In [1]:
from huggingface_hub import login
login()  # Will prompt for your token

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [16]:
!del tamazight_model.pt


In [17]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import numpy as np
from datasets import load_dataset
# Tifinagh transliteration mapping
LATIN_TO_TIFINAGH = {
    'a': 'ⴰ', 'b': 'ⴱ', 'c': 'ⵛ', 'č': 'ⵞ', 'd': 'ⴷ', 'ḍ': 'ⴹ',
    'e': 'ⴻ', 'ɛ': 'ⵄ', 'f': 'ⴼ', 'g': 'ⴳ', 'ǧ': 'ⴵ', 'ɣ': 'ⵖ',
    'h': 'ⵀ', 'ḥ': 'ⵃ', 'i': 'ⵉ', 'j': 'ⵊ', 'k': 'ⴽ', 'l': 'ⵍ',
    'm': 'ⵎ', 'n': 'ⵏ', 'q': 'ⵇ', 'r': 'ⵔ', 'ṛ': 'ⵕ', 's': 'ⵙ',
    'ṣ': 'ⵚ', 't': 'ⵜ', 'ṭ': 'ⵟ', 'u': 'ⵓ', 'w': 'ⵡ', 'x': 'ⵅ',
    'y': 'ⵢ', 'z': 'ⵣ', 'ẓ': 'ⵥ',
    'A': 'ⴰ', 'B': 'ⴱ', 'C': 'ⵛ', 'Č': 'ⵞ', 'D': 'ⴷ', 'Ḍ': 'ⴹ',
    'E': 'ⴻ', 'Ɛ': 'ⵄ', 'F': 'ⴼ', 'G': 'ⴳ', 'Ǧ': 'ⴵ', 'Ɣ': 'ⵖ',
    'H': 'ⵀ', 'Ḥ': 'ⵃ', 'I': 'ⵉ', 'J': 'ⵊ', 'K': 'ⴽ', 'L': 'ⵍ',
    'M': 'ⵎ', 'N': 'ⵏ', 'Q': 'ⵇ', 'R': 'ⵔ', 'Ṛ': 'ⵕ', 'S': 'ⵙ',
    'Ṣ': 'ⵚ', 'T': 'ⵜ', 'Ṭ': 'ⵟ', 'U': 'ⵓ', 'W': 'ⵡ', 'X': 'ⵅ',
    'Y': 'ⵢ', 'Z': 'ⵣ', 'Ẓ': 'ⵥ'
}

TIFINAGH_TO_LATIN = {v: k.lower() for k, v in LATIN_TO_TIFINAGH.items()}

def tifinagh_to_latin(text):
    """Convert Tifinagh script to Latin script"""
    result = []
    for char in text:
        result.append(TIFINAGH_TO_LATIN.get(char, char))
    return ''.join(result)

def latin_to_tifinagh(text):
    """Convert Latin script to Tifinagh script"""
    sorted_chars = sorted(LATIN_TO_TIFINAGH.keys(), key=len, reverse=True)
    result = text
    for latin_char in sorted_chars:
        result = result.replace(latin_char, LATIN_TO_TIFINAGH[latin_char])
    return result

class Vocabulary:
    def __init__(self):
        self.char2idx = {'<PAD>': 0, '<SOS>': 1, '<EOS>': 2, '<UNK>': 3}
        self.idx2char = {0: '<PAD>', 1: '<SOS>', 2: '<EOS>', 3: '<UNK>'}
        self.n_chars = 4
    
    def add_char(self, char):
        if char not in self.char2idx:
            self.char2idx[char] = self.n_chars
            self.idx2char[self.n_chars] = char
            self.n_chars += 1
    
    def build_vocab(self, texts):
        for text in texts:
            for char in text:
                self.add_char(char)

