In [None]:
import numpy as np
import glob
import re
import shutil
import random
import itertools
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

In [57]:
!unzip "/content/drive/MyDrive/FewShot/final_weights_5.zip" -d "/content/weights/"

Archive:  /content/drive/MyDrive/FewShot/final_weights_5.zip
replace /content/weights/best_one.data-00000-of-00001? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
  inflating: /content/weights/best_one.data-00000-of-00001  
  inflating: /content/weights/checkpoint  
  inflating: /content/weights/best_one.index  


In [None]:
#@title loading the model (or directly load the architecture)
class SQD(tf.keras.layers.Layer):
    def __init__(self):
        super(SQD, self).__init__()
    def call(self, inputs):
        x, y = inputs
        diff = tf.subtract(x, y)
        return tf.square(diff)

def wave_downsample():
    input = tf.keras.Input(batch_size = batch_size,shape = (1024,19))
    output = tf.keras.layers.LSTM(10,return_sequences = True)(input)
    output = tf.keras.layers.Dense(2)(output)
    output = tf.keras.layers.Flatten()(output)
    output = tf.keras.layers.Activation('relu')(output)
    output = tf.keras.layers.Dropout(0.3)(output)
    output = tf.keras.layers.Dense(20)(output)
    output = tf.keras.layers.Dropout(0.1)(output)
    model = tf.keras.Model(inputs = [input],outputs = [output])
    return model
     
def get_model():
    wave1 = tf.keras.Input(batch_size = batch_size,shape = (1024,19),name = 'wave1')
    wave2 = tf.keras.Input(batch_size = batch_size,shape = (1024,19),name = 'wave2')
    state = tf.keras.Input(batch_size = batch_size,shape = (2,),name = 'state')
    
    output1 = tf.keras.layers.BatchNormalization()(wave1)
    output2 = tf.keras.layers.BatchNormalization()(wave2)

    down_sampler = wave_downsample()
    output1 = down_sampler(output1)
    output2 = down_sampler(output2)

    output1 = tf.keras.layers.Activation('tanh')(output1)
    output1 = tf.keras.layers.Dropout(0.3)(output1)

    output2 = tf.keras.layers.Activation('tanh')(output2)
    output2 = tf.keras.layers.Dropout(0.3)(output2)

    output = SQD()([output1,output2])
    output = tf.keras.layers.concatenate([output,state])
    output = tf.keras.layers.Dense(10)(output)
    output = tf.keras.layers.Dropout(0.2)(output)
    output = tf.keras.layers.Dense(1,activation = 'sigmoid',name = 'difference' )(output)
    
    model = tf.keras.Model(inputs = [wave1,wave2,state],outputs = [output],name = 'FewShot')
    return model

model.compile(optimizer = tf.keras.optimizers.Adam(),
              loss = 'binary_crossentropy',
              metrics = ['acc'])

model = get_model()

In [95]:
model.load_weights("/content/weights/best_one")

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7f1382e26950>

Cleaning data for testing

In [None]:
!unzip "/content/FINAL_DATASET.zip"

In [None]:
files = glob.glob("/content/Dataset/*/*")
files

In [9]:
def clean(path):
    df = pd.read_csv(path)
    
    for column in df.columns:
        if df[column].dtype == 'object':
            print("Sample : ",path," feature : ",column," is uncleaned")
            df[column] = pd.to_numeric(df[column], errors='coerce')
            df[column] = df[column].fillna(method='ffill')
            df[column] = df[column].fillna(method='bfill')
    df = df.iloc[:1024,:]

    while df.isnull().sum().values.sum() != 0:
        print("Sample : ",path,"getting cleaned")
        df = df.fillna(method = 'ffill')
        df = df.fillna(method = 'bfill')
    df.to_csv(path, index=False)
     
for i in files:
    clean(i)

