In [1]:
import pickle
UNK_TOKEN = "<UNK>"
PAD_TOKEN = "<PAD>"
BOS_TOKEN = "시"
EOS_TOKEN = "끝"
SPLIT_TOKEN = "▁"
def create_digest_cedict(mono_file, poly_file, output_file):
    cedict = {}

    # Process monophonic characters
    with open(mono_file, 'r', encoding='utf-8') as f:
        for line in f:
            char, pron = line.strip().split('\t')
            cedict[char] = [pron]

    # Process polyphonic characters
    with open(poly_file, 'r', encoding='utf-8') as f:
        for line in f:
            char, prons = line.strip().split('\t')
            cedict[char] = prons.split(',')

    # Save to pickle file
    with open(output_file, 'wb') as f:
        pickle.dump(cedict, f)

# Create digest_cedict.pkl
create_digest_cedict('MONOPHONIC_CHARS.txt', 'POLYPHONIC_CHARS.txt', 'digest_cedict.pkl')

In [2]:
import pickle


def create_char2idx(sent_files, output_file):
    char2idx = {}
    idx = 0

    for sent_file in sent_files:
        with open(sent_file, "r", encoding="utf-8") as f:
            for line in f:
                for char in line.strip():
                    if char not in char2idx:
                        char2idx[char] = idx
                        idx += 1

    # Add special tokens
    char2idx[UNK_TOKEN] = idx
    char2idx[PAD_TOKEN] = idx + 1
    char2idx[BOS_TOKEN] = idx + 2
    char2idx[EOS_TOKEN] = idx + 3

    # Save to pickle file
    with open(output_file, "wb") as f:
        pickle.dump(char2idx, f)


# Create char2idx.pkl
create_char2idx(["train.sent", "dev.sent", "test.sent"], "char2idx.pkl")

In [3]:
def create_class2idx(lb_files, output_file):
    class2idx = {}
    idx = 0

    for lb_file in lb_files:
        with open(lb_file, "r", encoding="utf-8") as f:
            for line in f:
                for pron in line.strip().split():
                    if pron not in class2idx:
                        class2idx[pron] = idx
                        idx += 1

    # Add special tokens
    class2idx[UNK_TOKEN] = idx
    class2idx[PAD_TOKEN] = idx + 1

    # Save to pickle file
    with open(output_file, "wb") as f:
        pickle.dump(class2idx, f)


# Create class2idx.pkl
create_class2idx(["train.lb", "dev.lb", "test.lb"], "class2idx.pkl")

In [4]:
import numpy as np
import pickle


def initialize_np_ckpt(char2idx, class2idx, embedding_dim=64, lstm_hidden_dim=32):
    state_dict = {}

    # Initialize embedding weights
    state_dict["embedding.weight"] = np.random.randn(
        len(char2idx), embedding_dim
    ).astype(np.float32)

    # Initialize LSTM weights and biases
    state_dict["lstm.weight_ih_l0"] = np.random.randn(
        4 * lstm_hidden_dim, embedding_dim
    ).astype(np.float32)
    state_dict["lstm.weight_hh_l0"] = np.random.randn(
        4 * lstm_hidden_dim, lstm_hidden_dim
    ).astype(np.float32)
    state_dict["lstm.bias_ih_l0"] = np.zeros(4 * lstm_hidden_dim, dtype=np.float32)
    state_dict["lstm.bias_hh_l0"] = np.zeros(4 * lstm_hidden_dim, dtype=np.float32)

    state_dict["lstm.weight_ih_l0_reverse"] = np.random.randn(
        4 * lstm_hidden_dim, embedding_dim
    ).astype(np.float32)
    state_dict["lstm.weight_hh_l0_reverse"] = np.random.randn(
        4 * lstm_hidden_dim, lstm_hidden_dim
    ).astype(np.float32)
    state_dict["lstm.bias_ih_l0_reverse"] = np.zeros(
        4 * lstm_hidden_dim, dtype=np.float32
    )
    state_dict["lstm.bias_hh_l0_reverse"] = np.zeros(
        4 * lstm_hidden_dim, dtype=np.float32
    )

    # Initialize fully connected layer weights and biases
    state_dict["logit_layer.0.weight"] = np.random.randn(
        lstm_hidden_dim, 2 * lstm_hidden_dim
    ).astype(np.float32)
    state_dict["logit_layer.0.bias"] = np.zeros(lstm_hidden_dim, dtype=np.float32)
    state_dict["logit_layer.2.weight"] = np.random.randn(
        len(class2idx), lstm_hidden_dim
    ).astype(np.float32)
    state_dict["logit_layer.2.bias"] = np.zeros(len(class2idx), dtype=np.float32)

    # Save to pickle file
    with open("np_ckpt.pkl", "wb") as f:
        pickle.dump(state_dict, f)


