In [1]:
import torch
import json
import numpy as np
import math
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset,  DataLoader
from torch import Tensor

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report, confusion_matrix
from sklearn import metrics
import pandas as pd


In [2]:
with open('dataset/set1_human.json') as f:
    set1_human = json.load(f)

with open('dataset/set1_machine.json') as f:
    set1_machine = json.load(f) 

print(len(set1_human), len(set1_machine))

122584 3500


In [3]:
with open('dataset/test.json') as f:
    testing_set = json.load(f) 

In [4]:
for data in set1_human:
    data['label'] = 1.0
for data in set1_machine:
    data['label'] = 0.0

In [5]:
human_labels = np.ones(len(set1_human))
machine_labels = np.zeros(len(set1_machine))

In [6]:
data = set1_human + set1_machine
labels = np.concatenate([human_labels, machine_labels])
train_data, test_data, _, _ = train_test_split(data, labels, test_size=0.1, stratify=labels)
print(len(train_data), len(test_data))

113475 12609


In [7]:
max_len = 256 
batch_size = 256 

class MyDataset(Dataset):
    def __init__(self, data):
        self.data = data
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):

        text = self.data[idx]['txt'][:max_len]
        text_len = len(text)
        text = torch.tensor(text)
        if text_len < max_len:
            text = torch.nn.functional.pad(text, (0, max_len - text_len), "constant", 0)
        label = torch.tensor(self.data[idx]['label'])

        return text, label

In [8]:
train_dataset = MyDataset(train_data)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = MyDataset(test_data)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [9]:
class Classifier(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size1, hidden_size2):
        super(Classifier, self).__init__()

        self.embedding = nn.Embedding(vocab_size, emb_size)

        self.classifier = nn.Sequential(
            nn.Linear(emb_size, hidden_size1),
            nn.ReLU(),
            nn.Linear(hidden_size1, hidden_size2),
            nn.ReLU(),
            nn.Linear(hidden_size2, 1),
        )

    def forward(self, text):
        """ text: (batch_size, max_len) """
        # i am a student
        # i       [....]
        # am      [....]
        # a       [....]
        # student [....]
        #         [    ]
        text_emb = self.embedding(text) # (batch_size, max_len, emb_size)
        text_emb = text_emb.mean(dim=1) # (batch_size, emb_size)
        output = self.classifier(text_emb)
        return output.squeeze() # from (batch_size, 1) to (batch_size,)


In [10]:
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


model = Classifier(5000, 256, 150, 100).to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-3)

criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(0.2, dtype=torch.float))

def train_one_epoch(model, train_loader):
    model.train()

    batch_loss = 0.0
    train_preds = []
    train_targets = []   

    for batch in train_loader:
        text, label = batch[0].to(device), batch[1].to(device)
        x_n = model(text)

        loss = criterion(x_n, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        batch_loss += loss.item()
        train_preds += torch.round(torch.sigmoid(x_n)).tolist()
        train_targets += label.tolist()
    
    batch_loss /= len(train_loader)
    train_f1 = f1_score(train_targets, train_preds, average='macro')

    return train_f1, batch_loss

def test(model, test_loader):
    model.eval()
    
    test_loss = 0.0 
    test_preds = []
    test_targets = []
    with torch.no_grad():
        for batch in test_loader:
            text, label = batch[0].to(device), batch[1].to(device)
            x_n = model(text)  
            loss = criterion(x_n, label)   

            test_loss += loss.item()
            test_preds += torch.round(torch.sigmoid(x_n)).tolist()
            test_targets += label.tolist()   
    test_loss /= len(test_loader)

    return test_loss, test_targets, test_preds


score = 0.
for epoch in range(num_epochs):
    train_f1, train_loss = train_one_epoch(model, train_loader)
    test_loss, test_targets, test_preds = test(model, test_loader)
    test_f1 = f1_score(test_targets, test_preds, average='macro')

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Train F1: {train_f1:.4f}, Test F1: {test_f1:.4f}')

    if test_f1 > score:
        score = test_f1
        torch.save(model.state_dict(), "classifier.pt")

KeyboardInterrupt: 

In [None]:
class TestDataset(Dataset):
    def __init__(self, data):
        self.data = data
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):

        text = self.data[idx]['txt'][:max_len]
        text_len = len(text)
        text = torch.tensor(text)
        if text_len < max_len:
            text = torch.nn.functional.pad(text, (0, max_len - text_len), "constant", 0)

        return text

In [None]:
test_dataset = TestDataset(testing_set)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
model.eval()
test_preds = []

with torch.no_grad():
    for batch in test_loader:

        text = batch.to(device)
        
        x_n = model(text)
        
        test_preds += torch.round(torch.sigmoid(x_n)).tolist()
        


In [None]:
y_pred = [int(num) for num in test_preds]

In [None]:
df = pd.DataFrame(y_pred, columns = ['Predicted'])

In [None]:
df.to_csv('out.csv')