In [34]:
import random
import pandas as pd
import numpy as np
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
from sklearn.metrics import f1_score

In [35]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [36]:
# Hyper parameter

EPOCHS = 400
LR = 1e-2
BS = 16384
SEED = 41

시드 고정

In [37]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PythonHashSeed'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

데이터 로드

In [38]:
train_df = pd.read_csv('./data/train.csv')
val_df = pd.read_csv('./data/val.csv')
test_df = pd.read_csv('./data/test.csv')

train_df = train_df.drop(columns = ['ID'])
val_df = val_df.drop(columns= ['ID'])

In [39]:
class MyDataset(Dataset):
    def __init__(self, df, eval_mode):
        self.df = df
        self.eval_mode = eval_mode
        if self.eval_mode:
            self.labels = self.df['Class'].values
            self.df = self.df.drop(columns = ['Class']).values
        else:
            self.df = self.df.values
    
    def __getitem__(self, index):
        if self.eval_mode:
            self.x = self.df[index]
            self.y = self.labels[index]
            return torch.Tensor(self.x), self.y
        else:
            self.x = self.df[index]
            return torch.Tensor(self.x)
        
    def __len__(self):
        return len(self.df)

In [40]:
train_dataset = MyDataset(df = train_df, eval_mode = False)
train_loader = DataLoader(train_dataset, batch_size = BS, shuffle = True, num_workers = 6)

val_dataset = MyDataset(df = val_df, eval_mode = True)
val_loader = DataLoader(val_dataset, batch_size=BS, shuffle = False, num_workers = 6)

1D AutoEncoder

In [41]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.Encoder = nn.Sequential(nn.Linear(30,64), nn.BatchNorm1d(64), nn.LeakyReLU(), nn.Linear(64,128), nn.BatchNorm1d(128), nn.LeakyReLU(), )
        self.Decoder = nn.Sequential(nn.Linear(128, 64), nn.BatchNorm1d(64), nn.LeakyReLU(), nn.Linear(64,30), )
    
    def forward(self, x):
        x = self.Encoder(x)
        x = self.Decoder(x)
        return x

In [42]:
class Trainer():
    def __init__(self, model, optimizer, train_loader, val_loader, scheduler, device):
        self.model = model
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.scheduler = scheduler
        self.device = device
        
        # Loss Function
        
        self.criterion = nn.L1Loss().to(self.device)
        
        
    def fit(self, ):
        self.model.to(self.device)
        best_score = 0
        for epoch in range(EPOCHS):
            self.model.train()
            train_loss = []
            for x in iter(self.train_loader):
                x = x.float().to(self.device)
                self.optimizer.zero_grad()
                
                _x = self.model(x)
                loss = self.criterion(x, _x)
                
                loss.backward()
                self.optimizer.step()
                
                train_loss.append(loss.item())
                
            score = self.validation(self.model, 0.95)
            print(f'Epoch : [{epoch}] Train loss : [{np.mean(train_loss)}] Val Score : [{score}]')
            
            if self.scheduler is not None:
                self.scheduler.step(score)
                
            if best_score < score:
                best_score = score
                torch.save(model.module.state_dict(), './best_model.pth', _use_new_zipfile_serialization = False)
                
    def validation(self, eval_model, thr):
        cos = nn.CosineSimilarity(dim = 1, eps = 1e-6)
        eval_model.eval()
        pred = []
        true = []
        with torch.no_grad():
            for x, y in iter(self.val_loader):
                x = x.float().to(self.device)
                
                _x = self.model(x)
                diff = cos(x, _x).cpu().tolist()
                batch_pred = np.where(np.array(diff) < thr, 1,0).tolist()
                pred += batch_pred
                true += y.tolist()
                
        return f1_score(true, pred,average = 'macro')

모델 학습

In [43]:
model = nn.DataParallel(AutoEncoder())
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = LR)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode = 'max', factor = 0.5, patience = 10, threshold_mode = 'abs', min_lr = 1e-8, verbose = True)

trainer = Trainer(model, optimizer, train_loader, val_loader, scheduler, device)
trainer.fit()

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/dataly/opt/anaconda3/envs/pytorch_env/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/dataly/opt/anaconda3/envs/pytorch_env/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'MyDataset' on <module '__main__' (built-in)>


KeyboardInterrupt: 

In [None]:
model = AutoEncoder()
model.load_state_dict(torch.load('./best_model.pth'))
model = nn.DataParallel(model)
model.eval()

In [None]:
test_df = test_df.drop(columns = ['ID'])

In [None]:
test_dataset = MyDataset(test_df, False)
test_loader = DataLoader(test_dataset, batch_size = BS, shuffle = False, num_workers = 6)

In [None]:
def prediction(model, thr, test_loader, device):
      model.to(device)
  model.eval()
  cos = nn.CosineSimilarity(dim = 1, eps = 1e-6)
  pred = []
  with torch.no_grad():
    for x in iter(test_loader):
      x = x.float().to(device)

      _x = model(x)

      diff = cos(x, _x).cpu().tolist()
      batch_pred = np.where(np.array(diff) < thr, 1, 0).tolist()
      pred += batch_pred
    return pred

In [None]:
preds = predicition(model, 0.95, test_loader, device)

In [None]:
submit = pd.read_csv('./sample_submission')
submit['Class'] = preds
submit.to_csv('./autoencoder_submit.csv', index = False)