In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.metrics import SparseCategoricalAccuracy,Mean
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input

from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import BinaryAccuracy

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import pickle
import random
import numpy as np 


Constants

In [None]:
BS = 32
EPOCHS = 5
L_R = 1e-4

STEP_PER_EPOCH = 2792 //BS #---- len(dataset_train)//BS
STEP_PER_VAL_EPOCH = 1300 //BS #---len(dataset_test)//BS


dict_architecture = {}

Loadin & Preprocessing Dataset with ImageDataGenerator

In [None]:

datatest = ImageDataGenerator(
    preprocessing_function = preprocess_input
)
datagen = ImageDataGenerator (
    preprocessing_function = preprocess_input,
    rotation_range=20,
    zoom_range=0.15,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.15,
    horizontal_flip=True,
    fill_mode="nearest"
)

X_train = datagen.flow_from_directory(
    "dataset_2/train_set",
    target_size=(224,224),
    color_mode='grayscale',
    batch_size=BS,
    class_mode="sparse",
    shuffle=True,
    seed=213
)

X_test = datatest.flow_from_directory(
    "dataset_2/test_set",
    target_size=(224,224),
    color_mode='grayscale',
    batch_size=BS,
    class_mode="sparse",
    shuffle=True,
    seed=213,
)

#
    # i=0
    # for X,y in X_train:
    #     for image in X:
    #         plt.figure()
    #         plt.imshow(image)
    #         print("label : ",y[i])
    #         i+=1
    #         # print("Max : ",image.max())
    #         # print("Min : ",image.min())
    #         # print("Ecart Type : ",image.std())
    #         # print("Moyenne : ",image.mean())
    #         if i==5:
    #             break
    #     break

    # i=0
    # print("----------------- TEST --------------------")
    # for X,y in X_test:
    #     for image in X:
    #         plt.figure()
    #         plt.imshow(image)
    #         print("label : ",y[i])
    #         i+=1
    #         # print("Max : ",image.max())
    #         # print("Min : ",image.min())
    #         # print("Ecart Type : ",image.std())
    #         # print("Moyenne : ",image.mean())
    #         if i==10:
    #             break
    #     break

Loading & Preprocessing Dataset

In [None]:
# X_Temp = open('X_Data','rb')
# X_Data = pickle.load(X_Temp)

# Y_Temp = open('Y_Data','rb')
# Y_Data = pickle.load(Y_Temp)

# X_Data = np.array(X_Data)
# Y_Data = np.array(Y_Data)

# print(X_Data.shape)
# print(Y_Data.shape)


# X_train,X_test,y_train,y_test = train_test_split(X_Data,Y_Data,test_size=0.2,random_state=42,shuffle=True)

# X_train = X_train.astype('float32')
# X_test = X_test.astype('float32')

# def augment(image):
#   image = tf.image.random_flip_left_right(image)
#   return image

# for i in range(len(X_train)):
#   X_train[i] = augment(X_train[i])


# for i in range(5):
#   plt.figure()
#   print(y_train[i])
#   plt.imshow(X_train[i])
#   plt.show()


# # ---------------------------------------- Normalisation MinMax
# scaler = MinMaxScaler()
# X_train = scaler.fit_transform(X_train.reshape(X_train.shape[0],-1))
# X_test = scaler.transform(X_test.reshape(X_test.shape[0],-1))
# pickle.dump(scaler, open('scaler.pkl', 'wb'))
# #---------------------------------------- Normalisation StandarScaler
# # scaler = StandardScaler()
# # X_train = scaler.fit_transform(X_train.reshape(X_train.shape[0],-1))
# # X_test = scaler.transform(X_test.reshape(X_test.shape[0],-1))
# # pickle.dump(scaler, open('scaler.pkl', 'wb'))


# X_train = X_train.reshape(-1,224,224,1) 
# X_test = X_test.reshape(-1,224,224,1) 

# print(X_train.max())
# print(X_test.max())
# print(X_test.mean())
# print(X_test.std())


Creating Model 

In [None]:
#------------------ Bloc TikaiNet -----------------------

class TikaiNetBlock(layers.Layer):
    
    def __init__(self,filters,strides,number,id_block,**kwargs):
        super(TikaiNetBlock,self).__init__()
        self.number = number
        self.id = id_block
        self.dw = layers.DepthwiseConv2D(kernel_size=3,strides=strides,padding="same",name=f"dw_id{id_block}")
        self.bn = layers.BatchNormalization(name=f"bn_id{id_block}")
        self.bn_2 = layers.BatchNormalization(name=f"bn_2_id{id_block}")
        self.relu = layers.ReLU(name=f"relu_id{id_block}")
        self.relu_2 = layers.ReLU(name=f"relu_2_id{id_block}")
        self.conv_2 = layers.Conv2D(filters=filters,kernel_size=1,strides=1,name=f"conv_2_id{id_block}",padding="same")
        self.sc_2 = layers.SeparableConv2D(filters=filters,kernel_size=1,strides=1,name=f"sc_2_id{id_block}",padding="same")
        self.drop = layers.Dropout(0.5)

    def call(self,X):
        
        X = self.dw(X)
        X = self.relu(X)
        X = self.bn(X)

        if self.number%2==0:
            dict_architecture[self.id] = self.number
            X = self.conv_2(X)
        else:
            dict_architecture[self.id] = self.number
            X = self.sc_2(X)


        X = self.relu_2(X)
        X = self.bn_2(X)
        return X
    

