In [1]:
import numpy as np
import pandas as pd
import torch, os, re
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchmetrics.functional as metrics
from torch.utils.data import Dataset, DataLoader
from typing import List


In [37]:
def find_file(path: str, extension: str, files: list = [], include_path: bool = False):
    if not extension.startswith("."):
        extension += "."
    for data in os.listdir(path):
        if os.path.isdir(path + data):
            find_file(path + data + "/", extension, files, include_path)
        else:
            if data.endswith(extension):
                files.append(path + data if include_path else data)
    return files


In [38]:
def extract_data(sentence: List[str], lang: str):
    data = {chr(s): 0 for s in range(ord("a"), ord("z") + 1)}
    # data["lang"] = lang
    count = 0
    for letter in sentence:
        if letter.lower() in data.keys():
            count += 1
            data[letter.lower()] += 1
    result = list(map(lambda x: x / count, list(data.values())))
    return result + [float(lang)]


In [39]:
main_folder = "../../data/lang_data/"
files = find_file(main_folder, ".txt", include_path=True)


In [40]:
lang_list = []
for file in files:
    lang = os.path.basename(file)[:2]
    if lang not in lang_list:
        lang_list.append(lang)
lang_list


['en', 'fr', 'id', 'tl']

In [41]:
train_list = []
valid_list = []
# test_list = []



for idx, lang in enumerate(lang_list):
    for file in files:
        file_name = os.path.basename(file)
        if file_name[:2] == lang:
            with open(file, "r", encoding="utf-8") as f:
                if file.find("train") != -1:
                    train_list.append(extract_data(f.read(), idx))
                else:
                    valid_list.append(extract_data(f.read(), idx))
                # if file.find("train") != -1 and file_name[4] != "0":
                #     train_list.append(extract_data(f.read(), idx))
                # elif file.find("train") != -1 and file_name[4] == "0":
                #     valid_list.append(extract_data(f.read(), idx))
                # else:
                #     test_list.append(extract_data(f.read(), idx))
train_array = np.array(train_list)
valid_array = np.array(valid_list)
# test_array = np.array(test_list)


In [42]:
train_array.shape, train_array.ndim, valid_array.shape, valid_array.ndim  # , test_array.shape, test_array.ndim


((40, 27), 2, (16, 27), 2)

In [43]:
class LangModule(nn.Module):
    def __init__(self, in_, out_, node_list):
        super().__init__()
        node_list = [in_] + node_list + [out_]
        layer_num = len(node_list) - 1
        self.layers = nn.ModuleList()
        for i in range(layer_num):
            self.layers.append(nn.Linear(node_list[i], node_list[i + 1]))
            if i != layer_num - 1:
                self.layers.append(nn.BatchNorm1d(node_list[i + 1]))
                self.layers.append(nn.ReLU())

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x


In [44]:
class LangDataset(Dataset):
    def __init__(self, X_data, y_data):
        super().__init__()
        X_data = X_data if isinstance(X_data, np.ndarray) else X_data.values
        y_data = y_data if isinstance(y_data, np.ndarray) else y_data.values
        self.feature = torch.FloatTensor(X_data)
        self.target = torch.LongTensor(y_data)

    def __len__(self):
        return self.feature.shape[0]

    def __getitem__(self, idx):
        return self.feature[idx], self.target[idx]

    def getInOut(self):
        return self.feature.shape[1], len(torch.unique(self.target))


In [45]:
trainDS = LangDataset(train_array[:, :-1], train_array[:, -1])
validDS = LangDataset(valid_array[:, :-1], valid_array[:, -1])
# testDS = LangDataset(test_array[:, :-1], test_array[:, -1])


In [46]:
batch = 2
trainDL = DataLoader(trainDS, batch_size=batch)
validDL = DataLoader(validDS, batch_size=batch)
# testDL = DataLoader(testDS, batch_size=batch)


In [47]:
in_, out_ = trainDS.getInOut()
node_list = [16, 8]
model = LangModule(in_, out_, node_list)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())


In [52]:
for feature, target in trainDL:
    print(feature, target)
    break


