In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import random
import pandas as pd
import numpy as np
import os
import csv
import cv2

from torchvision import transforms

from tqdm.auto import tqdm

from sklearn.preprocessing import RobustScaler

from sklearn.model_selection import train_test_split

import warnings
warnings.filterwarnings(action='ignore') 

# Funtion Define

In [43]:
def RemoveEmptyColumn(train_X, test):
    train_X = train_X.dropna(axis=1, how='all')
    test = test[train_X.columns]

    return train_X, test

def DropDuplicateColumns(train_X, test):
    train_X = train_X.loc[:,~train_X.T.duplicated(keep='first')]
    test = test[train_X.columns]

    return train_X, test

def RemoveOneValueColumn(train_X, test):
    for col in [x for x in train_X.columns if 'X_' in x]:
        if len(train_X[col].value_counts())==1:
            train_X = train_X.drop(col, axis=1)
        
    test = test[train_X.columns]

    return train_X, test

def ConcatProdLine(train_X, test):
    train_X['PROD_LINE'] = train_X['PRODUCT_CODE']+'_'+train_X['LINE']
    train_X = train_X.drop(['PRODUCT_CODE','LINE'],axis=1)
    test['PROD_LINE'] = test['PRODUCT_CODE']+'_'+test['LINE']
    test = test.drop(['PRODUCT_CODE','LINE'],axis=1)

    return train_X, test

def fillNa(train_X, test):
    train_X = train_X.fillna(0)
    test = test.fillna(0)
    
    return train_X, test

def DatascalingRobust(train_X, test):
    scaler = RobustScaler()

    num_features_train = [x for x in train_X.columns if "X" in x]
    #train_x.select_dtypes(exclude=['object']).columns.to_list()

    train_X[num_features_train] = scaler.fit_transform(train_X[num_features_train])
    test[num_features_train] = scaler.transform(test[num_features_train])

    return train_X, test

def OnehotEncoder(train_X, test):
    dummies_col = []
    for c in ['PROD_LINE']:
        df = pd.get_dummies(train_X[c])
        train_X[df.columns] = df
        train_X = train_X.drop(c, axis=1)
        df = pd.get_dummies(test[c])
        test[df.columns] = df
        test = test.drop(c, axis=1)
        dummies_col.extend(df.columns)
        
    return train_X, test, dummies_col

# CustomDataset

In [16]:
class CustomDataset(Dataset):
    def __init__(self, train_X, train_y):
        super(CustomDataset, self).__init__()
        self.train_X = train_X
        self.train_y = train_y
        
        self.train_X = torch.tensor(train_X.values, dtype=torch.float64)
        self.train_y = torch.tensor(train_y.values, dtype=torch.float64)

        
    def __len__(self):
        return len(self.train_X)
    
    def __getitem__(self, idx):
        x = self.train_X[idx]
        y = self.train_y[idx]
        return x, y

# Data Load

In [61]:
def load_data(df, mode):
    train_X = train_df.drop(columns=['Y_Quality', 'Y_Class', 'TIMESTAMP', 'PRODUCT_ID'])
    if mode == 'single':
        train_y = train_df['Y_Class']
    if mode == 'dual':
        train_y = train_df[['Y_Class', 'Y_Quality']]
        
    return train_X, train_y

In [62]:
train_df = pd.read_csv('C:/Users/mooha/Desktop/LG Aimers/open/train.csv')
test_df = pd.read_csv('C:/Users/mooha/Desktop/LG Aimers/open/test.csv')

In [63]:
train_X, train_y = load_data(train_df, 'dual')
test = test_df.drop(columns=['TIMESTAMP', 'PRODUCT_ID'])

train_X, val_X, train_y, val_y = train_test_split(train_X, train_y, test_size=0.2, random_state=42, stratify=train_df['Y_Class'])

# Data Preprocessing

In [66]:
# 우선은 test 대신에 val_X 로 대체 
train_X, val_X = RemoveEmptyColumn(train_X, val_X)
train_X, val_X = DropDuplicateColumns(train_X, val_X)
train_X, val_X = RemoveOneValueColumn(train_X, val_X)
train_X, val_X = ConcatProdLine(train_X, val_X)
train_X, val_X = fillNa(train_X, val_X)
train_X, val_X = DatascalingRobust(train_X, val_X)
train_X, val_X, prod_dum = OnehotEncoder(train_X, val_X)

위 데이터 전처리시 train_X 와 val_X 의 number of columns 가 다름. PROD_LINE 에서 val_X 가 하나 부족.

In [None]:
train_dataset = CustomDataset(train_X, train_y)
train_loader = DataLoader(train_dataset, batch_size = 8, shuffle=True, num_workers=0)

val_dataset = CustomDataset(val_X, val_y)
val_loader = DataLoader(val_dataset, batch_size = 8, shuffle=False, num_workers=0)

# Model Define

In [2]:
class Multiclass(nn.Module):
    def __init__(self):
        super().__init__()
        self.hidden = nn.Linear(1626, 8)
        self.Classification = nn.Linear(8, 1)
        self.Regression = nn.Linear(8, 1)

    def forward(self, x):
        x = self.hidden(x)
        y1= self.Classification(x)
        y2= self.Regression(x)
        return y1, y2

# Train

In [71]:
from sklearn.metrics import mean_squared_error 
EPOCHS = 1000
best_acc = - np.inf   # init to negative infinity

def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    loss_fn1 = nn.CrossEntropyLoss().to(device) # 다중클래스분류 손실 함수
    loss_fn2 = mean_squared_error().to(device) # 회귀 손실 함수
    
    model.train()
    
    for epoch in range(1, EPOCHS):
        train_loss = []
        for x, y1, y2 in tqdm(iter(train_loader)):
            x = x.to(device)
            y1 = y1.to(device)
            y2 = y2.to(device)
            
            optimizer.zero_grad()   # clear gradients 
            pre_class, pre_regression = model(x)
            
            loss = loss_fn1(pre_class, y1)
            loss2 = loss_fn2(pre_regression, y2)
            loss_total = loss + loss2
            
            loss_total.backward()
            
            optimizer.step()
            
            train_loss.append(loss_total.item())
            
        _val_loss, _val_acc = validation(model, loss_fn1, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val ACC : [{_val_acc:.5f}]')
        
        if scheduler is not None:
            scheduler.step(_val_acc)
            
        if best_val_acc < _val_acc:
            best_val_acc = _val_acc
            best_model = model
        
    return best_model

# Validation

In [70]:
def validation(model, ClassCriterion, val_loader, device):
    model.eval()
    val_loss = []
    val_acc = []
    for x, y1, y2 in tqdm(iter(val_loader)):
        x = x.to(device)
        y1 = y1.to(device)
        y2 = y2.to(device)
        
        probs,_ = model(x)
        
        loss = ClassCriterion(probs, y1)
        
        probs  = probs.cpu().detach().numpy()
        y1 = y1.cpu().detach().numpy()
        
        preds = probs > 0.5
        
        batch_acc = (y1 == preds).mean()
        
        val_acc.append(batch_acc)
        val_loss.append(loss.item())
        
    _val_loss = np.mean(val_loss)
    _val_acc = np.mean(val_acc)
    
    return _val_loss, _val_acc