In [1]:
!pip install sacrebleu

Collecting sacrebleu
  Downloading sacrebleu-2.5.1-py3-none-any.whl.metadata (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting portalocker (from sacrebleu)
  Downloading portalocker-3.2.0-py3-none-any.whl.metadata (8.7 kB)
Downloading sacrebleu-2.5.1-py3-none-any.whl (104 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.1/104.1 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading portalocker-3.2.0-py3-none-any.whl (22 kB)
Installing collected packages: portalocker, sacrebleu
Successfully installed portalocker-3.2.0 sacrebleu-2.5.1


# Semantor-

Standard translation models often overlook the rich semantic information conveyed by punctuation, treating a question and a statement as nearly identical. The Semantor project is an experimental English-to-Hindi translation model that directly tackles this gap. Built on a PyTorch Transformer, its core innovation is a custom encoder that creates a powerful, unified embedding by fusing a word's traditional contextual meaning with a unique semantic value derived from its punctuation. By interpreting marks like '?' and '!' as direct signals of intent, the model gains a deeper, more human-like understanding of the source text, aiming to produce translations that are not just literally correct but also contextually nuanced.

In [2]:
# import pandas as pd
# dict=pd.read_csv('/kaggle/input/english-to-hindi-dataset/Dataset_English_Hindi.csv')
# vocab_dict={i:dict[0][i] } for i range(len(dict[0]))

In [3]:
!pip install sacrebleu



In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from typing import List, Dict, Tuple
import sacrebleu
from collections import Counter
import random

class SentenceEmbedderWithAttention(nn.Module):
    def __init__(self, sem_dim: int = 8, model_dim: int = 8, num_heads: int = 2):
        super().__init__()
        self.sem_dim = sem_dim
        self.model_dim = model_dim
        self.embed_dim = sem_dim + model_dim
        self.num_heads = num_heads
        self.head_dim = self.embed_dim // num_heads
        
        assert self.embed_dim % num_heads == 0, f"embed_dim ({self.embed_dim}) must be divisible by num_heads ({num_heads})"
        
        self.punctuation = {
            ',': 'continuation',
            '.': 'declaration', 
            '?': 'question',
            '!': 'intense_emotion',
            '"': 'quotes',
            '(': 'detail_start',
            ')': 'detail_end',
            ';': 'pause',
            ':': 'explanation',
            '-': 'dash'
        }
        self.punct2indx = {p: i+1 for i, p in enumerate(self.punctuation)}
        self.punct2indx['None'] = 0
        self.num_punct = len(self.punct2indx)
        
        self.sem_emb = self._init_semantic_embedding()
        self.W_q = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.W_k = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.W_v = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.out_proj = nn.Linear(self.embed_dim, self.embed_dim)
        self.layer_norm = nn.LayerNorm(self.embed_dim)
        self.ffn = nn.Sequential(
                nn.Linear(self.embed_dim, self.embed_dim * 4),
                nn.ReLU(),
                nn.Linear(self.embed_dim * 4, self.embed_dim)
               )

    def _init_semantic_embedding(self) -> nn.Embedding:
        embedding = nn.Embedding(self.num_punct, self.sem_dim)
        nn.init.uniform_(embedding.weight, -0.1, 0.1)
        return embedding

    def _get_positional_encoding(self, position: int, d_model: int) -> np.ndarray:
        pos_enc = np.zeros(d_model)
        for i in range(0, d_model, 2):
            pos_enc[i] = np.sin(position / (10000 ** (i / d_model)))
            if i + 1 < d_model:
                pos_enc[i+1] = np.cos(position / (10000 ** (i / d_model)))
        return pos_enc

    def _get_semantic_embeddings(self, sentence: str) -> np.ndarray:
        semantic_data = []
        words = sentence.split()

        for word in words:
            punct_idx = 0
            for punct in self.punctuation.keys():
                if word.endswith(punct):
                    punct_idx = self.punct2indx[punct]
                    break
            emb_vec = self.sem_emb(torch.tensor([punct_idx], dtype=torch.long))
            semantic_data.append(emb_vec.detach().numpy()[0])

        return np.array(semantic_data)

    def embed_sentence(self, sentence: str) -> torch.Tensor:
        words = sentence.split()
        if len(words) == 0:
            return torch.empty(0, self.embed_dim)

        semantic_embeddings = torch.tensor(
            self._get_semantic_embeddings(sentence), dtype=torch.float32
        )
        # Fix the numpy array creation warning
        pos_encodings = np.array([self._get_positional_encoding(idx, self.model_dim) for idx in range(len(words))])
        positional_encodings = torch.tensor(pos_encodings, dtype=torch.float32)
        combined_embeddings = torch.cat((semantic_embeddings, positional_encodings), dim=1)
        return combined_embeddings 

    def _split_heads(self, x: torch.Tensor) -> torch.Tensor:
        batch_size, seq_len, _ = x.size()
        return x.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

    def self_attention(self, embeddings: torch.Tensor) -> torch.Tensor:
        x = embeddings.unsqueeze(0)  
        Q = self.W_q(x)
        K = self.W_k(x)
        V = self.W_v(x)
        Q, K, V = map(self._split_heads, (Q, K, V)) 
        scores = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim ** 0.5)
        attn_weights = torch.softmax(scores, dim=-1)
        out = torch.matmul(attn_weights, V) 
        out = out.transpose(1, 2).contiguous().view(1, -1, self.embed_dim) 
        out = self.out_proj(out)
        return out.squeeze(0) 

    def forward_with_attention(self, sentence: str) -> torch.Tensor:
        embeddings = self.embed_sentence(sentence) 
        attended = self.self_attention(embeddings)
        x = self.layer_norm(embeddings + attended)
        ffn_out = self.ffn(x)
        output = self.layer_norm(x + ffn_out)
        return output


class SimpleDecoder(nn.Module):
    def __init__(self, embed_dim, vocab_size, num_heads=2):
        super().__init__()
        self.embed_dim = embed_dim
        self.vocab_size = vocab_size
        
        
        self.token_embedding = nn.Embedding(vocab_size, embed_dim)
        
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.cross_attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, embed_dim * 4),
            nn.ReLU(),
            nn.Linear(embed_dim * 4, embed_dim)
        )
        self.out_proj = nn.Linear(embed_dim, vocab_size)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.norm3 = nn.LayerNorm(embed_dim)
    
    def forward(self, tgt_tokens, memory):
        if isinstance(tgt_tokens, torch.Tensor) and tgt_tokens.dtype == torch.long:
            tgt_embeddings = self.token_embedding(tgt_tokens)
        else:
            tgt_embeddings = tgt_tokens
            
        if tgt_embeddings.dim() == 2:
            tgt_embeddings = tgt_embeddings.unsqueeze(0)
        if memory.dim() == 2:
            memory = memory.unsqueeze(0)
        
        seq_len = tgt_embeddings.size(1)
        causal_mask = self._causal_mask(seq_len)
        
        attn_out, _ = self.self_attn(tgt_embeddings, tgt_embeddings, tgt_embeddings, 
                                    attn_mask=causal_mask, is_causal=True)
        x = self.norm1(tgt_embeddings + attn_out)
        attn_out, _ = self.cross_attn(x, memory, memory)
        x = self.norm2(x + attn_out)
        
        
        ffn_out = self.ffn(x)
        x = self.norm3(x + ffn_out)
        
        
        logits = self.out_proj(x)
        return logits.squeeze(0) if logits.size(0) == 1 else logits
    
    def _causal_mask(self, size):
        mask = torch.triu(torch.ones(size, size), diagonal=1)
        return mask.masked_fill(mask == 1, float('-inf'))


