In [None]:
%%capture
!pip install transformers==4.35.2
!pip install datasets==2.15.0
!pip install soundfile==0.12.1
!pip install speechbrain==0.5.16

In [None]:
pip install transformers huggingface_hub

In [2]:
from huggingface_hub import HfFolder

# Replace 'your_token_here' with your actual Hugging Face token
token = "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"
HfFolder.save_token(token)

In [1]:
from datasets import load_dataset,Dataset, Audio

dataset_stream = load_dataset("facebook/voxpopuli", "en", streaming=True)

dataset = {
    'train': dataset_stream["train"],
    'validation': dataset_stream["validation"],
    'test': dataset_stream["test"]
}

# Cast the audio column for preprocessing
dataset['train'] = dataset['train'].cast_column("audio", Audio(sampling_rate=16000))
dataset['validation'] = dataset['validation'].cast_column("audio", Audio(sampling_rate=16000))
dataset['test'] = dataset['test'].cast_column("audio", Audio(sampling_rate=16000))


train_dataset = list(dataset_stream["train"].take(5500))
validation_dataset = list(dataset_stream["validation"].take(500))
test_dataset = list(dataset_stream["test"].take(500))

dataset = {
    'train': Dataset.from_list(train_dataset),
    'validation': Dataset.from_list(validation_dataset),
    'test': Dataset.from_list(test_dataset)
}
train_dataset = dataset['train']
valid_dataset = dataset['validation']
test_dataset = dataset['test']



In [3]:
from transformers import SpeechT5Processor

checkpoint="microsoft/speecht5_tts"
processor=SpeechT5Processor.from_pretrained(checkpoint)

preprocessor_config.json:   0%|          | 0.00/433 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/232 [00:00<?, ?B/s]

spm_char.model:   0%|          | 0.00/238k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/40.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/234 [00:00<?, ?B/s]

In [4]:
def extract_all_chars(batch):
    all_text=" ".join(batch["normalized_text"])
    vocab=list(set(all_text))
    return {"vocab": [vocab], "all_text":[all_text]}

from datasets import Dataset

if isinstance(train_dataset, dict):
    train_dataset = Dataset.from_dict(train_dataset)

# Now you can use dataset.column_names
vocabs = train_dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=train_dataset.column_names,
)



tokenizer=processor.tokenizer

dataset_vocab=set(vocabs["vocab"][0])
tokenizer_vocab={k for k, _ in tokenizer.get_vocab().items()}

Map:   0%|          | 0/5500 [00:00<?, ? examples/s]

In [5]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

spk_model_name="speechbrain/spkrec-xvect-voxceleb"

device="cuda"
speaker_model=EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)


def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings=speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings=torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings=speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings


def prepare_dataset(example):
    audio=example["audio"]
    
    example=processor(
        text=example["normalized_text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )
    
    # strip off thje batch dimension
    example["labels"]=example["labels"][0]
    
    # use SpeechBrain to obtain x-vector
    example["speaker_embeddings"]=create_speaker_embedding(audio["array"])
    
    return example

hyperparams.yaml:   0%|          | 0.00/2.04k [00:00<?, ?B/s]

embedding_model.ckpt:   0%|          | 0.00/16.9M [00:00<?, ?B/s]

mean_var_norm_emb.ckpt:   0%|          | 0.00/3.20k [00:00<?, ?B/s]

classifier.ckpt:   0%|          | 0.00/15.9M [00:00<?, ?B/s]

label_encoder.txt:   0%|          | 0.00/129k [00:00<?, ?B/s]

In [None]:
# applying the processing function to the entire dataset
train_dataset=train_dataset.map(prepare_dataset)
valid_dataset=valid_dataset.map(prepare_dataset)
test_dataset=test_dataset.map(prepare_dataset)
# Columns to remove
columns_to_remove = [
    "audio_id",
    "language",
    "audio",
    "raw_text",
    "normalized_text",
    "gender",
    "speaker_id",
    "is_gold_transcript",
    "accent",
]

# Remove columns
train_dataset = train_dataset.remove_columns(columns_to_remove)
test_dataset = test_dataset.remove_columns(columns_to_remove)
valid_dataset = valid_dataset.remove_columns(columns_to_remove)

Map:   0%|          | 0/5500 [00:00<?, ? examples/s]

Token indices sequence length is longer than the specified maximum sequence length for this model (618 > 600). Running this sequence through the model will result in indexing errors


In [None]:
len(train_dataset)

In [None]:
def is_not_too_long(input_ids):
    input_length=len(input_ids)
    return input_length<220

train_dataset=train_dataset.filter(is_not_too_long, input_columns=["input_ids"])
test_dataset=test_dataset.filter(is_not_too_long, input_columns=["input_ids"])
valid_dataset=valid_dataset.filter(is_not_too_long, input_columns=["input_ids"])

In [None]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class TTSDataCollatorWithPadding:
    
    processor: Any
    
    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_ids=[{"input_ids":feature["input_ids"]} for feature in features]
        label_features=[{"input_values":feature["labels"]} for feature in features]
        speaker_features=[feature["speaker_embeddings"] for feature in features]
        
        # collate the inputs and targets into a batch
        batch=processor.pad(input_ids=input_ids, labels=label_features, return_tensors="pt")
        
        # replace padding with -100 to ignore loss correctly
        batch["labels"]=batch["labels"].masked_fill(batch.decoder_attention_mask.unsqueeze(-1).ne(1),-100)
        #not used during fine-tuning
        del batch["decoder_attention_mask"]
        
        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor>1:
            target_lengths=torch.tensor([len(feature["input_values"]) for feature in label_features])
            target_lengths=target_lengths.new(
                [length-length%model.config.reduction_factor for length in target_lengths]
            )
            max_length=max(target_lengths)
            batch["labels"]=batch["labels"][:, :max_length]
        
        # also add in the speaker embeddings
        batch["speaker_embeddings"]=torch.tensor(speaker_features)
        
        return batch

In [None]:
data_collator=TTSDataCollatorWithPadding(processor=processor)

In [None]:
from transformers import SpeechT5ForTextToSpeech

model=SpeechT5ForTextToSpeech.from_pretrained(checkpoint)
model.config.use_cache=False

In [None]:
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer

training_args = Seq2SeqTrainingArguments(
    output_dir="sohail3",                       
    per_device_train_batch_size=8,               
    gradient_accumulation_steps=8,               
    learning_rate=0.0002,                        
    weight_decay=0.01,                           
    warmup_steps=50,                             
    num_train_epochs=3,                          
    gradient_checkpointing=True,                 
    evaluation_strategy="steps",                 
    per_device_eval_batch_size=2,                
    save_steps=100,                              
    eval_steps=50,                               
    logging_steps=25,                            
    load_best_model_at_end=True,                 
    greater_is_better=False,                     
    label_names=["labels"],                      
    push_to_hub=False,                           
)

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    data_collator=data_collator,
    tokenizer=processor,                         
)

trainer.train()


In [None]:
trainer.evaluate(test_dataset)

In [None]:
model.save_pretrained('./my_model_sohail1')
processor.save_pretrained('./my_model_sohail1')

In [None]:
from huggingface_hub import upload_folder

# Specify the model path and your Hugging Face repository name
upload_folder(
    repo_id="sohail2003/pattern3.1",
    folder_path="/kaggle/working/my_model_sohail1"
)

In [None]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
import torchaudio

# Path to your fine-tuned checkpoint
checkpoint_path = "/kaggle/working/sohail/checkpoint-200"

# Load the processor and TTS model
processor = SpeechT5Processor.from_pretrained("sohail2003/pattern3")
model = SpeechT5ForTextToSpeech.from_pretrained("sohail2003/pattern3")

# Load HiFi-GAN vocoder
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

# Load SpeechBrain speaker embedding model
spk_model_name = "speechbrain/spkrec-xvect-voxceleb"
device = "cuda" if torch.cuda.is_available() else "cpu"
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)

# Function to generate speaker embeddings
def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform).to(device))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        return speaker_embeddings.squeeze().cpu().numpy()

# Example dataset preparation (if needed for testing)
def prepare_dataset(example):
    audio = example["audio"]
    example = processor(
        text=example["normalized_text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )
    # Strip off the batch dimension
    example["labels"] = example["labels"][0]

    # Use SpeechBrain to obtain x-vector
    example["speaker_embeddings"] = create_speaker_embedding(audio["array"])
    return example

def generate_random_speaker_embedding():
    # The embedding size for SpeechT5 is 512 dimensions
    return torch.randn(1, 512).float()

# Generate TTS with random speaker embedding
# Function to generate speech from text using a random speaker embedding
def generate_speech(text):
    # Create a random speaker embedding
    speaker_embedding = generate_random_speaker_embedding()

    # Prepare the text input
    inputs = processor(text=text, return_tensors="pt")

    # Generate speech
    speech = model.generate_speech(inputs["input_ids"], speaker_embedding, vocoder=vocoder)

    # Reshape the speech tensor to ensure it's 2D [channels, samples]
    if speech.dim() == 1:  # Check if the tensor is 1D (single-channel audio)
        speech = speech.unsqueeze(0)  # Add a channel dimension

    # Save the speech to a file
    torchaudio.save("output.wav", speech, 16000)
    print("Speech saved to 'output.wav'")
    return speech


# Example usage
text_input = "why are you doing that are you crazy "
# Generate speech
generated_speech = generate_speech(text_input)

# Play the audio (optional)
from IPython.display import Audio
Audio("output.wav")