tensor([[0.0760, 0.0128, 0.0457, 0.0461, 0.1053, 0.0157, 0.0192, 0.0437, 0.0740,
         0.0017, 0.0054, 0.0538, 0.0263, 0.0775, 0.0897, 0.0165, 0.0000, 0.0777,
         0.0614, 0.0805, 0.0259, 0.0098, 0.0141, 0.0007, 0.0200, 0.0004],
        [0.0967, 0.0124, 0.0446, 0.0333, 0.1058, 0.0213, 0.0160, 0.0394, 0.0820,
         0.0010, 0.0044, 0.0449, 0.0274, 0.0785, 0.0724, 0.0212, 0.0018, 0.0654,
         0.0690, 0.0880, 0.0279, 0.0082, 0.0149, 0.0021, 0.0157, 0.0056]]) tensor([0, 0])


In [49]:
model


LangModule(
  (layers): ModuleList(
    (0): Linear(in_features=26, out_features=16, bias=True)
    (1): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Linear(in_features=16, out_features=8, bias=True)
    (4): BatchNorm1d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Linear(in_features=8, out_features=4, bias=True)
  )
)

In [50]:
def learning(model, dataset, mode="train"):
    if mode == "train":
        model.train()
    elif mode == "valid":
        model.eval()
    elif mode == "test":
        model.eval()
    else:
        raise ValueError("mode 값은 train, valid, test 중 하나여야 합니다.")

    loss_list = []
    for feature, target in dataset:
        pre_target = model(feature)
        loss = loss_fn(pre_target, target)
        loss_list.append(loss.item())
        if mode == "train":
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    loss = np.mean(loss_list)
    acc = metrics.accuracy(
        pre_target, target, task="multiclass", num_classes=out_
    ).item()
    f1 = metrics.f1_score(
        pre_target, target, task="multiclass", num_classes=out_, average="macro"
    ).item()
    return loss, acc, f1


In [51]:
epochs = 1000
train_list = [[], [], []]
valid_list = [[], [], []]
for epoch in range(epochs + 1):
    train_loss, train_acc, train_f1 = learning(model, trainDL, "train")
    train_list[0].append(train_loss)
    train_list[1].append(train_acc)
    train_list[2].append(train_f1)
    valid_loss, valid_acc, valid_f1 = learning(model, validDL, "valid")
    valid_list[0].append(valid_loss)
    valid_list[1].append(valid_acc)
    valid_list[2].append(valid_f1)

    print(f"[EPOCH {epoch}]")
    print(f"[TRAIN] loss: {train_loss:.4f} acc: {train_acc:.4f} f1: {train_f1:.4f}")
    print(f"[VALID] loss: {valid_loss:.4f} acc: {valid_acc:.4f} f1: {valid_f1:.4f}")


[EPOCH 0]
[TRAIN] loss: 1.5216 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.4331 acc: 0.0000 f1: 0.0000
[EPOCH 1]
[TRAIN] loss: 1.4887 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.4177 acc: 0.0000 f1: 0.0000
[EPOCH 2]
[TRAIN] loss: 1.4794 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.3538 acc: 0.0000 f1: 0.0000
[EPOCH 3]
[TRAIN] loss: 1.4677 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.2960 acc: 0.0000 f1: 0.0000
[EPOCH 4]
[TRAIN] loss: 1.4637 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.3454 acc: 0.0000 f1: 0.0000
[EPOCH 5]
[TRAIN] loss: 1.4560 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.5824 acc: 0.5000 f1: 0.3333
[EPOCH 6]
[TRAIN] loss: 1.4588 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.2081 acc: 0.5000 f1: 0.3333
[EPOCH 7]
[TRAIN] loss: 1.4341 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.1028 acc: 0.5000 f1: 0.3333
[EPOCH 8]
[TRAIN] loss: 1.4319 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.1449 acc: 0.5000 f1: 0.3333
[EPOCH 9]
[TRAIN] loss: 1.4349 acc: 0.0000 f1: 0.0000
[VALID] loss: 1.0898 acc: 0.5000 f1: 0.3333
[EPOCH 10]
[TRAIN] l

In [53]:
model(torch.FloatTensor(valid_array[:, :-1])).argmax(dim=1)


tensor([3, 3, 3, 3, 3, 3, 3, 3, 1, 1, 3, 1, 3, 3, 3, 1])