#------------------ Model TikaiNet -----------------------

class TikaiNetV1(tf.keras.Model):
    def __init__(self):
        super(TikaiNetV1,self).__init__()
        self.dw_s1_64  = TikaiNetBlock(64,1,2,id_block=1)
        self.dw_s2_128 = TikaiNetBlock(128,2,2,id_block=2)
        self.dw_s1_128 = TikaiNetBlock(128,1,2,id_block=3)
        self.dw_s2_256 = TikaiNetBlock(256,2,2,id_block=4)
        self.dw_s1_256 = TikaiNetBlock(256,1,2,id_block=5)
        self.dw_s2_512 = TikaiNetBlock(512,2,2,id_block=6)

        self.dw_s1_512_1 = TikaiNetBlock(512,1,2,id_block=7)
        self.dw_s1_512_2 = TikaiNetBlock(512,1,2,id_block=8)
        self.dw_s1_512_3 = TikaiNetBlock(512,1,2,id_block=9)
        self.dw_s1_512_4 = TikaiNetBlock(512,1,2,id_block=10)
        self.dw_s1_512_5 = TikaiNetBlock(512,1,2,id_block=11)

        self.dw_s2_1024 = TikaiNetBlock(1024,2,2,id_block=12)
        self.dw_s1_1024 = TikaiNetBlock(1024,1,2,id_block=13)

       
                
    def call(self,X):
        X =  self.dw_s1_64(X)
        
        X =  self.dw_s2_128(X)
        X =  self.dw_s1_128(X)
        X =  self.dw_s2_256(X)
        X =  self.dw_s1_256(X)
        X =  self.dw_s2_512(X)
        
        X = self.dw_s1_512_1(X)
        X = self.dw_s1_512_2(X)  
        X = self.dw_s1_512_3(X)
        X = self.dw_s1_512_4(X)
        X = self.dw_s1_512_5(X)

        X =  self.dw_s2_1024(X)
        X =  self.dw_s1_1024(X)

        return X

Compiling & Training & Saving

In [None]:
# Tikai = TikaiNetV1()
# Tikai.build((None,112,112,32))

model = Sequential()

model.add(layers.Conv2D(32,3,2,padding="same",input_shape=(224,224,1),name="first_conv"))
#model.add(layers.SeparableConv2D(32,3,2,padding="same",input_shape=(224,224,1),name="first_Sconv"))
model.add(layers.ReLU(name="first_relu"))
model.add(layers.BatchNormalization(name="first_bn"))


#model.add(Tikai)
#model.add(TikaiNetBlock(64,2,1,id_block=0))
model.add(TikaiNetBlock(128,2,1,id_block=2))
model.add(TikaiNetBlock(256,2,1,id_block=4))
model.add(TikaiNetBlock(512,2,1,id_block=6))
model.add(TikaiNetBlock(1024,2,1,id_block=12))

#model.add(layers.Dropout(0.5))

model.add(TikaiNetBlock(64,1,2,id_block=1))
model.add(TikaiNetBlock(128,1,2,id_block=3))
model.add(TikaiNetBlock(256,1,2,id_block=5))

for i in range(5):
    model.add(TikaiNetBlock(512,1,2,id_block=(7+i)))

model.add(TikaiNetBlock(1024,1,2,id_block=13))


model.add(layers.AveragePooling2D(pool_size=7,strides=1,name="avg_pooling"))
model.add(layers.Flatten(name="flatten"))

model.add(layers.Dense(128))
model.add(layers.ReLU())
#model.add(layers.Dropout(0.5))
model.add(layers.Dense(2,name="last_dense"))
model.add(layers.Softmax())
#model.add(layers.Dense(1,activation="sigmoid",name="last_dense"))

model.summary()

with open('model_summary.txt', 'w') as f:
    model.summary(print_fn=lambda x: f.write(x + '\n'))

Train Step Mode Eager (Slower-Easier)

In [None]:
# model.compile(optimizer= Adam(learning_rate=L_R),
#               loss = BinaryCrossentropy(),
#               metrics=['accuracy'])

# history = model.fit (
#     X_train, y_train, batch_size=BS,
#     validation_data=(X_test, y_test),
#     epochs = EPOCHS,
# )

