<a href="https://colab.research.google.com/github/alchua1996/DeepLearning/blob/master/Fashion_MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import Dataset, TensorDataset, DataLoader,SubsetRandomSampler
from torchvision.datasets import FashionMNIST
from torchvision import transforms as tfs
from sklearn.preprocessing import StandardScaler
from PIL import Image

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg


In [None]:
def L(r,j): #low pass
    if(2**j * r <= np.pi/2):
        out = 1
    elif(np.pi/2 <= 2**j * r <= np.pi):
        out = np.cos(np.pi/2 * np.log2(2* 2**(j) * r/np.pi))
    else:
        out = 0
    return out

def H(r,j): #high pass
    if(2**j * r <= np.pi/2):
        out = 0
    elif(np.pi/2 <= 2**j * r <= np.pi/4):
        out = np.cos(np.pi/2 * np.log2(2**(j) * r/np.pi))
    else:
        out = 1
    return out

def G_q(theta, Q,q): #directional cones
    alpha = 2**(Q - 1) * np.math.factorial(Q-1) / np.sqrt(Q*np.math.factorial(2*(Q-1)))
    r1 = theta - np.pi * q/Q
    if(r1 < -np.pi):
        r1 += 2*np.pi
    r2 = theta - np.pi * (q - Q)/Q
    if(r2 > np.pi):
        r2 -= 2*np.pi
    if(np.abs(r1) <= np.pi/2 and np.abs(r2) <= np.pi/2):
        out = np.cos(r1)**(Q-1) + np.cos(r2)**(Q-1)
    elif(np.abs(r1) <= np.pi/2 and np.abs(r2) > np.pi/2):
        out = np.cos(r1)**(Q-1)
    elif(np.abs(r1) > np.pi/2 and np.abs(r2) <= np.pi/2):
        out = np.cos(r2)**(Q-1)
    else:
        out = 0
    return alpha * out

In [None]:
def polar_grid(sample_rate_x, sample_rate_y): 
    grid = np.zeros((sample_rate_x,sample_rate_y,2), dtype = 'float')
    x = np.linspace(-np.pi, np.pi, num=sample_rate_x, endpoint=False)
    y = np.linspace(-np.pi, np.pi, num=sample_rate_y, endpoint=False)
    xx, yy = np.meshgrid(x, y, sparse=False, indexing='ij')
    rr = np.sqrt(xx**2 + yy**2)
    theta = np.arctan2(yy,xx)
    grid[:,:,0] = rr
    grid[:,:,1] = theta
    return grid 

In [None]:
def L_grid(grid, J=0):
    N = grid.shape[0]
    M = grid.shape[1]
    L_matrix = np.zeros((N,M), dtype = 'float')
    for n in range(N):
        for m in range(M):
            L_matrix[n,m] = L(grid[n,m,0],J)
    return L_matrix

def H_grid(grid, j=0):
    N = grid.shape[0]
    M = grid.shape[1]
    H_matrix = np.zeros((N,M))
    for n in range(N):
        for m in range(M):
            H_matrix[n,m] = H(grid[n,m,0],j)
    return H_matrix

def G_grid(grid,Q):
    N = grid.shape[0]
    M = grid.shape[1]
    G_matrix = np.zeros((N,M,Q))
    for n in range(N):
        for m in range(M):
            for q in range(Q):
                G_matrix[n,m,q] = G_q(grid[n,m,1], Q, q)
    return G_matrix            

def psi_grid(grid, Q, J):
    N = grid.shape[0]
    M = grid.shape[1]
    psi_matrix = np.zeros((N,M,Q,J))
    G = G_grid(grid,Q)
    for j in range(J):
        L = L_grid(grid, j)
        H = H_grid(grid, j+1)
        for q in range(Q):
            psi_matrix[:,:,q,j] = G[:,:,q] * L * H     
    return psi_matrix

In [None]:
def conv_frequency(img, filter):
    convolved_img = np.fft.ifft2(np.fft.fft2(img) * np.fft.fftshift(filter))
    return convolved_img

