In [2]:
import re
import math
from collections import defaultdict, Counter
from typing import List, Tuple, Dict
import logging

class NGramModel:
    """
    N-gram語言模型實現
    支援 n=2 (bigram) 和 n=3 (trigram)
    """
    
    def __init__(self, n: int = 2):
        """
        初始化N-gram模型
        
        Args:
            n (int): n-gram的階數 (2 for bigram, 3 for trigram)
        """
        self.n = n
        self.ngram_counts = defaultdict(int)  # n-gram計數
        self.context_counts = defaultdict(int)  # (n-1)-gram計數
        self.vocabulary = set()  # 詞彙表
        self.total_words = 0
        
        # 特殊符號
        self.start_token = "<s>"
        self.end_token = "</s>"
        self.unk_token = "<unk>"
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def preprocess_text(self, text: str) -> List[str]:
        """
        文本預處理
        
        Args:
            text (str): 原始文本
            
        Returns:
            List[str]: 處理後的詞列表
        """
        # 轉小寫，保留字母、數字和基本標點
        text = text.lower()
        # 用正則表達式分詞
        words = re.findall(r'\b\w+\b|[.!?]', text)
        return words
    
    def add_sentence_markers(self, words: List[str]) -> List[str]:
        """
        為句子添加開始和結束標記
        
        Args:
            words (List[str]): 詞列表
            
        Returns:
            List[str]: 添加標記後的詞列表
        """
        # 根據n-gram階數添加適當數量的開始標記
        start_markers = [self.start_token] * (self.n - 1)
        return start_markers + words + [self.end_token]
    
    def get_ngrams(self, words: List[str]) -> List[Tuple[str, ...]]:
        """
        從詞列表生成n-gram
        
        Args:
            words (List[str]): 詞列表
            
        Returns:
            List[Tuple[str, ...]]: n-gram列表
        """
        ngrams = []
        for i in range(len(words) - self.n + 1):
            ngram = tuple(words[i:i + self.n])
            ngrams.append(ngram)
        return ngrams
    
    def get_contexts(self, words: List[str]) -> List[Tuple[str, ...]]:
        """
        從詞列表生成context (n-1)-gram
        
        Args:
            words (List[str]): 詞列表
            
        Returns:
            List[Tuple[str, ...]]: context列表
        """
        contexts = []
        for i in range(len(words) - self.n + 1):
            context = tuple(words[i:i + self.n - 1])
            contexts.append(context)
        return contexts
    
    def train(self, train_file: str):
        """
        訓練N-gram模型
        
        Args:
            train_file (str): 訓練文件路徑
        """
        self.logger.info(f"Start training {self.n}-gram model...")
        
        line_count = 0
        with open(train_file, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                
                # 預處理文本
                words = self.preprocess_text(line)
                if len(words) == 0:
                    continue
                
                # 添加句子標記
                words_with_markers = self.add_sentence_markers(words)
                
                # 更新詞彙表
                self.vocabulary.update(words)
                self.total_words += len(words)
                
                # 生成n-gram和context
                ngrams = self.get_ngrams(words_with_markers)
                contexts = self.get_contexts(words_with_markers)
                
                # 更新計數
                for ngram in ngrams:
                    self.ngram_counts[ngram] += 1
                
                for context in contexts:
                    self.context_counts[context] += 1
                
                line_count += 1
                if line_count % 10000 == 0:
                    self.logger.info(f"Already processed {line_count} lines...")

        self.logger.info(f"Training completed!")
        self.logger.info(f"Total words: {self.total_words}")
        self.logger.info(f"Vocabulary size: {len(self.vocabulary)}")
        self.logger.info(f"N-gram total: {len(self.ngram_counts)}")
        self.logger.info(f"Context total: {len(self.context_counts)}")

    def get_probability(self, ngram: Tuple[str, ...]) -> float:
        """
        計算n-gram的條件概率
        使用最大似然估計 (MLE)
        
        Args:
            ngram (Tuple[str, ...]): n-gram
            
        Returns:
            float: 條件概率
        """
        if len(ngram) != self.n:
            raise ValueError(f"N-gram長度應為 {self.n}")
        
        # 取得context
        context = ngram[:-1]
        
        # 處理未見過的context
        if self.context_counts[context] == 0:
            return 1e-10  # 平滑處理，避免概率為0
        
        # P(w_n | w_1, ..., w_{n-1}) = Count(w_1, ..., w_n) / Count(w_1, ..., w_{n-1})
        return self.ngram_counts[ngram] / self.context_counts[context]
    
    def get_sentence_probability(self, sentence: str) -> float:
        """
        計算句子的概率
        
        Args:
            sentence (str): 句子
            
        Returns:
            float: 句子概率的對數值
        """
        words = self.preprocess_text(sentence)
        if len(words) == 0:
            return float('-inf')
        
        words_with_markers = self.add_sentence_markers(words)
        ngrams = self.get_ngrams(words_with_markers)
        
        log_prob = 0.0
        for ngram in ngrams:
            prob = self.get_probability(ngram)
            if prob > 0:
                log_prob += math.log(prob)
            else:
                log_prob += math.log(1e-10)  # 平滑處理
        
        return log_prob
    
    def calculate_perplexity(self, test_file: str) -> float:
        """
        計算測試集的困惑度 (perplexity)
        
        Args:
            test_file (str): 測試文件路徑
            
        Returns:
            float: 困惑度值
        """
        self.logger.info(f"Calculating {self.n}-gram model perplexity...")
        
        total_log_prob = 0.0
        total_words = 0
        line_count = 0
        
        with open(test_file, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                
                words = self.preprocess_text(line)
                if len(words) == 0:
                    continue
                
                # 計算句子概率
                log_prob = self.get_sentence_probability(line)
                total_log_prob += log_prob
                total_words += len(words)
                
        
        # 計算困惑度: PP = exp(-1/N * sum(log P(sentence)))
        avg_log_prob = total_log_prob / total_words
        perplexity = math.exp(-avg_log_prob)
        
        self.logger.info(f"測試集總詞數: {total_words}")
        self.logger.info(f"平均對數概率: {avg_log_prob:.6f}")
        self.logger.info(f"{self.n}-gram 困惑度: {perplexity:.2f}")
        
        return perplexity
    
    def generate_text(self, context: Tuple[str, ...], max_length: int = 20) -> str:
        """
        基於給定context生成文本
        
        Args:
            context (Tuple[str, ...]): 初始context
            max_length (int): 最大生成長度
            
        Returns:
            str: 生成的文本
        """
        if len(context) != self.n - 1:
            raise ValueError(f"Context長度應為 {self.n - 1}")
        
        result = list(context)
        current_context = context
        
        for _ in range(max_length):
            # 找到所有以current_context開頭的n-gram
            candidates = []
            for ngram, count in self.ngram_counts.items():
                if ngram[:-1] == current_context:
                    candidates.extend([ngram[-1]] * count)
            
            if not candidates:
                break
            
            # 隨機選擇下一個詞
            import random
            next_word = random.choice(candidates)
            
            if next_word == self.end_token:
                break
            
            result.append(next_word)
            # 更新context
            current_context = tuple(result[-(self.n-1):])
        
        # 移除開始標記
        filtered_result = [word for word in result if word != self.start_token]
        return ' '.join(filtered_result)

In [None]:
"""
N-gram語言模型訓練和測試程式
使用 train.txt 訓練模型，在 test.txt 上評估性能
比較 n=2 (bigram) 和 n=3 (trigram) 的表現
"""

import time
import os
from ngram_model import NGramModel

def main():
    
    train_file = "train.txt"
    test_file = "test.txt"
    
    print("=" * 60)
    print("N-gram model Training and Evaluation")
    print("=" * 60)
    
    results = {}
    trained_models = {}  # Store trained models for text generation
    
    for n in [2, 3]:
        print(f"\n{'='*20} N={n} ({'Bigram' if n==2 else 'Trigram'}) {'='*20}")
        
        start_time = time.time()

        model = NGramModel(n=n)

        print(f"Train {n}-gram model...")
        model.train(train_file)
        
        training_time = time.time() - start_time
        print(f"Training time: {training_time:.2f} seconds")

        # Calculate perplexity
        start_time = time.time()
        perplexity = model.calculate_perplexity(test_file)
        test_time = time.time() - start_time

        print(f"Testing time: {test_time:.2f} seconds")
        
        # save results and model
        results[n] = {
            'perplexity': perplexity,
            'training_time': training_time,
            'test_time': test_time,
            'vocab_size': len(model.vocabulary),
            'total_words': model.total_words,
            'ngram_types': len(model.ngram_counts),
            'context_types': len(model.context_counts)
        }
        trained_models[n] = model  # Store the trained model

        print(f"\n{n}-gram model text generation examples:")
        try:
            if n == 2:
                contexts = [("add",), ("cook",), ("bake",)]
            else:
                contexts = [("add", "the"), ("cook", "for"), ("bake", "at")]
            
            for context in contexts:
                generated = model.generate_text(context, max_length=15)
                print(f"  Context: {' '.join(context)} -> {generated}")
        except Exception as e:
            print(f"  Error occurred during text generation: {e}")
        
        # Test with incomplete.txt for text completion
        if os.path.exists("incomplete.txt"):
            try:
                with open("incomplete.txt", "r", encoding="utf-8") as f:
                    incomplete_lines = [line.strip() for line in f if line.strip()]
                
                for i, incomplete_text in enumerate(incomplete_lines):
                    words = model.preprocess_text(incomplete_text)
                    if len(words) == 0:
                        continue
                    
                    # Create context based on model type
                    if n == 2:
                            continue
                    else:  # n == 3
                        print(f"\n{n}-gram model incomplete text completion:")
                        if len(words) >= 2:
                            context = (words[-2], words[-1])  # Use last 2 words as context
                        elif len(words) == 1:
                            context = ("<s>", words[-1])  # Pad with start token
                        else:
                            continue
                    
                    # Generate completion
                    completion = model.generate_text(context, max_length=20)
                    # Remove the context words from completion to show only new words
                    context_str = ' '.join(context)
                    if completion.startswith(context_str):
                        new_words = completion[len(context_str):].strip()
                        if new_words:
                            full_completion = incomplete_text + " " + new_words
                        else:
                            full_completion = incomplete_text + " [no completion]"
                    else:
                        full_completion = incomplete_text + " " + completion
                    
                    print(f"  '{incomplete_text}' -> '{full_completion}'")
                    
            except Exception as e:
                print(f"  Error processing incomplete.txt: {e}")
        else:
            print("  incomplete.txt not found, skipping completion test")

    # Results comparison
    print("\n" + "=" * 60)
    print("Results Comparison")
    print("=" * 60)

    print(f"{'Metric':<20} {'Bigram (n=2)':<15} {'Trigram (n=3)':<15} {'Difference':<15}")
    print("-" * 65)
    
    bigram_pp = results[2]['perplexity']
    trigram_pp = results[3]['perplexity']
    pp_diff = ((trigram_pp - bigram_pp) / bigram_pp) * 100

    print(f"{'Perplexity':<20} {bigram_pp:<15.2f} {trigram_pp:<15.2f} {pp_diff:+.2f}%")

    bigram_time = results[2]['training_time']
    trigram_time = results[3]['training_time']
    time_diff = ((trigram_time - bigram_time) / bigram_time) * 100

    print(f"{'Training time':<20} {bigram_time:<15.2f} {trigram_time:<15.2f} {time_diff:+.2f}%")
    
    bigram_ngrams = results[2]['ngram_types']  
    trigram_ngrams = results[3]['ngram_types']
    ngram_diff = ((trigram_ngrams - bigram_ngrams) / bigram_ngrams) * 100

    print(f"{'N-gram types':<20} {bigram_ngrams:<15,} {trigram_ngrams:<15,} {ngram_diff:+.2f}%")

    print(f"{'Vocabulary size':<20} {results[2]['vocab_size']:<15,} {results[3]['vocab_size']:<15,} {'Same':<15}")
    print(f"{'Total words':<20} {results[2]['total_words']:<15,} {results[3]['total_words']:<15,} {'Same':<15}")

    # Save detailed results to a file
    with open("ngram_results.txt", "w", encoding="utf-8") as f:
        f.write("N-gram Language Model Evaluation Results\n")
        f.write("=" * 40 + "\n\n")
        
        for n in [2, 3]:
            model_name = "Bigram" if n == 2 else "Trigram"
            f.write(f"{model_name} (n={n}) Results:\n")
            f.write(f"  Perplexity: {results[n]['perplexity']:.2f}\n")
            f.write(f"  Training time: {results[n]['training_time']:.2f} seconds\n")
            f.write(f"  Testing time: {results[n]['test_time']:.2f} seconds\n")
            f.write(f"  Vocabulary size: {results[n]['vocab_size']:,}\n")
            f.write(f"  Total words: {results[n]['total_words']:,}\n")
            f.write(f"  N-gram types: {results[n]['ngram_types']:,}\n")
            f.write(f"  Context types: {results[n]['context_types']:,}\n")
            f.write("\n")

        f.write("Comparison Results:\n")
        f.write(f"  Perplexity difference: {pp_diff:+.2f}%\n")
        f.write(f"  Training time difference: {time_diff:+.2f}%\n")
        f.write(f"  N-gram types difference: {ngram_diff:+.2f}%\n")
        
        # Save text generation examples using trained models
        f.write("\nText Generation Examples:\n")
        for n in [2, 3]:
            model_name = "Bigram" if n == 2 else "Trigram"
            f.write(f"\n{model_name} Generation Examples:\n")
            
            model = trained_models[n]  # Use already trained model
            
            # Basic examples
            if n == 2:
                contexts = [("add",), ("cook",), ("bake",)]
            else:
                contexts = [("add", "the"), ("cook", "for"), ("bake", "at")]
            
            for context in contexts:
                try:
                    generated = model.generate_text(context, max_length=15)
                    f.write(f"  Context: {' '.join(context)} -> {generated}\n")
                except Exception as e:
                    f.write(f"  Context: {' '.join(context)} -> Error: {e}\n")
            
            # Incomplete text completion examples
            if os.path.exists("incomplete.txt"):
                f.write(f"\n{model_name} Incomplete Text Completions:\n")
                try:
                    with open("incomplete.txt", "r", encoding="utf-8") as inc_f:
                        incomplete_lines = [line.strip() for line in inc_f if line.strip()]
                    
                    for incomplete_text in incomplete_lines[:8]:
                        words = model.preprocess_text(incomplete_text)
                        if len(words) == 0:
                            continue
                        
                        if n == 2:
                            if len(words) >= 1:
                                context = (words[-1],)
                            else:
                                continue
                        else:
                            if len(words) >= 2:
                                context = (words[-2], words[-1])
                            elif len(words) == 1:
                                context = ("<s>", words[-1])
                            else:
                                continue
                        
                        try:
                            completion = model.generate_text(context, max_length=8)
                            context_str = ' '.join(context)
                            if completion.startswith(context_str):
                                new_words = completion[len(context_str):].strip()
                                if new_words:
                                    full_completion = incomplete_text + " " + new_words
                                else:
                                    full_completion = incomplete_text + " [no completion]"
                            else:
                                full_completion = incomplete_text + " " + completion
                            f.write(f"  '{incomplete_text}' -> '{full_completion}'\n")
                        except Exception as e:
                            f.write(f"  '{incomplete_text}' -> Error: {e}\n")
                except Exception as e:
                    f.write(f"  Error processing incomplete.txt: {e}\n")

    print(f"\nDetailed results have been saved to ngram_results.txt")
    print("Program execution completed!")

if __name__ == "__main__":
    main()

In [None]:
#!/usr/bin/env python3
"""
RNN Language Model Implementation
使用 PyTorch 實現循環神經網路語言模型
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import re
from collections import Counter, defaultdict
import logging
import time
from typing import List, Tuple, Dict
import os

# 設置日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class Vocabulary:
    """詞彙表類別，處理詞彙到索引的轉換"""
    
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.word_count = Counter()
        
        # 特殊標記
        self.pad_token = '<PAD>'
        self.unk_token = '<UNK>'
        self.start_token = '<START>'
        self.end_token = '<END>'
        
        # 初始化特殊標記
        self.add_word(self.pad_token)
        self.add_word(self.unk_token)
        self.add_word(self.start_token)
        self.add_word(self.end_token)
        
    def add_word(self, word: str) -> int:
        """添加詞彙到詞彙表"""
        if word not in self.word2idx:
            idx = len(self.word2idx)
            self.word2idx[word] = idx
            self.idx2word[idx] = word
        self.word_count[word] += 1
        return self.word2idx[word]
    
    def build_vocab(self, texts: List[str], min_freq: int = 2):
        """建立詞彙表"""
        logger.info("Building vocabulary...")
        
        # 統計詞頻
        for text in texts:
            words = self.preprocess_text(text)
            for word in words:
                self.word_count[word] += 1
        
        # 添加高頻詞到詞彙表
        for word, count in self.word_count.items():
            if count >= min_freq and word not in self.word2idx:
                self.add_word(word)
        
        logger.info(f"Vocabulary size: {len(self.word2idx)}")
        logger.info(f"Most common words: {self.word_count.most_common(10)}")
    
    def preprocess_text(self, text: str) -> List[str]:
        """文本預處理"""
        text = text.lower().strip()
        words = re.findall(r'\b\w+\b', text)
        return words
    
    def text_to_indices(self, text: str) -> List[int]:
        """將文本轉換為索引序列"""
        words = self.preprocess_text(text)
        indices = [self.word2idx[self.start_token]]
        
        for word in words:
            if word in self.word2idx:
                indices.append(self.word2idx[word])
            else:
                indices.append(self.word2idx[self.unk_token])
        
        indices.append(self.word2idx[self.end_token])
        return indices
    
    def indices_to_text(self, indices: List[int]) -> str:
        """將索引序列轉換為文本"""
        words = []
        for idx in indices:
            if idx in self.idx2word:
                word = self.idx2word[idx]
                if word not in [self.pad_token, self.start_token, self.end_token]:
                    words.append(word)
        return ' '.join(words)
    
    def __len__(self):
        return len(self.word2idx)

class TextDataset(Dataset):
    """文本數據集類別"""
    
    def __init__(self, texts: List[str], vocab: Vocabulary, seq_length: int = 50):
        self.vocab = vocab
        self.seq_length = seq_length
        self.sequences = []
        
        self.prepare_sequences(texts)
    
    def prepare_sequences(self, texts: List[str]):
        """準備訓練序列"""
        logger.info("Preparing training sequences...")
        
        for text in texts:
            indices = self.vocab.text_to_indices(text)
            
            # 如果序列太短，跳過
            if len(indices) < 2:
                continue
            
            # 創建滑動窗口序列
            for i in range(len(indices) - 1):
                # 輸入序列和目標序列
                input_seq = indices[max(0, i - self.seq_length + 1):i + 1]
                target = indices[i + 1]
                
                # 填充到固定長度
                if len(input_seq) < self.seq_length:
                    padding = [self.vocab.word2idx[self.vocab.pad_token]] * (self.seq_length - len(input_seq))
                    input_seq = padding + input_seq
                
                self.sequences.append((input_seq, target))
        
        logger.info(f"Created {len(self.sequences)} training sequences")
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        input_seq, target = self.sequences[idx]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target, dtype=torch.long)

class RNNLanguageModel(nn.Module):
    """RNN 語言模型"""
    
    def __init__(self, vocab_size: int, embed_dim: int = 128, hidden_dim: int = 128, 
                 num_layers: int = 2, dropout: float = 0.2):
        super(RNNLanguageModel, self).__init__()
        
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # 詞嵌入層
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # RNN 層
        self.rnn = nn.RNN(embed_dim, hidden_dim, num_layers, 
                         batch_first=True, dropout=dropout if num_layers > 1 else 0)
        
        # Dropout 層
        self.dropout = nn.Dropout(dropout)
        
        # 輸出層
        self.linear = nn.Linear(hidden_dim, vocab_size)
        
        # 初始化權重
        self.init_weights()
    
    def init_weights(self):
        """初始化模型權重"""
        init_range = 0.1
        self.embedding.weight.data.uniform_(-init_range, init_range)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-init_range, init_range)
    
    def forward(self, x, hidden=None):
        """前向傳播"""
        batch_size = x.size(0)
        
        # 詞嵌入
        embedded = self.embedding(x)  # (batch_size, seq_len, embed_dim)
        
        # RNN
        rnn_out, hidden = self.rnn(embedded, hidden)  # (batch_size, seq_len, hidden_dim)
        
        # 取最後一個時間步的輸出
        last_output = rnn_out[:, -1, :]  # (batch_size, hidden_dim)
        
        # Dropout
        output = self.dropout(last_output)
        
        # 線性層
        output = self.linear(output)  # (batch_size, vocab_size)
        
        return output, hidden
    
    def init_hidden(self, batch_size, device):
        """初始化隱藏狀態"""
        return torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)

class RNNTrainer:
    """RNN 訓練器"""
    
    def __init__(self, model, vocab, device):
        self.model = model
        self.vocab = vocab
        self.device = device
        
        # 損失函數和優化器
        self.criterion = nn.CrossEntropyLoss(ignore_index=vocab.word2idx[vocab.pad_token])
        self.optimizer = optim.Adam(model.parameters(), lr=0.001)
        
    def train_epoch(self, dataloader):
        """訓練一個 epoch"""
        self.model.train()
        total_loss = 0
        total_samples = 0
        
        for batch_idx, (data, targets) in enumerate(dataloader):
            data, targets = data.to(self.device), targets.to(self.device)
            batch_size = data.size(0)
            
            # 初始化隱藏狀態
            hidden = self.model.init_hidden(batch_size, self.device)
            
            # 前向傳播
            outputs, _ = self.model(data, hidden)
            loss = self.criterion(outputs, targets)
            
            # 反向傳播
            self.optimizer.zero_grad()
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=5.0)
            
            self.optimizer.step()
            
            total_loss += loss.item() * batch_size
            total_samples += batch_size
            
            if batch_idx % 100 == 0:
                logger.info(f'Batch {batch_idx}, Loss: {loss.item():.4f}')
        
        return total_loss / total_samples
    
    def evaluate(self, dataloader):
        """評估模型"""
        self.model.eval()
        total_loss = 0
        total_samples = 0
        
        with torch.no_grad():
            for data, targets in dataloader:
                data, targets = data.to(self.device), targets.to(self.device)
                batch_size = data.size(0)
                
                hidden = self.model.init_hidden(batch_size, self.device)
                outputs, _ = self.model(data, hidden)
                loss = self.criterion(outputs, targets)
                
                total_loss += loss.item() * batch_size
                total_samples += batch_size
        
        return total_loss / total_samples
    
    def generate_text(self, start_text: str, max_length: int = 50, temperature: float = 1.0):
        """生成文本"""
        self.model.eval()
        
        # 預處理起始文本
        indices = self.vocab.text_to_indices(start_text)
        if len(indices) == 0:
            indices = [self.vocab.word2idx[self.vocab.start_token]]
        
        generated = indices.copy()
        
        with torch.no_grad():
            for _ in range(max_length):
                # 準備輸入序列
                input_seq = generated[-50:]  # 取最後50個詞作為上下文
                if len(input_seq) < 50:
                    padding = [self.vocab.word2idx[self.vocab.pad_token]] * (50 - len(input_seq))
                    input_seq = padding + input_seq
                
                input_tensor = torch.tensor([input_seq], dtype=torch.long).to(self.device)
                hidden = self.model.init_hidden(1, self.device)
                
                # 預測下一個詞
                outputs, _ = self.model(input_tensor, hidden)
                outputs = outputs / temperature
                probabilities = torch.softmax(outputs, dim=-1)
                
                # 隨機採樣
                next_word_idx = torch.multinomial(probabilities, 1).item()
                
                # 如果生成結束標記，停止生成
                if next_word_idx == self.vocab.word2idx[self.vocab.end_token]:
                    break
                
                generated.append(next_word_idx)
        
        return self.vocab.indices_to_text(generated)

def load_data(file_path: str) -> List[str]:
    """載入訓練數據"""
    texts = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if line:
                    texts.append(line)
    except FileNotFoundError:
        logger.error(f"File {file_path} not found!")
        return []
    
    logger.info(f"Loaded {len(texts)} texts from {file_path}")
    return texts

def test_incomplete_sentences(trainer, incomplete_file: str):
    """測試不完整句子的補全"""
    logger.info("Testing incomplete sentence completion...")
    
    try:
        with open(incomplete_file, 'r', encoding='utf-8') as f:
            incomplete_texts = [line.strip() for line in f if line.strip()]
    except FileNotFoundError:
        logger.error(f"File {incomplete_file} not found!")
        return
    
    print("\n" + "="*60)
    print("RNN 模型文本補全結果")
    print("="*60)
    
    for incomplete_text in incomplete_texts[:10]:  # 測試前10個
        completed = trainer.generate_text(incomplete_text, max_length=20, temperature=0.8)
        print(f"輸入: {incomplete_text}")
        print(f"補全: {completed}")
        print("-" * 50)

def main():

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info(f"Using device: {device}")
    
    # Hyperparameters
    HIDDEN_DIM = 128
    NUM_LAYERS = 2
    LEARNING_RATE = 0.001
    NUM_EPOCHS = 10
    BATCH_SIZE = 32
    SEQ_LENGTH = 50
    
    # Load data
    train_texts = load_data('train.txt')
    if not train_texts:
        return

    # Build vocabulary
    vocab = Vocabulary()
    vocab.build_vocab(train_texts, min_freq=3)

    # Create dataset
    train_dataset = TextDataset(train_texts, vocab, SEQ_LENGTH)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    
    model = RNNLanguageModel(
        vocab_size=len(vocab),
        embed_dim=HIDDEN_DIM,
        hidden_dim=HIDDEN_DIM,
        num_layers=NUM_LAYERS
    ).to(device)
    
    logger.info(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
    
    # 創建訓練器
    trainer = RNNTrainer(model, vocab, device)
    
    # 訓練模型
    logger.info("Starting training...")
    for epoch in range(NUM_EPOCHS):
        start_time = time.time()
        
        train_loss = trainer.train_epoch(train_dataloader)
        
        epoch_time = time.time() - start_time
        logger.info(f'Epoch {epoch+1}/{NUM_EPOCHS}, Train Loss: {train_loss:.4f}, Time: {epoch_time:.2f}s')
        
        # 每幾個 epoch 生成一些文本
        if (epoch + 1) % 3 == 0:
            print(f"\nEpoch {epoch+1} 生成範例:")
            sample_text = trainer.generate_text("add salt", max_length=15)
            print(f"Generated: {sample_text}")
    
    # 保存模型
    torch.save({
        'model_state_dict': model.state_dict(),
        'vocab': vocab,
        'hyperparameters': {
            'vocab_size': len(vocab),
            'embed_dim': HIDDEN_DIM,
            'hidden_dim': HIDDEN_DIM,
            'num_layers': NUM_LAYERS
        }
    }, 'rnn_model.pth')
    
    logger.info("Model saved to rnn_model.pth")
    
    # 測試不完整句子補全
    test_incomplete_sentences(trainer, 'incomplete.txt')
    
    # 生成一些範例文本
    print("\n" + "="*60)
    print("RNN 模型文本生成範例")
    print("="*60)
    
    test_prompts = ["add", "cook", "bake", "mix"]
    for prompt in test_prompts:
        generated = trainer.generate_text(prompt, max_length=20)
        print(f"起始詞: {prompt}")
        print(f"生成文本: {generated}")
        print("-" * 50)

if __name__ == "__main__":
    main()

In [None]:
#!/usr/bin/env python3
"""
LSTM Language Model Implementation
使用 PyTorch 實現長短期記憶網路語言模型
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import re
from collections import Counter, defaultdict
import logging
import time
from typing import List, Tuple, Dict
import os

# 設置日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class Vocabulary:
    """詞彙表類別，處理詞彙到索引的轉換"""
    
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.word_count = Counter()
        
        # 特殊標記
        self.pad_token = '<PAD>'
        self.unk_token = '<UNK>'
        self.start_token = '<START>'
        self.end_token = '<END>'
        
        # 初始化特殊標記
        self.add_word(self.pad_token)
        self.add_word(self.unk_token)
        self.add_word(self.start_token)
        self.add_word(self.end_token)
        
    def add_word(self, word: str) -> int:
        """添加詞彙到詞彙表"""
        if word not in self.word2idx:
            idx = len(self.word2idx)
            self.word2idx[word] = idx
            self.idx2word[idx] = word
        self.word_count[word] += 1
        return self.word2idx[word]
    
    def build_vocab(self, texts: List[str], min_freq: int = 2):
        """建立詞彙表"""
        logger.info("Building vocabulary...")
        
        # 統計詞頻
        for text in texts:
            words = self.preprocess_text(text)
            for word in words:
                self.word_count[word] += 1
        
        # 添加高頻詞到詞彙表
        for word, count in self.word_count.items():
            if count >= min_freq and word not in self.word2idx:
                self.add_word(word)
        
        logger.info(f"Vocabulary size: {len(self.word2idx)}")
        logger.info(f"Most common words: {self.word_count.most_common(10)}")
    
    def preprocess_text(self, text: str) -> List[str]:
        """文本預處理"""
        text = text.lower().strip()
        words = re.findall(r'\b\w+\b', text)
        return words
    
    def text_to_indices(self, text: str) -> List[int]:
        """將文本轉換為索引序列"""
        words = self.preprocess_text(text)
        indices = [self.word2idx[self.start_token]]
        
        for word in words:
            if word in self.word2idx:
                indices.append(self.word2idx[word])
            else:
                indices.append(self.word2idx[self.unk_token])
        
        indices.append(self.word2idx[self.end_token])
        return indices
    
    def indices_to_text(self, indices: List[int]) -> str:
        """將索引序列轉換為文本"""
        words = []
        for idx in indices:
            if idx in self.idx2word:
                word = self.idx2word[idx]
                if word not in [self.pad_token, self.start_token, self.end_token]:
                    words.append(word)
        return ' '.join(words)
    
    def __len__(self):
        return len(self.word2idx)

class TextDataset(Dataset):
    """文本數據集類別"""
    
    def __init__(self, texts: List[str], vocab: Vocabulary, seq_length: int = 50):
        self.vocab = vocab
        self.seq_length = seq_length
        self.sequences = []
        
        self.prepare_sequences(texts)
    
    def prepare_sequences(self, texts: List[str]):
        """準備訓練序列"""
        logger.info("Preparing training sequences...")
        
        for text in texts:
            indices = self.vocab.text_to_indices(text)
            
            # 如果序列太短，跳過
            if len(indices) < 2:
                continue
            
            # 創建滑動窗口序列
            for i in range(len(indices) - 1):
                # 輸入序列和目標序列
                input_seq = indices[max(0, i - self.seq_length + 1):i + 1]
                target = indices[i + 1]
                
                # 填充到固定長度
                if len(input_seq) < self.seq_length:
                    padding = [self.vocab.word2idx[self.vocab.pad_token]] * (self.seq_length - len(input_seq))
                    input_seq = padding + input_seq
                
                self.sequences.append((input_seq, target))
        
        logger.info(f"Created {len(self.sequences)} training sequences")
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        input_seq, target = self.sequences[idx]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target, dtype=torch.long)

class LSTMLanguageModel(nn.Module):
    """LSTM 語言模型"""
    
    def __init__(self, vocab_size: int, embed_dim: int = 128, hidden_dim: int = 128, 
                 num_layers: int = 2, dropout: float = 0.2):
        super(LSTMLanguageModel, self).__init__()
        
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # 詞嵌入層
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # LSTM 層
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, 
                           batch_first=True, dropout=dropout if num_layers > 1 else 0)
        
        # Dropout 層
        self.dropout = nn.Dropout(dropout)
        
        # 輸出層
        self.linear = nn.Linear(hidden_dim, vocab_size)
        
        # 初始化權重
        self.init_weights()
    
    def init_weights(self):
        """初始化模型權重"""
        init_range = 0.1
        self.embedding.weight.data.uniform_(-init_range, init_range)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-init_range, init_range)
    
    def forward(self, x, hidden=None):
        """前向傳播"""
        batch_size = x.size(0)
        
        # 詞嵌入
        embedded = self.embedding(x)  # (batch_size, seq_len, embed_dim)
        
        # LSTM
        lstm_out, hidden = self.lstm(embedded, hidden)  # (batch_size, seq_len, hidden_dim)
        
        # 取最後一個時間步的輸出
        last_output = lstm_out[:, -1, :]  # (batch_size, hidden_dim)
        
        # Dropout
        output = self.dropout(last_output)
        
        # 線性層
        output = self.linear(output)  # (batch_size, vocab_size)
        
        return output, hidden
    
    def init_hidden(self, batch_size, device):
        """初始化隱藏狀態和細胞狀態"""
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)
        return (h0, c0)

