In [None]:
!git clone https://github.com/fizyr/keras-retinanet.git
%cd keras-retinanet

In [None]:
!pip install Pillow
!pip install -r requirements.txt
!pip install . --user
!python setup.py build_ext --inplac

#Upload Zipped LPCV Data to proceed

In [None]:
from tensorflow import keras
import tensorflow_datasets as tfds
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pandas as pd
import os
import tensorflow as tf
import numpy as np
import glob
from shutil import copyfile
import shutil
from PIL import Image
import cv2

In [None]:
%rm -r LPCV

In [None]:
!unzip LPCVtrain.zip

In [None]:
%mkdir ./LPCV/valid
%mkdir ./LPCV/valid/images
%mkdir ./LPCV/valid/labels

In [None]:
data = open("LPCV/dataset.txt", "w")
data.write("Person,0\nBall,1")
data.close()

In [None]:
def contrast_image(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.random_contrast(img, 2, 5, 42)
    img = img.numpy()
    im = Image.fromarray(img)
    im.save(folder+'/images/'+new_image)
    copy_label(image, "contrast", folder)
def noise_image(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.stateless_random_jpeg_quality(img, 2, 20, (42,42))
    img = img.numpy()
    im = Image.fromarray(img)
    im.save(folder+'/images/'+new_image)
    copy_label(image, "noise", folder)
def grayscale_image(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.rgb_to_grayscale(img)
    img = img.numpy()
    img = img.reshape((img.shape[0], img.shape[1]))
    im = Image.fromarray(img, 'L')
    im.save(folder+'/images/'+new_image)
    copy_label(image, "grayscale", folder)
def saturate_image(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.random_saturation(img, 2, 120, 42)
    img = img.numpy()
    im = Image.fromarray(img)
    im.save(folder+'/images/'+new_image)
    copy_label(image, "saturate", folder)
def hue_image(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.random_hue(img, 0.1)
    img = img.numpy()
    im = Image.fromarray(img)
    im.save(folder+'/images/'+new_image)
    copy_label(image, "hue", folder)
def flip_image_lr(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.flip_left_right(img)
    img = img.numpy()
    im = Image.fromarray(img)
    im.save(folder+'/images/'+new_image)
    flip_label_lr(image[:image.find('.')]+".txt", image[:image.find('.')]+"_flip_lr.txt", folder)
def flip_image_ud(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.flip_up_down(img)
    img = img.numpy()
    im = Image.fromarray(img)
    im.save(folder+'/images/'+new_image)
    flip_label_ud(image[:image.find('.')]+".txt", image[:image.find('.')]+"_flip_ud.txt", folder)
def rot_90(image, new_image, folder):
    img = Image.open(folder+'/images/'+image)
    img = np.array(img)
    img = tf.image.rot90(img)
    img = img.numpy()
    im = Image.fromarray(img)
    im.save(folder+'/images/'+new_image)
    label_rot_90(image[:image.find('.')]+".txt", image[:image.find('.')]+"_rot_90.txt", folder)
def flip_label_lr(label_file, new_label_file, folder):
    label_file = folder+"/labels/"+label_file
    new_label_file = folder+"/labels/"+new_label_file
    with open(label_file) as f:
        lines = f.readlines()
        new_lines = []
        for x in lines:
            words = x.split(" ")
            words[1] = str(1-float(words[1]))
            new_lines.append(" ".join(words))
    f.close()
    with open(new_label_file, 'w') as f:
        for x in new_lines:
            f.write(x)
def flip_label_ud(label_file, new_label_file, folder):
    label_file = folder+"/labels/"+label_file
    new_label_file = folder+"/labels/"+new_label_file
    with open(label_file) as f:
        lines = f.readlines()
        new_lines = []
        for x in lines:
            words = x.split(" ")
            words[2] = str(1-float(words[2]))
            new_lines.append(" ".join(words))
    f.close()
    with open(new_label_file, 'w') as f:
        for x in new_lines:
            f.write(x)
def label_rot_90(label_file, new_label_file, folder):
    label_file = folder+"/labels/"+label_file
    new_label_file = folder+"/labels/"+new_label_file
    with open(label_file) as f:
        lines = f.readlines()
        new_lines = []
        for x in lines:
            words = x.split(" ")
            x = words[1]
            y = words[2]
            w = words[3]
            h = words[4]
            words[1] = y
            words[2] = str(1-float(x))
            words[3] = h
            words[4] = w
            new_lines.append(" ".join(words))
            # new_lines.append(" ".join(words[:len(words)-1]))
    f.close()
    with open(new_label_file, 'w') as f:
        for x in new_lines:
            f.write(x)
def copy_label(image, augmentation, folder):
    filename = image[:image.find('.')]
    new_filename = folder+'/labels/'+filename+'_'+augmentation+'.txt'
    filename = folder+'/labels/'+filename+'.txt'
    copyfile(filename, new_filename)

In [None]:
aug_list = ['_contrast', '_noise', '_flip_lr', '_flip_ud', '_rot_90', '_saturate', '_hue']
data_dir = "./LPCV/train"

In [None]:
for root, dirs, files in os.walk(data_dir+'/images/', topdown=False):
    for file in files:
      count = 0
      for j in aug_list:
        filename = file[:file.find('.')]+j+file[file.find('.'):]
        if count==0: contrast_image(file, filename, data_dir)
        if count==1: noise_image(file, filename, data_dir)
        if count==2: flip_image_lr(file, filename, data_dir)
        if count==3: flip_image_ud(file, filename, data_dir)
        if count==4: rot_90(file, filename, data_dir)
        if count==5: saturate_image(file, filename, data_dir)
        if count==6: hue_image(file, filename, data_dir)
        count+=1

In [None]:
for root, dirs, files in os.walk(data_dir+'/images/', topdown=False):
    for file in files:
        filename = file[:file.find('.')]+"_grayscale"+file[file.find('.'):]
        grayscale_image(file, filename, data_dir)

In [None]:
def moveFiles(src, dst):
  files = []
  for i in os.listdir(src):
    if i.find("contrast")==-1 and i.find("hue")==-1 and i.find("flip")==-1 and i.find("gray")==-1 and i.find("noise")==-1 and i.find("rot")==-1 and i.find("saturate")==-1:
      files.append(i)
  files = sorted(files)
  listt = np.random.RandomState(seed=12).permutation(files)[:30]
  for f in listt:
    shutil.copy(os.path.join(src, f), dst)
    p = os.path.join(src, f)
    os.remove(p)

In [None]:
moveFiles("LPCV/train/images", "LPCV/valid/images")
moveFiles("LPCV/train/labels", "LPCV/valid/labels")

In [None]:
def create_annotations(data_dir):
  for root, dirs, files in os.walk(data_dir+'/labels/', topdown=False):
      new_lines = []
      for file in files:
          im_name = file[:len(file)-3]+"jpg"
          if im_name.find('grayscale')==-1:
              im = cv2.imread(data_dir+"/images/"+im_name)
              height = im.shape[0]
              width = im.shape[1]
          else:
              im = cv2.imread(data_dir+"/images/"+im_name, cv2.IMREAD_GRAYSCALE)
              height = im.shape[0]
              width = im.shape[1]
          with open(data_dir+'/labels/'+file) as f:
              lines = f.readlines()
              for x in lines:
                  words = x.split(" ")
                  xp = float(words[1])
                  yp = float(words[2])
                  wp = float(words[3])
                  hp = float(words[4])
                  temp = int(words[0])
                  x1 = int(round(xp*width-(wp*width)/2,0))
                  x2 = int(round(x1+wp*width,0))
                  y1 = int(round(yp*height-(hp*height)/2,0))
                  y2 = int(round(y1+hp*height,0))
                  words[0] = str(x1)
                  words[1] = str(y1)
                  words[2] = str(x2)
                  words[3] = str(y2)
                  if temp == 0:
                      words[4] = "Person"
                  else:
                      words[4] = "Ball"
                  words.insert(0,("images/"+im_name))
                  new_lines.append(words[:len(words)-1])
      df = pd.DataFrame(new_lines)
      df.to_csv(data_dir+'/_annotations.csv', index=False, header=False)

In [None]:
create_annotations("./LPCV/train")

In [None]:
create_annotations("./LPCV/valid")

In [None]:
!keras_retinanet/bin/train.py --steps 100 --weighted-average csv ./LPCV/train/_annotations.csv ./LPCV/dataset.txt --val-annotations ./LPCV/valid/_annotations.csv

In [None]:
!python keras_retinanet/bin/convert_model.py ./snapshots/resnet50_csv_21.h5 ./inference_models/lpcv_inference50.h5

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras_retinanet import models
from keras_retinanet import layers, initializers
from PIL import Image
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import cv2
import os
import time
import pathlib
print(tf.version.VERSION)
tf.compat.v1.enable_v2_behavior()

In [None]:
from keras_retinanet import models
from keras_retinanet.utils.image import read_image_bgr, preprocess_image, resize_image
from keras_retinanet.utils.visualization import draw_box, draw_caption
from keras_retinanet.utils.colors import label_color
from keras_retinanet.utils.gpu import setup_gpu
from keras.layers import Input
from keras.models import Model

In [None]:
model =  models.load_model('./lpcv_inference50.h5', backbone_name="resnet50")

# Code Below Can be used to run the inference model on an image file or a video

In [None]:
labels_to_names = {0: 'Person', 1: 'Ball'}

In [None]:
def detect_image(image, model, class_dict, start):
  if type(image)==str:
    image = read_image_bgr(image)
  else:
    image = np.ascontiguousarray(Image.fromarray(image).convert('RGB'))
    image = image[:, :, ::-1]
  # copy to draw on
  draw = image.copy()
  draw = cv2.cvtColor(draw, cv2.COLOR_BGR2RGB)
  # preprocess image for network
  image = preprocess_image(image)
  image, scale = resize_image(image)
  # process image
  n = time.time()
  boxes, scores, labels = model.predict(np.expand_dims(image, axis=0))
  print("processing time: ", time.time() - n)

  # correct for image scale
  boxes /= scale
  # visualize detections
  for box, score, label in zip(boxes[0], scores[0], labels[0]):
      # scores are sorted so we can break
      if score < 0.5:
          break
          
      color = label_color(label)
      
      b = box.astype(int)
      draw_box(draw, b, color=color)
      
      caption = "{} {:.3f}".format(class_dict[label], score)
      draw_caption(draw, b, caption)
  return draw, time.time() - start

In [None]:
draw, detect_time = detect_image('shot.png', model, labels_to_names, time.time())
plt.figure(figsize=(15, 15))
plt.axis('off')
plt.imshow(draw)
plt.show()

In [None]:
def read_video(video, model, class_dict):
  cap = cv2.VideoCapture(video)
  frame_width = int(cap.get(3))
  frame_height = int(cap.get(4))
  frame_size = (frame_width,frame_height)
  fps = 60
  output_file = "TESTOUT2"+video
  output = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'XVID'), 60, frame_size)
  count = 0
  times = []
  while(cap.isOpened() and count < 1000):
    print(count)
    count+=1
    ret, frame = cap.read()
    if ret == False:
      break
    timer = 0
    start = time.time()
    if count%2 == 0:
      draw, timer = detect_image(frame, model, class_dict, start)
    else:
      draw = frame
      timer = time.time()-start
      print("processing time: ", timer)
    output.write(draw)
    times.append(timer)
  cap.release()
  output.release()
  timeave = np.mean(np.array(times))
  print(timeave)

In [None]:
read_video('7p3b_02M.m4v', model, labels_to_names)

In [None]:
from google.colab import files
files.download('TESTOUT7p3b_02M.m4v') 

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Code Below is Creates and is able to perform inference from a tflite file, however it is slow and inefficient as of now. 
Inference must be done on a local machine to perform in a reasonable time. Colab is incredibly inneficient and most likely will crash at some point. There is still a scaling issue that must be straightened out in order to draw the boxes correctly.

In [None]:
fixed_input = Input((256,256,3))
fixed_model = Model(fixed_input,model(fixed_input))
fixed_model.summary()

In [None]:
def representative_dataset():
    for _ in range(100):
      data = np.random.rand(1, 256, 256, 3)
      yield [data.astype(np.float32)]
def convert_model_to_tflite(model_path, filename = "converted_model.tflite"):
  model = models.load_model(model_path, backbone_name='resnet50')
  fixed_input = Input((256,256,3))
  fixed_model = Model(fixed_input,model(fixed_input))
  converter = tf.lite.TFLiteConverter.from_keras_model(fixed_model)
  # converter.representative_dataset = representative_dataset
  converter.optimizations = [tf.lite.Optimize.DEFAULT]
  converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS,
  ]
  # converter.inference_input_type = tf.int8  # or tf.uint8
  # converter.inference_output_type = tf.int8
  tflite_model = converter.convert()
  open(filename, "wb").write(tflite_model)
  print(convert_bytes(get_file_size("converted_model.tflite"), "MB"))

In [None]:
def get_file_size(file_path):
    size = os.path.getsize(file_path)
    return size
    
def convert_bytes(size, unit=None):
    if unit == "KB":
        return print('File size: ' + str(round(size / 1024, 3)) + ' Kilobytes')
    elif unit == "MB":
        return print('File size: ' + str(round(size / (1024 * 1024), 3)) + ' Megabytes')
    else:
        return print('File size: ' + str(size) + ' bytes')

In [None]:
convert_model_to_tflite('./lpcv_inference50.h5')

In [None]:
image = "baseball.jpg"

In [None]:
def tflite_inference(image, model):
  original_image = read_image_bgr(image)
  original_image, scale = resize_image(original_image)
  image_data = cv2.resize(original_image, (256,256))
  image_data = preprocess_image(image_data)
  image_data = np.expand_dims(image_data, axis=0)
  interpreter = tf.lite.Interpreter(model_path=model)
  input_details = interpreter.get_input_details()
  output_details = interpreter.get_output_details()
  interpreter.allocate_tensors()
  interpreter.set_tensor(input_details[0]['index'], image_data)
  start = time.time()
  interpreter.invoke()
  tot = time.time() - start
  scores = interpreter.get_tensor(output_details[0]['index'])
  boxes = interpreter.get_tensor(output_details[1]['index'])
  labels = interpreter.get_tensor(output_details[2]['index'])
  boxes /= scale
  draw = original_image.copy()
  draw = cv2.cvtColor(draw, cv2.COLOR_BGR2RGB)
  for box, score, label in zip(boxes[0], scores[0], labels[0]):
    if score < 0.5:
      break
    color = label_color(label)
    b = box.astype(int)
    draw_box(draw, b, color=color)
        
    caption = "{} {:.3f}".format(labels_to_names[label], score)
    draw_caption(draw, b, caption)
  return draw, tot

In [None]:
draw, tot_time = tflite_inference('shot.png', 'converted_model.tflite')
print(tot_time)

In [None]:
plt.figure(figsize=(15, 15))
plt.axis('off')
plt.imshow(draw)
plt.show()