# Load char2idx and class2idx
char2idx = pickle.load(open("char2idx.pkl", "rb"))
class2idx = pickle.load(open("class2idx.pkl", "rb"))

# Initialize np_ckpt.pkl
initialize_np_ckpt(char2idx, class2idx)

In [5]:
digest_cedict = pickle.load(open("digest_cedict.pkl", "rb"))
char2idx = pickle.load(open("char2idx.pkl", "rb"))
class2idx = pickle.load(open("class2idx.pkl", "rb"))

# Print statistics
print("Length of digest_cedict:", len(digest_cedict))
print("Length of char2idx:", len(char2idx))
print("Length of class2idx:", len(class2idx))
state_dict = pickle.load(open("np_ckpt.pkl", "rb"))

print("Dimensions of embedding.weight:", state_dict["embedding.weight"].shape)
print("Dimensions of lstm.weight_ih_l0:", state_dict["lstm.weight_ih_l0"].shape)
print("Dimensions of lstm.weight_hh_l0:", state_dict["lstm.weight_hh_l0"].shape)
print("Dimensions of lstm.bias_ih_l0:", state_dict["lstm.bias_ih_l0"].shape)
print("Dimensions of lstm.bias_hh_l0:", state_dict["lstm.bias_hh_l0"].shape)
print(
    "Dimensions of lstm.weight_ih_l0_reverse:",
    state_dict["lstm.weight_ih_l0_reverse"].shape,
)
print(
    "Dimensions of lstm.weight_hh_l0_reverse:",
    state_dict["lstm.weight_hh_l0_reverse"].shape,
)
print(
    "Dimensions of lstm.bias_ih_l0_reverse:",
    state_dict["lstm.bias_ih_l0_reverse"].shape,
)
print(
    "Dimensions of lstm.bias_hh_l0_reverse:",
    state_dict["lstm.bias_hh_l0_reverse"].shape,
)
print("Dimensions of logit_layer.0.weight:", state_dict["logit_layer.0.weight"].shape)
print("Dimensions of logit_layer.0.bias:", state_dict["logit_layer.0.bias"].shape)
print("Dimensions of logit_layer.2.weight:", state_dict["logit_layer.2.weight"].shape)
print("Dimensions of logit_layer.2.bias:", state_dict["logit_layer.2.bias"].shape)

Length of digest_cedict: 27006
Length of char2idx: 6358
Length of class2idx: 1593
Dimensions of embedding.weight: (6358, 64)
Dimensions of lstm.weight_ih_l0: (128, 64)
Dimensions of lstm.weight_hh_l0: (128, 32)
Dimensions of lstm.bias_ih_l0: (128,)
Dimensions of lstm.bias_hh_l0: (128,)
Dimensions of lstm.weight_ih_l0_reverse: (128, 64)
Dimensions of lstm.weight_hh_l0_reverse: (128, 32)
Dimensions of lstm.bias_ih_l0_reverse: (128,)
Dimensions of lstm.bias_hh_l0_reverse: (128,)
Dimensions of logit_layer.0.weight: (32, 64)
Dimensions of logit_layer.0.bias: (32,)
Dimensions of logit_layer.2.weight: (1593, 32)
Dimensions of logit_layer.2.bias: (1593,)


In [7]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import pickle
from tqdm import tqdm


