In [9]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # or any {'0', '1', '2'}   
import tensorflow as tf

import numpy as np

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.layers import Conv2D, GlobalAveragePooling2D, GlobalAveragePooling1D, DepthwiseConv2D, MaxPooling2D
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import LayerNormalization
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import MultiHeadAttention
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import ReLU
from tensorflow.keras.layers import Add
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard


IMAGE_SIZE = 96


def get_pcam_generators(base_dir, train_batch_size=32, val_batch_size=32):

     # dataset parameters
     train_path = os.path.join(base_dir,'train+val','train')
     valid_path = os.path.join(base_dir,'train+val','valid')


     RESCALING_FACTOR = 1./255

     # instantiate data generators
     datagen = ImageDataGenerator(rescale=RESCALING_FACTOR)

     train_gen = datagen.flow_from_directory(train_path,
                                             target_size=(IMAGE_SIZE, IMAGE_SIZE),
                                             batch_size=train_batch_size,
                                             class_mode='binary')

     val_gen = datagen.flow_from_directory(valid_path,
                                             target_size=(IMAGE_SIZE, IMAGE_SIZE),
                                             batch_size=val_batch_size,
                                             class_mode='binary')

     return train_gen, val_gen

In [None]:
def transformer_block(x, shape, channel, num_heads=8, dropout_rate=0.1): 

    ff_dim=4*channel

    # reduce dimensions by factor two and get right amount of channels
    x = Conv2D(filters=channel*shape[1], kernel_size=(3,3), strides=(2,2), padding='same')(x)
    # reshape into 1d image
    batch, height, width, channels = x.shape
    x = Reshape((height*width, channels))(x)

    # Pre-normalization
    x = LayerNormalization(epsilon=1e-6)(x)

    # Multi-head attention layer
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=channel)(x, x)
    attn_output = Dropout(dropout_rate)(attn_output) # dropout to prevent overfitting

    # Post-normalization
    x = LayerNormalization(epsilon=1e-6)(x + attn_output)  
    
    # Feed-forward network (Multi Layer Perceptron)
    x = Dense(ff_dim, activation="gelu")(x), # expands feature dimension and introduces non-linearity (to recognize complex patterns)
    x = Dense(ff_dim//4, activation='linear')(x) # projects back to original size
    ffn_output = Dropout(dropout_rate)(x) # dropout for generalization

    output=LayerNormalization(epsilon=1e-6)(x + ffn_output)

    return output

# testing with one MBConv and one ViT layer with differing metalayers
def inverted_residual_block(input, indim, channel, expand=4):

    # expand with expand*amount of channels and reduce dimensions by factor 2
    m = Conv2D(filters=expand*indim[-1], kernel_size=(3,3), strides=(2,2), activation=None, padding='same')(input)

    # perform depthwise convolution
    m = DepthwiseConv2D((3,3), activation=None, padding='same', use_bias=False)(m)

    #squeeze to desired amount of channels
    output = Conv2D(channel, (1,1), activation=None, padding='same', use_bias=False)(m)
    output = BatchNormalization()(output)

    output = tf.nn.gelu(output)  

    return output

def CoAtNet(input_shape, 
            channels=[64,96,192,384,768],
            dropout_rate=.3,
            num_heads=8,
            num_classes=1):
    
    inputs = Input(shape=input_shape)

    # first convolution to transform 96x96x3 image into 48x48x64
    x = Conv2D(filters=64,kernel_size=(3,3),strides=(2,2),padding='same')(inputs)

    # 2x CNN block
    x = inverted_residual_block(x, x.shape, channel=channels[1]) 
    x = inverted_residual_block(x, x.shape, channel=channels[2])

    #x = Conv2D(32, (3,3), strides=2, padding='same', activation='relu')(x)
    #x = MaxPooling2D(pool_size=(4,4))(x)
    
    # Automatic reshaping
    #x = Reshape((-1, x.shape[-1]))(x)

    # 2x transformer block
    x = transformer_block(x, x.shape, channel=channels[3], num_heads=num_heads, dropout_rate=dropout_rate)
    x = transformer_block(x, x.shape, channel=channels[4], num_heads=num_heads, dropout_rate=dropout_rate)
    
    
    x = GlobalAveragePooling1D(x.shape[1]//32 , 1)(x)
    x = Dense(num_classes, activation="sigmoid")(x)
    outputs = Dropout(.1)(x) # dropout to prevent overfitting
    
    
    return Model(inputs, outputs)

In [2]:
os.chdir('..')
dir=os.getcwd()

In [3]:
"""
def inverted_residual_block(input, expand=64, squeeze=16):
    m = Conv2D(expand, (1,1), activation='relu')(input)
    m = DepthwiseConv2D((3,3), activation='relu')(m)
    output = Conv2D(squeeze, (1,1), activation='relu')(m)
    return output
"""

def inverted_residual_block(input, expand=64, squeeze=16):
    m = Conv2D(expand, (1,1), activation=None, padding='same', use_bias=False)(input)
    m = BatchNormalization()(m)
    m = ReLU()(m)

    m = DepthwiseConv2D((3,3), activation=None, padding='same', use_bias=False)(m)
    m = BatchNormalization()(m)
    m = ReLU()(m)

    output = Conv2D(squeeze, (1,1), activation=None, padding='same', use_bias=False)(m)
    output = BatchNormalization()(output)

    # Residual Connection (Skip Connection)
    shortcut = input
    if input.shape[-1] != squeeze: 
        shortcut = Conv2D(squeeze, (1,1), padding='same', use_bias=False)(input)
    
    output = Add()([shortcut, output])  
    output = ReLU()(output)  

    return output

"""
De code hierboven lijkt heel erg op de code van Robin. De veranderingen:
- Na elke convolution stap wordt batchnormalization toegepast. Dit zou het model moeten helpen om beter te convergeren.
- Het blokje code van shortcut is toegevoegd zodat er als het goed is geen errors opdagen met problemen tussen de verschillende blokken
"""

'\nDe code hierboven lijkt heel erg op de code van Robin. De veranderingen:\n- Na elke convolution stap wordt batchnormalization toegepast. Dit zou het model moeten helpen om beter te convergeren.\n- Het blokje code van shortcut is toegevoegd zodat er als het goed is geen errors opdagen met problemen tussen de verschillende blokken\n'

In [4]:
def transformer_block(x, embed_dim, num_heads=8, ff_dim=256, dropout_rate=0.1): 
    # Pre-normalization
    x = LayerNormalization(epsilon=1e-6)(x)

    # Multi-head attention layer
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)(x, x)
    attn_output = Dropout(dropout_rate)(attn_output) # dropout to prevent overfitting

    # Post-normalization
    x = LayerNormalization(epsilon=1e-6)(x + attn_output)  
    
    # Feed-forward network (Multi Layer Perceptron)
    ffn_output = tf.keras.Sequential([
        Dense(ff_dim, activation="gelu"), # expands feature dimension and introduces non-linearity (to recognize complex patterns)
        Dense(embed_dim) # projects back to original size
    ])(x)
    ffn_output = Dropout(dropout_rate)(ffn_output) # dropout for generalization

    output=LayerNormalization(epsilon=1e-6)(x + ffn_output)

    return output

In [7]:
test=tf.random.normal([96,96,192])
output=MultiHeadAttention(num_heads=8,key_dim=192)(test,test)

In [8]:
output

<tf.Tensor: shape=(96, 96, 192), dtype=float32, numpy=
array([[[-7.85953074e-04, -1.69481151e-03, -1.45195071e-02, ...,
          9.14683565e-04,  3.73773742e-03,  9.34608839e-03],
        [-7.81970448e-04, -1.81844097e-03, -1.47203403e-02, ...,
          1.02955196e-03,  3.62323597e-03,  9.38561559e-03],
        [-5.24078147e-04, -1.56890042e-03, -1.47062074e-02, ...,
          9.89650376e-04,  3.63864983e-03,  9.25845280e-03],
        ...,
        [-7.07988336e-04, -1.50823663e-03, -1.47141553e-02, ...,
          1.05326530e-03,  3.67860077e-03,  9.56844632e-03],
        [-6.33458490e-04, -1.47872232e-03, -1.46559160e-02, ...,
          1.03846844e-03,  3.65743972e-03,  9.31038521e-03],
        [-6.48324611e-04, -1.75921246e-03, -1.44347120e-02, ...,
          1.37616694e-03,  3.94211989e-03,  9.26242489e-03]],

       [[-5.76260965e-03,  1.40514560e-02,  2.66433693e-03, ...,
          7.18184747e-05, -7.93055631e-03, -7.42277410e-03],
        [-5.74916881e-03,  1.40500814e-02,  2.54

In [None]:
# testing with one MBConv and one ViT layer with differing metalayers
def inverted_residual_block(input, expand=64, squeeze=16):
    m = Conv2D(expand, (1,1), activation=None, padding='same', use_bias=False)(input)

    m = DepthwiseConv2D((3,3), activation=None, padding='same', use_bias=False)(m)

    output = Conv2D(squeeze, (1,1), activation=None, padding='same', use_bias=False)(m)
    output = BatchNormalization()(output)

    # Residual Connection (Skip Connection)
    shortcut = input
    if input.shape[-1] != squeeze: 
        shortcut = Conv2D(squeeze, (1,1), padding='same', use_bias=False)(input)
    
    output = Add()([shortcut, output])  
    output = ReLU()(output)  

    return output

def CoAtNet(input_shape, 
            MBConv1_expand=64, MBConv1_squeeze=8,  
            num_heads1=4,
            num_classes=1):
    
    inputs = Input(shape=input_shape)
    
    # 2x CNN block
    x = inverted_residual_block(inputs, MBConv1_expand, MBConv1_squeeze) 

    x = Conv2D(8, (3,3), strides=2, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2,2))(x)

    # Automatic reshaping
    x = Reshape((-1, x.shape[-1]))(x)

    # 2x transformer block
    x = transformer_block(x, embed_dim=x.shape[-1], num_heads=num_heads1)
    
    x = GlobalAveragePooling1D()(x)
    outputs = Dense(num_classes, activation="sigmoid")(x)
    
    return Model(inputs, outputs)

In [31]:
model=CoAtNet((96,96,3))
model.summary()

ResourceExhaustedError: {{function_node __wrapped__AddV2_device_/job:localhost/replica:0/task:0/device:GPU:0}} failed to allocate memory [Op:AddV2]

In [17]:
def CoAtNet(input_shape, 
            MBConv1_expand=64, MBConv1_squeeze=16, 
            MBConv2_expand=32, MBConv2_squeeze=8, 
            num_heads1=4, num_heads2=4, 
            num_classes=1):
    
    inputs = Input(shape=input_shape)
    
    # 2x CNN block
    x = inverted_residual_block(inputs, MBConv1_expand, MBConv1_squeeze) 
    x = inverted_residual_block(x, MBConv2_expand , MBConv2_squeeze)

    x = Conv2D(32, (3,3), strides=2, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2,2))(x)
    
    # Automatic reshaping
    x = Reshape((-1, x.shape[-1]))(x)

    # 2x transformer block
    x = transformer_block(x, embed_dim=x.shape[-1], num_heads=num_heads1)
    x = transformer_block(x, embed_dim=x.shape[-1], num_heads=num_heads2)
    
    x = GlobalAveragePooling1D()(x)
    outputs = Dense(num_classes, activation="sigmoid")(x)
    
    return Model(inputs, outputs)

In [14]:
model=CoAtNet((96,96,3))
model.summary()

In [45]:
input_shape = (IMAGE_SIZE, IMAGE_SIZE, 3)
model = CoAtNet(input_shape)
model.compile(SGD(learning_rate=0.001, momentum=0.95), loss = 'binary_crossentropy', metrics=['accuracy'])
model_name = 'CoAtNet_test_16-3'


# save the model and weights
model_filepath = 'metadata/'+model_name + '.json'
weights_filepath = 'metadata/'+model_name + '_weights.keras'

model_json = model.to_json() # serialize model to JSON
with open(model_filepath, 'w') as json_file:
    json_file.write(model_json)


# get the data generators
train_gen, val_gen = get_pcam_generators(dir+'\Data')


# define the model checkpoint and Tensorboard callbacks
checkpoint = ModelCheckpoint(weights_filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
tensorboard = TensorBoard(os.path.join('logs', model_name))
callbacks_list = [checkpoint, tensorboard]


# since the model is trained for only 10 "mini-epochs", i.e. half of the data is
# not used during training
train_steps = train_gen.n//train_gen.batch_size
val_steps = val_gen.n//val_gen.batch_size

  train_gen, val_gen = get_pcam_generators(dir+'\Data')


Found 144000 images belonging to 2 classes.
Found 16000 images belonging to 2 classes.


In [46]:
history = model.fit(train_gen, steps_per_epoch=train_steps,
                    validation_data=val_gen,
                    validation_steps=val_steps,
                    epochs=3,
                    callbacks=callbacks_list)

Epoch 1/3
[1m  66/4500[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m47:25[0m 642ms/step - accuracy: 0.5865 - loss: 0.6739

KeyboardInterrupt: 

In [5]:
def CNN(input_shape, 
            MBConv1_expand=64, MBConv1_squeeze=16, 
            MBConv2_expand=32, MBConv2_squeeze=8,
            MBConv3_expand=16, MBConv3_squeeze=4,
            MBConv4_expand=8, MBConv4_squeeze=2):
    
    inputs = Input(shape=input_shape)
    
    # 2x CNN block
    x = inverted_residual_block(inputs, MBConv1_expand, MBConv1_squeeze) 
    x = inverted_residual_block(x, MBConv2_expand , MBConv2_squeeze)
    x = inverted_residual_block(x, MBConv3_expand, MBConv3_squeeze) 
    x = inverted_residual_block(x, MBConv4_expand , MBConv4_squeeze)

    x = GlobalAveragePooling2D()(x)

    outputs = Dense(1, activation="sigmoid")(x)
    
    return Model(inputs, outputs)

In [6]:
model=CNN((96,96,3))
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 96, 96, 3)]          0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 96, 96, 64)           192       ['input_1[0][0]']             
                                                                                                  
 batch_normalization (Batch  (None, 96, 96, 64)           256       ['conv2d[0][0]']              
 Normalization)                                                                                   
                                                                                                  
 re_lu (ReLU)                (None, 96, 96, 64)           0         ['batch_normalization[0][0

In [7]:
input_shape = (IMAGE_SIZE, IMAGE_SIZE, 3)
model = CNN(input_shape)
model.compile(SGD(learning_rate=0.01, momentum=0.95), loss = 'binary_crossentropy', metrics=['accuracy'])
model_name = 'CNN_test_1'


# save the model and weights
model_filepath = 'metadata/'+model_name + '.json'
weights_filepath = 'metadata/'+model_name + '_weights.hdf5'

os.makedirs("metadata", exist_ok=True)

model_json = model.to_json() # serialize model to JSON
with open(model_filepath, 'w') as json_file:
    json_file.write(model_json)


# get the data generators
train_gen, val_gen = get_pcam_generators("C:\\Users\\20223692\\OneDrive - TU Eindhoven\\data")


# define the model checkpoint and Tensorboard callbacks
checkpoint = ModelCheckpoint(weights_filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min', save_format='hdf5')
tensorboard = TensorBoard(os.path.join('logs', model_name))
callbacks_list = [checkpoint, tensorboard]


# since the model is trained for only 10 "mini-epochs", i.e. half of the data is
# not used during training
train_steps = train_gen.n//train_gen.batch_size
val_steps = val_gen.n//val_gen.batch_size

Found 144000 images belonging to 2 classes.
Found 16000 images belonging to 2 classes.


In [8]:
history = model.fit(train_gen, steps_per_epoch=train_steps,
                    validation_data=val_gen,
                    validation_steps=val_steps,
                    epochs=3,
                    callbacks=callbacks_list)

Epoch 1/3

KeyboardInterrupt: 