In [None]:
 %tensorflow_version 2.x

from __future__ import absolute_import, division, print_function, unicode_literals

try:
  import tensorflow.compat.v2 as tf
except Exception:
  import tensorflow as tf

tf.enable_v2_behavior()

print(tf.__version__)

2.4.1


In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
import os
import cv2
import glob
import numpy as np
from zipfile import ZipFile
from tqdm import tqdm
import matplotlib.pyplot as plt
import pathlib
import shutil
import math

In [None]:
def clean_data_files():
  if(os.path.isdir("dataset")):
    shutil.rmtree("dataset", ignore_errors=False, onerror=None)
  if(os.path.isdir("dev_dataset")):
    shutil.rmtree("dev_dataset", ignore_errors=False, onerror=None)
  if os.path.isdir("models"):
    shutil.rmtree("models", ignore_errors=False, onerror=None)

In [None]:
clean_data_files()

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [None]:
# Uncompress the feature images and labels csv
def uncompress_features_labels(dir,name):
    if(os.path.isdir(name)):
        print('Data extracted')
    else:
        with ZipFile(dir) as zipf:
            zipf.extractall(name)

In [None]:
root_path = '/content/drive/My Drive/dataset/cocoon.zip'  #change dir to your project folder

In [None]:
uncompress_features_labels(root_path,'dataset')

Data extracted


In [None]:
path = 'dataset'

In [None]:
data_dir = pathlib.Path(path)
image_count = len(list(data_dir.glob('*/*/*.*')))
image_count

26

In [None]:
CLASS_NAMES = np.array([item.name for item in data_dir.glob('*')])
CLASS_NAMES

array(['cocoon'], dtype='<U6')

In [None]:
BATCH_SIZE = 4
IMG_SIZE = 224
EPOCS = 2

In [None]:
def flip(x: tf.Tensor) -> tf.Tensor:
    """Flip augmentation

    Args:
        x: Image to flip

    Returns:
        Augmented image
    """
    x = tf.image.random_flip_left_right(x)
    x = tf.image.random_flip_up_down(x)

    return x

In [None]:
def color(x: tf.Tensor) -> tf.Tensor:
    """Color augmentation

    Args:
        x: Image

    Returns:
        Augmented image
    """
    x = tf.image.random_hue(x, 0.08)
    x = tf.image.random_saturation(x, 0.6, 1.6)
    x = tf.image.random_brightness(x, 0.05)
    x = tf.image.random_contrast(x, 0.7, 1.3)
    return x

In [None]:
def rotate(x: tf.Tensor) -> tf.Tensor:
    """Rotation augmentation

    Args:
        x: Image

    Returns:
        Augmented image
    """
    return tf.image.rot90(x, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))

In [None]:
def zoom(x: tf.Tensor) -> tf.Tensor:
    """Zoom augmentation

    Args:
        x: Image

    Returns:
        Augmented image
    """

    # Generate 20 crop settings, ranging from a 1% to 20% crop.
    scales = list(np.arange(0.8, 1.0, 0.01))
    boxes = np.zeros((len(scales), 4))

    for i, scale in enumerate(scales):
        x1 = y1 = 0.5 - (0.5 * scale)
        x2 = y2 = 0.5 + (0.5 * scale)
        boxes[i] = [x1, y1, x2, y2]

    def random_crop(img):
        # Create different crops for an image
        crops = tf.image.crop_and_resize([img], boxes=boxes, box_indices=np.zeros(len(scales)), crop_size=(IMG_SIZE, IMG_SIZE))
        # Return a random crop
        return crops[tf.random.uniform(shape=[], minval=0, maxval=len(scales), dtype=tf.int32)]


    choice = tf.random.uniform(shape=[], minval=0., maxval=1., dtype=tf.float32)

    # Only apply cropping 50% of the time
    return tf.cond(choice < 0.5, lambda: x, lambda: random_crop(x))

In [None]:
list_ds = tf.data.Dataset.list_files(str(data_dir/'*/*/*.*'))

In [None]:
for f in list_ds.take(2):
  print(f.numpy())

b'dataset/cocoon/bad_ones/5.png'
b'dataset/cocoon/good_ones/g13.jpeg'


