In [None]:
import sys
import subprocess
import pkg_resources

required = {'numpy', 'pandas', 'tensorflow', 'keras'}
installed = {pkg.key for pkg in pkg_resources.working_set}
missing = required - installed

if missing:
    python = sys.executable
    subprocess.check_call([python, '-m', 'pip', 'install', *missing], stdout=subprocess.DEVNULL)

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd

### Import images

In [None]:
import os
import random

if not os.path.exists('keras_classification_sample'):
  !git clone https://github.com/pazamelin/keras_classification_sample.git

IMG_DIR = f'./keras_classification_sample/concrete-cracks/'
CONFIG = {
    'image_size': (227, 227),
    'batch_size': 25,
    'validation_split': 0.2,
    'seed': random.randint(0, 99999) 
}

train_ds = tf.keras.utils.image_dataset_from_directory(IMG_DIR, **CONFIG, subset="training")
validation_ds = tf.keras.utils.image_dataset_from_directory(IMG_DIR, **CONFIG, subset="validation")

class_names = train_ds.class_names
print(f'Class_names: {class_names}')

AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
validation_ds = validation_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(25, 10))
for images, labels in train_ds.take(1):
  for i in range(10):
    ax = plt.subplot(2, 5, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(class_names[labels[i]])
    plt.axis("off")

## Custom CNN

### Instantiate a model

In [None]:
from tensorflow.keras import models, layers

model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu',input_shape=(227, 227, 3)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(256, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
model.add(layers.Dense(512, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))

model.summary()

### Configure the model for training

In [None]:
from tensorflow.keras import optimizers

model.compile(loss='binary_crossentropy',
              optimizer=optimizers.RMSprop(learning_rate=1e-5),
              metrics=['acc']
)

### Fit the model

In [None]:
import time

# timing callback
class TimeHistory(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.times = []

    def on_epoch_begin(self, batch, logs={}):
        self.epoch_time_start = time.time()

    def on_epoch_end(self, batch, logs={}):
        self.times.append(time.time() - self.epoch_time_start)
        
time_callback = TimeHistory()

# fit model
history = model.fit(train_ds,
                    steps_per_epoch=32,
                    epochs=10,
                    validation_data=validation_ds,
                    validation_steps=50,
                    callbacks=[time_callback]
)

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = make_subplots(rows=1, cols=2,
                    subplot_titles = ['Training and Validation Accuracy',
                                      'Training and Validation Loss'])

plots_positions = [(1, 1)] * 2 + [(1, 2)] * 2
metric_names = ['acc', 'val_acc', 'loss', 'val_loss']
epochs = list(range(1, history.params['epochs'] + 1))

for metric, (plot_row, plot_col) in zip(metric_names, plots_positions):
  fig.add_trace(
    go.Scatter(name=metric,
               x=epochs,
               y=history.history[metric]),
  row=plot_row, 
  col=plot_col
)   

fig.show()


### Model evaluation

In [None]:
FV_IMG_DIR = f'./keras_classification_sample/final-validation'

evaluate_ds = tf.keras.utils.image_dataset_from_directory(FV_IMG_DIR, image_size=(227,227), batch_size=25)
print("Evaluate on test data")
custom_cnn_fv = model.evaluate(evaluate_ds, batch_size=25)
print("test loss, test acc:", custom_cnn_fv)

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
import os, random
import matplotlib.pyplot as plt

columns = 4
fig = make_subplots(rows=1, cols=columns)

img_names = []
for i in range(0, int(columns / 2)):
  img_names.append(f'{FV_IMG_DIR}/positive/' + random.choice(os.listdir(f'{FV_IMG_DIR}/positive/')))
  img_names.append(f'{FV_IMG_DIR}/negative/' + random.choice(os.listdir(f'{FV_IMG_DIR}/negative/')))


plt.figure(figsize=(25, 10))
for i, img_name in enumerate(img_names):
  ax = plt.subplot(1, columns, i + 1)
  img = tf.keras.preprocessing.image.load_img(img_name, target_size=(227, 227))
  img_array = tf.keras.preprocessing.image.img_to_array(img)
  plt.imshow(img_array/255.)
  img_array = tf.expand_dims(img_array, 0)  # Create batch axis
  score = model.predict(img_array)[0]
  plt.title(f'negative: {1 - score}, positive: {score}')
  plt.axis("off")


## Framework Network: TODO

## Training Times Comparison

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = make_subplots(rows=1, cols=2,
                    subplot_titles = ['training time per epoch',
                                      'total time per epoch'])
timing_data = [time_callback.times]
timing_names = ['custom CNN']
epochs = list(range(1, history.params['epochs'] + 1))

for data, name in zip(timing_data, timing_names):
  # plot time per epoch
  fig.add_trace(
    go.Scatter(name=name,
               x=epochs,
               y=data),
    row = 1, col = 1   
  )    

# plot total time
total_times = [sum(data) for data in timing_data]
fig.add_trace(
  go.Bar(name='total time',
         x=timing_names,
         y=total_times),
    row = 1, col = 2   
  )
fig.show()


## Final validation comparison

In [None]:
print("test loss, test acc:", custom_cnn_fv)