In [1]:
import os
import pandas as pd
from phonemize import phonemize
import phonemizer
import spacy
import torch
from tqdm import tqdm
import pickle
import yaml
from torch.utils.data import Dataset, DataLoader
import numpy as np
from concurrent.futures import ThreadPoolExecutor

In [None]:
# Check for GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
# Initialize phonemizer and tokenizer
config_path = "Configs/config.yml"
config = yaml.safe_load(open(config_path))

In [None]:
# Initialize phonemizer
global_phonemizer = phonemizer.backend.EspeakBackend(
    language='ur', 
    preserve_punctuation=True,  
    with_stress=True
)

In [None]:
# Initialize spacy tokenizer with GPU support if available
spacy.prefer_gpu()
nlp = spacy.blank('ur')
if torch.cuda.is_available():
    nlp.to(device)
tokenizer = nlp.tokenizer

In [None]:
class TextDataset(Dataset):
    """Custom Dataset class for batch processing"""
    def __init__(self, texts):
        self.texts = texts
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        return self.texts[idx]

def batch_process_texts(batch_texts):
    """Process a batch of texts in parallel"""
    processed_batch = []
    
    # Use GPU for spaCy processing if available
    docs = list(nlp.pipe(batch_texts, batch_size=32))
    
    for doc, text in zip(docs, batch_texts):
        # Tokenize the sentence
        tokens = [token.text for token in doc]
        
        # Phonemize the sentence
        phonemized = phonemize(text, backend=global_phonemizer)
        
        processed_batch.append({
            'text': text,
            'tokens': tokens,
            'phonemized': phonemized
        })
    
    return processed_batch

In [None]:
def process_shard(shard):
    """
    Process a shard of the dataset using batch processing
    """
    # Create DataLoader for batch processing
    dataset = TextDataset(shard)
    dataloader = DataLoader(
        dataset,
        batch_size=32,
        num_workers=4,
        pin_memory=torch.cuda.is_available()
    )
    
    processed_data = []
    for batch in tqdm(dataloader, desc="Processing batches"):
        batch_results = batch_process_texts(batch)
        processed_data.extend(batch_results)
    
    return processed_data

In [None]:
def process_dataset(path, num_shards=4):
    """
    Process all text files in the given directory with GPU acceleration
    """
    # Getting all files in the directory
    files = os.listdir(path)
    
    Data_Collected = {}
    
    print("Reading input files...")
    for file in tqdm(files):
        # If txt just read and save data
        if file.endswith('.txt'):
            with open(os.path.join(path, file), 'r', encoding='utf-8') as f:
                data = f.read()
                Data_Collected[file] = data
        # If csv read and save data
        elif file.endswith('.csv'):
            df = pd.read_csv(os.path.join(path, file), delimiter='\t', encoding='utf-8')
            Data_Collected[file] = df['summery'] + ' ' + df['title']
        # If excel read and save data
        elif file.endswith('.xlsx'):
            df = pd.read_excel(os.path.join(path, file))
            Data_Collected[file] = df['Text']
            # Removing all emojis from the text
            Data_Collected[file] = Data_Collected[file].str.replace(r'[^\x00-\x7F]+', '', regex=True)
    
    # Combining all to one list
    dataset = []
    for key in Data_Collected.keys():
        if isinstance(Data_Collected[key], pd.DataFrame):
            dataset.extend(Data_Collected[key].values.tolist())
        elif isinstance(Data_Collected[key], pd.Series):
            dataset.extend(Data_Collected[key].tolist())
        else:
            dataset.append(Data_Collected[key])
    
    print(f"Total number of texts to process: {len(dataset)}")
    
    # Split the dataset into shards
    shard_size = len(dataset) // num_shards
    shards = [dataset[i:i+shard_size] for i in range(0, len(dataset), shard_size)]
    
    # Process each shard in parallel using ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=num_shards) as executor:
        processed_data = list(executor.map(process_shard, shards))
    
    # Flatten the processed data
    processed_data = [item for shard in processed_data for item in shard]
    
    return processed_data


In [None]:
def save_processed_dataset(processed_data, output_path):
    """
    Save the processed dataset
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    with open(output_path, 'wb') as f:
        pickle.dump(processed_data, f)
    
    print(f'Dataset saved to {output_path}')


In [None]:
def process_token_mapping(processed_data, config):
    """
    Process token mapping for the dataset with GPU acceleration
    """
    special_token = config['dataset_params']['word_separator']
    
    # Get all unique tokens
    print("Collecting unique tokens...")
    unique_tokens = {special_token}
    for item in tqdm(processed_data):
        if isinstance(item, dict):
            for token in item.get('tokens', []):
                unique_tokens.add(token)
    
    unique_tokens = list(unique_tokens)
    
    # Process tokens in batches for GPU efficiency
    print("Processing token cases...")
    token_maps = {}
    batch_size = 1000
    
    for i in tqdm(range(0, len(unique_tokens), batch_size)):
        batch_tokens = unique_tokens[i:i+batch_size]
        # Process batch of tokens
        words = [tokenizer.decode([t]) for t in batch_tokens]
        words = [word.lower() for word in words]
        new_tokens = [tokenizer.encode(word)[0] for word in words]
        
        for t, word, new_t in zip(batch_tokens, words, new_tokens):
            token_maps[t] = {'word': word, 'token': new_t}
    
    # Save token mapping
    with open(config['dataset_params']['token_maps'], 'wb') as handle:
        pickle.dump(token_maps, handle)
    print(f'Token mapper saved to {config["dataset_params"]["token_maps"]}')

In [None]:
# Process dataset
input_path = 'Data/To Use'
processed_data = process_dataset(input_path)

In [None]:
# Save processed dataset
output_path = os.path.join(config['data_folder'], 'processed_dataset.pkl')
save_processed_dataset(processed_data, output_path)

In [None]:
# Process token mapping
process_token_mapping(processed_data, config)
print("Processing complete!")