In [None]:
import pickle
UNK_TOKEN = "<UNK>"
PAD_TOKEN = "<PAD>"
BOS_TOKEN = "시"
EOS_TOKEN = "끝"
SPLIT_TOKEN = "▁"

def create_digest_cedict(mono_file, poly_file, output_file):
    cedict = {}

    with open(mono_file, 'r', encoding='utf-8') as f:
        for line in f:
            char, pron = line.strip().split('\t')
            cedict[char] = [pron]

    with open(poly_file, 'r', encoding='utf-8') as f:
        temp_dict = {}
        for line in f:
            char, pron = line.strip().split('\t')
            if char not in temp_dict:
                temp_dict[char] = []
            temp_dict[char].append(pron)
        
        cedict.update(temp_dict)

    with open(output_file, 'wb') as f:
        pickle.dump(cedict, f)

create_digest_cedict('MONOPHONIC_CHARS.txt', 'POLYPHONIC_CHARS.txt', 'digest_cedict.pkl')

In [None]:
def create_char2idx(sent_files, output_file):
    char2idx = {}
    idx = 0

    for sent_file in sent_files:
        with open(sent_file, "r", encoding="utf-8") as f:
            for line in f:
                for char in line.strip():
                    if char not in char2idx:
                        char2idx[char] = idx
                        idx += 1

    char2idx[UNK_TOKEN] = idx
    char2idx[PAD_TOKEN] = idx + 1
    char2idx[BOS_TOKEN] = idx + 2
    char2idx[EOS_TOKEN] = idx + 3

    with open(output_file, "wb") as f:
        pickle.dump(char2idx, f)


create_char2idx(["train.sent", "dev.sent", "test.sent"], "char2idx.pkl")

In [None]:
def create_class2idx(lb_files, output_file):
    class2idx = {}
    idx = 0

    for lb_file in lb_files:
        with open(lb_file, "r", encoding="utf-8") as f:
            for line in f:
                for pron in line.strip().split():
                    if pron not in class2idx:
                        class2idx[pron] = idx
                        idx += 1

    class2idx[UNK_TOKEN] = idx
    class2idx[PAD_TOKEN] = idx + 1

    with open(output_file, "wb") as f:
        pickle.dump(class2idx, f)


create_class2idx(["train.lb", "dev.lb", "test.lb"], "class2idx.pkl")

In [None]:
import numpy as np


def initialize_np_ckpt(char2idx, class2idx, embedding_dim=64, lstm_hidden_dim=32):
    state_dict = {}

    state_dict["embedding.weight"] = np.random.randn(
        len(char2idx), embedding_dim
    ).astype(np.float32)

    state_dict["lstm.weight_ih_l0"] = np.random.randn(
        4 * lstm_hidden_dim, embedding_dim
    ).astype(np.float32)
    state_dict["lstm.weight_hh_l0"] = np.random.randn(
        4 * lstm_hidden_dim, lstm_hidden_dim
    ).astype(np.float32)
    state_dict["lstm.bias_ih_l0"] = np.zeros(4 * lstm_hidden_dim, dtype=np.float32)
    state_dict["lstm.bias_hh_l0"] = np.zeros(4 * lstm_hidden_dim, dtype=np.float32)

    state_dict["lstm.weight_ih_l0_reverse"] = np.random.randn(
        4 * lstm_hidden_dim, embedding_dim
    ).astype(np.float32)
    state_dict["lstm.weight_hh_l0_reverse"] = np.random.randn(
        4 * lstm_hidden_dim, lstm_hidden_dim
    ).astype(np.float32)
    state_dict["lstm.bias_ih_l0_reverse"] = np.zeros(
        4 * lstm_hidden_dim, dtype=np.float32
    )
    state_dict["lstm.bias_hh_l0_reverse"] = np.zeros(
        4 * lstm_hidden_dim, dtype=np.float32
    )

    state_dict["logit_layer.0.weight"] = np.random.randn(
        lstm_hidden_dim, 2 * lstm_hidden_dim
    ).astype(np.float32)
    state_dict["logit_layer.0.bias"] = np.zeros(lstm_hidden_dim, dtype=np.float32)
    state_dict["logit_layer.2.weight"] = np.random.randn(
        len(class2idx), lstm_hidden_dim
    ).astype(np.float32)
    state_dict["logit_layer.2.bias"] = np.zeros(len(class2idx), dtype=np.float32)

    with open("np_ckpt.pkl", "wb") as f:
        pickle.dump(state_dict, f)


