<a href="https://colab.research.google.com/github/KaushikRoy94/Work/blob/master/cifar10_project_CNN_ResNet_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [75]:
#!/usr/bin/env python

"""
    convnet/prep.py
    
    Grabs the first two classes of CIFAR10 and saves them as numpy arrays
    
    Since we don't assume everyone has access to GPUs, we do this to create
    a dataset that can be trained in a reasonable amount of time on a CPU.
"""

import os
import sys
import numpy as np
import json
import argparse
import math
from math import ceil
from time import time
from keras.initializers import RandomUniform
import torch
import keras.backend as K
from torchvision import transforms, datasets
import numpy as np
from keras import layers
import keras
from keras.losses import sparse_categorical_crossentropy
from keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D
from keras.models import Model, load_model
from keras.preprocessing import image
from keras.utils import layer_utils
from keras.utils.data_utils import get_file
from keras.applications.imagenet_utils import preprocess_input
import pydot
from IPython.display import SVG
from keras.utils.vis_utils import model_to_dot
from keras.utils import plot_model
import scipy.misc
from matplotlib.pyplot import imshow
%matplotlib inline
from keras.optimizers import SGD
from keras.preprocessing.image import ImageDataGenerator
from keras.utils.vis_utils import plot_model


if __name__ == "__main__":
    
    # --
    # Load data
    
    print('prep.py: dowloading cifar10', file=sys.stderr)
    trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transforms.ToTensor())
    testset  = datasets.CIFAR10(root='./data', train=False, download=True, transform=transforms.ToTensor())
    
    X_train, y_train = zip(*[(x, y) for x, y in trainset if y <= 1])
    X_test, y_test   = zip(*[(x, y) for x, y in testset if y <= 1])
    
    X_train = np.array(torch.stack(X_train)).astype(np.float32)
    X_test  = np.array(torch.stack(X_test)).astype(np.float32)
    y_train = np.array(y_train).astype(np.int64)
    y_test  = np.array(y_test).astype(np.int64)
    
    # --
    # Scale and center data
    
    X_mean = X_train.transpose(1, 0, 2, 3).reshape(3, -1).mean(axis=-1).reshape(1, 3, 1, 1)
    X_std = X_train.transpose(1, 0, 2, 3).reshape(3, -1).std(axis=-1).reshape(1, 3, 1, 1)
    
    X_train = (X_train - X_mean) / X_std
    X_test  = (X_test - X_mean) / X_std
    
    # --
    # Save to file
    
    os.makedirs('data/cifar2', exist_ok=True)
    
    print('prep.py: saving to data/cifar2', file=sys.stderr)
    np.save('data/cifar2/X_train.npy', X_train)
    np.save('data/cifar2/X_test.npy', X_test)
    
    np.save('data/cifar2/y_train.npy', y_train)
    np.save('data/cifar2/y_test.npy', y_test)

prep.py: dowloading cifar10


Files already downloaded and verified
Files already downloaded and verified


prep.py: saving to data/cifar2


In [80]:

"""
    convnet/main.py
"""

# --
# User code
# Note: Depending on how you implement your model, you'll likely have to change the parameters of these
# functions.  They way their shown is just one possble way that the code could be structured.
 

def identity_block(X, filters, input_channels):
    
    # Save the input value
    X_shortcut = X

    # Component of main path
    scale = math.sqrt(1 / (9 * input_channels))
    X = ZeroPadding2D((1,1))(X)
    X = Conv2D(filters=filters, kernel_size=(3, 3), strides=(1, 1), kernel_initializer=RandomUniform(minval=-scale, maxval=scale, seed=None), use_bias=False)(X)
    print(X.shape)
    X = BatchNormalization( momentum=0.99,epsilon=0.001, beta_initializer='zeros', gamma_initializer='ones',  moving_mean_initializer='zeros', moving_variance_initializer='ones')(X)
    X = Activation('relu')(X)
    
    # Final step: Add shortcut value to main path
    X = Add()([X, X_shortcut])

    return X
  
