In [None]:
import numpy as np
import pandas as pd
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from collections import Counter
from fractions import Fraction
import re

# Fungsi-fungsi untuk preprocessing
def case_folding(text):
    return text.lower()

def cleansing(text):
    text = re.sub(r'[^a-zA-Z\s]', '', text)  # Hanya biarkan huruf dan spasi
    text = re.sub(r'\s+', ' ', text)  # Menghapus spasi ganda
    return text.strip()

def tokenize(text):
    return text.split()

def pad_sequences(sequences, max_length):
    padded_seqs = []
    for seq in sequences:
        if len(seq) < max_length:
            padded_seq = seq + [0] * (max_length - len(seq))
        else:
            padded_seq = seq[:max_length]
        padded_seqs.append(padded_seq)
    return padded_seqs

def create_word_embedding(tokenized_texts):
    vocab = set(word for text in tokenized_texts for word in text)
    word_to_index = {word: idx + 1 for idx, word in enumerate(vocab)}  # Index 0 untuk padding
    word_to_index['<EOS>'] = len(word_to_index) + 1  # Tambahkan <EOS> ke kamus
    index_to_word = {idx + 1: word for idx, word in enumerate(vocab)}
    index_to_word[len(index_to_word) + 1] = '<EOS>'
    return word_to_index, index_to_word

def encode_text(tokenized_text, word_to_index):
    encoded_text = [word_to_index[word] for word in tokenized_text if word in word_to_index]
    if '<EOS>' in word_to_index:
        encoded_text.append(word_to_index['<EOS>'])
    return encoded_text

# Load data from Excel
def load_data(file_path):
    print("Loading data from Excel...")
    df = pd.read_excel(file_path)
    return df['SOURCE'].tolist(), df['TARGET'].tolist()

# LSTM Model
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(LSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        embedded = self.embedding(x)
        output, _ = self.lstm(embedded)
        output = self.fc(output)
        return output

# Fungsi untuk melatih model
def train_model(model, optimizer, criterion, source_seqs, target_seqs, word_to_index, index_to_word, max_length, epochs=25, batch_size=2):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        print(f"Epoch {epoch + 1}/{epochs}")

        # Shuffle data setiap epoch
        permutation = np.random.permutation(len(source_seqs))
        source_seqs = [source_seqs[i] for i in permutation]
        target_seqs = [target_seqs[i] for i in permutation]

        for i in range(0, len(source_seqs), batch_size):
            optimizer.zero_grad()

            batch_sources = source_seqs[i:i + batch_size]
            batch_targets = target_seqs[i:i + batch_size]

            padded_sources = pad_sequences(batch_sources, max_length=max_length)
            padded_targets = pad_sequences(batch_targets, max_length=max_length)

            source_tensor = torch.tensor(padded_sources)
            target_tensor = torch.tensor(padded_targets)

            output = model(source_tensor)
            output = output.view(-1, output.shape[-1])
            target_tensor = target_tensor.view(-1)

            loss = criterion(output, target_tensor)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # Tampilkan beberapa contoh prediksi
            if i % (batch_size * 10) == 0:  # Setiap 10 batch
                print(f"Batch {i // batch_size}/{len(source_seqs) // batch_size}, Loss: {loss.item():.4f}")

                # Contoh pengujian prediksi pada beberapa data
                for j in range(min(5, len(batch_sources))):
                    source_text = ' '.join([index_to_word.get(idx, '') for idx in batch_sources[j] if idx != 0])
                    target_text = ' '.join([index_to_word.get(idx, '') for idx in batch_targets[j] if idx != 0])
                    predicted_text = predict(model, source_text, word_to_index, index_to_word, max_length=max_length)

                    print(f"SOURCE: {source_text}")
                    print(f"TARGET: {target_text}")
                    print(f"PREDICTION: {predicted_text}")
                    print()

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(source_seqs):.4f}")

# Fungsi untuk melakukan prediksi
def predict(model, source_text, word_to_index, index_to_word, max_length):
    model.eval()
    with torch.no_grad():
        tokenized_source = tokenize(cleansing(case_folding(source_text)))
        encoded_source = encode_text(tokenized_source, word_to_index)
        padded_source = pad_sequences([encoded_source], max_length=max_length)[0]
        source_tensor = torch.tensor(padded_source).unsqueeze(0)

        output = model(source_tensor)
        output = output.squeeze(0)

        predicted_words = []
        eos_token = word_to_index.get('<EOS>', None)
        predicted_indices = []

        for timestep in range(output.size(0)):
            token_idx = torch.argmax(output[timestep]).item()
            if token_idx == 0:  # Padding token, stop further decoding
                break
            if eos_token and token_idx == eos_token:  # EOS token, stop decoding
                break
            if len(predicted_indices) > 0 and token_idx == predicted_indices[-1]:  # Skip repeating tokens
                continue
            predicted_words.append(index_to_word.get(token_idx, ''))
            predicted_indices.append(token_idx)

        predicted_text = ' '.join(predicted_words).strip()
        return predicted_text