char2idx = pickle.load(open("char2idx.pkl", "rb"))
class2idx = pickle.load(open("class2idx.pkl", "rb"))

initialize_np_ckpt(char2idx, class2idx)

In [None]:
from tqdm import tqdm
from g2pM2 import G2pM


def load_data(sent_file, lb_file):
    with open(sent_file, "r", encoding="utf-8") as f:
        sentences = [line.strip() for line in f]
    with open(lb_file, "r", encoding="utf-8") as f:
        labels = [line.strip().split() for line in f]
    return sentences, labels


def prepare_data(sentences, labels, char2idx, class2idx):
    input_ids = []
    target_ids = []
    target_indices = []
    for sent, label in zip(sentences, labels):
        input_id = [char2idx.get(char, char2idx[UNK_TOKEN]) for char in sent]
        target_id = [class2idx.get(pron, class2idx[UNK_TOKEN]) for pron in label]
        input_ids.append(input_id)
        target_ids.append(target_id)

        target_idx = [i for i, pron in enumerate(label) if pron in class2idx]
        target_indices.append(target_idx)

    max_length = max(len(seq) for seq in input_ids)
    input_ids = [
        seq + [char2idx[PAD_TOKEN]] * (max_length - len(seq)) for seq in input_ids
    ]
    target_ids = [
        seq + [class2idx[PAD_TOKEN]] * (max_length - len(seq)) for seq in target_ids
    ]

    return np.array(input_ids), np.array(target_ids), target_indices


def get_batches(data, batch_size):
    inputs, targets, target_indices = data
    for i in range(0, len(inputs), batch_size):
        batch_inputs = inputs[i : i + batch_size]
        batch_targets = targets[i : i + batch_size]
        batch_target_indices = target_indices[i : i + batch_size]
        yield np.array(batch_inputs), np.array(batch_targets), batch_target_indices


def compute_loss(model, inputs, targets, target_indices):
    lengths = np.sum(np.sign(inputs), axis=1)
    max_length = max(lengths)

    rev_seq = model.reverse_sequence(inputs, lengths)
    fw_emb = model.get_embedding(inputs)
    bw_emb = model.get_embedding(rev_seq)

    fw_states, bw_states = None, None
    fw_hs = []
    bw_hs = []
    for i in range(max_length):
        fw_input = fw_emb[:, i, :]
        bw_input = bw_emb[:, i, :]
        fw_states = model.fw_lstm_cell(fw_input, fw_states)
        bw_states = model.bw_lstm_cell(bw_input, bw_states)

        fw_hs.append(fw_states[0])
        bw_hs.append(bw_states[0])
    fw_hiddens = np.stack(fw_hs, axis=1)
    bw_hiddens = np.stack(bw_hs, axis=1)
    bw_hiddens = model.reverse_sequence(bw_hiddens, lengths)

    outputs = np.concatenate([fw_hiddens, bw_hiddens], axis=2)
    batch_size = outputs.shape[0]
    if batch_size == 1:
        outputs = outputs.squeeze(axis=0)
        target_hidden = outputs[target_indices[0]]
    else:
        target_hidden = []
        for i in range(batch_size):
            for idx in target_indices[i]:
                target_hidden.append(outputs[i, idx])
        target_hidden = np.array(target_hidden)

    logits = model.fc_layer(target_hidden)

    exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
    softmax_probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)

    total_targets = len(target_hidden)
    target_classes = []
    for i in range(batch_size):
        for idx in target_indices[i]:
            target_classes.append(targets[i, idx])
    target_classes = np.array(target_classes)

    target_probs = softmax_probs[np.arange(total_targets), target_classes]

    loss = -np.log(target_probs + 1e-9)
    loss = np.sum(loss) / total_targets

    return loss


