In [None]:
!pip install transformers datasets soundfile accelerate speechbrain==0.5.16

In [None]:
from huggingface_hub import login
login(token="")


In [3]:
from datasets import Dataset, Audio
import pandas as pd
def load_datasset(data_path):
    data = []
    with open(data_path, 'r') as f:
        for line in f:
            audio_path, text = line.strip().split('|')
            data.append({'audio': audio_path, 'transcription': text})
    df = pd.DataFrame(data)
    dataset = Dataset.from_pandas(df)
    dataset = dataset.cast_column('audio', Audio())
    return dataset

path_to_my_local_data=r'C:\program1\my_project\data.txt'
dataset=load_datasset(path_to_my_local_data)


In [None]:
len(dataset)

In [None]:
print(dataset)

In [6]:
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

In [7]:
from transformers import SpeechT5Processor

checkpoint ="arielSultan/final_project_55000"
processor = SpeechT5Processor.from_pretrained(checkpoint)


In [8]:
tokenizer = processor.tokenizer

In [None]:
dataset[2:5]

Let's normalize the dataset, create a column called "normalized_text"

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["transcription"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

In [None]:
dataset_vocab - tokenizer_vocab

In [None]:
import re

def normalize_text(text):
    # Convert to lowercase
    text = text.lower()

    # Remove punctuation (except apostrophes)
    text = re.sub(r'[^\w\s\']', '', text)

    # Remove extra whitespace
    text = ' '.join(text.split())

    return text

# Define a function to add the normalized_text column
def add_normalized_text(example):
    example['normalized_text'] = normalize_text(example['transcription'])
    return example

# Apply the function to the dataset
dataset = dataset.map(add_normalized_text)

# Print the first few examples to verify
print(dataset[2:5])

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["normalized_text"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

In [None]:
dataset_vocab - tokenizer_vocab

In [None]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

spk_model_name = "speechbrain/spkrec-xvect-voxceleb"

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)


def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings = speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings

In [16]:
def prepare_dataset(example):
    audio = example["audio"]
    if not audio or len(audio["array"]) == 0:
        print("Empty audio found!")
    if not example["normalized_text"]:
        print("Empty transcription found!")
        
    example = processor(
        text=example["normalized_text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )

    # strip off the batch dimension
    example["labels"] = example["labels"][0]

    # use SpeechBrain to obtain x-vector
    example["speaker_embeddings"] = create_speaker_embedding(audio["array"])

    return example

In [None]:
processed_example = prepare_dataset(dataset[0])
list(processed_example.keys())

In [None]:
processed_example["speaker_embeddings"].shape

In [None]:
dataset_copy = dataset.map(prepare_dataset, remove_columns=dataset.column_names)

In [None]:
# prev 300
def is_not_too_long(input_ids):
    input_length = len(input_ids)
    return input_length <= 320 and input_length >= 50
print(len(dataset_copy))
dataset = dataset_copy.filter(is_not_too_long, input_columns=["input_ids"])
print(len(dataset))


In [21]:
dataset = dataset.train_test_split(test_size=0.1)

In [22]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class TTSDataCollatorWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # collate the inputs and targets into a batch
        batch = processor.pad(
            input_ids=input_ids, labels=label_features, return_tensors="pt"
        )

        # replace padding with -100 to ignore loss correctly
        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        # not used during fine-tuning
        del batch["decoder_attention_mask"]

        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor(
                [len(feature["input_values"]) for feature in label_features]
            )
            target_lengths = target_lengths.new(
                [
                    length - length % model.config.reduction_factor
                    for length in target_lengths
                ]
            )
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # also add in the speaker embeddings
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch

In [23]:
data_collator = TTSDataCollatorWithPadding(processor=processor)

In [24]:
from transformers import SpeechT5ForTextToSpeech

model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint)

In [25]:
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(model.generate, use_cache=True)

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="final_project_70000",  # change to a repo name of your choice
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    learning_rate=1e-4,
    warmup_steps=100,
    max_steps=70000,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=2,
    save_steps=1000,
    eval_steps=100,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    greater_is_better=False,
    label_names=["labels"],
    push_to_hub=True,
)

In [None]:
from transformers import Seq2SeqTrainer
from huggingface_hub import login

# Replace 'YOUR_HF_TOKEN' with your actual token
login(token="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")
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor,
)

In [None]:
trainer.train(resume_from_checkpoint="final_project_55000\checkpoint-55000")

In [None]:
trainer.push_to_hub()

# Inference

In [None]:
from transformers import SpeechT5ForTextToSpeech
model = SpeechT5ForTextToSpeech.from_pretrained(
    "arielSultan/final_project_70000"
)

In [34]:
example = dataset["test"][0]
speaker_embeddings = torch.tensor(example["speaker_embeddings"]).unsqueeze(0)
with open('var.txt','w') as file:
    file.write(str(dataset["test"][0]["speaker_embeddings"]))

In [None]:
final_text="shalom martsey mihlelet netaneyahh aniy hhabot hhamzooyayf shel seport beyhoot gvohhahh , hhakol sheatem shomeiym karega eyno amiytiy venotsar al ydey beynahh melahootiyt . nismah lishmoa et dathem veshetagiyshoo otanoo kvar al  meahh  tizkroo tamiyd am yisrael hay ."

inputs = processor(text=final_text, return_tensors="pt")

In [62]:
from transformers import SpeechT5HifiGan

vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")
speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)

In [63]:
from IPython.display import Audio
import soundfile as sf

Audio(speech.numpy(), rate=16000)
# Save the audio to a file (e.g., 'output.wav')
sf.write('output3.wav', speech.numpy(), 16000)