class TranslationTrainer:
    def __init__(self, encoder, decoder, vocab_dict, learning_rate=0.001):
        self.encoder = encoder
        self.decoder = decoder
        self.vocab_dict = vocab_dict
        self.reverse_vocab = {v: k for k, v in vocab_dict.items()}
        
        self.criterion = nn.CrossEntropyLoss(ignore_index=0) 
        
        self.optimizer = optim.Adam(
            list(encoder.parameters()) + list(decoder.parameters()), 
            lr=learning_rate,
            weight_decay=1e-4
        )
        
        self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=10, gamma=0.9)
        
        self.loss_history = []
        self.bleu_history = []
    
    def tokens_to_ids(self, tokens: List[str]) -> List[int]:
        
        ids = []
        for token in tokens:
            if token in self.reverse_vocab:
                ids.append(self.reverse_vocab[token])
            else:
                print(f"Warning: Unknown token '{token}' replaced with <UNK>")
                ids.append(self.reverse_vocab['<UNK>'])
        return ids
    
    def prepare_target_sequence(self, target_tokens: List[str]) -> Tuple[torch.Tensor, torch.Tensor]:

        target_with_bos = ['<BOS>'] + target_tokens
        target_with_eos = target_tokens + ['<EOS>']
        
        input_ids = torch.tensor(self.tokens_to_ids(target_with_bos), dtype=torch.long)
        target_ids = torch.tensor(self.tokens_to_ids(target_with_eos), dtype=torch.long)
        
        return input_ids, target_ids
    
    def train_step(self, source_sentence: str, target_tokens: List[str]) -> float:
        self.encoder.train()
        self.decoder.train()
        self.optimizer.zero_grad()
        
        encoder_outputs = self.encoder.forward_with_attention(source_sentence)
        
    
        decoder_input, target_labels = self.prepare_target_sequence(target_tokens)
    
        logits = self.decoder(decoder_input, encoder_outputs)
        
    
        loss = self.criterion(logits.view(-1, logits.size(-1)), target_labels.view(-1))
        
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(
            list(self.encoder.parameters()) + list(self.decoder.parameters()), 
            max_norm=1.0
        )
        
        self.optimizer.step()
        
        return loss.item()
    
    def train_epoch(self, train_data: List[Dict], script_type='roman') -> float:
        total_loss = 0.0
        num_batches = 0
    
        shuffled_data = train_data.copy()
        random.shuffle(shuffled_data)
        
        for sample in shuffled_data:
            source = sample['source']
            target_key = f'target_{script_type}'
            
            if target_key in sample:
                target = sample[target_key]
                loss = self.train_step(source, target)
                total_loss += loss
                num_batches += 1
        
        avg_loss = total_loss / num_batches if num_batches > 0 else 0.0
        self.loss_history.append(avg_loss)
        
        self.scheduler.step()
        
        return avg_loss
    
    def generate_translation(self, source_sentence: str, max_length: int = 15) -> List[str]:
        """Generate translation using greedy decoding"""
        self.encoder.eval()
        self.decoder.eval()
        
        with torch.no_grad():
            encoder_outputs = self.encoder.forward_with_attention(source_sentence)
            current_tokens = [self.reverse_vocab['<BOS>']]
            generated_tokens = []
            
            for _ in range(max_length):
                input_tensor = torch.tensor(current_tokens, dtype=torch.long)
                logits = self.decoder(input_tensor, encoder_outputs)
                next_token_id = torch.argmax(logits[-1]).item()
                next_token = self.vocab_dict[next_token_id]
                if next_token == '<EOS>':
                    break
                if next_token not in {'<BOS>', '<PAD>', '<UNK>'}:
                    generated_tokens.append(next_token)
                current_tokens.append(next_token_id)
            
            return generated_tokens