# model.save("TikaiNetV1",save_format="tf")


# print(history.history.keys())

# val_loss_curve =  history.history["val_loss"]
# val_acc_curve = history.history["val_accuracy"]
# acc_curve = history.history["accuracy"]
# loss_curve = history.history["loss"]

Train Step Mode Graph (Faster-Harder)

In [None]:
loss_of = SparseCategoricalCrossentropy()
#loss_of = BinaryCrossentropy()
optimizer = Adam(learning_rate=L_R, decay=L_R / EPOCHS)
#optimizer = SGD()

train_loss = Mean(name="train_loss")
train_acc = SparseCategoricalAccuracy(name="train_acc")
#train_acc = BinaryAccuracy(name="train_acc")

val_loss = Mean(name="valid_loss")
val_acc = SparseCategoricalAccuracy(name="valid_acc")
#val_acc = BinaryAccuracy(name="valid_acc")

model.compile(optimizer = optimizer,loss = loss_of,metrics=[SparseCategoricalAccuracy()])


loss_curve = []
acc_curve = []
val_loss_curve = []
val_acc_curve = []

@tf.function
def train_step(X,y):
    with tf.GradientTape() as tape:
        prediction = model(X,training=True)
        loss = loss_of(y,prediction)

    gradients = tape.gradient(loss,model.trainable_weights)
    optimizer.apply_gradients(zip(gradients,model.trainable_weights))
    train_loss.update_state(loss)
    train_acc.update_state(y,prediction)

@tf.function
def validation_step(X,y):
    prediction = model(X,training=False)
    loss = loss_of(y,prediction)
    val_loss.update_state(loss)
    val_acc.update_state(y,prediction)



for epoch in range(EPOCHS):
    nb_batchs = 0
    nb_batchs_val = 0
    # Training set
    for X, y in X_train:
        nb_batchs+=1
        train_step(X, y)
        print(f'Epoch: {epoch+1}/{EPOCHS} Batch: {nb_batchs}/{STEP_PER_EPOCH} loss: {train_loss.result()}, accuracy: {train_acc.result()}')
        if nb_batchs == STEP_PER_EPOCH:
            break

    # Validation set
    for X, y in X_test:
        nb_batchs_val+=1
        validation_step(X, y)
        if nb_batchs_val == STEP_PER_VAL_EPOCH:
            break
       
    print(f'Epoch: {epoch+1}/{EPOCHS} val_Loss: {val_loss.result()}, val_acc: {val_acc.result()}')
    
    val_acc_curve.append(val_acc.result())
    val_loss_curve.append(val_loss.result())
    loss_curve.append(train_loss.result())
    acc_curve.append(train_acc.result())

    if epoch == (EPOCHS-1):
        with open("model_params_result.txt","w") as fichier:
            fichier.write(f"Batch Size : {BS} \n --- \n ")
            fichier.write(f"Epochs : {EPOCHS} \n --- \n ")
            fichier.write(f"Leraning_Rate : {L_R} \n --- \n ")
            fichier.write(f"Step per epochs : {STEP_PER_EPOCH} \n --- \n ")
            fichier.write(f"Step per epochs val : {STEP_PER_VAL_EPOCH} \n --- \n ")
            fichier.write(f"train_acc : {train_acc.result()} \n --- \n ")
            fichier.write(f"val_acc : {val_acc.result()} \n --- \n ")
            fichier.write(f"train_loss : {train_loss.result()} \n --- \n ")
            fichier.write(f"val_loss : {train_acc.result()} \n --- \n ")
            fichier.write(f"Optimizer : Adam \n --- \n ")
            fichier.write(f"Metrics/Loss : SparseCategoricalCrossEntropy/Accuracy \n --- \n ")
            fichier.write(f"Architecture : {dict_architecture} \n --- \n ")
            fichier.write(f"Taille dict : {len(dict_architecture)} \n")

    val_loss.reset_states()
    val_acc.reset_states()
    train_acc.reset_states()
    train_loss.reset_states()



        
model.save("TikaiNetV1/")

Analyze Result

In [None]:
plt.figure()

plt.subplot(2,1,1)
plt.plot(val_acc_curve,c="red",label="Val_Acc")
plt.plot(acc_curve,c="blue",label="Accuracy")
plt.legend()

plt.subplot(2,1,2)
plt.plot(val_loss_curve,c="red",label="Val_Loss")
plt.plot(loss_curve,c="blue",label="Loss")
plt.legend()

plt.savefig("plot.png")
plt.show()

In [None]:
TikaiNet = tf.keras.models.load_model('TikaiNetV1')

i=0
for X,y in X_test:
    for image in X:
        i+=1 
        plt.figure()
        plt.imshow(image)
        image = image.reshape(1,224,224,1)
        A = TikaiNet.predict(image)
        print(A)
        if i == 10:
            break
    break