def convolutional_block(X, filters, input_channels):

    # Save the input value
    X_shortcut = X

    # Component of main path
    scale = math.sqrt(1 / (9 * input_channels))
    X = ZeroPadding2D((1,1))(X)
    X = Conv2D(filters=filters, kernel_size=(3, 3), strides=(1, 1), kernel_initializer=RandomUniform(minval=-scale, maxval=scale, seed=None), use_bias=False)(X)
    print(X.shape)
    X = BatchNormalization( momentum=0.99,epsilon=0.001, beta_initializer='zeros', gamma_initializer='ones',  moving_mean_initializer='zeros', moving_variance_initializer='ones')(X)
    X = Activation('relu')(X)

    # Shortcut path
    scale = math.sqrt(1 / ( input_channels))
    X_shortcut = Conv2D(filters=filters, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_initializer=RandomUniform(minval=-scale, maxval=scale, seed=None), use_bias=False)(X_shortcut)
    
    # Final step: Add shortcut value to main path
    X = Add()([X, X_shortcut])
    
    return X
  
  
 
def make_model(input_channels, output_classes, scale_alpha, optimizer, lr, momentum):

    # ... your code here ...
  
    image_shape = (32, 32, input_channels)
    inputs = Input(image_shape)
    
    # Stage 1
    scale = math.sqrt(1 / (9 * input_channels))
    x=ZeroPadding2D((1,1))(inputs)
    x = Conv2D(filters=16, kernel_size=(3, 3), strides=(1, 1), kernel_initializer=RandomUniform(minval=-scale, maxval=scale, seed=None), use_bias=False)(x)
    print(x.shape)
    x = BatchNormalization( momentum=0.99,epsilon=0.001, beta_initializer='zeros', gamma_initializer='ones',  moving_mean_initializer='zeros', moving_variance_initializer='ones')(x)
    x = Activation('relu')(x)
    
    # Stage 2
    x = convolutional_block(x, filters=32, input_channels=16)
    x = identity_block(x, filters=32, input_channels=16)
    
    # Stage 3
    print("x before stage 3",x.shape)
    x = MaxPooling2D((2, 2), strides=(2, 2))(x)
    print("x after stage 3",x.shape)
    
    # Stage 4
    x = convolutional_block(x, filters=64, input_channels=32)
    x = identity_block(x, filters=64, input_channels=32)
    
    # Stage 5
    print("x before stage 5",x.shape)
    x = MaxPooling2D((2, 2), strides=(2, 2))(x)
    print("x after stage 3",x.shape)
    
    # Stage 6
    x = convolutional_block(x, filters=128, input_channels=64)
    x = identity_block(x, filters=128, input_channels=64)
    
    # Stage 7
    (_,x_h,x_w,x_c)=x.shape
    x = GlobalMaxPooling2D()(x)
 
    # Stage 8
    _,input_dim = x.shape
    print("input dim=",input_dim)
    scale = math.sqrt(1 /128)
    x = Dense(output_classes, activation='linear',kernel_initializer=RandomUniform(minval=-scale, maxval=scale, seed=None), use_bias=False)(x)
    
    # Stage 9
    print(x.shape)
    #x = x[:,0]*scale_alpha
    #x = x[:,1]*scale_alpha
    
    # Output layer
    x = Activation('softmax')(x)
    
    # Create model
    model = Model(inputs, x)
 
    if optimizer == "SGD":
        sgd = SGD(lr=lr, momentum=momentum)
        model.compile(optimizer=sgd, loss=sparse_categorical_crossentropy,
            metrics=['accuracy'])
    else:
        model.compile(optimizer, loss=sparse_categorical_crossentropy,
                metrics=['accuracy'])
    return model
 
 
def make_train_dataloader(X, y, batch_size, shuffle):
    # ... your code here ...
    data = ImageDataGenerator()
    dataloader = data.flow(X, y, batch_size, shuffle=shuffle)
    return dataloader
 
 
def make_test_dataloader(X, batch_size, shuffle):
    # ... your code here ...
    data1 = ImageDataGenerator()
    dataloader = data1.flow(X, None, batch_size, shuffle=shuffle)
    return dataloader
 
 
def train_one_epoch(model, dataloader, steps):
    # ... your code here ...
    model.fit_generator(dataloader, epochs=1,
            steps_per_epoch=steps)
    return model
 
 
def predict(model, dataloader, steps):
    # ... your code here ...
    probablities = model.predict_generator(dataloader,
            steps=steps)
    predictions = []
 
    for i in range(probablities.shape[0]):
        max_probablity = -1
        best_class = -1
        for j in range(probablities.shape[1]):
            if probablities[i][j] > max_probablity:
                max_probablity = probablities[i][j]
                best_class = j
        predictions.append(best_class)
    predictions = np.array(predictions)
    return predictions
 