class EnglishHindiTranslationEvaluator:
    def __init__(self, vocab_dict: Dict[int, str]):
        self.vocab_dict = vocab_dict
        self.reverse_vocab = {v: k for k, v in vocab_dict.items()}
        self.special_tokens = {'<BOS>', '<EOS>', '<PAD>', '<UNK>'}
    
    def calculate_bleu_metrics(self, predicted_tokens: List[str], 
                              reference_tokens: List[str]) -> Dict[str, float]:
        if not predicted_tokens or not reference_tokens:
            return {'bleu': 0.0, 'bleu_1': 0.0, 'bleu_2': 0.0, 'bleu_4': 0.0}
        
        predicted_sentence = ' '.join(predicted_tokens)
        reference_sentence = ' '.join(reference_tokens)
        
        bleu = sacrebleu.sentence_bleu(predicted_sentence, [reference_sentence])

        try:
            bleu_1 = sacrebleu.sentence_bleu(predicted_sentence, [reference_sentence], 
                                           max_ngram_order=1)
            bleu_2 = sacrebleu.sentence_bleu(predicted_sentence, [reference_sentence], 
                                           max_ngram_order=2)
            bleu_4 = sacrebleu.sentence_bleu(predicted_sentence, [reference_sentence], 
                                           max_ngram_order=4)
        except:
            bleu_1 = bleu_2 = bleu_4 = bleu
        
        return {
            'bleu': bleu.score,
            'bleu_1': bleu_1.score,
            'bleu_2': bleu_2.score,
            'bleu_4': bleu_4.score,
            'brevity_penalty': bleu.bp,
            'length_ratio': len(predicted_tokens) / len(reference_tokens) if reference_tokens else 0
        }
    
    def evaluate_model(self, trainer, test_data: List[Dict], script_type='roman') -> Dict:
        """Evaluate model on test data"""
        predictions = []
        references = []
        individual_scores = []
        
        target_key = f'target_{script_type}'
        
        for sample in test_data:
            source = sample['source']
            reference = sample[target_key]
            prediction = trainer.generate_translation(source)
            bleu_scores = self.calculate_bleu_metrics(prediction, reference)
            individual_scores.append(bleu_scores)
            
            predictions.append(prediction)
            references.append(reference)
        
        # Calculate average sentence BLEU
        avg_bleu = np.mean([s['bleu'] for s in individual_scores])
        
        pred_sentences = [' '.join(tokens) for tokens in predictions]
        ref_sentences = [' '.join(tokens) for tokens in references]
        
        if pred_sentences and ref_sentences:
            corpus_bleu = sacrebleu.corpus_bleu(pred_sentences, [ref_sentences])
        else:
            corpus_bleu = sacrebleu.BLEU.compute_bleu([],[])
        
        avg_bleu_1 = np.mean([s['bleu_1'] for s in individual_scores])
        avg_bleu_2 = np.mean([s['bleu_2'] for s in individual_scores])
        avg_bleu_4 = np.mean([s['bleu_4'] for s in individual_scores])
        
        return {
            'corpus_bleu': corpus_bleu.score,
            'avg_sentence_bleu': avg_bleu,  # Fixed: this variable is now properly defined
            'avg_bleu_1': avg_bleu_1,
            'avg_bleu_2': avg_bleu_2,
            'avg_bleu_4': avg_bleu_4,
            'individual_scores': individual_scores,
            'predictions': predictions,
            'references': references
        }