# Fungsi untuk menghitung BLEU score secara manual
def calculate_bleu_score(references, hypotheses, max_n=4):
    p_numerators = Counter()
    p_denominators = Counter()
    reference_lengths = Counter()
    hypothesis_lengths = Counter()

    for reference, hypothesis in zip(references, hypotheses):
        reference_lengths[len(reference[0])] += 1
        hypothesis_lengths[len(hypothesis)] += 1

        for i in range(1, max_n + 1):
            p_i = modified_precision(reference[0], hypothesis, i)
            p_numerators[i] += p_i.numerator
            p_denominators[i] += p_i.denominator

    p_n = [p_numerators[i] / p_denominators[i] if p_denominators[i] > 0 else 0 for i in range(1, max_n + 1)]
    bp = brevity_penalty(reference_lengths, hypothesis_lengths)
    bleu = bp * np.exp(sum(np.log(p) for p in p_n if p > 0) / max_n)
    return bleu

# Fungsi untuk menghitung modified precision
def modified_precision(reference, hypothesis, n):
    reference_ngrams = Counter(tuple(reference[i:i + n]) for i in range(len(reference) - n + 1))
    hypothesis_ngrams = Counter(tuple(hypothesis[i:i + n]) for i in range(len(hypothesis) - n + 1))

    clipped_counts = dict()
    for ngram in hypothesis_ngrams:
        clipped_counts[ngram] = min(hypothesis_ngrams[ngram], reference_ngrams[ngram])

    numerator = sum(clipped_counts.values())
    denominator = max(1, sum(hypothesis_ngrams.values()))

    return Fraction(numerator, denominator)

# Fungsi untuk menghitung Brevity Penalty
def brevity_penalty(reference_lengths, hypothesis_lengths):
    c = sum(reference_lengths.values())
    r = sum(hypothesis_lengths.values())
    if r > c:
        return 1
    else:
        return np.exp(1 - c / r)

# Fungsi utama untuk melatih dan menguji model NMT
def main(file_path):
    source_texts, target_texts = load_data(file_path)

    # Preprocessing data
    source_texts = [cleansing(case_folding(text)) for text in source_texts]
    target_texts = [cleansing(case_folding(text)) for text in target_texts]

    max_length = 350  # Tentukan panjang maksimum untuk padding

    source_tokenized = [tokenize(text) for text in source_texts]
    target_tokenized = [tokenize(text) for text in target_texts]

    # Membuat word embedding
    word_to_index, index_to_word = create_word_embedding(source_tokenized + target_tokenized)

    # Encoding teks ke dalam index dan padding sequence
    source_seqs = [encode_text(tokenized_text, word_to_index) for tokenized_text in source_tokenized]
    target_seqs = [encode_text(tokenized_text, word_to_index) for tokenized_text in target_tokenized]

    # Membagi data menjadi data latih dan data uji (85% latih, 15% uji)
    source_train, source_test, target_train, target_test = train_test_split(source_seqs, target_seqs, test_size=0.15, random_state=42)

    vocab_size = len(word_to_index) + 1  # Ditambah 1 untuk padding token
    embedding_dim = 128
    hidden_dim = 256

    model = LSTM(vocab_size, embedding_dim, hidden_dim)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignorasi index 0 (padding)

    # Melatih model
    train_model(model, optimizer, criterion, source_train, target_train, word_to_index, index_to_word, max_length=max_length, epochs=25)

    # Evaluasi model menggunakan BLEU score pada data uji
    model.eval()
    references = []
    hypotheses = []
    exact_matches = 0

    # Tampilkan kalimat-kalimat yang dijadikan data testing
    print("Kalimat yang dijadikan data testing:")
    for i in range(len(source_test)):
        source_text = ' '.join([index_to_word.get(idx, '') for idx in source_test[i] if idx != 0])
        target_text = ' '.join([index_to_word.get(idx, '') for idx in target_test[i] if idx != 0])
        print(f"SOURCE: {source_text}")
        print(f"TARGET: {target_text}")
        print()

    print("\nProses prediksi dan evaluasi:")
    for i in range(len(source_test)):
        source_text = ' '.join([index_to_word.get(idx, '') for idx in source_test[i] if idx != 0])
        target_text = ' '.join([index_to_word.get(idx, '') for idx in target_test[i] if idx != 0])
        predicted_text = predict(model, source_text, word_to_index, index_to_word, max_length=max_length)

        references.append([target_text.split()])
        hypotheses.append(predicted_text.split())

        if predicted_text == target_text:
            exact_matches += 1

        print(f"SOURCE: {source_text}")
        print(f"TARGET: {target_text}")
        print(f"PREDICTION: {predicted_text}")
        print()

    bleu_score = calculate_bleu_score(references, hypotheses)
    exact_match_ratio = exact_matches / len(source_test)

    # Testing setelah evaluasi
    print("\nProses Testing:")
    for i in range(len(source_test)):
        source_text = ' '.join([index_to_word.get(idx, '') for idx in source_test[i] if idx != 0])
        target_text = ' '.join([index_to_word.get(idx, '') for idx in target_test[i] if idx != 0])
        predicted_text = predict(model, source_text, word_to_index, index_to_word, max_length=max_length)

        print(f"SOURCE: {source_text}")
        print(f"TARGET: {target_text}")
        print(f"PREDICTION: {predicted_text}")
        print()


    print(f"\nHasil Evaluasi:")
    print(f"BLEU Score: {bleu_score:.4f}")
    print(f"Exact Match Ratio: {exact_match_ratio:.4f}")

    # Simpan model
    with open("muna_model.pkl", "wb") as f:
        pickle.dump(model, f)
    print("Model saved as muna_model.pkl")
    
if __name__ == "__main__":
    main("Corpus_muna.xlsx")
