<a href="https://colab.research.google.com/github/SarveshPatil99/Adversarial-Robustness-Enhancement/blob/main/TML_train_mix.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!gdown --fuzzy https://drive.google.com/file/d/1-8d_JyH50-b23vnQXSn2BLM1E4Mh-0k5/view?usp=drive_link
!unzip -q speckle_0.01.zip

In [None]:
!gdown --fuzzy https://drive.google.com/uc?id=18MZZwgKjTGlB2Y4Esw_lRIvbpOaANbnh
!unzip -q original_dataset_stylegan3_10000.zip

In [None]:
!unzip -q list_files.zip

In [None]:
import cv2
from pathlib import Path
from tqdm.auto import tqdm
import tensorflow as tf
import numpy as np
import pathlib
import os
from tensorflow.keras.layers import Input, Conv2D, SeparableConv2D, MaxPooling2D, BatchNormalization, Dropout, Flatten, Dense, ReLU, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import RMSprop
import pickle
from matplotlib import pyplot as plt
from random import sample, shuffle, seed
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')

In [None]:
# from pathlib import Path
# Path('list_files/train').mkdir(parents=True)
# Path('list_files/val').mkdir(parents=True)
# Path('list_files/test').mkdir(parents=True)

# for ratio in np.arange(0,1.1,1/8):
#   # print(ratio)
#   for split in ['train', 'val', 'test']:
#     total_files = []
#     for label in ['real', 'fake']:
#       og_files =  [f'original/{split}/{label}/{f}' for f in os.listdir(f'original/{split}/{label}')]
#       noisy_files =  [f'noisy_data/speckle_0.01/{split}/{label}/{f}' for f in os.listdir(f'noisy_data/speckle_0.01/{split}/{label}')]
#       seed(0)
#       total_files += sample(og_files, int(ratio * len(og_files)))
#       seed(0)
#       total_files += sample(noisy_files, int((1 - ratio) * len(noisy_files)))
#     seed(0)
#     shuffle(total_files)
#     print(f'{split}_og_{ratio*100:.1f}_noisy_{(1-ratio)*100:.1f}')
#     with open(f'list_files/{split}/og_{ratio*100:.1f}_noisy_{(1-ratio)*100:.1f}.txt', 'w') as f:
#       f.write('\n'.join(total_files))

# Assert files
for ratio in np.arange(0,1.1,1/8):
  print(ratio)
  for split in ['train', 'val', 'test']:
    print(split, end = ' ')
    with open(f'list_files/{split}/og_{ratio*100:.1f}_noisy_{(1-ratio)*100:.1f}.txt', 'r') as f:
      lines = f.read().split('\n')
      len_real = len([f for f in lines if 'real' in f])
      len_fake = len([f for f in lines if 'fake' in f])
      len_og = len([f for f in lines if 'original' in f])
      len_noisy = len([f for f in lines if 'noisy_data' in f])
      # print(f"{len_real} real {len_fake} fake")
      print(f"{len_og} og {len_noisy} noisy")
      assert len_real == len_fake
      assert len_og / (len_og + len_noisy) == ratio

In [None]:
noise_type = 'speckle_0.01'
classes = np.array(['real', 'fake'])
img_height = 256
img_width = 256
batch_size = 16
ratio = 0 # 0, 0.125, 0.25, 0.5, 0.625, 0.75, 0.875, 1

def create_label(image_path):
  class_name = tf.strings.split(image_path,'/')[-2]
  return tf.cast(classes == class_name,tf.float32)

def load(image_path):

  image = tf.io.read_file(image_path)
  image = tf.image.decode_png(image) / 255
  # image = tf.image.resize(image, [img_height, img_width])

  label = create_label(image_path)

  return image, label

with open(f'list_files/train/og_{ratio*100:.1f}_noisy_{(1-ratio)*100:.1f}.txt', 'r') as f:
  train_files = f.read().split('\n')
