In [131]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras import layers
from keras.utils import to_categorical

from matplotlib.image import imread
from sklearn.model_selection import train_test_split
from sklearn.cluster import MiniBatchKMeans
from sklearn.utils import shuffle
import numpy as np
import pandas as pd

from __future__ import print_function
from ladder_net import get_ladder_network_fc

# each image dimension is (128, 384). After segmentation the following will be the dimensions of each character.
charHeight = 128
charWidth = 128
numImages = 50000

vectorLength = 16384
imgPath = "SoML-50/SoML-50/data/"
csvPath = "SoML-50/SoML-50/annotations.csv"
manual_path = "manual.csv"


In [68]:
class Manual:
    def __init__(self, path):
        self.df = pd.read_csv (path)
        self.df.set_index ('Image', inplace =True)
  
    def getOperator (self,index):
        index = int (index) 
        if (1 <= index <= 1000):
            char = str(self.df['operator'][index])
            if (char == '+'):
                return 0
            elif (char == '-'):
                return 1 
            elif (char == '*'):
                return 2
            elif (char == '/'):
                return 3
            else: 
                raise Exception ("Invalid operator at index : " + str(index))
        raise Exception("Index is limited to [1,1000] only. Given: " + str(index))
    
    def getOp1 (self,index):
        index = int (index) 
        if (1 <= index <= 1000):
            return str(self.df['op1'][index])
        raise Exception("Index is limited to [1,1000] only. Given: " + str(index))
    
    def getOp2 (self,index):
        index = int (index) 
        if (1 <= index <= 1000):
            return str(self.df['op2'][index])
        raise Exception("Index is limited to [1,1000] only. Given: " + str(index))

manual = Manual (manual_path)
print (manual.getOperator (100))
    

1


In [69]:
class Annotations:
    def __init__ (self, path):                                                           
        self.df = pd.read_csv (path)
        self.df.set_index ('Image', inplace =True)
        
    def normalizeImg (self,npArr):
        return npArr / 255.0
    
    def getPathOfImg (self,index):
        return (imgPath + str (index) + ".jpg")
    
    def getLabelOfImg (self,index):
        return (self.df.loc[(str(index) + '.jpg')]['Label'])

    def getValueOfImg (self, index):
        return (int(self.df.loc[(str(index) + '.jpg')]['Value']))
    
    def getSegmentedVectors (self,index):
        """ This function returns a numpy array of the three character images of shape (128,128) present in index.jpg. 
            Also, the operator is always present at [0] and other two operands at [1] and [2] in the order in which the operator has to be applied."""

        image = imread (self.getPathOfImg(index))
        label = self.getLabelOfImg (index)
        if (label == 'prefix'):
            charArray = np.array ([image[:, 0:charWidth],image[:, charWidth:(2*charWidth)],image[:, (2*charWidth):]])
        elif (label == 'postfix'):
            charArray = np.array ([image[:, (2*charWidth):],image[:, 0:charWidth],image[:, charWidth:(2*charWidth)]])
        else:
            charArray = np.array ([image[:, charWidth:(2*charWidth)],image[:, 0:charWidth],image[:, (2*charWidth):]])
        charArray = np.array ([self.normalizeImg(i) for i in charArray])
        return charArray


annotations = Annotations (csvPath)

print (annotations.getSegmentedVectors (5))        

[[[1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  ...
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]]

 [[1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  ...
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]]

 [[1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  ...
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]]]