# --
# CLI
 
#def parse_args():
#    parser = argparse.ArgumentParser()
#    parser.add_argument('--cuda', action="store_true")
#    parser.add_argument('--num-epochs', type=int, default=5)
#    parser.add_argument('--lr', type=float, default=0.1)
#    parser.add_argument('--momentum', type=float, default=0.9)
#    parser.add_argument('--batch-size', type=int, default=128)
#    return parser.parse_args()
 
if __name__ == '__main__':
    
    #args = parse_args()
   
    # --
    # IO
   
    # X_train: tensor of shape (number of train observations, number of image channels, image height, image width)
    # X_test:  tensor of shape (number of train observations, number of image channels, image height, image width)
    # y_train: vector of [0, 1] class labels for each train image
    # y_test:  vector of [0, 1] class labels for each test image (don't look at these to make predictions!)
   
    X_train = np.load('data/cifar2/X_train.npy')
    X_test  = np.load('data/cifar2/X_test.npy')
    y_train = np.load('data/cifar2/y_train.npy')
    y_test  = np.load('data/cifar2/y_test.npy')
    X_train = np.transpose(X_train, (0, 2, 3, 1))
    X_test = np.transpose(X_test, (0, 2, 3, 1))
 
    # --
    # Define model
   
    model = make_model(
        input_channels=3,
        output_classes=2,
        scale_alpha=0.125,
        optimizer="SGD",
        lr=0.006,
        momentum=0.9,
    )
    
    # --
    # Train
        
    t = time()
    for epoch in range(5):
       
        # Train
        model = train_one_epoch(
            model=model,
            dataloader=make_train_dataloader(X_train, y_train, batch_size=128, shuffle=True),
            steps=ceil(X_train.shape[0] / 128)
        )
       
        # Evaluate
        preds = predict(
            model=model,
            dataloader=make_test_dataloader(X_test, batch_size=128, shuffle=False),
            steps=ceil(X_test.shape[0] / 128)
        )
       
        assert isinstance(preds, np.ndarray)
        assert preds.shape[0] == X_test.shape[0]
       
        test_acc = (preds == y_test.squeeze()).mean()
       
        print(json.dumps({
            "epoch"    : int(epoch),
            "test_acc" : test_acc,
            "time"     : time() - t
        }))
        sys.stdout.flush()
       
    elapsed = time() - t
    print('elapsed', elapsed, file=sys.stderr)
   
    # --
    # Save results
   
    os.makedirs('results', exist_ok=True)
   
    np.savetxt('results/preds', preds, fmt='%d')
    open('results/elapsed', 'w').write(str(elapsed))

(?, 32, 32, 16)
(?, 32, 32, 32)
(?, 32, 32, 32)
x before stage 3 (?, 32, 32, 32)
x after stage 3 (?, 16, 16, 32)
(?, 16, 16, 64)
(?, 16, 16, 64)
x before stage 5 (?, 16, 16, 64)
x after stage 3 (?, 8, 8, 64)
(?, 8, 8, 128)
(?, 8, 8, 128)
input dim= 128
(?, 2)
Epoch 1/1
{"epoch": 0, "test_acc": 0.912, "time": 16.279667377471924}
Epoch 1/1
{"epoch": 1, "test_acc": 0.7595, "time": 18.713507890701294}
Epoch 1/1
{"epoch": 2, "test_acc": 0.905, "time": 21.091830730438232}
Epoch 1/1
{"epoch": 3, "test_acc": 0.927, "time": 23.486514806747437}
Epoch 1/1
{"epoch": 4, "test_acc": 0.96, "time": 25.874765157699585}


elapsed 25.877500772476196


In [81]:

"""
    convnet/validate.py
"""


ACC_THRESHOLD = 0.95

if __name__ == '__main__':
    y_test = np.load('data/cifar2/y_test.npy')
    y_pred = np.array([int(xx) for xx in open('results/preds').read().splitlines()])
    
    test_acc = (y_test == y_pred).mean()
    
    # --
    # Log
    
    print(json.dumps({
        "test_acc" : float(test_acc),
        "status"   : "PASS" if test_acc >= ACC_THRESHOLD else "FAIL",
    }))
    
    does_pass = "PASS" if test_acc >= ACC_THRESHOLD else "FAIL"
    open('results/.pass', 'w').write(str(does_pass))

{"test_acc": 0.96, "status": "PASS"}