In [None]:
train_size = int(0.7 * image_count)
val_size = int(0.2 * image_count)
test_size = image_count - train_size - val_size

print("Total Images     : ", image_count)
print("train Images     : ", train_size)
print("validation Images: ", val_size)
print("test Images      : ", test_size)

SUFFLE_BUFFER_SIZE = int(test_size/2)
STEPS_PER_EPOCH = np.ceil(train_size/BATCH_SIZE)
VALIDATION_STEPS = np.ceil(val_size/BATCH_SIZE)

full_list_dataset = list_ds.shuffle(buffer_size=SUFFLE_BUFFER_SIZE)
train_list_dataset = full_list_dataset.take(train_size)
test_list_dataset = full_list_dataset.skip(train_size)
val_list_dataset = test_list_dataset.take(val_size)
test_list_dataset = test_list_dataset.skip(val_size)

Total Images     :  26
train Images     :  18
validation Images:  5
test Images      :  3


In [None]:
def get_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # # The second to last is the class-directory
  return parts[-2] == CLASS_NAMES 

In [None]:
def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_jpeg(img, channels=3)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32)
  # resize the image to the desired size.
  return tf.image.resize(img, [IMG_SIZE, IMG_SIZE])

In [None]:
def process_path(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label

In [None]:
def process_path_flip(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  img = flip(img)
  return img, label

In [None]:
def process_path_color(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  img = color(img)
  return img, label

In [None]:
def process_path_rotate(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  img = rotate(img)
  return img, label

In [None]:
def process_path_zoom(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  img = zoom(img)
  return img, label

In [None]:
# Set `num_parallel_calls` so multiple images are loaded/processed in parallel.
labeled_normal_ds = train_list_dataset.map(process_path, num_parallel_calls=AUTOTUNE)
labeled_flipped_ds = train_list_dataset.map(process_path_flip, num_parallel_calls=AUTOTUNE)
labeled_color_ds = train_list_dataset.map(process_path_color, num_parallel_calls=AUTOTUNE)
labeled_rotate_ds = train_list_dataset.map(process_path_rotate, num_parallel_calls=AUTOTUNE)
labeled_zoomed_ds = train_list_dataset.map(process_path_zoom, num_parallel_calls=AUTOTUNE)

train_dataset = labeled_normal_ds
train_dataset = train_dataset.concatenate(labeled_flipped_ds)
train_dataset = train_dataset.concatenate(labeled_flipped_ds)
train_dataset = train_dataset.concatenate(labeled_rotate_ds)
train_dataset = train_dataset.concatenate(labeled_zoomed_ds)


labeled_normal_ds = val_list_dataset.map(process_path, num_parallel_calls=AUTOTUNE)
labeled_flipped_ds = val_list_dataset.map(process_path_flip, num_parallel_calls=AUTOTUNE)
labeled_color_ds = val_list_dataset.map(process_path_color, num_parallel_calls=AUTOTUNE)
labeled_rotate_ds = val_list_dataset.map(process_path_rotate, num_parallel_calls=AUTOTUNE)
labeled_zoomed_ds = val_list_dataset.map(process_path_zoom, num_parallel_calls=AUTOTUNE)

val_dataset = labeled_normal_ds
val_dataset = val_dataset.concatenate(labeled_flipped_ds)
val_dataset = val_dataset.concatenate(labeled_flipped_ds)
val_dataset = val_dataset.concatenate(labeled_rotate_ds)
val_dataset = val_dataset.concatenate(labeled_zoomed_ds)

In [None]:
def prepare_for_training(ds, cache=False, shuffle_buffer_size=SUFFLE_BUFFER_SIZE):
  # This is a small dataset, only load it once, and keep it in memory.
  # use `.cache(filename)` to cache preprocessing work for datasets that don't
  # fit in memory.
  if cache:
    if isinstance(cache, str):
      ds = ds.cache(cache)
    else:
      ds = ds.cache()

  ds = ds.shuffle(buffer_size=shuffle_buffer_size)

  # Repeat forever
  ds = ds.repeat()

  ds = ds.batch(BATCH_SIZE)

  # `prefetch` lets the dataset fetch batches in the background while the model
  # is training.
  ds = ds.prefetch(buffer_size=AUTOTUNE)

  return ds

In [None]:
train_ds = prepare_for_training(train_dataset)

In [None]:
train_image_batch, train_label_batch = next(iter(train_ds))


In [None]:
val_ds = prepare_for_training(val_dataset)

val_image_batch, val_label_batch = next(iter(val_ds))



In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, MaxPool2D, Dropout, Flatten, Activation, MaxPooling2D

In [None]:
IMG_SHAPE = (IMG_SIZE, IMG_SIZE, 3)

import tensorflow_hub as hub
mobilenet_v2 = hub.KerasLayer("https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4", input_shape=IMG_SHAPE, trainable=False)

model = tf.keras.Sequential([
  mobilenet_v2,
#  Dense(16, activation='relu'),
  Dense(4, activation='relu'),
  Dense(2, activation='softmax')
])
model.build([None, 224, 224, 3])  # Batch input shape.

In [None]:
model.compile(optimizer="adam", loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
if not os.path.exists('models'):
  os.mkdir('models')

if not os.path.exists('models/checkpoints'):
  os.mkdir('models/checkpoints/')

from keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import  ModelCheckpoint

# checkpoint
filepath="models/checkpoints/weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5"
checkpoint = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
callbacks_list = [checkpoint]

In [None]:
history = model.fit(
    train_ds,
    steps_per_epoch=STEPS_PER_EPOCH,
    epochs=EPOCS,
    validation_data=val_ds,
    validation_steps=VALIDATION_STEPS,
    #callbacks=callbacks_list
)

Train for 5.0 steps, validate for 2.0 steps
Epoch 1/2
Epoch 2/2


In [None]:
#Prediction
#model.save('cocoon.tflite')
model.save('cocoon.model')



INFO:tensorflow:Assets written to: cocoon.model/assets


INFO:tensorflow:Assets written to: cocoon.model/assets


In [None]:
Cat = ['Defect','No Defect']
def prepare(filepath):
  IMG_SIZE = 224
  img_array = cv2.imread(filepath, cv2.IMREAD_COLOR)
  new_array = cv2.resize(img_array, (IMG_SIZE,IMG_SIZE))
  return new_array.reshape(-1, IMG_SIZE,IMG_SIZE, 3)

model = tf.keras.models.load_model('cocoon.tflite')

prediction = model.predict([prepare('test1.jpg')])
a=prediction[0][0]
b=prediction[0][1]
#pred.append(prediction)
if round(a,3)>round(b,3):
  print(round(a,3))
  print(Cat[math.floor(a)])
else:
  print(round(b,3))
  print(Cat[math.ceil(b)])
print(prediction)

0.78
No Defect
[[0.21975358 0.78024644]]


In [None]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode

def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      // Wait for Capture to be clicked.
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)
  data = eval_js('takePhoto({})'.format(quality))
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return filename

In [None]:
from IPython.display import Image
try:
  filename = take_photo()
  print('Saved to {}'.format(filename))
  
  # Show the image which was just taken.
  display(Image(filename))
  Cat = ['Defect','No Defect']
  def prepare(filepath):
    IMG_SIZE = 224
    img_array = cv2.imread(filepath, cv2.IMREAD_COLOR)
    new_array = cv2.resize(img_array, (IMG_SIZE,IMG_SIZE))
    return new_array.reshape(-1, IMG_SIZE,IMG_SIZE, 3)

  #model = tf.keras.models.load_model('dataset/final3.model')

  prediction = model.predict([prepare(filename)])
 # print(Cat[int(prediction[0][0])])
 # print(prediction)
  a=prediction[0][0]
  b=prediction[0][1]
#pred.append(prediction)
  if round(a,3)>round(b,3):
    print(round(a,3))
    print(Cat[0])
  else:
    print(round(b,3))
    print(Cat[1])
  print(prediction)
except Exception as err:
  # Errors will be thrown if the user does not have a webcam or if they do not
  # grant the page permission to access it.
  print(str(err))

name 'take_photo' is not defined