In [None]:
class ANN: 
    def __init__ (self,manualPath, csvPath):
        self.manual = Manual (manualPath)
        self.annotations = Annotations (csvPath)
        
        self.inputSize = (128*128)
        self.numOperators = 4
        self.numOperands = 10
        self.numBinaryClasses = 2
        self.epochs = 10
        
        # divide dataset into 10 sets. labeled dataset into 10 of sizes 100 each. and unlabeled into 10 of 4000 size each. rest use for testing (9000)
        self.numMiniBatches = 10
        
        #initialze all three models
        operatorModel = get_ladder_network_fc (layer_sizes=[self.inputSize, 1000, 500, 250, 250, 250, self.numOperators])
        operandModel = get_ladder_network_fc (layer_sizes=[self.inputSize, 1000, 500, 250, 250, 250, self.numOperands])
        binaryModel = get_ladder_network_fc (layer_sizes=[self.inputSize, 1000, 500, 250, 250, 250, self.numBinaryClasses])
        
    def getLabeledMiniBatch (self,batchNum): 
        '''returns a tuple x_labeled and y_labeled'''
        if (batchNum < 0 or batchNum >= self.numMiniBatches):
            raise ValueError ("batchNum should be between -1 and" + str(self.numMiniBatches))
        
        numElements = 1000 // self.numMiniBatches
        
        x_operator_labeled = np.empty (shape = [numElements, 128,128])
        y_operator_labeled = np.empty (shape = [numElements,1])
        x_operand_labeled = np.empty (shape = [2*numElements,128,128])
        y_operand_labeled = np.empty (shape = [2*numElements,1])
        x_binary_labeled = np.empty (shape = [3 * numElements, 128,128])
        y_binary_labeled = np.empty (shape = [3 * numElements, 1])
        
        for i in range (batchNum * numElements + 1,(batchNum + 1) * numElements + 1):  
            segments = self.annotations.getSegmentedVectors (i)
            x_operator_labeled[(i-1)%numElements] = segments[0]
            y_operator_labeled[(i-1)%numElements] = self.manual.getOperator(i)
            
            x_operand_labeled[(i-1)%numElements] = segments[1]
            x_operand_labeled[((i-1)%numElements) + numElements] = segments[2]
            
            y_operand_labeled[(i-1)%numElements] = self.manual.getOp1 (i)            
            y_operand_labeled[((i-1)%numElements) + numElements] = self.manual.getOp2 (i)
            
            x_binary_labeled[(i-1)%numElements] = segments[0]            
            x_binary_labeled[((i-1)%numElements) + numElements] = segments[1]            
            x_binary_labeled[((i-1)%numElements) + (2*numElements)] = segments[2]
            
            y_binary_labeled[(i-1)%numElements] =  0           
            y_binary_labeled[((i-1)%numElements) + numElements] =  1          
            y_binary_labeled[((i-1)%numElements) + (2*numElements)] =  1 
                      
        y_operator_labeled = to_categorical (y_operator_labeled, num_classes = 4)
        y_operand_labeled = to_categorical (y_operand_labeled, num_classes = 10)
        y_binary_labeled = to_categorical (y_binary_labeled, num_classes = 2)
        
        x_binary_labeled , y_binary_labeled = shuffle (x_binary_labeled, y_binary_labeled) 
        
        return (x_operator_labeled, y_operator_labeled, x_operand_labeled, y_operand_labeled, x_binary_labeled, y_binary_labeled)
        
    
    def getUnlabeledMiniBatch (self, batchNum):
        '''returns a tuple x_unlabeled and y_unlabeled'''
        if (batchNum < 0 or batchNum >= self.numMiniBatches):
            raise ValueError ("batchNum should be between -1 and" + str(self.numMiniBatches))
        
        numElements = 40000 // self.numMiniBatches
        
        x_operator_unlabeled = np.empty (shape = [numElements, 128,128])
        x_operand_unlabeled = np.empty (shape = [2*numElements,128,128])
        x_binary_unlabeled = np.empty (shape = [3 * numElements, 128,128])
        
        for i in range (batchNum * numElements + 1001,(batchNum + 1) * numElements + 1001):  
            segments = self.annotations.getSegmentedVectors (i)
            
            x_operator_unlabeled[(i-1001)%numElements] = segments[0]
            
            x_operand_unlabeled[(i-1001)%numElements] = segments[1]
            x_operand_unlabeled[((i-1001)%numElements) + numElements] = segments[2]
                   
            x_binary_unlabeled[(i-1001)%numElements] = segments[0]            
            x_binary_unlabeled[((i-1001)%numElements) + numElements] = segments[1]            
            x_binary_unlabeled[((i-1001)%numElements) + (2*numElements)] = segments[2]
        
        
        x_binary_unlabeled = shuffle(x_binary_unlabeled)
        
        return (x_operator_unlabeled, x_operand_unlabeled, x_binary_unlabeled)
        
    
    def trainOneEpoche (self, epochNum):
        for batchNum in range (self.numMiniBatches):
            print ("Epoche = " + str(epochNum), "batchNum = " + str(batchNum))
            
             
            
        pass
    

        