class TranslationDataset(Dataset):
    def __init__(self, source_texts, target_texts, src_vocab, tgt_vocab, max_len=100):
        self.source_texts = source_texts
        self.target_texts = target_texts
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len
    
    def __len__(self):
        return len(self.source_texts)
    
    def __getitem__(self, idx):
        src = self.source_texts[idx].lower()
        tgt = self.target_texts[idx]
        
        # Encode source
        src_indices = [self.src_vocab.char2idx.get(c, 3) for c in src]
        src_indices = src_indices[:self.max_len]
        
        # Encode target
        tgt_indices = [self.tgt_vocab.char2idx['<SOS>']]
        tgt_indices += [self.tgt_vocab.char2idx.get(c, 3) for c in tgt]
        tgt_indices += [self.tgt_vocab.char2idx['<EOS>']]
        tgt_indices = tgt_indices[:self.max_len]
        
        return (torch.tensor(src_indices), torch.tensor(tgt_indices))

def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src, tgt in batch:
        src_batch.append(src)
        tgt_batch.append(tgt)
    
    src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=0, batch_first=True)
    tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, padding_value=0, batch_first=True)
    
    return src_batch, tgt_batch

class Seq2SeqModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, embed_size=256, hidden_size=512, num_layers=2, dropout=0.3):
        super().__init__()
        
        self.encoder_embed = nn.Embedding(src_vocab_size, embed_size, padding_idx=0)
        self.decoder_embed = nn.Embedding(tgt_vocab_size, embed_size, padding_idx=0)
        
        self.encoder = nn.LSTM(embed_size, hidden_size, num_layers, 
                              batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True)
        
        self.decoder = nn.LSTM(embed_size, hidden_size * 2, num_layers, 
                              batch_first=True, dropout=dropout if num_layers > 1 else 0)
        
        self.fc = nn.Linear(hidden_size * 2, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, src, tgt):
        # Encode
        src_embed = self.dropout(self.encoder_embed(src))
        encoder_out, (hidden, cell) = self.encoder(src_embed)
        
        # Prepare decoder hidden state
        hidden = hidden.view(self.encoder.num_layers, 2, src.size(0), -1)
        hidden = torch.cat([hidden[:, 0], hidden[:, 1]], dim=2)
        
        cell = cell.view(self.encoder.num_layers, 2, src.size(0), -1)
        cell = torch.cat([cell[:, 0], cell[:, 1]], dim=2)
        
        # Decode
        tgt_embed = self.dropout(self.decoder_embed(tgt))
        decoder_out, _ = self.decoder(tgt_embed, (hidden, cell))
        
        output = self.fc(decoder_out)
        return output

