## Univus Pass detector
Detects if a univus pass is fake, expired, invalid, or ok

In [1]:
import os
from PIL import Image
import cv2
import numpy as np
from tensorflow.keras import models
import pandas as pd
import tensorflow as tf


# Turn off GPU computiation for tensorflow
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

if tf.test.gpu_device_name():
    print('GPU found')
else:
    print("No GPU found")

No GPU found


### Video Conversion and formatting to images

In [2]:
# Converts a video into a series of images
def video_to_images(input_loc):
    vidcap = cv2.VideoCapture(input_loc)
    success,image = vidcap.read()
    count = 0
    output_loc =  './prediction_temp/VideoFrames/'
    filename = 'frame1'
    vidcap.set(cv2.CAP_PROP_POS_MSEC,(count*1000))    # added this line 
    success,image = vidcap.read()
    cv2.imwrite(output_loc + f"\\{filename}.jpg", image)     # save frame as JPEG file
    count = count + 1


In [3]:
# Reformat image to correct format for Red/Green detection
def format_image(img):
    new_horz=720
    new_vert=1280
    # Resize image
    resized_image = img.resize((new_horz, new_vert))
    rgb_image =  resized_image.convert('RGB')
    formatted_image_name = './prediction_temp/greenred.jpg'
    rgb_image.save(formatted_image_name, 'JPEG')
    return

### Prediction of green or red passes

In [4]:
# Predicts if image is red or green
def predict_green_red_pass(model):
    # Reads in BGR, need convert to RGB
    img = cv2.imread('./prediction_temp/greenred.jpg')
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    prediction =  model.predict(np.array([img]))
    # Red = 0, Green = 1
    return np.argmax(prediction)

### OCR for date retrieval from image

In [5]:
import re
from datetime import datetime, timezone, timedelta
import pytesseract

# Extracts text from images using Tesseract OCR
def extract_text_from_image(img):
    texts = []
    # Convert to grayscale image for better performance
    grayscale_image = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # PSM11 = Sparse text, default engine
    text = pytesseract.image_to_string(grayscale_image, config='--psm 11 --oem 3')
    texts.append(text)
    return (texts)
    
# Uses regex patterns to match time and date
def extract_time_date_data(text):
    date = re.findall(r'\d{4}\s\w{3,4}\s\d{1,2}', text) # Detect format of YYYY MMM DD
    time = re.findall(r'\d{2}:\d{2}:\d{2}', text) # Detect format of HH:MM:SS
    return (date, time)

# Parse strings into date time objects
def parse_date_time_data(date, time):
    default_date = '2020 Jan 01'
    default_time = '00:00:00'
    if len(date) == 0:
        date.append(default_date)
    if len(time) == 0:
        time.append(default_time)
    date_time_string = date[0] + ' ' + time[0]
    date_time_object = datetime.strptime(date_time_string, "%Y %b %d %H:%M:%S")
    # print("Date time =", date_time_object)
    return date_time_object

# Determines if time is within threshold. Outputs 1 if within, 0 if not
def check_datetime_in_window(date_time_object):
    # 86400 seconds in a day
    threshold = 86400 # Time in seconds

    timezone_offset = +8.0  # SG GMT +8
    timezone_info = timezone(timedelta(hours=timezone_offset))
    datetime.now(timezone_info)
    now = datetime.now()
    difference = now - date_time_object # Calculate difference in current time vs image
    
    # Need to update to score based on if days was missing/time was missing
    # 1 for within threshold and -1 for not in
    if difference.total_seconds() < threshold:
        return 1
    else: 
        return -1

# Main function to determine if image within time period
def determine_time_validity(img):
    extract_data = extract_text_from_image(img)
    for text in extract_data:
        date_time = extract_time_date_data(text)
        if len(date_time[0]) != 0 and len(date_time[1]) != 0:
            date_time_object = parse_date_time_data(date_time[0], date_time[1])
            return check_datetime_in_window(date_time_object)
        else:
            return 0 # return 0 for missing data

### YOLO Model for motion tracking

In [6]:
import torch

def aggregate_velocity(change_rates):
    # print(change_rates)
    if len(change_rates) == 0:
        return
    abnormalSpeed = 0
    isFirst = True
    for rate in change_rates:
        if isFirst:
            isFirst = False
            continue
        if rate > 15:
            abnormalSpeed += 1
    return abnormalSpeed