class LSTMTrainer:
    """LSTM 訓練器"""
    
    def __init__(self, model, vocab, device):
        self.model = model
        self.vocab = vocab
        self.device = device
        
        # 損失函數和優化器
        self.criterion = nn.CrossEntropyLoss(ignore_index=vocab.word2idx[vocab.pad_token])
        self.optimizer = optim.Adam(model.parameters(), lr=0.001)
        
    def train_epoch(self, dataloader):
        """訓練一個 epoch"""
        self.model.train()
        total_loss = 0
        total_samples = 0
        
        for batch_idx, (data, targets) in enumerate(dataloader):
            data, targets = data.to(self.device), targets.to(self.device)
            batch_size = data.size(0)
            
            # 初始化隱藏狀態
            hidden = self.model.init_hidden(batch_size, self.device)
            
            # 前向傳播
            outputs, _ = self.model(data, hidden)
            loss = self.criterion(outputs, targets)
            
            # 反向傳播
            self.optimizer.zero_grad()
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=5.0)
            
            self.optimizer.step()
            
            total_loss += loss.item() * batch_size
            total_samples += batch_size
            
            if batch_idx % 100 == 0:
                logger.info(f'Batch {batch_idx}, Loss: {loss.item():.4f}')
        
        return total_loss / total_samples
    
    def evaluate(self, dataloader):
        """評估模型"""
        self.model.eval()
        total_loss = 0
        total_samples = 0
        
        with torch.no_grad():
            for data, targets in dataloader:
                data, targets = data.to(self.device), targets.to(self.device)
                batch_size = data.size(0)
                
                hidden = self.model.init_hidden(batch_size, self.device)
                outputs, _ = self.model(data, hidden)
                loss = self.criterion(outputs, targets)
                
                total_loss += loss.item() * batch_size
                total_samples += batch_size
        
        return total_loss / total_samples
    
    def generate_text(self, start_text: str, max_length: int = 50, temperature: float = 1.0):
        """生成文本"""
        self.model.eval()
        
        # 預處理起始文本
        indices = self.vocab.text_to_indices(start_text)
        if len(indices) == 0:
            indices = [self.vocab.word2idx[self.vocab.start_token]]
        
        generated = indices.copy()
        
        with torch.no_grad():
            for _ in range(max_length):
                # 準備輸入序列
                input_seq = generated[-50:]  # 取最後50個詞作為上下文
                if len(input_seq) < 50:
                    padding = [self.vocab.word2idx[self.vocab.pad_token]] * (50 - len(input_seq))
                    input_seq = padding + input_seq
                
                input_tensor = torch.tensor([input_seq], dtype=torch.long).to(self.device)
                hidden = self.model.init_hidden(1, self.device)
                
                # 預測下一個詞
                outputs, _ = self.model(input_tensor, hidden)
                outputs = outputs / temperature
                probabilities = torch.softmax(outputs, dim=-1)
                
                # 隨機採樣
                next_word_idx = torch.multinomial(probabilities, 1).item()
                
                # 如果生成結束標記，停止生成
                if next_word_idx == self.vocab.word2idx[self.vocab.end_token]:
                    break
                
                generated.append(next_word_idx)
        
        return self.vocab.indices_to_text(generated)

