# Seamless

In [12]:
import csv
import os
from tqdm import tqdm
from transformers import SeamlessM4Tv2ForTextToText, AutoProcessor

In [13]:
import torch
print(torch.cuda.is_available())
print(torch.cuda.device_count())
print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

True
1
NVIDIA GeForce RTX 2060


In [14]:
model = SeamlessM4Tv2ForTextToText.from_pretrained("facebook/seamless-m4t-v2-large")
processor = AutoProcessor.from_pretrained("facebook/seamless-m4t-v2-large")

if torch.cuda.is_available():
    model = model.to("cuda")

Loading checkpoint shards: 100%|██████████| 2/2 [00:00<00:00,  2.18it/s]


In [15]:
def translation(source_lang, target_lang, text, cuda = False):

    if cuda:
        text_inputs = processor(text, return_tensors="pt", src_lang=source_lang).to("cuda")
    else:
        text_inputs = processor(text, return_tensors="pt", src_lang=source_lang)
        
    output_tokens = model.generate(**text_inputs, tgt_lang=target_lang)
    translated_text = processor.decode(output_tokens[0], skip_special_tokens=True)

    return translated_text

### Translation CSV

In [16]:
# CSV translation function with line-by-line saving
def translate_csv(input_csv, source_lang, target_lang):
    encoding = 'utf-8'
    output_csv = f"{os.path.splitext(input_csv)[0]}_{target_lang}.csv"

    if torch.cuda.is_available():
        cuda = True

    # Count the number of rows already processed in the output file
    processed_rows = 0
    try:
        with open(output_csv, mode='r', encoding=encoding) as outfile:
            reader = csv.reader(outfile)
            processed_rows = sum(1 for row in reader) - 1  # Subtract 1 for the header row
    except FileNotFoundError:
        pass

    # Open the input file for reading
    with open(input_csv, mode='r', encoding=encoding) as infile:
        reader = csv.DictReader(infile)

        # Open the output file in append mode so that progress is saved after each row
        with open(output_csv, mode='a', newline='', encoding=encoding) as outfile:
            writer = csv.DictWriter(outfile, fieldnames=["Speaker", "Translated_Text"])
    
            # Check if the file is empty to avoid writing headers multiple times
            if infile.tell() == 0: # File is empty
                writer.writeheader() # Header == Columns names
  
            # Skip the already processed rows in the input file
            for _ in range(processed_rows):
                next(reader)
            
            # Use tqdm to display a progress bar
            rows = list(reader)
            for row in tqdm(rows, desc="Translating", unit="row"):
                speaker = row["Speaker"]
                text = row["Text"]

                # Translate the text
                translated_text = translation(source_lang, target_lang, text, cuda)

                # Write the speaker and translated text to the new CSV file immediately
                writer.writerow({"Speaker": speaker, "Translated_Text": translated_text})

In [17]:
source_lang = "fra" # French
target_lang = "eng" # English
path_file = "csv/7-1_script_interview_clinique_2_21-08-2020.csv"

translate_csv(path_file, source_lang, target_lang)

Translating: 100%|██████████| 191/191 [03:58<00:00,  1.25s/row]