def create_english_hindi_vocab():
    vocab_list = [
    
        '<PAD>', '<BOS>', '<EOS>', '<UNK>',
        'i', 'want', 'to', 'eat', 'rice', 'and', 'dal', 'hello', 'how', 'are', 'you', 
        'good', 'morning', 'what', 'is', 'your', 'name', 'thank', 'very', 'much', 
        'the', 'book', 'water', 'please', 'where', 'go',
    
        'मैं', 'चावल', 'और', 'दाल', 'खाना', 'चाहता', 'हूं', 'नमस्ते', 'आप', 'कैसे',
        'हैं', 'सुप्रभात', 'आपका', 'नाम', 'क्या', 'है', 'बहुत', 'धन्यवाद', 'पुस्तक', 
        'पानी', 'कृपया', 'कहां', 'जाना',
    
        'main', 'chawal', 'aur', 'daal', 'khaana', 'chahta', 'hun', 'namaste', 'aap', 
        'kaise', 'hain', 'suprabhat', 'aapka', 'naam', 'kya', 'hai', 'bahut', 
        'dhanyawad', 'pustak', 'paani', 'kripaya', 'kahan', 'jaana',
        '.', ',', '?', '!'
    ]
    

    vocab_dict = {i: token for i, token in enumerate(vocab_list)}
    
    print(f"Created vocabulary with {len(vocab_dict)} tokens (IDs 0-{len(vocab_dict)-1})")
    return vocab_dict