train_ds = tf.data.Dataset.from_tensor_slices(train_files)
train_ds = train_ds.map(load, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
train_ds_length = len(train_ds)

with open(f'list_files/val/og_{ratio*100:.1f}_noisy_{(1-ratio)*100:.1f}.txt', 'r') as f:
  val_files = f.read().split('\n')
val_ds = tf.data.Dataset.from_tensor_slices(val_files)
val_ds = val_ds.map(load, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.batch(batch_size)
val_ds = val_ds.cache()
val_ds_length = len(val_ds)

with open(f'list_files/test/og_{ratio*100:.1f}_noisy_{(1-ratio)*100:.1f}.txt', 'r') as f:
  test_files = f.read().split('\n')
test_ds = tf.data.Dataset.from_tensor_slices(test_files)
test_ds = test_ds.map(load, num_parallel_calls=tf.data.AUTOTUNE)
test_ds = test_ds.batch(batch_size)
test_ds = test_ds.cache()
test_ds_length = len(test_ds)

print(f'train_ds_length: {train_ds_length}, val_ds_length: {val_ds_length}, test_ds_length: {test_ds_length}')

In [None]:
for x, y in train_ds.take(1):
  pass
plt.figure(figsize=(8,2))
for i in range(4):
  plt.subplot(1,4,i+1)
  plt.imshow(x[i])
  plt.axis('off')
  plt.title(classes[tf.argmax(y[i])])
plt.tight_layout()

In [None]:
def conv_bn_relu(filters,x,idx,label):
  x = SeparableConv2D(filters, 3, padding='same', kernel_initializer='he_uniform',name=f'conv_{idx}{label}')(x)
  x = BatchNormalization(name=f'bn_{idx}{label}')(x)
  x = ReLU(name=f'relu_{idx}{label}')(x)
  # x = Dropout(rate=0.1,name=f'dropout_{idx}{label}')(x)
  return x

def create_model(input_shape = (256, 256, 3)):
  input_layer = Input(input_shape,name='input')
  n_filters = 16
  x = input_layer
  for i in range(5):
    x = conv_bn_relu(n_filters,x,i,'a')
    x = conv_bn_relu(n_filters,x,i,'b')
    x = MaxPooling2D(name=f'maxpool_{i}')(x)
    n_filters = int(n_filters*2)
  # x = Flatten(name='flatten')(x)
  x = GlobalAveragePooling2D(name='global_pool')(x)
  # x = Dense(32,activation='relu',name='dense_0')(x)
  # x = Dropout(rate=0.2,name=f'dropout_dense')(x)
  x = Dense(2,activation='softmax',name='dense_0', dtype = 'float32')(x)

  model = Model(inputs = [input_layer], outputs = [x])

  return model

In [None]:
lr = 1e-3
epochs = 50
rLR_patience = 5
es_patience = 10
loss = 'categorical_crossentropy'
metrics = ['accuracy']
n_filters = 16
model_type = f'{noise_type}_og_{ratio*100:.1f}_noisy_{(1-ratio)*100:.1f}'

model_filename = f'{model_type}_sg3=20k_n={n_filters}_epoch={epochs}_lr={lr:.0e}'

model_path = f'saved/models/{model_filename}.h5'
history_path = f'saved/histories/{model_filename}.pkl'
pathlib.Path('saved/models').mkdir(exist_ok=True,parents=True)
pathlib.Path('saved/histories').mkdir(exist_ok=True,parents=True)

checkpoint = ModelCheckpoint(filepath=model_path, monitor='val_accuracy', mode='max', save_best_only=True, verbose = 1)
reduce_lr = ReduceLROnPlateau(monitor='val_accuracy', mode = 'max', factor=1/np.sqrt(10), patience = rLR_patience, min_lr=1e-6, verbose = 1)
earlystopper = EarlyStopping(monitor='val_accuracy', mode = 'max', patience = es_patience, verbose=1)
callbacks = [checkpoint, reduce_lr, earlystopper]

model = create_model()
print(model.count_params())
optimizer = RMSprop(learning_rate = lr)
model.compile(optimizer = optimizer, loss = loss, metrics = metrics)
hist = model.fit(train_ds, epochs = epochs, validation_data = val_ds, callbacks = callbacks, verbose = 1)
with open(history_path, 'wb') as file_pi:
  pickle.dump(hist.history, file_pi)

In [None]:
model.load_weights(model_path)
plt.plot(hist.history['accuracy'])
plt.plot(hist.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'])
plt.show()
plt.plot(hist.history['loss'])
plt.plot(hist.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'])
plt.show()

In [None]:
from tensorflow.keras.models import load_model
model = load_model(model_path)
model.evaluate(test_ds)

In [None]:
og_test_files = [f'original/test/real/{f}' for f in os.listdir(f'original/test/real')] + [f'original/test/fake/{f}' for f in os.listdir(f'original/test/fake')]
test_ds = tf.data.Dataset.from_tensor_slices(og_test_files)
test_ds = test_ds.map(load, num_parallel_calls=tf.data.AUTOTUNE)
test_ds = test_ds.batch(batch_size)
test_ds = test_ds.cache()
test_ds_length = len(test_ds)
model.evaluate(test_ds)

In [None]:
!gdown --fuzzy https://drive.google.com/file/d/1l3Hjb4LemQNEj2Muf7T87dCAYHroafux/view?usp=sharing
!unzip -q adversarial_fgsm_0.001_big_56.zip

In [None]:
og_test_files = [f'adversarial_fgsm/test/real/{f}' for f in os.listdir(f'adversarial_fgsm/test/real')] + [f'adversarial_fgsm/test/fake/{f}' for f in os.listdir(f'adversarial_fgsm/test/fake')]
test_ds = tf.data.Dataset.from_tensor_slices(og_test_files)
test_ds = test_ds.map(load, num_parallel_calls=tf.data.AUTOTUNE)
test_ds = test_ds.batch(batch_size)
test_ds = test_ds.cache()
test_ds_length = len(test_ds)
model.evaluate(test_ds)