In [38]:
def get_next_layer(coeffs,filters,num_angles, d):
    #current number of wavelet coefficients in coeffs array
    num_coeffs = num_angles**(d-1)
    N = coeffs.shape[0]
    count = 0
    if(d == 1):
        next_layer = np.zeros((N,N, num_angles), dtype = 'complex')
        for m in range(num_angles):
            next_layer[:,:,count] = conv_frequency(coeffs, filters[:,:,m])
            count += 1 
    else:
        next_layer = np.zeros((N,N, num_coeffs * num_angles), dtype = 'complex')
        count = 0
        for n in range(num_coeffs):
            for m in range(num_angles):
                next_layer[:,:,count] = conv_frequency(coeffs[:,:,n], filters[:,:,m])
                count += 1 
    return next_layer

In [39]:
def wavelet_scattering_transform(img, psi, phi, num_angles, depth):
    #number of wavelet coefficients
    num_coeff = 0
    for n in range(depth+1):
        num_coeff += num_angles**n

    #wavelet coefficient index
    count = 0

    #initialize array to store all wavelet coefficients
    wavelet_coefficients = np.zeros((img.shape[0],img.shape[0],num_coeff), dtype = 'complex')

    for d in range(depth+1):
        if d == 0:
            curr_layer = img 
            wavelet_coefficients[:,:,count] = conv_frequency(curr_layer, phi)
            count+=1
        else: 
            next_layer = get_next_layer(curr_layer, psi[:,:,:,d-1], num_angles, d)
            next_layer = np.abs(next_layer)
            for n in range(num_angles**d):
                wavelet_coefficients[:,:,count] = conv_frequency(next_layer[:,:,n],phi)
                count += 1 
            curr_layer = next_layer        
    return wavelet_coefficients

In [40]:
def downsample(x):
    x_down = np.zeros(int(len(x)/2))
    for n in range(len(x)):
        if (n % 2 == 0):
            x_down[int(n/2)] = x[n]
    return x_down

In [55]:
depth  = 2
num_angles = 4
grid = polar_grid(28, 28)
psi = psi_grid(grid, num_angles, depth)
phi = L_grid(grid, depth)

num_coeff = 0
for n in range(depth+1):
    num_coeff += num_angles**n

In [56]:
TRAIN_BATCH = 100
VAL_BATCH = 100
TEST_BATCH = 1

# Transform data to PIL images
transforms = tfs.Compose([tfs.ToTensor()])

# Train/Val Subsets
train_mask = range(50000)
val_mask = range(50000, 60000)

# Download/Load Dataset
train_dataset = FashionMNIST('./data', train=True, transform=transforms, download=True)
test_dataset = FashionMNIST('./data', train=False, transform=transforms, download=True)

train_loader = DataLoader(train_dataset, batch_size=len(train_dataset))
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset))

train_feat = next(iter(train_loader))[0].numpy()
train_feat.reshape((len(train_dataset),28,28))
test_feat = next(iter(test_loader))[0].numpy()
test_feat.reshape((len(test_dataset),28,28))

train_labels = next(iter(train_loader))[1]
test_labels = next(iter(test_loader))[1]

In [57]:
train_transform = np.zeros((train_feat.shape[0], int(28*28*num_coeff/2)))
for n in range(train_feat.shape[0]):
    sample = train_feat[n,:,:].reshape((28,28))
    trans_sample = np.real(wavelet_scattering_transform(sample, psi, phi, num_angles, depth))
    trans_sample = np.reshape(trans_sample,28*28*num_coeff)
    downsample_x = downsample(trans_sample)
    train_transform[n,:] = downsample_x 

test_transform = np.zeros((test_feat.shape[0], int(28*28*num_coeff/2)))
for n in range(test_feat.shape[0]):
    sample = test_feat[n,:,:].reshape((28,28))
    trans_sample = np.real(wavelet_scattering_transform(sample, psi, phi, num_angles, depth))
    trans_sample = np.reshape(trans_sample,28*28*num_coeff)
    downsample_x = downsample(trans_sample)
    test_transform[n,:] = downsample_x 



In [None]:
# from sklearn.decomposition import PCA
# train_pca = PCA(train_transform)
# test_pca = train_pca *


In [58]:
train_transform_T = torch.Tensor(train_transform)
test_transform_T = torch.Tensor(test_transform)
train_data = TensorDataset(train_transform_T, train_labels)
test_data = TensorDataset(test_transform_T, test_labels)

In [59]:
# Data Loaders
train_load = DataLoader(train_data, batch_size=TRAIN_BATCH, sampler=SubsetRandomSampler(train_mask))
val_load = DataLoader(train_data, batch_size=VAL_BATCH, sampler=SubsetRandomSampler(val_mask))
test_load = DataLoader(test_data, batch_size=TEST_BATCH)