def load_and_preprocess_dataset(dataset_choice='both'):
    """Load datasets from Hugging Face
    
    Args:
        dataset_choice: 'both', 'weblate', or 'beni-mellal'
    
    Returns:
        source_texts, target_texts: Lists of English and Tamazight (Latin) texts
    """
    from datasets import load_dataset
    
    source_texts = []
    target_texts = []
    
    # Dataset 1: Weblate-Translations
    if dataset_choice in ['both', 'weblate']:
        print("Loading Weblate-Translations dataset...")
        try:
            ds1 = load_dataset("Tamazight-NLP/Weblate-Translations", "en-ber")
            
            for example in ds1['train']:
                source_texts.append(example['source_string'])
                # Convert Tifinagh to Latin
                target_texts.append(tifinagh_to_latin(example['target_string']))
            
            print(f"✓ Loaded {len(source_texts)} examples from Weblate-Translations")
        except Exception as e:
            print(f"✗ Error loading Weblate-Translations: {e}")
            import traceback
            traceback.print_exc()
    
    # Dataset 2: Beni-Mellal-Tamazight
    if dataset_choice in ['both', 'beni-mellal']:
        print("\nLoading Beni-Mellal-Tamazight dataset...")
        print("Note: This dataset may require authentication.")
        print("If you haven't logged in, run: huggingface-cli login")
        
        try:
            # Try loading with token if available
            

            ds2 = load_dataset(
                "parquet",
                data_files={
                    "train": "hf://datasets/Tamazight-NLP/Beni-Mellal-Tamazight/data/train-00000-of-00001.parquet"
                }
            )
            ds2=ds2.remove_columns(["Audio"])

            
            initial_count = len(source_texts)
            
            # Check what splits are available
            print(f"✓ Dataset loaded successfully!")
            print(f"  Available splits: {list(ds2.keys())}")
            
            # Iterate through all splits (train, test, validation, etc.)
            for split_name in ds2.keys():
                print(f"\nProcessing {split_name} split...")
                split_data = ds2[split_name]
                
                # Check column names
                if len(split_data) > 0:
                    print(f"  Columns: {split_data.column_names}")
                    print(f"  Size: {len(split_data)} examples")
                    
                    # Show first example to debug
                    print(f"  First example keys: {list(split_data[0].keys())}")
                
                added_in_split = 0
                for example in split_data:
                    # Handle different possible column names
                    english_text = None
                    tamazight_text = None
                    
                    # Try to get English text
                    if 'English' in example:
                        english_text = example['English']
                    elif 'english' in example:
                        english_text = example['english']
                    elif 'source' in example:
                        english_text = example['source']
                    elif 'en' in example:
                        english_text = example['en']
                    
                    # Try to get Tamazight text
                    if 'tamazight' in example:
                        tamazight_text = example['tamazight']
                    elif 'Tamazight' in example:
                        tamazight_text = example['Tamazight']
                    elif 'target' in example:
                        tamazight_text = example['target']
                    elif 'tzm' in example:
                        tamazight_text = example['tzm']
                    elif 'ber' in example:
                        tamazight_text = example['ber']
                    
                    # Add to dataset if both are valid
                    if english_text and tamazight_text:
                        # Clean whitespace
                        english_text = str(english_text).strip()
                        tamazight_text = str(tamazight_text).strip()
                        
                        if english_text and tamazight_text:
                            source_texts.append(english_text)
                            # Convert Tifinagh to Latin
                            target_texts.append(tifinagh_to_latin(tamazight_text))
                            added_in_split += 1
                
                print(f"  Added {added_in_split} examples from {split_name}")
            
            added_count = len(source_texts) - initial_count
            print(f"\n✓ Total added from Beni-Mellal-Tamazight: {added_count} examples")
            print(f"✓ Total examples: {len(source_texts)}")
            
        except Exception as e:
            print(f"✗ Error loading Beni-Mellal-Tamazight: {e}")
            print(f"  Error type: {type(e).__name__}")
            import traceback
            traceback.print_exc()
            
            if dataset_choice == 'beni-mellal':
                print("\n  FALLBACK: Beni-Mellal dataset failed. Using Weblate instead...")
                if len(source_texts) == 0:
                    print("  Loading Weblate-Translations as fallback...")
                    try:
                        ds1 = load_dataset("Tamazight-NLP/Weblate-Translations", "en-ber")
                        for example in ds1['train']:
                            source_texts.append(example['source_string'])
                            target_texts.append(tifinagh_to_latin(example['target_string']))
                        print(f"✓ Loaded {len(source_texts)} examples from Weblate-Translations (fallback)")
                    except Exception as fallback_error:
                        print(f"✗ Fallback also failed: {fallback_error}")
            else:
                print("  Continuing with Weblate-Translations only...")
    
    if len(source_texts) == 0:
        raise ValueError(
            "No data loaded! Please check:\n"
            "1. Internet connection\n"
            "2. Hugging Face authentication (run: huggingface-cli login)\n"
            "3. Dataset availability\n"
            "Try using dataset_choice='weblate' which doesn't require authentication."
        )
    
    print(f"\n{'='*60}")
    print(f"Final dataset size: {len(source_texts)} translation pairs")
    print(f"{'='*60}")
    
    # Show some examples from each dataset
    print("\nSample translations:")
    for i in range(min(5, len(source_texts))):
        en_preview = source_texts[i][:60] if len(source_texts[i]) > 60 else source_texts[i]
        tz_preview = target_texts[i][:60] if len(target_texts[i]) > 60 else target_texts[i]
        print(f"\n  [{i+1}] EN: {en_preview}")
        print(f"      TZM (Latin): {tz_preview}")
        print(f"      TZM (Tifinagh): {latin_to_tifinagh(tz_preview)}")
    
    return source_texts, target_texts