def create_english_hindi_training_data():
    training_data = [
        {
            'source': 'I want to eat rice and dal.',
            'target_devanagari': ['मैं', 'चावल', 'और', 'दाल', 'खाना', 'चाहता', 'हूं'],
            'target_roman': ['main', 'chawal', 'aur', 'daal', 'khaana', 'chahta', 'hun']
        },
        {
            'source': 'Hello, how are you?',
            'target_devanagari': ['नमस्ते', 'आप', 'कैसे', 'हैं'],
            'target_roman': ['namaste', 'aap', 'kaise', 'hain']
        },
        {
            'source': 'Good morning.',
            'target_devanagari': ['सुप्रभात'],
            'target_roman': ['suprabhat']
        },
        {
            'source': 'What is your name?',
            'target_devanagari': ['आपका', 'नाम', 'क्या', 'है'],
            'target_roman': ['aapka', 'naam', 'kya', 'hai']
        },
        {
            'source': 'Thank you very much.',
            'target_devanagari': ['बहुत', 'धन्यवाद'],
            'target_roman': ['bahut', 'dhanyawad']
        },
        {
            'source': 'I want water please.',
            'target_devanagari': ['मैं', 'पानी', 'चाहता', 'हूं', 'कृपया'],
            'target_roman': ['main', 'paani', 'chahta', 'hun', 'kripaya']
        },
        {
            'source': 'Where is the book?',
            'target_devanagari': ['पुस्तक', 'कहां', 'है'],
            'target_roman': ['pustak', 'kahan', 'hai']
        },
    ]
    return training_data


def validate_training_data_vocab(training_data, vocab_dict):
    """Validate that all tokens in training data exist in vocabulary"""
    reverse_vocab = {v: k for k, v in vocab_dict.items()}
    missing_tokens = set()
    
    for sample in training_data:
        for target_type in ['target_roman', 'target_devanagari']:
            if target_type in sample:
                for token in sample[target_type]:
                    if token not in reverse_vocab:
                        missing_tokens.add(token)
    
    if missing_tokens:
        print("Missing tokens in vocabulary:")
        for token in sorted(missing_tokens):
            print(f"  '{token}'")
        return False
    return True