def aggregate_direction(directions):
    horizontal_count = 0
    vertical_count = 0
    static_count = 0
    no_direction = 0
    for direction in directions:
        if direction == 'Horizontal':
            horizontal_count += 1
        elif direction == 'Vertical':
            vertical_count += 1
        elif direction == 'Static':
            static_count += 1
        else:
            no_direction += 1
    if static_count > horizontal_count and static_count > vertical_count:
        return 'Static'
    if no_direction > horizontal_count and no_direction > vertical_count:
        return 'No Logo Detectable'
    if horizontal_count > vertical_count:
        return 'Horizontal'
    return 'Vertical'

def get_logo_speed(velocity):
    if velocity == None:
        return 'No Speed'
    if velocity <= 4:
        return 'Normal Speed'
    else:
        return 'Abnormal Fast'

# Calculate speed and direction coordinates
def calculate_direction(coordinates, prev_X, prev_Y):
    movement = ''
    rate_of_change = ''
    # print('x = ' + str(coordinates[0][0]) + ' ' + 'y = ' + str(coordinates[0][1]))
    # Focus on a single logo
    newX = coordinates[0][0]
    newY = coordinates[0][1]
    change_in_position_x = abs(newX - prev_X)
    change_in_position_y = abs(newY - prev_Y)
    if change_in_position_x < 1 and change_in_position_y < 1:
        movement = 'Static'
        rate_of_change = 0
    elif change_in_position_y < change_in_position_x:
        movement = 'Horizontal'
        rate_of_change = change_in_position_x
    else:
        movement = 'Vertical'
        rate_of_change = change_in_position_y
    if rate_of_change > 100:
        rate_of_change /= 100
    return newX, newY, movement, rate_of_change

def predict_speed_direction(model, video_path):
    cap = cv2.VideoCapture(video_path)
    total_movement_sets = 10
    frame_limit = 5
    previous_x = 0
    previous_y = 0
    movement_sets = 0
    frame_count = 0
    change_rates = []
    directions = []
    while cap.isOpened() and movement_sets < total_movement_sets:
        ret, frame = cap.read()
        # Make Detections
        results = model(frame)
        # cv2.imshow('YOLO', np.squeeze(results.render()))
        if frame_count == frame_limit:
            # xyxy = xmin, ymin, xmax, ymax, confidence, class
            coordinates = results.xyxy[0]
            detections = len(coordinates)
            if detections != 0:
                newX, newY, direction, change_rate = calculate_direction(coordinates, previous_x, previous_y)
                previous_x = newX
                previous_y = newY
                directions.append(direction)
                change_rates.append(change_rate)
            else:
                directions.append('NO IMAGE')
            movement_sets += 1
            frame_count = 0
        frame_count += 1
        # Exit loop
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()
    average_velocity = aggregate_velocity(change_rates)
    predicted_direction = aggregate_direction(directions)
    return average_velocity, predicted_direction

### FakeRealClassifier

In [7]:
def predict_fake_real_pass(model):
    # read in img in the required format
    img = tf.keras.preprocessing.image.load_img('./prediction_temp/VideoFrames/frame1.jpg', color_mode = "grayscale", target_size=(480,480))
    input_arr = tf.keras.preprocessing.image.img_to_array(img)/255
    input_arr = np.array([input_arr])
    # make predictions
    predictions = model.predict(input_arr)
    # Fake = 0, Real = 1
    return np.argmax(predictions)

### Predictions

In [8]:
# Carry out predictions
def predict_pass(input_video_path, green_red_model, yolo_model, fake_real_model, nn_model):
    outputs = []
    # Convert input video into a series of screenshots
    video_to_images(input_video_path)
    # Save a raw image to the prediction_temp folder for use

    print('-----------------------------------')
    # Determine movement metrics
    velocity, direction = predict_speed_direction(yolo_model, input_video_path)
    speed_rating = get_logo_speed(velocity)
    if direction == 'Horizontal':
        print('Logo movement in correct direction: ' + direction)
        outputs.append(1)
    else:
        print('Improper logo movement detected: ' + direction + ', expected Horizontal')
        outputs.append(0)
    if speed_rating == 'Normal Speed':
        print('Normal logo speed detected')
        outputs.append(1)
    else:
        print('Logo speed anomaly: ' + speed_rating)
        outputs.append(0)

    # Select image to be used for green-red detection and OCR + fake-real detection
    img = Image.open('./prediction_temp/VideoFrames/frame1.jpg')
    format_image(img)
    green_red_prediction = predict_green_red_pass(green_red_model)
    outputs.append(green_red_prediction)
    if green_red_prediction == 0:
        print('Pass Colour: Red')
    elif green_red_prediction == 1:
        print('Pass Colour: Green')

    # path is specified in the func itself for now
    fake_real_prediction = predict_fake_real_pass(fake_real_model)
    outputs.append(fake_real_prediction)
    if fake_real_prediction == 0:
        print('Pass Status: Fake')
    elif fake_real_prediction == 1:
        print('Pass Status: Real')

    img = cv2.imread('./prediction_temp/VideoFrames/frame1.jpg')
    is_datetime_valid = determine_time_validity(img)
    outputs.append(is_datetime_valid)
    # outputs.append(0)
    if is_datetime_valid == 1:
        print('Date time is valid')
    elif is_datetime_valid == 0:
        print('Missing Date Time Data')
    elif is_datetime_valid == -1:
        print('Expired Date time detected')
    print('-----------------------------------')

    prediction_df = pd.DataFrame(columns=['isCorrectDirection', 'isCorrectSpeed', 'isGreenPass', 'isRealPass', 'isNotExpired'])
    prediction_df.loc[len(prediction_df)] = outputs

    return nn_model.predict(prediction_df)


