In [None]:
import os
import zipfile
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import PIL
from PIL import Image, ImageOps
import pandas as pd
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from sklearn.preprocessing import MinMaxScaler

In [None]:
!pip install PyDrive

In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [None]:
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
# Download image zip
downloaded = drive.CreateFile({'id':"10K44l5A__7DrNTZStVP6-vebBbWp9f4Y"})
downloaded.GetContentFile('Dataset.zip') 

# Download CSV data
downloaded = drive.CreateFile({'id':"1v73UDBS2_yUMZ0yTAIrN8i5JyiepIAKo"})
downloaded.GetContentFile('PM10_Data.csv') 

In [None]:
test_local_zip = "/content/Dataset.zip"

zip_ref = zipfile.ZipFile(test_local_zip, 'r')
zip_ref.extractall('/tmp/Dataset')

zip_ref.close()

In [None]:
# Define the training and validation base directories
train_dir = '/tmp/Dataset'

# Directory with training Good pictures
train_Good_dir = os.path.join(train_dir, 'Good')
# Directory with training Moderate pictures
train_Moderate_dir = os.path.join(train_dir, 'Moderate')
# Directory with training Dangerous pictures
train_Dangerous_dir = os.path.join(train_dir, 'Dangerous')

# Check the number of images for each class and set
print(f"There are {len(os.listdir(train_Good_dir))} images of Good.\n")
print(f"There are {len(os.listdir(train_Moderate_dir))} images of Moderate.\n")
print(f"There are {len(os.listdir(train_Dangerous_dir))} images of Dangerous.\n")

In [None]:
# Turn all image to jpg
turn_to_jpg(train_Good_dir)
turn_to_jpg(train_Moderate_dir)
turn_to_jpg(train_Dangerous_dir)

In [None]:
# Load the first example of a Good
sample_image  = load_img(f"{os.path.join(train_Good_dir, os.listdir(train_Good_dir)[0])}")

# Convert the image into its numpy array representation
sample_array = img_to_array(sample_image)

print(f"Each image has shape: {sample_array.shape}")

In [None]:
# Read CSV File
csv_path = "/content/PM10_Data.csv"
df = pd.read_csv(csv_path)
df.head(30)

In [None]:
def remove_missing_image(dataset, image_path):
    dataset = np.array(dataset)
    result = []
    data_index = 0
    for i in dataset:
      if os.path.join(i[0] + ".jpg") not in os.listdir(os.path.join(image_path, i[2])):
        result.append(data_index)
      data_index += 1
    return result

In [None]:
# Split data with ratio train 90% and validation 10%
randomize_train = df.sample(frac = 0.9)
randomize_val = df.drop(randomize_train.index)
not_found_image = remove_missing_image(randomize_train, train_dir)
train_image = np.delete(np.array(randomize_train), not_found_image, axis=0)

not_found_image = remove_missing_image(randomize_val, train_dir)
val_image = np.delete(np.array(randomize_val), not_found_image, axis=0)

In [None]:
def process_image(image_path, image_list):
    ori_image = Image.open(os.path.join(image_path, image_list[2] + "/" + image_list[0] + ".jpg"))
    ori_image = ori_image.resize((150, 150))
    gray_image = ImageOps.grayscale(ori_image)
    return np.array(gray_image).flatten()

In [None]:
# Open image and put it in array
image_array = []
for i in train_image:
  image_array.append(process_image(train_dir, i))
image_array = np.array(image_array)

val_array = []
for i in val_image:
  val_array.append(process_image(train_dir, i))
val_array = np.array(val_array)

In [None]:
train_labels = np.delete(train_image, 0, 1)
train_labels = np.delete(train_labels, 1, 1)

val_labels = np.delete(val_image, 0, 1)
val_labels = np.delete(val_labels, 1, 1)

In [None]:
image_array = np.reshape(image_array, (image_array.shape[0], image_array.shape[1], image_array.shape[2], 1))
val_array = np.reshape(val_array, (val_array.shape[0], val_array.shape[1], val_array.shape[2], 1))

In [None]:
image_array = image_array/255.0
val_array = val_array/255.0

In [None]:
train_labels = train_labels.astype(float)
val_labels = val_labels.astype(float)

In [None]:
image_array.shape

In [None]:
train_labels = np.reshape(train_labels, (image_array.shape[0], 1))
val_labels = np.reshape(val_labels, (val_array.shape[0], 1))

In [None]:
train_labels.shape

In [None]:
# Test generators
train_generator, validation_generator = train_labels, val_labels

Transfer Learning

In [None]:
# Download the inception v3 weights
!wget --no-check-certificate \
    https://storage.googleapis.com/mledu-datasets/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5 \
    -O /tmp/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5

In [None]:
# Import the inception model  
from tensorflow.keras.applications.inception_v3 import InceptionV3

# Create an instance of the inception model from the local pre-trained weights
local_weights_file = '/tmp/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5'

In [None]:
def create_pre_trained_model(local_weights_file):
  pre_trained_model = InceptionV3(input_shape = (150, 150, 3),
                                  include_top = False, 
                                  weights = None) 

  pre_trained_model.load_weights(local_weights_file)

  # Make non trainable layer
  for layers in pre_trained_model.layers:
    layers.trainable = False

  return pre_trained_model

In [None]:
pre_trained_model = create_pre_trained_model(local_weights_file)

# model summary
pre_trained_model.summary()

In [None]:
class myCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs={}):
    if(logs.get('accuracy')>0.999):
      print("\nReached 99.9% accuracy so cancelling training!")
      self.model.stop_training = True

In [None]:
def output_of_last_layer(pre_trained_model):
  last_desired_layer = pre_trained_model.get_layer('mixed7')
  print('last layer output shape: ', last_desired_layer.output_shape)
  last_output = last_desired_layer.output
  print('last layer output: ', last_output)

  return last_output

In [None]:
last_output = output_of_last_layer(pre_trained_model)

In [None]:
# Print the type of the pre-trained model
print(f"The pretrained model has type: {type(pre_trained_model)}")

In [None]:
def create_final_model(pre_trained_model, last_output):
  # Flatten the output layer to 1 dimension
  x = layers.Flatten()(last_output)

  # Add a fully connected layer with 1024 hidden units and ReLU activation
  x = layers.Dense(1024, activation='relu')(x)
  # Add a dropout rate of 0.2
  x = layers.Dropout(0.2)(x)  
  # Add a final sigmoid layer for classification
  x = layers.Dense(1, activation='sigmoid')(x)        

  # Create the complete model by using the Model class
  model = Model(inputs=pre_trained_model.input, outputs=x)

  # Compile the model
  model.compile(optimizer = RMSprop(learning_rate=0.0001), 
                loss = 'binary_crossentropy',
                metrics = ['accuracy'])

  return model

In [None]:
# model in a variable
model = create_final_model(pre_trained_model, last_output)

# Inspect parameters
total_params = model.count_params()
num_trainable_params = sum([w.shape.num_elements() for w in model.trainable_weights])

print(f"There are {total_params:,} total parameters in this model.")
print(f"There are {num_trainable_params:,} trainable parameters in this model.")

In [None]:
callbacks = myCallback()
history = model.fit(train_generator,
                    validation_data = validation_generator,
                    epochs = 100,
                    verbose = 2,
                    callbacks=callbacks)