In [2]:
from google.colab import drive 
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [3]:
!cp -av 'drive/MyDrive/CV/final_project/data_pure.zip' './' 
!unzip -u -q "data_pure.zip"

'drive/MyDrive/CV/final_project/data_pure.zip' -> './data_pure.zip'


In [4]:
!pip install -U -q tensorflow-addons

In [16]:
%tensorflow_version 2.x
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pandas as pd
import numpy as np
import tensorflow.keras.layers as L
from tensorflow.keras.losses import MeanAbsoluteError
from tensorflow import keras
from tensorflow.keras.optimizers import Adam

seed = 1
base_dir = './'
image_size=224

df = pd.read_csv(base_dir+"data/train.csv")
df['filename'] = df.apply(lambda row: row['id']+'.jpg',axis=1)

def prep_fn(img):
    img = img.astype(np.float32) / 255.0
    #img = (img - 0.5) * 2
    return img

data_gen_args = dict(preprocessing_function=prep_fn,
                     width_shift_range=0.05,
                     height_shift_range=0.2,
                     zoom_range=0.05,
                     rotation_range=5,
                     horizontal_flip=False,
                     vertical_flip=False,
                     validation_split=0.1)

train_datagen = ImageDataGenerator(**data_gen_args)
val_datagen = ImageDataGenerator(preprocessing_function=prep_fn,validation_split=0.1)

train_generator = train_datagen.flow_from_dataframe(dataframe=df,
                                                    directory=base_dir+'data/train/',
                                                    x_col='filename',
                                                    y_col=['x','y'],
                                                    subset="training",
                                                    batch_size = 32,
                                                    seed=seed,
                                                    shuffle=True,
                                                    class_mode='raw',
                                                    target_size=(224,224))
val_generator = val_datagen.flow_from_dataframe(dataframe=df,
                                                directory=base_dir+'data/train/',
                                                x_col='filename',
                                                y_col=['x','y'],
                                                subset="validation",
                                                batch_size = 32,
                                                seed=seed,
                                                shuffle=True,
                                                class_mode='raw',
                                                target_size=(224,224))

Found 6750 validated image filenames.
Found 750 validated image filenames.


In [None]:
df_test = pd.read_csv("data/imagenames.csv")
df_test['filename'] = df_test.apply(lambda row: row['id']+'.jpg',axis=1)
df_test['x']=0
df_test['y']=0
test_datagen = ImageDataGenerator(preprocessing_function=prep_fn)
test_generator = test_datagen.flow_from_dataframe(dataframe=df_test,
                                                directory=base_dir+'data/test/',
                                                x_col='filename',
                                                y_col=['x','y'],
                                                batch_size = 32,
                                                shuffle=False,
                                                class_mode='raw',
                                                target_size=(224,224))

Found 1200 validated image filenames.


In [6]:
projection_dim = 128
conv_filters = [32,64, projection_dim]
num_patches = (image_size//2**(len(conv_filters))) **2
num_heads = 3

In [7]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = L.Dense(units, activation = tf.nn.gelu)(x)
        x = L.Dropout(dropout_rate)(x)
    return x

In [8]:
class CCTTokenizer(L.Layer):
    def __init__(self):
        super(CCTTokenizer, self).__init__()
        self.num_patches = num_patches
        self.projection_dim = projection_dim

        """
        self.conv_model = tf.keras.Model(inputs = Xception_model.get_layer("block1_conv1").input, \
                                         outputs = Xception_model.output)
        for layer in self.conv_model.layers:
            layer.trainable = False
        """
        self.conv_model = keras.Sequential()
        for i in conv_filters:
            self.conv_model.add(
                L.Conv2D(i,(3,3),activation='relu', padding='same')
            )
            self.conv_model.add(
                L.MaxPool2D((2, 2), strides=(2, 2))
            )
        
        self.position_embedding = L.Embedding(
            input_dim = self.num_patches, output_dim = self.projection_dim
        )

    def call(self, images):
        outputs = self.conv_model(images)
        positions = tf.range(start = 0, limit = self.num_patches, delta = 1)
        #print(outputs.shape)
        reshaped = tf.reshape(
            outputs,
            (-1, tf.shape(outputs)[1] * tf.shape(outputs)[2], tf.shape(outputs)[-1]),
        )+ self.position_embedding(positions)
        #print(tf.shape(reshaped), self.position_embedding(positions).shape)
        return reshaped

In [18]:
def attention_block(inputs, key_dim, mlp_dim,dropout=0):
    x = L.LayerNormalization(epsilon = 1e-6)(inputs)
    attention_output = L.MultiHeadAttention(
        num_heads = num_heads, key_dim = key_dim, dropout = dropout
    )(x, x)
    if(inputs.shape[-1]==key_dim):
        x = L.Add()([inputs,attention_output])
    skip = x
    x = L.LayerNormalization(epsilon = 1e-6)(x)
    x = mlp(x, hidden_units = mlp_dim, dropout_rate = dropout)
    if(skip.shape[-1]==x.shape[-1]):
        x = L.Add()([skip,x])
    return x

def vision_transformer():
    inputs = L.Input(shape = (image_size, image_size, 3))

    #conv_features= convolution_block(inputs)

    # Encode patches.
    x = CCTTokenizer()(inputs)

    for i in range(4):
        x = attention_block(x, 128, [256,128], 0)


    # Create a [batch_size, projection_dim] tensor.
    x = L.LayerNormalization(epsilon = 1e-6)(x)

    x = x[:,0,:]
    #x = L.GlobalAveragePooling1D()(x) 

    x = L.Dense(64,activation='relu')(x)
    out = L.Dense(2)(x)
    
    # Create the model.
    model = tf.keras.Model(inputs = inputs, outputs = out)
    
    return model

In [19]:
model = vision_transformer()
model.compile(optimizer = Adam(learning_rate=0.001), 
              loss=MeanAbsoluteError())

In [60]:
model.summary()

Model: "model_15"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_13 (InputLayer)           [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
cct_tokenizer_11 (CCTTokenizer) (None, 784, 128)     193600      input_13[0][0]                   
__________________________________________________________________________________________________
layer_normalization_99 (LayerNo (None, 784, 128)     256         cct_tokenizer_11[0][0]           
__________________________________________________________________________________________________
multi_head_attention_44 (MultiH (None, 784, 128)     197888      layer_normalization_99[0][0]     
                                                                 layer_normalization_99[0][

In [20]:
with tf.device('/device:GPU:0'):
    model.load_weights("drive/MyDrive/CV/Model/CCT")
    callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)
    r=model.fit(train_generator, validation_data=val_generator,
                validation_steps=20,steps_per_epoch=211,epochs=100,callbacks=[callback])

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
 18/211 [=>............................] - ETA: 1:37 - loss: 3.8390

KeyboardInterrupt: ignored

In [None]:
model.save_weights("drive/MyDrive/CV/Model/CCT")

In [None]:
with tf.device('/device:GPU:0'):
    ypred = model.predict(test_generator)
ypred.shape

(1200, 2)

In [None]:
df_test['x'] = ypred[:,0]
df_test['y'] = ypred[:,1]
if 'filename' in df_test.keys():
    df_test=df_test.drop(columns=['filename'])
df_test.head()
df_test.to_csv('pred_CCT.csv',index=False)