# 1. Define Dataset and DataLoader
class CantoneseDataset(Dataset):
    def __init__(
        self,
        sent_file,
        lb_file,
        char2idx,
        class2idx,
        pad_token="<PAD>",
        unk_token="<UNK>",
    ):
        self.sentences, self.labels = self.load_data(sent_file, lb_file)
        self.char2idx = char2idx
        self.class2idx = class2idx
        self.pad_token = pad_token
        self.unk_token = unk_token
        self.prepared_data = self.prepare_data()

    def load_data(self, sent_file, lb_file):
        with open(sent_file, "r", encoding="utf-8") as f:
            sentences = [line.strip() for line in f]
        with open(lb_file, "r", encoding="utf-8") as f:
            labels = [line.strip().split() for line in f]
        return sentences, labels

    def prepare_data(self):
        input_ids = []
        target_ids = []
        target_indices = []
        for sent, label in zip(self.sentences, self.labels):
            input_id = [
                self.char2idx.get(char, self.char2idx[self.unk_token]) for char in sent
            ]
            target_id = [
                self.class2idx.get(pron, self.class2idx[self.unk_token])
                for pron in label
            ]
            input_ids.append(input_id)
            target_ids.append(target_id)

            # Compute target indices for polyphonic characters
            target_idx = [i for i, pron in enumerate(label) if pron in self.class2idx]
            target_indices.append(target_idx)

        # Pad sequences
        max_length = max(len(seq) for seq in input_ids)
        input_ids = [
            seq + [self.char2idx[self.pad_token]] * (max_length - len(seq))
            for seq in input_ids
        ]
        target_ids = [
            seq + [self.class2idx[self.pad_token]] * (max_length - len(seq))
            for seq in target_ids
        ]

        return list(zip(input_ids, target_ids, target_indices))

    def __len__(self):
        return len(self.prepared_data)

    def __getitem__(self, idx):
        input_id, target_id, target_idx = self.prepared_data[idx]
        return {
            "input_ids": torch.tensor(input_id, dtype=torch.long),
            "target_ids": torch.tensor(target_id, dtype=torch.long),
            "target_indices": target_idx,  # Keep as list for variable lengths
        }


def collate_fn(batch):
    inputs = torch.stack([item["input_ids"] for item in batch])
    targets = torch.stack([item["target_ids"] for item in batch])
    target_indices = [item["target_indices"] for item in batch]
    return inputs, targets, target_indices


# 2. Define the Model
class G2pM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes, padding_idx):
        super(G2pM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=padding_idx)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)  # Bidirectional
        # Initialize weights
        self.init_weights()

    def init_weights(self):
        nn.init.xavier_uniform_(self.embedding.weight)
        for name, param in self.lstm.named_parameters():
            if "weight" in name:
                nn.init.xavier_uniform_(param)
            elif "bias" in name:
                nn.init.constant_(param, 0)
        nn.init.xavier_uniform_(self.fc.weight)
        nn.init.constant_(self.fc.bias, 0)

    def forward(self, inputs, target_indices):
        """
        Args:
            inputs: [batch_size, seq_len]
            target_indices: list of lists containing target positions for each sample
        Returns:
            logits: [total_targets, num_classes]
        """
        embedded = self.embedding(inputs)  # [batch_size, seq_len, embed_dim]
        packed_output, _ = self.lstm(embedded)  # [batch_size, seq_len, hidden_dim*2]

        # Extract target hidden states
        target_hidden = []
        for i, indices in enumerate(target_indices):
            for idx in indices:
                if idx < packed_output.size(
                    1
                ):  # Ensure index is within sequence length
                    target_hidden.append(packed_output[i, idx, :])
        if target_hidden:
            target_hidden = torch.stack(target_hidden)  # [total_targets, hidden_dim*2]
        else:
            target_hidden = torch.empty(0, self.lstm.hidden_size * 2).to(
                packed_output.device
            )

        logits = self.fc(target_hidden)  # [total_targets, num_classes]
        return logits


# 3. Training and Evaluation Functions
def train_epoch(model, loader, criterion, optimizer, device, class2idx):
    model.train()
    running_loss = 0.0
    total_targets = 0

    progress = tqdm(loader, desc="Training", leave=False)
    for batch in progress:
        inputs, targets, target_indices = batch
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()

        logits = model(inputs, target_indices)  # [total_targets, num_classes]
        if logits.numel() == 0:
            continue  # Skip if there are no target indices in the batch

        # Flatten targets based on target_indices
        active_targets = targets[targets != class2idx["<PAD>"]].view(-1)
        loss = criterion(logits, active_targets)

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * logits.size(0)
        total_targets += logits.size(0)

        avg_loss = running_loss / total_targets if total_targets > 0 else 0.0
        progress.set_postfix({"Loss": f"{avg_loss:.4f}"})

    avg_loss = running_loss / total_targets if total_targets > 0 else 0.0
    return avg_loss