def update_weights(model, grads, learning_rate, beta1, beta2, epsilon, t, m, v):
    for param, grad in grads.items():
        m[param] = beta1 * m[param] + (1 - beta1) * grad
        v[param] = beta2 * v[param] + (1 - beta2) * (grad**2)
        m_hat = m[param] / (1 - beta1**t)
        v_hat = v[param] / (1 - beta2**t)
        model.__dict__[param] -= learning_rate * m_hat / (np.sqrt(v_hat) + epsilon)


def save_model(model, output_file):
    state_dict = {
        "embedding.weight": model.embeddings,
        "lstm.weight_ih_l0": model.weight_ih,
        "lstm.weight_hh_l0": model.weight_hh,
        "lstm.bias_ih_l0": model.bias_ih,
        "lstm.bias_hh_l0": model.bias_hh,
        "lstm.weight_ih_l0_reverse": model.weight_ih_reverse,
        "lstm.weight_hh_l0_reverse": model.weight_hh_reverse,
        "lstm.bias_ih_l0_reverse": model.bias_ih_reverse,
        "lstm.bias_hh_l0_reverse": model.bias_hh_reverse,
        "logit_layer.0.weight": model.hidden_weight_l0,
        "logit_layer.0.bias": model.hidden_bias_l0,
        "logit_layer.2.weight": model.hidden_weight_l1,
        "logit_layer.2.bias": model.hidden_bias_l1,
    }
    with open(output_file, "wb") as f:
        pickle.dump(state_dict, f)


if __name__ == "__main__":
    UNK_TOKEN = "<UNK>"
    PAD_TOKEN = "<PAD>"
    BOS_TOKEN = "시"
    EOS_TOKEN = "끝"
    SPLIT_TOKEN = "▁"

    model = G2pM()

    train_sentences, train_labels = load_data("train.sent", "train.lb")
    dev_sentences, dev_labels = load_data("dev.sent", "dev.lb")

    char2idx = pickle.load(open("char2idx.pkl", "rb"))
    class2idx = pickle.load(open("class2idx.pkl", "rb"))

    train_data = prepare_data(train_sentences, train_labels, char2idx, class2idx)
    dev_data = prepare_data(dev_sentences, dev_labels, char2idx, class2idx)

    epochs = 5
    batch_size = 32
    learning_rate = 0.001
    beta1 = 0.9
    beta2 = 0.999
    epsilon = 1e-8


    t = 0
    m = {
        param: np.zeros_like(value)
        for param, value in model.__dict__.items()
        if isinstance(value, np.ndarray)
    }
    v = {
        param: np.zeros_like(value)
        for param, value in model.__dict__.items()
        if isinstance(value, np.ndarray)
    }

    for epoch in range(epochs):
        train_loss = 0
        total_targets = 0
        with tqdm(total=len(train_data[0]), desc=f"Epoch {epoch+1}/{epochs}") as pbar:
            for inputs, targets, target_indices in get_batches(train_data, batch_size):
                t += 1
                loss = compute_loss(model, inputs, targets, target_indices)
                train_loss += loss

                grads = {
                    param: np.zeros_like(value)
                    for param, value in model.__dict__.items()
                    if isinstance(value, np.ndarray)
                }


                update_weights(
                    model, grads, learning_rate, beta1, beta2, epsilon, t, m, v
                )

                target_count = sum(len(indices) for indices in target_indices)
                total_targets += target_count
                pbar.update(len(inputs))
                pbar.set_postfix(
                    {
                        "Train Loss": (
                            train_loss / total_targets if total_targets > 0 else 0.0
                        )
                    }
                )

        dev_loss = 0
        total_dev_targets = 0
        for inputs, targets, target_indices in get_batches(dev_data, batch_size):
            loss = compute_loss(model, inputs, targets, target_indices)
            dev_loss += loss
            total_dev_targets += sum(len(indices) for indices in target_indices)

        avg_train_loss = (
            train_loss / total_targets if total_targets > 0 else float("inf")
        )
        avg_dev_loss = (
            dev_loss / total_dev_targets if total_dev_targets > 0 else float("inf")
        )

        print(
            f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss}, Dev Loss: {avg_dev_loss}"
        )

    save_model(model, "trained_np_ckpt.pkl")