In [9]:
# Load in pre-trained models
green_red_model = models.load_model('GreenRedClassifier.model')
yolo_model = torch.hub.load('ultralytics/yolov5', 'custom', path='./TrainedYoloModel/best3.pt', force_reload=True)
fake_real_model = models.load_model('./FakePassModel/fakepass_model.h5')
nn_model = models.load_model('NN.h5')

Downloading: "https://github.com/ultralytics/yolov5/archive/master.zip" to C:\Users\dojh1/.cache\torch\hub\master.zip
YOLOv5  2022-4-13 torch 1.8.2+cu111 CPU

Fusing layers... 
Model summary: 213 layers, 7055974 parameters, 0 gradients, 15.9 GFLOPs
Adding AutoShape... 


### Group Prediction For Evaluation

In [10]:
# Reads in images as long as of image format
def load_video_and_predict_from_folder(folder, green_red_model, yolo_model, fake_real_model, nn_model):
    actual_labels = []
    predictions = []
    count = 1
    for filename in os.listdir(folder):
        video_path = os.path.join(folder,filename)

        if 'red' in video_path:
            actual_labels.append(1)
        else:
            actual_labels.append(2)

        print('Predicting number: ' + str(count))
        count += 1
        prediction = predict_pass(video_path, green_red_model, yolo_model, fake_real_model, nn_model)
        predictions.append(np.argmax(prediction))
    return actual_labels, predictions

In [11]:
actual_labels, predictions = load_video_and_predict_from_folder('../Test Data/TEST/', green_red_model, yolo_model, fake_real_model, nn_model)

Predicting number: 1
-----------------------------------
Logo movement in correct direction: Horizontal
Logo speed anomaly: Abnormal Fast
Pass Colour: Green
Pass Status: Real
Missing Date Time Data
-----------------------------------
Predicting number: 2
-----------------------------------
Improper logo movement detected: No Logo Detectable, expected Horizontal
Logo speed anomaly: No Speed
Pass Colour: Green
Pass Status: Real
Missing Date Time Data
-----------------------------------
Predicting number: 3
-----------------------------------
Improper logo movement detected: Vertical, expected Horizontal
Normal logo speed detected
Pass Colour: Green
Pass Status: Real
Missing Date Time Data
-----------------------------------
Predicting number: 4
-----------------------------------
Logo movement in correct direction: Horizontal
Normal logo speed detected
Pass Colour: Green
Pass Status: Fake
Expired Date time detected
-----------------------------------
Predicting number: 5
----------------

### Evaluation Metrics

In [24]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import f1_score

def score_model(predictions, target_value):
    score = f1_score(target_value, predictions, average='macro')
    print('F1 score = {}'.format(score))
    accuracy = accuracy_score(target_value, predictions)
    print('Accuracy = {}'.format(accuracy))
    precision = precision_score(target_value, predictions, average='micro')
    print('Precision = {}'.format(precision))
    print()

    # Print confusion matrix
def print_confusion_matrix(predictions, true_labels):
    cm = confusion_matrix(true_labels, predictions)
    cm_plot = ConfusionMatrixDisplay(confusion_matrix = cm)
    cm_plot.plot()

classes = ['Valid', 'Invalid', 'Fake']

In [25]:
predictions

[2, 2, 2, 2, 2, 2, 2, 2, 2, 0, 0, 2, 2, 2, 2, 1, 1, 2, 2]

In [26]:
actual_labels

[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 2, 2]

In [27]:
score_model(predictions, actual_labels)

F1 score = 0.6458333333333334
Accuracy = 0.8947368421052632
Precision = 0.8947368421052632



In [28]:
print_confusion_matrix(predictions, actual_labels)

In [29]:
print('test')

test
