In [None]:
# -*- coding: utf-8 -*-
"""
Created on Tue May 17 2022

@author: Debabrata Ghorai, Ph.D.

Mobile phone screen on/off detection.
"""

import os
import random
import matplotlib.pyplot as plt

import cv2
import pandas as pd
import numpy as np
import tensorflow
import keras

from keras import layers
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ModelCheckpoint
from keras.callbacks import EarlyStopping
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

In [None]:
def prepare_data(x_train, y_train, h=150, w=150):
  """Read and resize the image"""
  x_lists = []
  y_lists = []
  # loop over image and label list
  for i, j in zip(x_train, y_train):
    try:
      # read image
      img = cv2.imread(i)
      # resize image
      img_resize = cv2.resize(img, (w, h))
      # reshape numpy array
      img_tf = img_resize.reshape((1,)+img_resize.shape)
      # append final array and corresponding label to the list
      x_lists.append(img_tf)
      y_lists.append(int(j))
    except Exception as e:
      print('Cannot read the image: {}'.format(e))
  # return both the list
  return x_lists, y_lists

In [None]:
def build_model(height, width, channel, outnode):
  """CNN for image classification"""
  # build model
  model = keras.Sequential()
  # learn a total of 32 filters, kernel size 3x3
  model.add(layers.Conv2D(32, (3, 3), input_shape=(height, width, channel), padding="Same", activation='relu'))
  model.add(layers.MaxPooling2D((2, 2)))
  #  learn a total of 64 filters, kernel size 3x3
  model.add(layers.Conv2D(64, (3, 3), padding="Same", activation='relu'))
  model.add(layers.MaxPooling2D((2, 2)))
  #  learn a total of 100 filters, kernel size 3x3
  model.add(layers.Conv2D(64, (3, 3), padding="Same", activation='relu'))
  model.add(layers.MaxPooling2D((2, 2)))
  # flatten the 3D output to 1D
  model.add(layers.Flatten())
  # add dense layer with relu activation function
  model.add(layers.Dense(256, activation='relu'))
  model.add(layers.Dense(32, activation='relu'))
  # add output layer with softmx activation function 
  model.add(layers.Dense(outnode, activation='softmax'))
  # return model
  return model

In [None]:
def get_data_list(path):
    """Get all the files with full path"""
    filelist = []
    # loop over dirs, sub-dirs, and files
    for root, dirs, files in os.walk(path):
        for fname in files:
            if fname.endswith(".jpg"):
                fpath = os.path.join(root, fname)
                # append full path of the file in a list
                filelist.append(fpath)
    # return final data list
    return filelist

In [None]:
def create_training_data(filelist, print_stats=False):
    """Create model training data"""
    try:
        # build a dataframe with image full-path and category
        df = pd.DataFrame(filelist, columns=["data_path"])

        # append class type into dataframe
        for ix, row in df.iterrows():
            df.loc[ix, 'class_type'] = row['data_path'].split("\\")[-2]

        # get unique class and their counts
        tot_sample = df.shape[0]
        class_type_counts = df['class_type'].value_counts()
        class_type = class_type_counts.keys().tolist()

        # get dataset's basic stats
        if print_stats == True:
            print("Total image count in dataset: ", tot_sample)
            print("class type: ", class_type)
            for view in class_type:
                print(view+' total count: '+str(class_type_counts[view]))

        # create label of the images
        labels = {}
        for i, view in enumerate(class_type):
            print(view, i)
            labels[view] = i

        # add label column to df
        for ix, row in df.iterrows():
            df.loc[ix, 'label'] = labels[row['class_type']]

    except Exception as e:
        print('Error: {}'.format(e))
    
    # return final dataframe
    return df

In [None]:
def plot_images(filelist, n=12):
    """Plot figures"""
    # plot some sample images
    sample_files = random.sample(filelist, n)
    f, ax = plt.subplots(4, 3, figsize=(10, 10))
    # loop over sample images and plot them
    for i, f in enumerate(sample_files):
        image = plt.imread(f)
        ax[i//3, i%3].imshow(image)
        ax[i//3, i%3].axis('off')
    plt.show()

In [None]:
# main directory of the mobile phone dataset
path = "/dataset"

# reading the mobile phone dataset
# mobile phone screen on/off image kept in seperate sub-folders
# get list of all data paths from all the sub-directory
filelist = get_data_list(path)
df = create_training_data(filelist, print_stats=True)

# plot some sample images/pic
plot_images(filelist, n=12)


In [None]:
try:
    # train-test split
    x_train_t, x_test_t, y_train_t, y_test_t = train_test_split(df['data_path'], df['label'], test_size=0.2, random_state=0)

    # prepare model train/validation datasets
    x_lists, y_lists = prepare_data(x_train_t, y_train_t)
    X_train_arr = np.concatenate(x_lists)/255.0
    y_train_arr = tensorflow.keras.utils.to_categorical(y_lists)
    X_train, X_val, y_train, y_val = train_test_split(X_train_arr, y_train_arr, test_size=0.2, random_state=0)

    # sequential model inputs
    _, height, width, channel = X_train.shape
    outnode = len(df['label'].unique().tolist())
    print(height, width, channel, outnode)
except Exception as e:
    print('Error: {}'.format(e))

In [None]:
# set random seed
np.random.seed(0)
tensorflow.random.set_seed(0)

# compile model and check model summary
opt = tensorflow.keras.optimizers.Adam(learning_rate=0.001)
model = build_model(height, width, channel, outnode)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

# model summary
model.summary()

In [None]:
# set callback parameters
mcp = ModelCheckpoint(filepath='model_1.h5', monitor='val_loss', save_best_only=True, mode='min', verbose=1)
es = EarlyStopping(monitor='val_loss', patience=10)
callbacks = [es, mcp]

In [None]:
# train model
history = model.fit(
        X_train, 
        y_train,
        batch_size=64,
        validation_data=(X_val, y_val),
        epochs=100,
        verbose=1,
        callbacks=callbacks
        )

In [None]:
# Load and evaluate the best model version
try:
    # pepare test data for model prediction
    x_test_lists, y_true = prepare_data(x_test_t, y_test_t)
    X_test = np.concatenate(x_test_lists)/255.0

    # load best model
    model = keras.models.load_model('model_1.h5')

    # predict on test data
    y_pred = model.predict(X_test)
    y_pred = np.argmax(y_pred, axis=1)

    # print accuracy score
    print(classification_report(y_true, y_pred))
except Exception as e:
    print('Error: {}'.format(e))