In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import datetime
from matplotlib.colors import ListedColormap

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import sys
import os

%matplotlib inline

class CustomDataset(Dataset):
    def __init__(self, file, scaler):
        self.df = pd.read_csv(file)
        self.sc = scaler

    def __len__(self):
        return self.df.shape[0]
    
    def __getitem__(self, idx):
        raw = self.df.iloc[idx].values
        if type(idx) == int:
            raw = raw.reshape(1, -1)
        raw = self.sc.transform(raw)
        data = torch.tensor(raw[:, :-1], dtype=torch.float32)
        label = torch.tensor(raw[:, -1], dtype=torch.float32)
        return data, label

In [17]:
np.random.seed(42)
torch.manual_seed(0)

def read_data(name,mode):
    x_data = []
    y_data = []
    label_data = []

    base_dir = os.getcwd()
    root = base_dir
    filename = '%s/%s_%s.csv' % (root,name,mode)

    i = 0
    with open(filename,'rt') as f:
        for line in f:
            line = line.replace('\n','')
            tokens = line.split(',')
            if i > 0:
                y = int(float(tokens[0]))
                x1 = float(tokens[1])
                x2 = float(tokens[2])
                x_data.append([1.0,x1,x2])
                y_data.append([y])
                temp = [0,0]
                temp[y] = 1
                label_data.append(temp)
            i = i + 1
    xs = np.array(x_data,dtype='float32')
    ys = np.array(y_data,dtype='float32')
    labels = np.array(label_data,dtype='float32')
    return(xs,ys,labels)

def draw_example(nodes,name,model):
    x_train, y_train, label_train = read_data(name,'train')
    x_test, y_test, label_test = read_data(name,'test')

    fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6,6*(1+0)))

    h = .1  # step size in the mesh
    cmap_light = ListedColormap(['#FFAAAA', '#AAFFAA', '#AAAAFF'])
    cmap_bold = ListedColormap(['#FF0000', '#00FF00', '#0000FF'])

    x_min, x_max = x_train[:, 1].min() - 1, x_train[:, 1].max() + 1
    y_min, y_max = x_train[:, 2].min() - 1, x_train[:, 2].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h),np.arange(y_min, y_max, h))
    Z = np.zeros([xx.shape[0],yy.shape[1]],dtype='float32')
    Z2 = np.zeros([nodes,xx.shape[0],yy.shape[1]],dtype='float32')
    grid_data = np.ones([1,3],dtype='float32')

    grid_data2 = np.zeros([1,3],dtype='float32')
    for i in range(xx.shape[0]):
        for j in range(yy.shape[1]):
            x = xx[i,j]
            y = yy[i,j]
            grid_data2[0,0] = 1.0
            grid_data2[0,1] = x
            grid_data2[0,2] = y

            x = torch.from_numpy(grid_data2)
            pred = model(x)
            if (pred[0].item()-6.0) > -4.5:
                yh = 1.0
            else:
                yh = 0.0
            Z[i,j] = yh
    ax.pcolormesh(xx, yy, Z, cmap=cmap_light)
    ax.scatter(x_test[:, 1], x_test[:, 2], c=y_test[:,0], cmap=cmap_bold)

FeedForward Model

In [None]:
device = torch.device("cpu")
# made with guidance from Pytorch tutorial
class FeedForward(nn.Module):
   def __init__(self):
      super(FeedForward, self).__init__()
      # first param is # of features in dataset
      # second param is # of hidden nodes
      self.linear1 = nn.Linear(3, 32) # input layer
      self.relu1 = nn.LeakyReLU()   # activation function \
                                    # used on nonlinearity after
      self.linear2 = nn.Linear(32, 16) # hidden layer
      self.relu2 = nn.LeakyReLU()   # activation function
      self.linear_out = nn.Linear(16, 1)  # output

   # forward pass where you just keep calculating with this one var
   # ^^ putting it through layers
   # also known as inference pass
   def forward(self, x):
      x = self.linear1(x)
      x = self.relu1(x)
      x = self.linear2(x)
      x = self.relu2(x)
      x = self.linear_out(x)
      return x
   
# the same result, just done sequentially
class moreAdvanced(nn.Module):
   def __init__(self, size_in, size_out, k, device=device):
      super(moreAdvanced, self).__init__()
      print(k)
      self.linear1 = nn.Linear(size_in, k)
      self.tanh1 = nn.Tanh()
      self.linear2 = nn.Linear(k, k//2)
      self.tanh2 = nn.Tanh()
      self.linear_out = nn.Linear(k//2, size_out)

   def forward(self, x):
      # print(x.shape)
      x = self.linear1(x)
      x = self.tanh1(x)
      x = self.linear2(x)
      x = self.tanh2(x)
      x = self.linear_out(x)
      # x = self.linearStack(x)
      return x

In [19]:

def train(dataloader, model, loss_func, optimizer):
    """
    A function to train sets

    PARAMETERS
    ----------
    dataloader      data

    model           model used

    loss_func       loss function to use

    optimizer       optimize data
    """
    model.train()
    train_loss = [] # keep track of loss -> learning curve

    now = datetime.datetime.now() # to track how long things take
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # make predictions and get error
        pred = model(X)
        loss = loss_func(pred, y.unsqueeze(1))

        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 10 == 0:
            loss, current = loss.item(), batch * len(X)
            iters = 10 * len(X)
            then = datetime.now()
            iter /= (then - now).total_seconds()
            # loss of every 10 batches:
            print(f"loss: {loss:>6f} [{current:>5d}/{17000}] ({iters:.1f} its/sec)")
            now = then
            train_loss.append(loss)

    return train_loss

def test(dataloader, model, loss_func):
    """
    A function to train sets

    PARAMETERS
    ----------
    dataloader      data

    model           model used

    loss_func       loss function to use
    """

    size = len(dataloader)
    num_batches = 170
    model.eval()
    test_loss = 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_func(pred, y.unsqueeze(1)).item()
    test_loss /= num_batches
    print(f"Avg Loss: {test_loss:>8f}\n")
    return test_loss

In [None]:
sc = MinMaxScaler()

train_data = CustomDataset('center_surround_train.csv', sc)
test_data = CustomDataset('center_surround_test.csv', sc)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=True)

k = [2, 3, 5, 7, 9]
for i in k:
    model = moreAdvanced(3, 1, i, device)
    ff = model.to(device)
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(ff.parameters(), lr=1e-3)
    epochs = 20
    train_loss = []
    test_loss = []
    for t in range(epochs):
        print(f"Epoch {t+1}\n-------------------------\n")
        losses = train(train_loader, ff, loss_func, optimizer)
        train_loss.append(losses)
        test_loss.append(test(test_loader, ff, loss_func))
draw_example(2, 'center_surround', ff)
draw_example(3, 'center_surround', ff)
draw_example(5, 'center_surround', ff)
draw_example(7, 'center_surround', ff)
draw_example(9, 'center_surround', ff)
print("done")

2
Epoch 1
-------------------------



NotFittedError: This MinMaxScaler instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.