In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch.optim import SGD
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

seed = 3047
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class SVM(nn.Module):
  def __init__(self):
    # TODO design your model
    super(SVM, self).__init__() 
    self.w = nn.Parameter(torch.randn((1, 32)).to(torch.float32))
    self.f = nn.Sequential(
                  nn.Linear(107, 32),
                  nn.Dropout(0.5),
                )
  def transform(self, x):
    x = self.f(x)
    return x
  def kernel(self, x):
    pass
  def forward(self, x):
    f = torch.matmul(self.transform(x), self.w.T)
    
    return f

In [3]:
class HingeLoss(nn.Module):
  def __init__(self, C):
    super(HingeLoss, self).__init__()  
    self.C = C
  def forward(self, y, f):
    loss = 0
    for i in range(len(y)):
      loss = loss + max(0, 1-y[i]*f[i])  # define Hinge loss
    loss = loss * self.C
    return loss


In [4]:
def cal_mu_std():
  X = pd.read_csv("./dataset/train.csv")
  mu = X.drop(['y'], axis=1).mean() # The mean of whole features except label y
  std = X.drop(['y'], axis=1).std() # The std of whole features except label y

  return mu, std

class TrainDataset(Dataset):
  def __init__(self, split, mu=None, std=None):
    X = pd.read_csv(f"{split}.csv")
    
    Y = X['y'].values.reshape(-1) * 2 - 1
    self.mu, self.std = mu, std
    # print(mu, std)
    X = self.normalize(X.drop(['y'], axis=1), self.mu, self.std)
    X = np.concatenate((X, np.ones((X.shape[0], 1))), 1)
    self.Y = torch.from_numpy(Y).to(torch.float32)
    self.X = torch.from_numpy(X).to(torch.float32)

  def normalize(self, X, mu=None, std=None):
    X = (X-mu)/std
    
    return X
  
  def __len__(self):
    return self.X.size(0)

  def __getitem__(self, idx):
    return self.X[idx], self.Y[idx]

class TestDataset(Dataset):
  def __init__(self, mu, std):
    X = pd.read_csv("./dataset/X_test")
    X = self.normalize(X, mu, std)
    X = np.concatenate((X, np.ones((X.shape[0], 1))), 1)
    self.X = torch.from_numpy(X).to(torch.float32)

  def normalize(self, X, mu_x, std_x):
    X = (X-mu_x)/std_x
    
    return X
  
  def __len__(self):
    return self.X.size(0)

  def __getitem__(self, idx):
    return self.X[idx]

In [5]:
def train(train_data, val_data, model, optim, C, device='cuda:0'):
    epoch = 100
    objective = HingeLoss(C)
    steps = 0
    best = 0

    for e in range(epoch):
      train_total_loss = 0
      for tr in train_data:
        steps += 1
        x_train, y_train = tr
        x_train, y_train = x_train.to(device), y_train.to(device)
        pred = model(x_train).squeeze(1)
        loss = objective(pred, y_train) + 1 / 2 * torch.sum(model.w[:-1] ** 2)
        
        optim.zero_grad()
        loss.backward() #retain_graph=True
        optim.step()

        train_total_loss += (loss.item() / len(train_data))

        if steps % 100 == 0:
          model.eval()
          with torch.no_grad():
            acc = []
            for val in val_data:
              x_val, y_val = val
              x_val , y_val = x_val.to(device), y_val.to(device)
              pred = model(x_val).squeeze(1)
              pred = (pred > 0) * 2 - 1
              
              result = (y_val == pred)
              acc += [(float(result.sum()) / result.size(0))]
            acc = sum(acc) / len(acc)
            print(f'Steps {steps}| Train Loss = {train_total_loss}| Val acc = {acc}')
            if acc > best:
              torch.save(model.state_dict(), './model/best_handcraft.ckpt')
              best = acc        
          model.train()
    return model

In [6]:
lr = 0.01
batch = 32
C = 1
device = 'cuda:0'

In [7]:
mu, std = cal_mu_std()
trainset = TrainDataset('./dataset/train', mu, std)
devset = TrainDataset('./dataset/val', mu, std)
testset = TestDataset(mu, std)

train_dataloader = DataLoader(trainset, batch, True, drop_last=False)
val_dataloader = DataLoader(devset, 1, False)
test_dataloader = DataLoader(testset, 1, False)

model = SVM().to(device)
model.train()
optim = SGD(model.parameters(), lr)
model = train(train_dataloader, val_dataloader, model, optim, C, device)

Steps 100| Train Loss = 4.254057230371418| Val acc = 0.8136666666666666
Steps 200| Train Loss = 6.514630773882849| Val acc = 0.8296666666666667
Steps 300| Train Loss = 8.187162278018475| Val acc = 0.8356666666666667
Steps 400| Train Loss = 10.377509416181812| Val acc = 0.8366666666666667
Steps 500| Train Loss = 12.141175177448241| Val acc = 0.826
Steps 600| Train Loss = 13.998730886808211| Val acc = 0.8223333333333334
Steps 700| Train Loss = 15.976334617767511| Val acc = 0.8116666666666666
Steps 800| Train Loss = 18.100007889848776| Val acc = 0.837
Steps 900| Train Loss = 20.24538290113599| Val acc = 0.7943333333333333
Steps 1000| Train Loss = 1.5157028684368377| Val acc = 0.8183333333333334
Steps 1100| Train Loss = 3.500360795687804| Val acc = 0.822
Steps 1200| Train Loss = 5.8324105050656705| Val acc = 0.7996666666666666
Steps 1300| Train Loss = 7.9660625981562045| Val acc = 0.8176666666666667
Steps 1400| Train Loss = 10.00904294448497| Val acc = 0.808
Steps 1500| Train Loss = 12.167

KeyboardInterrupt: 

In [None]:
best_model = model
best_model.load_state_dict(torch.load('./model/best_handcraft.ckpt'))
best_model = best_model.eval()

y_test = []
for x in test_dataloader:
  x = x.to(device)
  y = best_model(x)
  y_test.append(((y > 0) * 1).item())



In [None]:
import csv
with open('./testing_result/predict_handcraft.csv', 'w', newline='') as csvf:
    # 建立 CSV 檔寫入器
    writer = csv.writer(csvf)
    writer.writerow(['id','label'])
    for i in range(len(y_test)):
      writer.writerow( [i + 1, int(y_test[i])] )