def train_and_evaluate_model():
    print("=== English-Hindi Translation Model Training ===\n")
    vocab_dict = create_english_hindi_vocab()
    training_data = create_english_hindi_training_data()
    
    print(f"Vocabulary size: {len(vocab_dict)}")
    print(f"Training samples: {len(training_data)}")

    if not validate_training_data_vocab(training_data, vocab_dict):
        print("ERROR")
        return None, None, None
    else:
        print("training tokens found in vocabulary")
    
    
    encoder = SentenceEmbedderWithAttention(sem_dim=12, model_dim=12, num_heads=3)
    decoder = SimpleDecoder(embed_dim=24, vocab_size=len(vocab_dict), num_heads=3)
    
    trainer = TranslationTrainer(encoder, decoder, vocab_dict, learning_rate=0.002)
    evaluator = EnglishHindiTranslationEvaluator(vocab_dict)

    num_epochs = 200
    script_type = 'roman' 
    
    print(f"\nTraining for {num_epochs} epochs on {script_type} script...")
    print("-" * 60)
    
    try:
        for epoch in range(num_epochs):
            avg_loss = trainer.train_epoch(training_data, script_type)

            if (epoch + 1) % 5 == 0 or epoch == 0:
                results = evaluator.evaluate_model(trainer, training_data, script_type)
                trainer.bleu_history.append(results['avg_sentence_bleu'])
                
                print(f"Epoch {epoch+1:2d}: Loss = {avg_loss:.4f}, BLEU = {results['avg_sentence_bleu']:.2f}")
            else:
                print(f"Epoch {epoch+1:2d}: Loss = {avg_loss:.4f}")
                
    except Exception as e:
        print(f"Training failed with error: {e}")
        return None, None, None
    
    print("\n" + "--"*60)
    
    print("\n=== Translation Examples ===")
    test_sentences = [
        "Hello, how are you?",
        "I want to eat rice and dal.",
        "Good morning.",
        "What is your name?",
        "Thank you very much."
    ]
    
    for sentence in test_sentences:
        try:
            prediction = trainer.generate_translation(sentence)
            print(f"english-- {sentence}")
            print(f"hindi-- {' '.join(prediction)}")
            print("-" * 40)
        except Exception as e:
            print(f"Translation failed for '{sentence}': {e}")
    
    try:
        final_results = evaluator.evaluate_model(trainer, training_data, script_type)
        
        print("\n=== Performance Metrics ===")
        print(f"Corpus BLEU Score: {final_results['corpus_bleu']:.2f}")
        print(f"Average Sentence BLEU: {final_results['avg_sentence_bleu']:.2f}")
        print(f"Average BLEU-1: {final_results['avg_bleu_1']:.2f}")
        print(f"Average BLEU-2: {final_results['avg_bleu_2']:.2f}")
        print(f"Average BLEU-4: {final_results['avg_bleu_4']:.2f}")
        
        print(f"\nLoss reduction: {trainer.loss_history[0]:.4f} → {trainer.loss_history[-1]:.4f}")
        if len(trainer.bleu_history) > 1:
            print(f"BLEU improvement: {trainer.bleu_history[0]:.2f} → {trainer.bleu_history[-1]:.2f}")
        
        return trainer, evaluator, final_results
        
    except Exception as e:
        print(f"Final evaluation failed: {e}")
        return trainer, evaluator, None

if __name__ == "__main__":
    torch.manual_seed(42)
    np.random.seed(42)
    random.seed(42)
    
    trained_model, evaluator, results = train_and_evaluate_model()
    
    print("\n=== Model Trained ===")
    print("You can now use trained_model.generate_translation() to translate new sentences!")

=== English-Hindi Translation Model Training ===

Created vocabulary with 80 tokens (IDs 0-79)
Vocabulary size: 80
Training samples: 7
training tokens found in vocabulary

Training for 200 epochs on roman script...
------------------------------------------------------------
Epoch  1: Loss = 4.6326, BLEU = 0.75
Epoch  2: Loss = 3.9764
Epoch  3: Loss = 3.5491
Epoch  4: Loss = 3.2766
Epoch  5: Loss = 3.0195, BLEU = 0.00
Epoch  6: Loss = 2.7538
Epoch  7: Loss = 2.5040
Epoch  8: Loss = 2.2531
Epoch  9: Loss = 2.0343
Epoch 10: Loss = 1.8258, BLEU = 17.51
Epoch 11: Loss = 1.6321
Epoch 12: Loss = 1.4783
Epoch 13: Loss = 1.3254
Epoch 14: Loss = 1.2099
Epoch 15: Loss = 1.0918, BLEU = 30.96
Epoch 16: Loss = 0.9950
Epoch 17: Loss = 0.9157
Epoch 18: Loss = 0.8356
Epoch 19: Loss = 0.7713
Epoch 20: Loss = 0.7207, BLEU = 42.86
Epoch 21: Loss = 0.6638
Epoch 22: Loss = 0.6217
Epoch 23: Loss = 0.5841
Epoch 24: Loss = 0.5489
Epoch 25: Loss = 0.5278, BLEU = 42.86
Epoch 26: Loss = 0.4917
Epoch 27: Loss = 0

In [5]:
trained_model.generate_translation('Hello, how are you?')

['namaste', 'aap', 'kaise', 'hain']