In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import tensorflow.keras.backend as K
from tensorflow.data import Dataset
from tensorflow.keras.utils import Sequence
from tensorflow.keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from keras import layers
from sklearn.model_selection import StratifiedKFold
from keras.preprocessing.image import ImageDataGenerator

# Define Model

In [2]:
def get_complete_model(base_model, operation_shape=4):
    image_shape = (32, 32, 3)
    
    # Define network inputs
    input_image_A = layers.Input(shape=image_shape)
    input_image_B = layers.Input(shape=image_shape)
    input_operation = layers.Input(operation_shape)
    
    # Digit recognition blocks
    digit_A_prediction = base_model(input_image_A, training=False)
    digit_B_prediction = base_model(input_image_B, training=False)
    
    # Expand operation
    expanded_operation = layers.Dense(10)(input_operation)

    # Concatenate output
    cat_vector = layers.Concatenate(-1)([digit_A_prediction, digit_B_prediction, expanded_operation])
    
    # Regression
    regression1 = layers.Dense(12)(cat_vector)
    output = layers.Dense(1)(regression1)

    # Model creation
    model = Model(inputs=[input_image_A, input_image_B, input_operation], outputs=output)
    return model

In [3]:
base_model = keras.models.load_model("./model/conv/convnet.h5")

In [4]:
base_model.trainable = False

In [5]:
model = get_complete_model(base_model)
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 32, 32, 3)]  0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 32, 32, 3)]  0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            [(None, 4)]          0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 10)           551466      input_1[0][0]                    
                                                                 input_2[0][0]                

## Test Model With Random Weights

In [6]:
%%time

A = np.random.random_sample((1, 32,32,3))
B = np.random.random_sample((1, 32,32,3))
operation = np.array([[0, 0, 0, 1]])
model.predict([A, B, operation])

Wall time: 12.6 s


array([[-0.0760373]], dtype=float32)

# Import Data

In [7]:
left_numbers_path = './data/npy_files/left_numbers.npy'
right_numbers_path = './data/npy_files/right_numbers.npy'
operands_path = './data/npy_files/opperands.npy'
labels_path = './data/npy_files/labels.npy'

In [8]:
left_numbers = np.load(left_numbers_path)
right_numbers = np.load(right_numbers_path)
operands = np.load(operands_path)
labels = np.load(labels_path)

In [9]:
print(f'SHAPES:\n - left_numbers: {left_numbers.shape}\n - right_numbers: {right_numbers.shape}\n - opperands: {operands.shape}\n - labels: {labels.shape}')

SHAPES:
 - left_numbers: (23951, 32, 32, 3)
 - right_numbers: (23951, 32, 32, 3)
 - opperands: (23951, 4)
 - labels: (23951,)


# Create Image Augmentation Object

In [10]:
class JoinedGen(Sequence):
    def __init__(self, datagen, left_numbers, right_numbers, operands, labels, batch_size):
        iterator1 = datagen.flow(left_numbers, labels, batch_size=batch_size, shuffle=True)
        iterator2 = datagen.flow(right_numbers, operands, batch_size=batch_size, shuffle=True)
        self.gen1 = iterator1
        self.gen2 = iterator2

        assert len(iterator1) == len(iterator2)

    def __len__(self):
        return len(self.gen1)

    def __getitem__(self, i):
        x1, y = self.gen1[i]
        x2, op = self.gen2[i]
        
        return [x1, x2, op], y

    def on_epoch_end(self):
        self.gen1.on_epoch_end()
        self.gen2.on_epoch_end()
        self.gen2.index_array = self.gen1.index_array

In [11]:
datagen = ImageDataGenerator(rotation_range=8,
                             zoom_range=[0.95, 1.05],
                             height_shift_range=0.10,
                             shear_range=0.15)

In [12]:
joined_generator = JoinedGen(datagen, left_numbers, right_numbers, operands, labels, 30)

## Left Numbers Batch

In [13]:
joined_generator[0][0][0].shape

(30, 32, 32, 3)

## Right Numbers Batch

In [14]:
joined_generator[0][0][1].shape

(30, 32, 32, 3)

## Operators Batch

In [15]:
joined_generator[0][0][2].shape

(30, 4)

## Labels Batch

In [16]:
joined_generator[0][1].shape

(30,)

# Define Best Learning Rate

In [17]:
def train_val_split_indexes(data_len, split_size):
    val_idx = np.random.choice(data_len, int(split_size*data_len), replace=False)
    val_idx_set = set(val_idx.tolist())
    train_idx = []
    for i in range(data_len):
        if i not in val_idx_set:
            train_idx.append(i)
    return np.array(train_idx), val_idx

In [18]:
lr_schedule_model = get_complete_model(base_model)

In [19]:
early_stopping = keras.callbacks.EarlyStopping(patience=8)
optimizer = keras.optimizers.Adam(learning_rate=1e-3, amsgrad=True)
model_checkpoint = keras.callbacks.ModelCheckpoint(
                   './best_model_save/model.h5', 
                   save_best_only=True)
lr_schedule_model.compile(optimizer=optimizer,
                  loss='mean_squared_error',
                 metrics=['accuracy'])

In [20]:
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

In [21]:
d = np.zeros((left_numbers.shape[0], 1))
l = np.zeros((left_numbers.shape[0], 1))
for train_idx, test_idx in kfold.split(d, l):    
    train_idx, val_idx = train_val_split_indexes(train_idx.shape[0], 0.15)
    
    train_left_numbers = left_numbers[train_idx]
    train_right_numbers = right_numbers[train_idx]
    train_operands = operands[train_idx]
    train_labels = labels[train_idx]
    
    val_left_numbers = left_numbers[val_idx]
    val_right_numbers = right_numbers[val_idx]
    val_operands = operands[val_idx]
    val_labels = labels[val_idx]

    test_left_numbers = left_numbers[test_idx]
    test_right_numbers = right_numbers[test_idx]
    test_operands = operands[test_idx]
    test_labels = labels[test_idx]
    break

In [22]:
test_gen = JoinedGen(datagen, train_left_numbers, train_right_numbers, train_operands, train_labels, 30)

In [23]:
history = lr_schedule_model.fit(JoinedGen(datagen, train_left_numbers, train_right_numbers, train_operands, train_labels, 128),
                              epochs=30, validation_data=([val_left_numbers, val_right_numbers, val_operands], val_labels),
                              callbacks=[early_stopping, model_checkpoint])

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