def load_data(file_path: str) -> List[str]:
    """載入訓練數據"""
    texts = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if line:
                    texts.append(line)
    except FileNotFoundError:
        logger.error(f"File {file_path} not found!")
        return []
    
    logger.info(f"Loaded {len(texts)} texts from {file_path}")
    return texts

def test_incomplete_sentences(trainer, incomplete_file: str):
    """測試不完整句子的補全"""
    logger.info("Testing incomplete sentence completion...")
    
    try:
        with open(incomplete_file, 'r', encoding='utf-8') as f:
            incomplete_texts = [line.strip() for line in f if line.strip()]
    except FileNotFoundError:
        logger.error(f"File {incomplete_file} not found!")
        return
    
    print("\n" + "="*60)
    print("LSTM 模型文本補全結果")
    print("="*60)
    
    for incomplete_text in incomplete_texts[:10]:  # 測試前10個
        completed = trainer.generate_text(incomplete_text, max_length=20, temperature=0.8)
        print(f"輸入: {incomplete_text}")
        print(f"補全: {completed}")
        print("-" * 50)

def main():
    """主函數"""
    # 設置設備
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info(f"Using device: {device}")
    
    # 超參數
    HIDDEN_DIM = 128
    NUM_LAYERS = 2
    LEARNING_RATE = 0.001
    NUM_EPOCHS = 10
    BATCH_SIZE = 32
    SEQ_LENGTH = 50
    
    # 載入數據
    train_texts = load_data('train.txt')
    if not train_texts:
        return
    
    # 建立詞彙表
    vocab = Vocabulary()
    vocab.build_vocab(train_texts, min_freq=3)
    
    # 創建數據集
    train_dataset = TextDataset(train_texts, vocab, SEQ_LENGTH)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    
    # 創建模型
    model = LSTMLanguageModel(
        vocab_size=len(vocab),
        embed_dim=HIDDEN_DIM,
        hidden_dim=HIDDEN_DIM,
        num_layers=NUM_LAYERS
    ).to(device)
    
    logger.info(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
    
    # 創建訓練器
    trainer = LSTMTrainer(model, vocab, device)
    
    # 訓練模型
    logger.info("Starting training...")
    for epoch in range(NUM_EPOCHS):
        start_time = time.time()
        
        train_loss = trainer.train_epoch(train_dataloader)
        
        epoch_time = time.time() - start_time
        logger.info(f'Epoch {epoch+1}/{NUM_EPOCHS}, Train Loss: {train_loss:.4f}, Time: {epoch_time:.2f}s')
        
        # 每幾個 epoch 生成一些文本
        if (epoch + 1) % 3 == 0:
            print(f"\nEpoch {epoch+1} 生成範例:")
            sample_text = trainer.generate_text("add salt", max_length=15)
            print(f"Generated: {sample_text}")
    
    # 保存模型
    torch.save({
        'model_state_dict': model.state_dict(),
        'vocab': vocab,
        'hyperparameters': {
            'vocab_size': len(vocab),
            'embed_dim': HIDDEN_DIM,
            'hidden_dim': HIDDEN_DIM,
            'num_layers': NUM_LAYERS
        }
    }, 'lstm_model.pth')
    
    logger.info("Model saved to lstm_model.pth")
    
    # 測試不完整句子補全
    test_incomplete_sentences(trainer, 'incomplete.txt')
    
    # 生成一些範例文本
    print("\n" + "="*60)
    print("LSTM 模型文本生成範例")
    print("="*60)
    
    test_prompts = ["add", "cook", "bake", "mix"]
    for prompt in test_prompts:
        generated = trainer.generate_text(prompt, max_length=20)
        print(f"起始詞: {prompt}")
        print(f"生成文本: {generated}")
        print("-" * 50)

if __name__ == "__main__":
    main()