def train_model(source_texts, target_texts, epochs=10, batch_size=32, max_samples=None):
    """Train the Seq2Seq model from scratch"""
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Limit samples for quick testing
    if max_samples:
        source_texts = source_texts[:max_samples]
        target_texts = target_texts[:max_samples]
        print(f"Training on {len(source_texts)} samples")
    
    # Build vocabularies
    print("Building vocabularies...")
    src_vocab = Vocabulary()
    tgt_vocab = Vocabulary()
    
    src_vocab.build_vocab(source_texts)
    tgt_vocab.build_vocab(target_texts)
    
    print(f"Source vocab size: {src_vocab.n_chars}")
    print(f"Target vocab size: {tgt_vocab.n_chars}")
    
    # Create dataset
    dataset = TranslationDataset(source_texts, target_texts, src_vocab, tgt_vocab)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    
    # Initialize model
    model = Seq2SeqModel(src_vocab.n_chars, tgt_vocab.n_chars).to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    print("\nStarting training...")
    model.train()
    
    for epoch in range(epochs):
        total_loss = 0
        for i, (src, tgt) in enumerate(dataloader):
            src, tgt = src.to(device), tgt.to(device)
            
            # Target input (all but last token)
            tgt_input = tgt[:, :-1]
            # Target output (all but first token)
            tgt_output = tgt[:, 1:]
            
            optimizer.zero_grad()
            
            # Forward pass
            output = model(src, tgt_input)
            
            # Calculate loss
            loss = criterion(output.reshape(-1, tgt_vocab.n_chars), tgt_output.reshape(-1))
            
            # Backward pass
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            total_loss += loss.item()
            
            if (i + 1) % 50 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Step [{i+1}/{len(dataloader)}], Loss: {loss.item():.4f}")
        
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch [{epoch+1}/{epochs}] completed. Average Loss: {avg_loss:.4f}")
    
    print("\nTraining complete!")
    
    # Save model and vocabularies
    torch.save({
        'model_state_dict': model.state_dict(),
        'src_vocab': src_vocab,
        'tgt_vocab': tgt_vocab,
        'model_config': {
            'src_vocab_size': src_vocab.n_chars,
            'tgt_vocab_size': tgt_vocab.n_chars
        }
    }, './tamazight_model.pt')
    
    print("Model saved to ./tamazight_model.pt")
    return model, src_vocab, tgt_vocab