def evaluate_epoch(model, loader, criterion, device, class2idx):
    model.eval()
    running_loss = 0.0
    total_targets = 0

    with torch.no_grad():
        progress = tqdm(loader, desc="Evaluating", leave=False)
        for batch in progress:
            inputs, targets, target_indices = batch
            inputs = inputs.to(device)
            targets = targets.to(device)

            logits = model(inputs, target_indices)  # [total_targets, num_classes]
            if logits.numel() == 0:
                continue  # Skip if there are no target indices in the batch

            active_targets = targets[targets != class2idx["<PAD>"]].view(-1)
            loss = criterion(logits, active_targets)

            running_loss += loss.item() * logits.size(0)
            total_targets += logits.size(0)

            avg_loss = running_loss / total_targets if total_targets > 0 else 0.0
            progress.set_postfix({"Loss": f"{avg_loss:.4f}"})

    avg_loss = running_loss / total_targets if total_targets > 0 else 0.0
    return avg_loss


# 4. Evaluation Function for Sentences
def evaluate_sentence(
    model, sentence, char2idx, idx2class, device, pad_token="<PAD>", unk_token="<UNK>"
):
    model.eval()
    with torch.no_grad():
        # Convert sentence to indices
        input_ids = [char2idx.get(char, char2idx[unk_token]) for char in sentence]
        input_tensor = (
            torch.tensor(input_ids, dtype=torch.long).unsqueeze(0).to(device)
        )  # [1, seq_len]

        # Since it's a single sentence, target_indices are all positions (or specific based on your use case)
        # Assuming you want predictions for all characters
        target_indices = [list(range(len(input_ids)))]

        # Get logits
        logits = model(input_tensor, target_indices)  # [seq_len, num_classes]
        if logits.numel() == 0:
            print("No target indices found in the sentence.")
            return []

        # Get predictions
        predictions = torch.argmax(logits, dim=1).cpu().numpy()  # [seq_len]

        # Map predictions to class labels
        predicted_labels = [idx2class.get(idx, "<UNK>") for idx in predictions]

    return predicted_labels


# 5. Main Training Script
def main():
    # Load mappings
    char2idx = pickle.load(open("char2idx.pkl", "rb"))
    class2idx = pickle.load(open("class2idx.pkl", "rb"))
    # Create inverse mapping for class indices
    idx2class = {idx: cls for cls, idx in class2idx.items()}

    # Parameters (adjust as needed)
    vocab_size = len(char2idx)
    embed_dim = 128
    hidden_dim = 256
    num_classes = len(class2idx)
    pad_idx = char2idx["<PAD>"]

    # Create datasets
    train_dataset = CantoneseDataset("train.sent", "train.lb", char2idx, class2idx)
    dev_dataset = CantoneseDataset("dev.sent", "dev.lb", char2idx, class2idx)

    # Create data loaders
    batch_size = 32
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn
    )
    dev_loader = DataLoader(
        dev_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
    )

    # Initialize model
    model = G2pM(vocab_size, embed_dim, hidden_dim, num_classes, padding_idx=pad_idx)

    # Move model to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    learning_rate = 0.001
    optimizer = torch.optim.Adam(
        model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-8
    )

    # Training parameters
    epochs = 10

    for epoch in range(1, epochs + 1):
        print(f"\nEpoch {epoch}/{epochs}")

        train_loss = train_epoch(
            model, train_loader, criterion, optimizer, device, class2idx
        )
        print(f"Training Loss: {train_loss:.4f}")

        dev_loss = evaluate_epoch(model, dev_loader, criterion, device, class2idx)
        print(f"Validation Loss: {dev_loss:.4f}")

        # Optionally, save the model checkpoint
        torch.save(model.state_dict(), f"trained_pytorch_ckpt_epoch{epoch}.pth")

    # Save the final model
    torch.save(model.state_dict(), "trained_pytorch_final.pth")

    # Example Evaluation
    sentence = "然而，他红了20年以后，他竟退出了大家的视线。"
    predicted_labels = evaluate_sentence(model, sentence, char2idx, idx2class, device)
    print("\nSentence Evaluation:")
    for char, label in zip(sentence, predicted_labels):
        print(f"{char}: {label}")


if __name__ == "__main__":
    main()