In [79]:
class ANN(nn.Module):
    def __init__(self, nin, nout):
        super(ANN, self).__init__()
        self.dropout = nn.Dropout(0.2)
        self.Linear1 = nn.Linear(nin, 100)
        self.Linear2 = nn.Linear(100, 25)
        self.Linear3 = nn.Linear(25, nout)

    def forward(self, x):
        x = F.relu(self.Linear1(x))
        x = self.dropout(x)
        x = F.relu(self.Linear2(x))
        x = self.dropout(x)
        x = F.relu(self.Linear3(x))
        return F.softmax(x, dim = -1)
    
    def predict(self, x):
        x = F.relu(self.Linear1(x))
        x = F.relu(self.Linear2(x))
        x = F.relu(self.Linear3(x))
        return F.softmax(x, dim = -1)

def weights_init(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform(m.weight.data)

In [72]:
def train_eval(epoch):
    correct = 0
    total = 0
    loss_sum = 0
    for data, labels in train_load:
        outputs = model(data)
        predicted = torch.argmax(outputs.data,-1)
        total += labels.size(0)
        correct += (predicted.float() == labels.float()).sum()
        loss_sum += loss_function(outputs,labels)
        
    if (epoch+1) % 1 == 0:
        print("Epoch {}:".format(epoch+1))
        print('Train accuracy: %f %%' % (100.0 * correct / total))
        print('Train loss: %f' % (loss_sum.data.numpy().item() / total))

    #return 100.0 * correct / total, loss_sum.data.numpy().item() / total
    return
    
def val_eval(epoch):
    correct = 0
    total = 0
    loss_sum = 0
    for data, labels in val_load:
        outputs = model.predict(data)
        predicted = torch.argmax(outputs.data,-1)
        total += labels.size(0)
        correct += (predicted.float() == labels.float()).sum()
        loss_sum += loss_function(outputs,labels)

    if (epoch+1) % 1 == 0:
        print('Validation accuracy: %f %%' % (100.0 * correct / total))
        print('Validation loss: %f' % (loss_sum.data.numpy().item() / total))

    return 

In [None]:
model = ANN(int(28*28*num_coeff/2), 10)
weights_init(model)

loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = 0.001)

for epoch in range(100):
    tot_loss = 0.0
    for i, (features, labels) in enumerate(train_load):
        optimizer.zero_grad() 
        output = model(features)
        loss = loss_function(output, labels)
        loss.backward()
        optimizer.step()
    tot_loss += loss.item()
    train_eval(epoch)
    val_eval(epoch)

Epoch 1:
Train accuracy: 71.167999 %
Train loss: 0.017560
Validation accuracy: 72.290001 %
Validation loss: 0.017391
Epoch 2:
Train accuracy: 79.162003 %
Train loss: 0.016753
Validation accuracy: 80.360001 %
Validation loss: 0.016585
Epoch 3:
Train accuracy: 80.578003 %
Train loss: 0.016593
Validation accuracy: 81.540001 %
Validation loss: 0.016467
Epoch 4:
Train accuracy: 81.253998 %
Train loss: 0.016517
Validation accuracy: 82.080002 %
Validation loss: 0.016410
Epoch 5:
Train accuracy: 81.472000 %
Train loss: 0.016487
Validation accuracy: 82.230003 %
Validation loss: 0.016395
Epoch 6:
Train accuracy: 82.012001 %
Train loss: 0.016420
Validation accuracy: 82.550003 %
Validation loss: 0.016331
Epoch 7:
Train accuracy: 83.047997 %
Train loss: 0.016329
Validation accuracy: 83.660004 %
Validation loss: 0.016241
Epoch 8:
Train accuracy: 84.150002 %
Train loss: 0.016211
Validation accuracy: 84.750000 %
Validation loss: 0.016132
Epoch 9:
Train accuracy: 85.323997 %
Train loss: 0.016103
Valida

In [63]:
correct = 0
total = 0
for data, labels in test_load:
    outputs = model.predict(data)
    predicted = torch.argmax(outputs.data,-1)
    total += labels.size(0)
    correct += (predicted.float() == labels.float()).sum()
print('Test accuracy: %f %%' % (100.0 * correct / total))



Test accuracy: 88.139999 %