In [110]:
operatorX = np.empty (shape = [1000, 128, 128])
operandX = np.empty (shape = [2000, 128, 128])


tempOperatorY = [manual.getOperator(i) for i in range (1, 1001)]
tempOperand1Y = [manual.getOp1(i) for i in range (1, 1001)]
tempOperand2Y = [manual.getOp2(i) for i in range (1, 1001)]
tempOperand1Y.extend (tempOperand2Y)

operatorY = np.array (to_categorical(tempOperatorY))
operandY = np.array (to_categorical(tempOperand1Y))

print (operandY[1002])

for img in range (1, 1001):
    segments = annotations.getSegmentedVectors (img)
    operatorX[img-1] = segments[0]
    operandX[img-1] = segments[1]
    operandX[img+999] = segments[2]
     
print (operatorX.shape, operandX.shape)
operatorX = np.expand_dims (operatorX, -1)
operandX = np.expand_dims (operandX, -1)
print (operatorX.shape, operandX.shape)
# make sure images have shape (128,128,1)


[0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
(1000, 128, 128) (2000, 128, 128)
(1000, 128, 128, 1) (2000, 128, 128, 1)


In [116]:
operator_clf = keras.Sequential (
    [
        keras.Input (shape = (128,128,1)),
        layers.Conv2D(32, kernel_size = (3,3), activation = "relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(4, activation="softmax"),
    ]
)

# operator_clf.summary()

In [117]:
batch_size = 50
epochs = 10

operator_clf.compile (loss = "categorical_crossentropy",optimizer="adam", metrics=["accuracy"])
operator_clf.fit(operatorX,operatorY, batch_size=batch_size, epochs=epochs, validation_split=0.1)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f4be936f370>

In [118]:
score = operator_clf.evaluate (operatorX[0:1000], operatorY[0:1000], verbose = 0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

Test loss: 0.07282477617263794
Test accuracy: 0.9829999804496765


In [125]:
operand_clf = keras.Sequential (
    [
        keras.Input (shape = (128,128,1)),
        layers.Conv2D(64, kernel_size = (3,3), activation = "relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(10, activation="softmax"),
    ]
)

# operand_clf.summary()

In [126]:
operand_clf.compile (loss = "categorical_crossentropy",optimizer="adam", metrics=["accuracy"])
operand_clf.fit(operandX,operandY, batch_size=batch_size, epochs=epochs, validation_split=0.1)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f4b942be310>

In [142]:
OperandScore = operand_clf.evaluate (operandX[1000:2000], operandY[1000:2000], verbose = 0)
print("Test loss:", OperandScore[0])
print("Test accuracy:", OperandScore[1])

Test loss: 0.2557852864265442
Test accuracy: 0.9539999961853027


In [137]:
binaryX = np.concatenate ((operatorX, operandX))
tempBinary0Y = [0 for i in range (1000)]
tempBinary1Y = [1 for i in range (2000)]
tempBinary0Y.extend (tempBinary1Y)
binaryY = np.array (to_categorical (tempBinary0Y))

binaryX, binaryY = shuffle (binaryX, binaryY)
print (binaryY[0:5])

[[0. 1.]
 [0. 1.]
 [0. 1.]
 [1. 0.]
 [0. 1.]]


In [139]:
binary_clf = keras.Sequential (
    [
        keras.Input (shape = (128,128,1)),
        layers.Conv2D(64, kernel_size = (3,3), activation = "relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(2, activation="sigmoid"),
    ]
)

# binary_clf.summary()

In [143]:
binary_clf.compile (loss = "binary_crossentropy",optimizer="adam", metrics=["accuracy"])
binary_clf.fit(binaryX,binaryY, batch_size=batch_size, epochs=epochs, validation_split=0.1)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f4b582b7490>

In [144]:
binaryScore = binary_clf.evaluate (binaryX[0:3000], binaryY[0:3000], verbose = 0)
print("Test loss:", binaryScore[0])
print("Test accuracy:", binaryScore[1])

Test loss: 0.02167610079050064
Test accuracy: 0.9959999918937683


In [59]:
imgIndex = [ i for i in range (1001, numImages+1)]
train_set, test_set = train_test_split (imgIndex, test_size= 0.05, random_state= 1)


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!ls gdrive/MyDrive
!unzip gdrive/MyDrive/SoML-50.zip