Epoch 1/10


                                                                          

Training Loss: 4.4511


                                                                          

Validation Loss: 3.3043

Epoch 2/10


                                                                          

Training Loss: 1.9429


                                                                          

Validation Loss: 1.8303

Epoch 3/10


                                                                            

Training Loss: 0.9380


                                                                          

Validation Loss: 1.2080

Epoch 4/10


                                                                           

Training Loss: 0.5325


                                                                          

Validation Loss: 1.0434

Epoch 5/10


                                                                             

Training Loss: 0.3488


                                                                          

Validation Loss: 0.9268

Epoch 6/10


                                                                          

Training Loss: 0.2511


                                                                          

Validation Loss: 0.9741

Epoch 7/10


                                                                          

Training Loss: 0.1994


                                                                          

Validation Loss: 0.8937

Epoch 8/10


                                                                          

Training Loss: 0.1682


                                                                          

Validation Loss: 0.9187

Epoch 9/10


                                                                          

Training Loss: 0.1498


                                                                          

Validation Loss: 0.9245

Epoch 10/10


                                                                          

Training Loss: 0.1408


                                                                          

Validation Loss: 0.9141

Sentence Evaluation:
然: fung1
而: fung1
，: faa3
他: fung1
红: fung1
了: fung1
2: fung1
0: fung1
年: fung1
以: fung1
后: fung1
，: fung1
他: fung1
竟: fung1
退: laa1
出: laa1
了: laa1
大: leoi4
家: laa1
的: laa1
视: laa1
线: ngaat6
。: cak1


In [1]:
sentence = "然而，他红了20年以后，他竟退出了大家的视线。"

In [2]:
import ToJyutping

ToJyutping.get_jyutping_text(sentence)

'jin4 ji4, taa1 hung4 liu5 […] nin4 ji5 hau6, taa1 ging2 teoi3 ceot1 liu5 daai6 gaa1 dik1 si6 sin3.'

In [11]:


def main():
    # Load mappings
    char2idx = pickle.load(open("char2idx.pkl", "rb"))
    class2idx = pickle.load(open("class2idx.pkl", "rb"))
    # Create inverse mapping for class indices
    idx2class = {idx: cls for cls, idx in class2idx.items()}

    # Parameters (adjust as needed)
    vocab_size = len(char2idx)
    embed_dim = 128
    hidden_dim = 256
    num_classes = len(class2idx)
    pad_idx = char2idx["<PAD>"]

    # Create datasets
    train_dataset = CantoneseDataset("train.sent", "train.lb", char2idx, class2idx)
    dev_dataset = CantoneseDataset("dev.sent", "dev.lb", char2idx, class2idx)

    # Create data loaders
    batch_size = 32
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn
    )
    dev_loader = DataLoader(
        dev_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
    )

    # Initialize model
    model = G2pM(vocab_size, embed_dim, hidden_dim, num_classes, padding_idx=pad_idx)

    # Move model to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Load the trained model
    model.load_state_dict(torch.load("trained_pytorch_final.pth", map_location=device))
    model.eval()  # Set model to evaluation mode

    # Example Evaluation
    sentence = "然而，他红了20年以后，他竟退出了大家的视线。"
    predicted_labels = evaluate_sentence(model, sentence, char2idx, idx2class, device)
    print(predicted_labels)
    print("\nSentence Evaluation:")
    for char, label in zip(sentence, predicted_labels):
        print(f"{char}: {label}")


if __name__ == "__main__":
    main()

['fung1', 'fung1', 'faa3', 'fung1', 'fung1', 'fung1', 'fung1', 'fung1', 'fung1', 'fung1', 'fung1', 'fung1', 'fung1', 'fung1', 'laa1', 'laa1', 'laa1', 'leoi4', 'laa1', 'laa1', 'laa1', 'ngaat6', 'cak1']

Sentence Evaluation:
然: fung1
而: fung1
，: faa3
他: fung1
红: fung1
了: fung1
2: fung1
0: fung1
年: fung1
以: fung1
后: fung1
，: fung1
他: fung1
竟: fung1
退: laa1
出: laa1
了: laa1
大: leoi4
家: laa1
的: laa1
视: laa1
线: ngaat6
。: cak1


  model.load_state_dict(torch.load("trained_pytorch_final.pth", map_location=device))
