In [None]:
import torch
import numpy as np
import pandas as pd
from transformers import AutoModel, AutoTokenizer
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split, cross_validate, StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

from utils import train_test

## Preprocess data

In [None]:
df = pd.read_feather('processed.feather')
print(f"Data frame shape: {df.shape}")

In [None]:
exclude_cols = [k for k, v in df['Category'].value_counts().items() if v < 500]
df = df.dropna()
data = df[~df['Category'].isin(exclude_cols)].copy()
train_df, test_df = train_test_split(data, test_size=0.2, random_state=42, shuffle=True)

le = LabelEncoder()

train_df['Category'] = le.fit_transform(train_df['Category'])
test_df['Category'] = le.transform(test_df['Category'])

print(f"train_df shape: {train_df.shape}")
print(f"train_df shape: {test_df.shape}")

## Helper function

In [None]:
class PhoBERT():
    phobert = None
    tokenizer = None

    def loadModel():
        if __class__.phobert is None or __class__.tokenizer is None:
            __class__.phobert = AutoModel.from_pretrained("vinai/phobert-base-v2")
            __class__.tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")
        return __class__.phobert, __class__.tokenizer

In [None]:
class NewsDataset(Dataset):
    def __init__(self, df, col):
        self.phobert, self.tokenizer = PhoBERT.loadModel()
        self.df = df[[col, 'Category']]
        self.device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
        self.phobert = self.phobert.to(self.device)

    def __len__(self,):
        return len(self.df)

    def __getitem__(self, index):
        sentence, label = self.df.iloc[index, :].values
        token_ids = self.tokenizer.encode(sentence, padding=True, truncation=True, max_length=256, return_tensors='pt').to(self.device)
        with torch.no_grad():
            features = self.phobert(token_ids)['pooler_output']
        return features.squeeze(0), torch.tensor(label)

## Create features

In [None]:
feature_col = 'Title'

train_data = NewsDataset(df=train_df,col=feature_col)
train_loader = DataLoader(train_data, batch_size=64)

test_data = NewsDataset(df=test_df,col=feature_col)
test_loader = DataLoader(test_data, batch_size=64)

train_features_lst = []
train_label_lst = []

for sent, label in tqdm(train_loader, desc="Create training data"):
    sentence = sent.detach().cpu().numpy() 
    label_idx = label.numpy()
    train_features_lst.append(sentence)
    train_label_lst.append(label_idx)

train_features = np.concatenate(train_features_lst)
train_labels = np.concatenate(train_label_lst)

print(train_features.shape, train_labels.shape)

test_features_lst = []
test_label_lst = []

for sent, label in tqdm(test_loader, desc="Create testing data"):
    sentence = sent.detach().cpu().numpy() 
    label_idx = label.numpy()
    test_features_lst.append(sentence)
    test_label_lst.append(label_idx)

test_features = np.concatenate(test_features_lst)
test_labels = np.concatenate(test_label_lst)

print(test_features.shape, test_labels.shape)

## Train - Test

In [None]:
model = SVC(max_iter=5000)

kfold = StratifiedKFold(n_splits=3)

scores = cross_validate(model, train_features, train_labels, cv=kfold, scoring=['f1_weighted', 'precision_weighted', 'recall_weighted', 'accuracy'], n_jobs=-1, verbose=2)
scores_df = pd.DataFrame(scores).T
scores_df['mean'] = scores_df.mean(axis=1)
scores_df

In [None]:
model = KNeighborsClassifier(n_neighbors=20)

kfold = StratifiedKFold(n_splits=3)

scores = cross_validate(model, train_features, train_labels, cv=kfold, scoring=['f1_weighted', 'precision_weighted', 'recall_weighted', 'accuracy'], n_jobs=-1, verbose=2)
scores_df = pd.DataFrame(scores).T
scores_df['mean'] = scores_df.mean(axis=1)
scores_df

In [None]:
model = LogisticRegression(max_iter=1000)

kfold = StratifiedKFold(n_splits=3)

scores = cross_validate(model, train_features, train_labels, cv=kfold, scoring=['f1_weighted', 'precision_weighted', 'recall_weighted', 'accuracy'], n_jobs=-1, verbose=2)
scores_df = pd.DataFrame(scores).T
scores_df['mean'] = scores_df.mean(axis=1)
scores_df

In [None]:
model = DecisionTreeClassifier()

kfold = StratifiedKFold(n_splits=3)

scores = cross_validate(model, train_features, train_labels, cv=kfold, scoring=['f1_weighted', 'precision_weighted', 'recall_weighted', 'accuracy'], n_jobs=-1, verbose=2)
scores_df = pd.DataFrame(scores).T
scores_df['mean'] = scores_df.mean(axis=1)
scores_df

In [None]:
model = RandomForestClassifier()

kfold = StratifiedKFold(n_splits=3)

scores = cross_validate(model, train_features, train_labels, cv=kfold, scoring=['f1_weighted', 'precision_weighted', 'recall_weighted', 'accuracy'], n_jobs=-1, verbose=2)
scores_df = pd.DataFrame(scores).T
scores_df['mean'] = scores_df.mean(axis=1)
scores_df

In [None]:
model = SVC(probability=True)
train_test(model, train_features, test_features, train_labels, test_labels)