Sample :  /content/Dataset/SETA/healthy_open2.csv  feature :  16  is uncleaned
Sample :  /content/Dataset/SETA/healthy_open11.csv  feature :  14  is uncleaned
Sample :  /content/Dataset/SETC/alzeimer_open5.csv  feature :  0  is uncleaned
Sample :  /content/Dataset/SETC/alzeimer_open5.csv getting cleaned
Sample :  /content/Dataset/SETC/alzeimer_open8.csv  feature :  0  is uncleaned
Sample :  /content/Dataset/SETC/alzeimer_open8.csv getting cleaned
Sample :  /content/Dataset/SETC/alzeimer_open6.csv  feature :  0  is uncleaned
Sample :  /content/Dataset/SETC/alzeimer_open6.csv getting cleaned
Sample :  /content/Dataset/SETC/alzeimer_open10.csv  feature :  0  is uncleaned
Sample :  /content/Dataset/SETC/alzeimer_open10.csv getting cleaned
Sample :  /content/Dataset/SETD/alzeimer_closed12.csv  feature :  18  is uncleaned
Sample :  /content/Dataset/SETB/healthy_closed2.csv  feature :  16  is uncleaned
Sample :  /content/Dataset/SETB/healthy_closed6.csv  feature :  14  is uncleaned


***TESTING TECHNIQUE*** : We will have a query sample of UNKNOWN CLASS which we need to predict as Healthy sample or Alzheimer sample. The query sample will be compared against stored reference samples of healthy and Alzheimer samples with the Similarity Few Shot NN trained. The intended results are: 


*   Query Sample belongs to 'Alzheimer' class if Similarity score between query sample and Alzheimer reference samples are closer to zero.
*   Query Sample belongs to 'Healthy' class if Similarity score between query sample and healthy reference samples are closer to zero.



In [15]:
reference = [] # For Comparing Against
query = [] # Query Samples for Detection
for i in range(0,48,12):
    reference += files[i:i+11]
    query += files[i+11:i+12]

In [20]:
healthy = []
alz = []
for afile in reference:
    if "healthy" in afile:
        healthy.append(afile)
    elif "alzeimer" in afile:
        alz.append(afile)    

Query Samples We will be testing are :

In [17]:
query

['/content/Dataset/SETA/healthy_open11.csv',
 '/content/Dataset/SETC/alzeimer_open7.csv',
 '/content/Dataset/SETD/alzeimer_closed8.csv',
 '/content/Dataset/SETB/healthy_closed1.csv']

Reading a sample

In [86]:
def to_numeric(csv):
    data = pd.read_csv(csv).values
    return data

def pair_up(x1,x2):
        sample = [x1,x2]
        if "closed" in x1:
            sample.append(0)
        elif "open" in x1:
            sample.append(1) 
        if "closed" in x2:
            sample.append(0)
        elif "open" in x2:
            sample.append(1)     

        sample[0] = to_numeric(x1)
        sample[1] = to_numeric(x2)

        return sample 

Getting Sample ready for Model Input

In [87]:
def as_input(sample):
    input = dict()
    input['wave1'] = tf.cast(tf.expand_dims(sample[0],0),dtype = tf.float32)
    input['wave2'] = tf.cast(tf.expand_dims(sample[1],0),dtype = tf.float32)
    input['state'] = tf.cast(tf.expand_dims([sample[2],sample[3]],0),dtype = tf.float32)
    return input

# TESTING 

In [92]:
def get_scores(afile):
    print("Query Sample :",afile)

    healthy_scores = []
    alz_scores = []

    reference_tests = 3 #No of times to compare 

    for ref in alz[:reference_tests]:
        pair = pair_up(afile,ref)
        pair = as_input(pair)
        output = np.squeeze(model(pair))
        alz_scores.append(output)
        
    for ref in healthy[:reference_tests]:
        pair = pair_up(afile,ref)
        pair = as_input(pair)
        output = np.squeeze(model(pair))
        healthy_scores.append(output)
    a_avg,h_avg = sum(alz_scores)/3,sum(healthy_scores)/3
    if a_avg > h_avg:
        print("Query Sample Class Predicted : Healthy Sample")
    else:
        print("Query Sample Class Predicted : Alzheimer Sample")    

In [93]:
for que in query:
    print("-----------------------------------------------------------------")
    get_scores(que)

-----------------------------------------------------------------
Query Sample : /content/Dataset/SETA/healthy_open11.csv
Query Sample Class Predicted : Healthy Sample
-----------------------------------------------------------------
Query Sample : /content/Dataset/SETC/alzeimer_open7.csv
Query Sample Class Predicted : Alzheimer Sample
-----------------------------------------------------------------
Query Sample : /content/Dataset/SETD/alzeimer_closed8.csv
Query Sample Class Predicted : Alzheimer Sample
-----------------------------------------------------------------
Query Sample : /content/Dataset/SETB/healthy_closed1.csv
Query Sample Class Predicted : Healthy Sample