def translate_to_latin(text, model_path='./tamazight_model.pt', max_len=100):
    """Translate English to Tamazight (Latin script)"""
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Load model and vocabularies
    checkpoint = torch.load(model_path, map_location=device)
    src_vocab = checkpoint['src_vocab']
    tgt_vocab = checkpoint['tgt_vocab']
    
    model = Seq2SeqModel(src_vocab.n_chars, tgt_vocab.n_chars).to(device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()
    
    # Encode input
    text = text.lower()
    src_indices = [src_vocab.char2idx.get(c, 3) for c in text]
    src_tensor = torch.tensor([src_indices]).to(device)
    
    with torch.no_grad():
        # Encode
        src_embed = model.encoder_embed(src_tensor)
        encoder_out, (hidden, cell) = model.encoder(src_embed)
        
        # Prepare hidden states
        hidden = hidden.view(model.encoder.num_layers, 2, 1, -1)
        hidden = torch.cat([hidden[:, 0], hidden[:, 1]], dim=2)
        cell = cell.view(model.encoder.num_layers, 2, 1, -1)
        cell = torch.cat([cell[:, 0], cell[:, 1]], dim=2)
        
        # Decode
        decoder_input = torch.tensor([[tgt_vocab.char2idx['<SOS>']]]).to(device)
        decoded_chars = []
        
        for _ in range(max_len):
            tgt_embed = model.decoder_embed(decoder_input)
            decoder_out, (hidden, cell) = model.decoder(tgt_embed, (hidden, cell))
            output = model.fc(decoder_out)
            
            # Get predicted character
            pred_idx = output.argmax(2).item()
            
            # Stop if EOS
            if pred_idx == tgt_vocab.char2idx['<EOS>']:
                break
            
            # Add to result
            if pred_idx not in [0, 1, 2]:  # Skip PAD, SOS, EOS
                decoded_chars.append(tgt_vocab.idx2char[pred_idx])
            
            # Next input
            decoder_input = torch.tensor([[pred_idx]]).to(device)
    
    return ''.join(decoded_chars)

def translate_and_convert(text, model_path='./tamazight_model.pt'):
    """Translate English to Tamazight and convert to Tifinagh"""
    latin_result = translate_to_latin(text, model_path)
    return latin_to_tifinagh(latin_result)

# Example usage
if __name__ == "__main__":
    import os
    import shutil
    
    # Force retrain option
    FORCE_RETRAIN = False  
    
    if FORCE_RETRAIN and os.path.exists('./tamazight_model.pt'):
        print("Deleting old model to retrain...")
        os.remove('./tamazight_model.pt')
    
    if not os.path.exists('./tamazight_model.pt'):
        print("Model not found. Training new model...")
        print("\n" + "="*60)
        print("IMPORTANT: Beni-Mellal dataset requires authentication!")
        print("="*60)
        print("\nIf you see authentication errors, run this command in terminal:")
        print("  huggingface-cli login")
        print("\nOr authenticate in Python:")
        print("  from huggingface_hub import login")
        print("  login()")
        print("\nAlternatively, use 'weblate' dataset which doesn't require auth:")
        print("  load_and_preprocess_dataset(dataset_choice='weblate')")
        print("="*60 + "\n")
        
        # Load ONLY Beni-Mellal dataset
        #source_texts, target_texts = load_and_preprocess_dataset(dataset_choice='beni-mellal')
        source_texts, target_texts = load_and_preprocess_dataset(dataset_choice='both')
        # Train model
        # For quick testing: max_samples=1000, epochs=5
        # For better quality: max_samples=None, epochs=10-15
        model, src_vocab, tgt_vocab = train_model(
            source_texts, target_texts, 
            epochs=10, 
            batch_size=32,
            max_samples=None  # Set to 1000 for quick testing
        )
    else:
        print("Model found. Skipping training.")
      
    
    # Test translation
    print("\n=== Testing Translation ===")
    test_texts = ["Hello", "How are you?", "Thank you", "Good morning"]
    
    for test_text in test_texts:
        latin = translate_to_latin(test_text)
        tifinagh = translate_and_convert(test_text)
        
        print(f"English: {test_text}")
        print(f"Tamazight (Latin): {latin}")
        print(f"Tamazight (Tifinagh): {tifinagh}")
        print()

Model not found. Training new model...

IMPORTANT: Beni-Mellal dataset requires authentication!

If you see authentication errors, run this command in terminal:
  huggingface-cli login

Or authenticate in Python:
  from huggingface_hub import login
  login()

Alternatively, use 'weblate' dataset which doesn't require auth:
  load_and_preprocess_dataset(dataset_choice='weblate')

Loading Weblate-Translations dataset...
✓ Loaded 1138 examples from Weblate-Translations

Loading Beni-Mellal-Tamazight dataset...
Note: This dataset may require authentication.
If you haven't logged in, run: huggingface-cli login
✓ Dataset loaded successfully!
  Available splits: ['train']

Processing train split...
  Columns: ['English', 'Tamazight']
  Size: 2322 examples
  First example keys: ['English', 'Tamazight']
  Added 2321 examples from train

✓ Total added from Beni-Mellal-Tamazight: 2321 examples
✓ Total examples: 3459

Final dataset size: 3459 translation pairs

Sample translations:

  [1] EN: Sear

In [18]:
translate_to_latin("Hello how are you")

'imshinnax a-š-i3jbn'