In [None]:
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import json
from collections import defaultdict, Counter

from typing import Optional, Dict, Sequence, Tuple

import base64
import csv

import math
import datetime
from tqdm import tqdm

import random

from google.cloud import storage, aiplatform
from google.cloud.aiplatform.gapic.schema import predict
from google.oauth2 import service_account

Specify your Google Cloud Project and region to use:

In [None]:
gcloud_project = 'gd-gcp-rnd-visual-quality-ctrl'

In [None]:
gcloud_location = 'us-central1'

In [None]:
gcloud_storage_location = 'us'

# Main functions

## Video I/O functions

Functions to convert a video to a list of images and back

In [None]:
def split_video(input_video_path, output_dir, sample_rate=60):
    '''
    Splits video into frames
    input_video_path: Path to Input Video
    output_dir: Path to Directory for Storing Image Frames
    sample_rate: Sampling rate to extract frames from video (60 on a 30 fps video = 1 frame every 2 s)
    '''
    video_capture = cv2.VideoCapture(input_video_path)
    result = []
    
    frame_id = 0
    while True:
        success, image = video_capture.read()
        if not success:
            break
        if (frame_id % sample_rate) == 0:
            image_name = f"frame_{frame_id}"
            save_path = os.path.join(output_dir, f"{image_name}.jpg")
            cv2.imwrite(save_path, image)
            result.append((image_name, save_path))
        frame_id += 1
    
    return result

In [None]:
def save_video_from_frames(frames, save_path, fps=60):
    '''
    Creates a video from frames
    frames: list of image name, image path
    save_path: Path/Directory to save the rendered video to.
    '''
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    out = None

    for frame_name, frame_path in tqdm(frames):
        frame = cv2.imread(frame_path)
        if out is None:
            out = cv2.VideoWriter(save_path, fourcc, fps, (frame.shape[1], frame.shape[0]))
        out.write(frame.astype('uint8'))

    out.release()

## Image cropping

Bounding boxes of different objects may have different size. We crop the objects, pad it to the square and resize to have the same size.

In [None]:
IMAGE_WIDTH = 256
IMAGE_HEIGHT = 256
PAD_COLOR = (0, 0, 0)

In [None]:
def crop_n_resize_image(img, bbox, size, padColor=0):
    # crop images
    crop = img[bbox[1]:bbox[1] + bbox[3], bbox[0]:bbox[0] + bbox[2]].copy()
    
    # cropped image size
    h, w = crop.shape[:2]
    # designed crop image sizes
    sh, sw = size

    # interpolation method
    if h > sh or w > sw: # shrinking image
        interp = cv2.INTER_AREA
    else: # stretching image
        interp = cv2.INTER_CUBIC

    # aspect ratio of image
    aspect = w/h 

    # compute scaling and pad sizing
    if aspect > 1: # horizontal image
        new_w = sw
        new_h = np.round(new_w/aspect).astype(int)
        pad_vert = (sh-new_h)/2
        pad_top, pad_bot = np.floor(pad_vert).astype(int), np.ceil(pad_vert).astype(int)
        pad_left, pad_right = 0, 0
    elif aspect < 1: # vertical image
        new_h = sh
        new_w = np.round(new_h*aspect).astype(int)
        pad_horz = (sw-new_w)/2
        pad_left, pad_right = np.floor(pad_horz).astype(int), np.ceil(pad_horz).astype(int)
        pad_top, pad_bot = 0, 0
    else: # square image
        new_h, new_w = sh, sw
        pad_left, pad_right, pad_top, pad_bot = 0, 0, 0, 0

    # set pad color
    if len(img.shape) == 3 and not isinstance(padColor, (list, tuple, np.ndarray)): # color image but only one color provided
        padColor = [padColor] * 3

    # scale and pad
    scaled_img = cv2.resize(crop, (new_w, new_h), interpolation=interp)
    scaled_img = cv2.copyMakeBorder(scaled_img, pad_top, pad_bot, pad_left, pad_right, borderType=cv2.BORDER_CONSTANT, value=padColor)

    return scaled_img

## Multi-object tracking

In [None]:
DETECTION_RATE = 30

In [None]:
DST_THRESHOLD = 100
UPD_DST_THRESHOLD = 30

DETECTION_RATE - rate to use object detection X frames (mainly to detect new objects that enters the frame)

DST_THRESHOLD - distance threshold to consider two boxes belong to the same object (to not count twice objects that were already detected previosly)

In [None]:
def get_center(box):
    '''
    box: a bounding box in X, Y, W, H format
    returns a center of a bounding box
    '''
    return box[0] + box[2] / 2, box[1] + box[3] / 2

In [None]:
class MultiTracker:
    '''
    tracks bounding boxes for detected objects through the video
    new objects should be initializes using add_boxes function (takes object detection results as an input)
    '''
    def __init__(self, inactive_thresh=2, upd_area_thresh=2.0, dst_thresh=100, upd_dst_thresh=30,
                 tracker_func=cv2.legacy.TrackerKCF_create):
        '''
        inactive_thresh: inactive objects threshold - if object is not present on the frame in {inactive_thresh}
          continious updates, drop it
        upd_area_thresh: a lower bound of are fraction to replace a box if a larger one was detected for
          the same object
        dst_thresh: distance threshold to consider two boxes belong to the same object
        tracker_func: OpenCV tracker creation function for individual objects
        '''
        self.trackers = []
        self.boxes = []
        self.inactive_time = []
        self.inactive_thresh = inactive_thresh
        self.upd_area_thresh = upd_area_thresh
        self.dst_thresh = dst_thresh
        self.upd_dst_thresh = upd_dst_thresh
        self.tracker_func = tracker_func
        
        
    def get_objects(self):
        '''
        returns a list of bounding boxes for all objects on the frame
        '''
        result = []
        for obj_id, box in enumerate(self.boxes):
            if self.inactive_time[obj_id] == 0:
                result.append((obj_id, box))
        return result

    @staticmethod
    def _get_dist(box1, box2):
        '''
        returns distance between centroids of bounding boxes
        box1, box2: bounding boxes in X, Y, W, H format
        '''
        cx1, cy1 = get_center(box1)
        cx2, cy2 = get_center(box2)
        dist = math.hypot(cx1 - cx2, cy1 - cy2)
        return dist
    
    
    def _create_tracker(self, box, frame):
        '''
        initializes a new tracker for a newly found object
        box: a bounding box in X, Y, W, H format
        frame: the entire frame image 
        '''
        tracker = self.tracker_func()
        tracker.init(frame, box)
        return tracker
        
        
    def add_boxes(self, boxes, frame):
        '''
        adds all new detected boxes to the tracker (creates trackers for them);
        ignores boxes for already tracked objects
        boxes: a list of boxes in X, Y, W, H format
        frame: the entire frame image 
        '''
        for box in boxes:
            found = False
            for obj_id, existing_box in enumerate(self.boxes):
                if self.inactive_time[obj_id] != 0:
                    continue
                centroid_dist = self._get_dist(box, existing_box)
                if centroid_dist < self.dst_thresh:
                    # the new box and the existing one represent the same object
                    
                    if (centroid_dist < self.upd_dst_thresh) or \
                            (box[2] * box[3] >= existing_box[2] * existing_box[3] * self.upd_area_thresh):
                        # replace the existing one with the new one, if the existing one is too far from the centroid
                        # (probably the existing box slightly shifted because of inaccurate tracking)
                        # OR the new box is significantly bigger (probably means it is entering the frame)
                        self.trackers[obj_id] = self._create_tracker(box, frame)
                        self.inactive_time[obj_id] = 0
                        self.boxes[obj_id] = box
                    found = True
                    break
            if not found:
                # this box is a new one -> create a new tracker for this object
                self.trackers.append(self._create_tracker(box, frame))
                self.inactive_time.append(0)
                self.boxes.append(box)
                    

    def update(self, frame):
        '''
        updates existing object positions based on a new frame
        frame: a next frame image
        '''
        for obj_id, tracker in enumerate(self.trackers):
            if self.inactive_time[obj_id] < self.inactive_thresh:
                success, bbox = tracker.update(frame)
                if success:
                    self.inactive_time[obj_id] = 0
                    self.boxes[obj_id] = bbox
                else:
                    self.inactive_time[obj_id] += 1

## Functions to make a demo video

In [None]:
def add_info(img, info):
    '''
    adds text info to the bottom of the image
    img: an image to add info
    info: list of key, value pairs to print on the image
    '''
    H, W = img.shape[:2]
    
    cols = 2
    rows = (len(info) + cols - 1) // cols
    scale = 1.4
    color = (0, 0, 0)
    thickness = 2
    
    rh = 50 + rows * 70
    rw = W
    text_box = np.ones((rh, rw, 3), dtype=np.uint8) * 255
    
    for (i, (k, v)) in enumerate(info):
        c = i // rows
        r = i % rows
        x = 50 + ((rw // cols - 100) * c)
        y = 70 + r * 70
        text = "{}: {}".format(k, v)
        cv2.putText(text_box, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, scale, color, thickness)
        
    return cv2.vconcat([img, text_box])

In [None]:
def make_frames_with_tracking_boxes(frames, tracking_ad_results, output_result_dir):
    '''
    adds bounding boxes and text info to the video frames, saves it to a new folder
    frames: a list of frames with their names (each frame is represented as a pair: a name and a path to the image)
    tracking_ad_results: dict with all tracking and anomaly detection results; keys are frame names and values are
        lists of all bounding boxes, each bounding box is a tuple (idx, box, is_anomaly, confidence)
            idx - object id, persisted between frames
            box - bounding box in X, Y, W, H format
            is_anomaly - boolean flag for anomaly classification
            confidence - AWS Lookout for Vision confidence of resulting verdict
    output_result_dir: a directory to save resulting video frames
    '''
    result_frames = []
    objects_total = set()
    anomalies_total = set()
    for frame_name, frame_path in tqdm(frames):
        if frame_name not in tracking_ad_results:
            break
        objects_frame = set()
        anomalies_frame = set()
        
        frame = cv2.imread(frame_path)
        boxes = tracking_ad_results[frame_name]

        for idx, box, is_anomaly, confidence in boxes:
            x, y, w, h = box
            x, y, w, h = int(x), int(y), int(w), int(h)
            if confidence is None:
                color = (255, 0, 0)
                box_name = str(idx)
            else:
                anomaly_prob = confidence if is_anomaly else 1 - confidence
                color = (0, 255 * (1 - anomaly_prob), 255 * anomaly_prob)
                box_name = '{} {} {:.1f}'.format(idx, "A" if is_anomaly else "N", confidence)
            cv2.putText(frame, box_name, (x, y - 15), cv2.FONT_HERSHEY_PLAIN, 2, color, 2)
            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 3)
            objects_total.add(idx)
            objects_frame.add(idx)
            if is_anomaly:
                anomalies_total.add(idx)
                anomalies_frame.add(idx)
        
        info = [
            ("#objects total", len(objects_total)),
            ("#anomalies total", len(anomalies_total)),
            ("anomalies percentage total", '{:.2f}'.format(len(anomalies_total) / len(objects_total) * 100)),
            ("#objects in the frame", len(objects_frame)),
            ("#anomalies in the frame", len(anomalies_frame)),
            ("anomalies percentage in the frame", '{:.2f}'.format(len(anomalies_frame) / len(objects_frame) * 100)),
        ]
        frame = add_info(frame, info)

        save_path = os.path.join(output_result_dir, f'{frame_name}.jpg')
        cv2.imwrite(save_path, frame)

        result_frames.append((frame_name, save_path))
    return result_frames

# Demo pipeline

## Define I/O locations

In [None]:
gcs_bucket = 'gd-rnd-visual-quality-ctrl'

In [None]:
gcs_folder = 'cv_anomaly'

In [None]:
gcs_input_video_path = os.path.join('s3://', gcs_bucket, gcs_folder, 'input_video', 'boxes.mp4')

In [None]:
workdir = '../data/boxes/'
os.makedirs(workdir, exist_ok=True)

In [None]:
INPUT_VIDEO_DOWNLOAD_LINK = 'https://www.dropbox.com/s/pot5874yqh11f7w/boxes.mp4?dl=1'

In [None]:
input_video_path = os.path.join(workdir, 'boxes.mp4')

In [None]:
output_directory = os.path.join(workdir, 'output')
os.makedirs(output_directory, exist_ok=True)

In [None]:
input_video_frames_dir = os.path.join(output_directory, 'input_frames')
os.makedirs(output_frames_dir, exist_ok=True)

In [None]:
output_result_dir = os.path.join(output_directory, 'result_frames')
os.makedirs(output_result_dir, exist_ok=True)

In [None]:
result_video_path = os.path.join(output_directory, 'pipeline_demo.mp4')

In [None]:
def read_image(image_name):
    image = cv2.imread(os.path.join(input_video_frames_dir, f'{image_name}.jpg'))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image

## Download the input video to working directory

In [None]:
!wget -O "{input_video_path}" "{INPUT_VIDEO_DOWNLOAD_LINK}"

## Split input video into frames

In [None]:
frames = split_video(input_video_path, input_video_frames_dir, sample_rate=2)

## Format datasets in VertexAI format

We labeled two separate datasets for object detection and image (anomaly) classification based on our input video. The dataset for object detection contains bounding boxes for every object in the video frames. The image classification dataset extends the object detection dataset, adding object IDs (objects have the same id across different frames) and class labels (normal/anomaly).

### Create a GCS bucket to store images and manifests

In [None]:
storage_client = storage.Client(project=gcloud_project)

bucket = storage_client.bucket(gcs_bucket)
bucket = storage_client.create_bucket(bucket, location=gcloud_storage_location)

### Object detection

#### I/O locations

In [None]:
manual_annotations_path = '../resources/boxes_annotations/manual_detection_annotations.csv'

In [None]:
output_manifest_path = os.path.join(workdir, 'detection_annotations_gc.jsonl')

In [None]:
gcs_subfolder = os.path.join(gcs_folder, 'detection_dataset')

In [None]:
upload_images_folder = os.path.join(gcs_subfolder, 'images')

In [None]:
gcs_detection_mainfest_path = os.path.join(gcs_subfolder, 'annotations.jsonl')

#### Read manual annotations

In [None]:
!head -n 5 {manual_annotations_path}

In [None]:
with open(manual_annotations_path) as f:
    manual_annotations = list(csv.DictReader(f))

In [None]:
# cast numeric fields to float
numeric_fields = ['bbox_x', 'bbox_y', 'bbox_width', 'bbox_height']

for annotation in manual_annotations:
    for col, value in annotation.items():
        if col in numeric_fields:
            annotation[col] = float(value)

#### Assign every frame to only one split (train / validation / test)

In [None]:
image_name_to_id = {
    annotation['image_name']: int(annotation['image_name'].split('_')[1].split('.')[0]) 
    for annotation in manual_annotations
}

In [None]:
sorted_frames = list(sorted(set(image_name_to_id.values())))

In [None]:
img_name_to_data_split = {}
for image_name, frame_id in image_name_to_id.items():
    pos = sorted_frames.index(frame_id) / len(sorted_frames)
    if pos < 0.7:
        data_split = 'training'
    elif pos < 0.85:
        data_split = 'validation'
    else:
        data_split = 'test'
    img_name_to_data_split[image_name] = data_split

#### Create a manifest file in VertexAI format

In [None]:
img_height, img_width = read_image(frames[0][0]).shape[:2]
img_width, img_height

In [None]:
img_to_annotations = defaultdict(list)
for annotation in manual_annotations:
    img_to_annotations[annotation['image_name']].append(annotation)

In [None]:
gc_annotations = []
for image_name, image_annotations in img_to_annotations.items():
    gcs_path = os.path.join('gs://', gcs_bucket, upload_images_folder, image_name)
    boxes_annotations = []
    
    for manual_annotation in image_annotations:
        x = manual_annotation['bbox_x']
        y = manual_annotation['bbox_y']
        w = manual_annotation['bbox_width']
        h = manual_annotation['bbox_height']
        
        annotation = {
            "displayName": "object",
            "xMin": min(1, x / img_width),
            "xMax": min(1, (x + w) / img_width),
            "yMin": min(1, y / img_height),
            "yMax": min(1, (y + h) / img_height),
        }
        boxes_annotations.append(annotation)
        
    gc_annotations.append({
        "imageGcsUri": gcs_path,
        "boundingBoxAnnotations": boxes_annotations,
        "dataItemResourceLabels": {
            "aiplatform.googleapis.com/ml_use": img_name_to_data_split[image_name]
        }
    })

In [None]:
manifest_lines = [json.dumps(annotation) + '\n' for annotation in gc_annotations]
with open(output_manifest_path, 'wt') as f:
    f.writelines(manifest_lines)

#### Upload images and manifest to gcs

In [None]:
input_images = list(img_name_to_data_split.keys())

In [None]:
storage_client = storage.Client(project=gcloud_project)

In [None]:
bucket = storage_client.bucket(gcs_bucket)

In [None]:
for image_name in tqdm(input_images):
    image_path = os.path.join(output_frames_dir, image_name)
    destination_path = os.path.join(upload_images_folder, image_name)
    blob = bucket.blob(destination_path)
    blob.upload_from_filename(image_path)

In [None]:
bucket.blob(gcs_detection_mainfest_path).upload_from_filename(output_manifest_path)

### Anomaly classification

#### I/O locations

In [None]:
manual_annotations_path = '../resources/boxes_annotations/manual_cls_annotations.csv'

In [None]:
output_manifest_path = os.path.join(workdir, 'classification_annotations_gc.jsonl')

In [None]:
cls_frames_dir = os.path.join(output_directory, 'cls_frames')
os.makedirs(output_frames_dir, exist_ok=True)

In [None]:
gcs_subfolder = os.path.join(gcs_folder, 'classification_dataset')

In [None]:
upload_images_folder = os.path.join(gcs_subfolder, 'images')

In [None]:
gcs_cls_mainfest_path = os.path.join(gcs_subfolder, 'annotations.jsonl')

#### Read manual annotations

In [None]:
!head -n 5 {manual_annotations_path}

In [None]:
with open(manual_annotations_path) as f:
    manual_annotations = list(csv.DictReader(f))

In [None]:
# cast numeric fields to float
numeric_fields = ['bbox_x', 'bbox_y', 'bbox_width', 'bbox_height']

for annotation in manual_annotations:
    for col, value in annotation.items():
        if col in numeric_fields:
            annotation[col] = float(value)

#### Train/test split for objects

In this subsection we divide all objects into 3 disjoint sets. This should be done to avoid data leaks during the model training and to be able to correctly evaluate the model quality. However for our demo we will ignore this split, because we only have one short video and do not have enough examples of anomalous objects to train the models properly (only 4 anomalous objects).

In [None]:
manual_annotations[0]

In [None]:
all_objects = set()
for annotation in manual_annotations:
    all_objects.add(annotation['object_id'])
len(all_objects)

In [None]:
FIXED_RANDOM_SEED = 10
fixed_random = random.Random(FIXED_RANDOM_SEED)

In [None]:
TRAIN_SIZE = 0.7
VAL_SIZE = 0.15
TEST_SIZE = 0.15

assert TRAIN_SIZE + VAL_SIZE + TEST_SIZE == 1

In [None]:
all_objects = list(all_objects)
fixed_random.shuffle(all_objects)

In [None]:
train_lim = int(round(len(all_objects) * TRAIN_SIZE))
val_lim = int(round(len(all_objects) * (TRAIN_SIZE + VAL_SIZE)))

In [None]:
train_objects = set(all_objects[:train_lim])
val_objects = set(all_objects[train_lim:val_lim])
test_objects = set(all_objects[val_lim:])

In [None]:
print('{} / {} / {}'.format(len(train_objects), len(val_objects), len(test_objects)))

#### Iterate over object frames and save them to separate images

In [None]:
SAMPLE_RATE = 20

In [None]:
obj_to_annotations = defaultdict(list)
for manual_annotation in tqdm(manual_annotations):
    image_name = manual_annotation['image_name'].split('.')[0]
    obj_id = manual_annotation['object_id']
    label = manual_annotation['class']
    frame_id = int(image_name[len('frame_'):])
    if frame_id % SAMPLE_RATE == 0:
        image = read_image(image_name)
        x = int(manual_annotation['bbox_x'])
        y = int(manual_annotation['bbox_y'])
        w = int(manual_annotation['bbox_width'])
        h = int(manual_annotation['bbox_height'])
        image_bbox = crop_n_resize_image(image, (x, y, w, h), (IMAGE_WIDTH, IMAGE_HEIGHT), PAD_COLOR)
        save_name = f'{image_name}_{x}_{y}_{w}_{h}_{obj_id}_{label}.jpg'
        save_path = os.path.join(cls_frames_dir, save_name)
        cv2.imwrite(save_path, cv2.cvtColor(image_bbox, cv2.COLOR_RGB2BGR))
        annotations_row = (save_name, save_path, label)
        obj_to_annotations[obj_id].append(annotations_row)

In [None]:
annotations = []
for split_objects, split_name in zip((train_objects, val_objects, test_objects), ('train', 'validation', 'test')):
    for idx in split_objects:
        for annotations_row in obj_to_annotations[idx]:
            annotations.append(annotations_row + (split_name, ))

In [None]:
Counter(x[-2: ] for x in annotations)

#### Create a manifest file in VertexAI format

In [None]:
gc_annotations = []
for image_name, image_path, class_label, data_split in annotations:
    image_gcs_path = os.path.join('gs://', gcs_bucket, upload_images_folder, image_name)
        
    gc_annotations.append({
        "imageGcsUri": image_gcs_path,
        "classificationAnnotation": {
            "displayName": class_label
        },
        "dataItemResourceLabels": {
            "aiplatform.googleapis.com/ml_use": data_split
        }
    })

In [None]:
manifest_lines = [json.dumps(annotation) + '\n' for annotation in gc_annotations]
with open(output_manifest_path, 'wt') as f:
    f.writelines(manifest_lines)

#### Upload images and manifest to gcs

In [None]:
storage_client = storage.Client(project=gcloud_project)

In [None]:
bucket = storage_client.bucket(gcs_bucket)

In [None]:
for image_name, image_path, _, _ in tqdm(annotations):
    destination_path = os.path.join(upload_images_folder, image_name)
    blob = bucket.blob(destination_path)
    blob.upload_from_filename(image_path)

In [None]:
bucket.blob(gcs_cls_mainfest_path).upload_from_filename(output_manifest_path)

## Create and train VertexAI models

In [None]:
aiplatform.init(project=gcloud_project, location=gcloud_location)

### Creating detection dataset

In [None]:
detection_dataset_name = 'detection_ds'

In [None]:
gcs_detection_manifest_uri = os.path.join('gs://', gcs_bucket, gcs_detection_mainfest_path)


In [None]:
detection_dataset = aiplatform.ImageDataset.create(
    display_name=detection_dataset_name,
    gcs_source=[gcs_detection_manifest_uri],
    import_schema_uri=aiplatform.schema.dataset.ioformat.image.bounding_box,
    sync=True,
)

detection_dataset.wait()

### Creating anomaly classification dataset

In [None]:
cls_dataset_name = 'anomaly_classification_ds'

In [None]:
gcs_cls_manifest_uri = os.path.join('gs://', gcs_bucket, gcs_cls_mainfest_path)

In [None]:
cls_dataset = aiplatform.ImageDataset.create(
    display_name=cls_dataset_name,
    gcs_source=[gcs_cls_manifest_uri],
    import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    sync=True,
)

cls_dataset.wait()

### Creating/training object detection model

In [None]:
detection_model_name = 'object_detection_model'

In [None]:
detection_budget_milli_node_hours = 20000

In [None]:
detection_job_name = f'{detection_model_name}_training_job'

In [None]:
detection_job = aiplatform.AutoMLImageTrainingJob(
    display_name=cls_job_name,
    model_type="CLOUD",
    prediction_type="object_detection"
)

In [None]:
detection_model = detection_job.run(
    dataset=detection_dataset,
    model_display_name=detection_model_name,
    training_filter_split="labels.aiplatform.googleapis.com/ml_use=training",
    validation_filter_split="labels.aiplatform.googleapis.com/ml_use=validation",
    test_filter_split="labels.aiplatform.googleapis.com/ml_use=test",
    budget_milli_node_hours=detection_budget_milli_node_hours,
    sync=False,
)

In [None]:
dataset_type ='test'
manifest_file = os.path.join(dataset_folder, 'annotations_test.manifest')

print('Creating dataset...')
dataset=json.loads('{ "GroundTruthManifest": { "S3Object": { "Bucket": "' + bucket + '", "Key": "'+ manifest_file + '" } } }')

response=client.create_dataset(ProjectName=project, DatasetType=dataset_type, DatasetSource=dataset)
print('Dataset Status: ' + response['DatasetMetadata']['Status'])
print('Dataset Status Message: ' + response['DatasetMetadata']['StatusMessage'])
print('Dataset Type: ' + response['DatasetMetadata']['DatasetType'])
print('Done!')

### Creating/training anomaly classification model

Models training will take several hours.

In [None]:
cls_model_name = 'anomaly_classification_model'

In [None]:
cls_budget_milli_node_hours = 10000

In [None]:
cls_job_name = f'{cls_model_name}_training_job'

In [None]:
cls_job = aiplatform.AutoMLImageTrainingJob(
    display_name=cls_job_name,
    model_type="CLOUD",
    prediction_type="classification",
    multi_label=False,
)

In [None]:
cls_model = cls_job.run(
    dataset=cls_dataset,
    model_display_name=cls_model_name,
# as we don't have enough data, we will use random split instead for our demo scenario
# (this may lead to the model overfitting)
#    training_filter_split="labels.aiplatform.googleapis.com/ml_use=training",
#    validation_filter_split="labels.aiplatform.googleapis.com/ml_use=validation",
#    test_filter_split="labels.aiplatform.googleapis.com/ml_use=test",
    budget_milli_node_hours=cls_budget_milli_node_hours,
    sync=False,
)

### Wait until the both training jobs finish

In [None]:
detection_model.wait()

In [None]:
cls_model.wait()

## Deploy vertexai models for inference

In [None]:
API_ENDPOINT = "us-central1-aiplatform.googleapis.com"

# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": API_ENDPOINT}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
vertexai_client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

### Object detection

In [None]:
detection_endpoint_name = f'{gcloud_project}-detection-endpoint'

In [None]:
detection_endpoint = aiplatform.Endpoint.create(
    display_name=detection_endpoint_name,
    project=gcloud_project,
    location=gcloud_location,
)

In [None]:
detection_model.deploy(detection_endpoint)

In [None]:
def predict_image_object_detection(
    project: str,
    endpoint_id: str,
    image: np.ndarray,
    vertexai_client,
    location: str = "us-central1",
    confidence_threshold: float = 0.0
):
    _, image_bytes = cv2.imencode('.jpg', image)

    # The format of each instance should conform to the deployed model's prediction input schema.
    encoded_content = base64.b64encode(image_bytes).decode("utf-8")
    instance = predict.instance.ImageObjectDetectionPredictionInstance(
        content=encoded_content,
    ).to_value()
    instances = [instance]
    # See gs://google-cloud-aiplatform/schema/predict/params/image_object_detection_1.0.0.yaml for the format of the parameters.
    parameters = predict.params.ImageObjectDetectionPredictionParams(
        confidence_threshold=confidence_threshold,
    ).to_value()
    endpoint = vertexai_client.endpoint_path(
        project=project, location=location, endpoint=endpoint_id
    )
    response = vertexai_client.predict(
        endpoint=endpoint, instances=instances, parameters=parameters
    )
    # See gs://google-cloud-aiplatform/schema/predict/prediction/image_object_detection_1.0.0.yaml for the format of the predictions.
    predictions = response.predictions
    return dict(predictions[0])

### Anomaly classification

In [None]:
cls_endpoint_name = f'{gcloud_project}-classification-endpoint'

In [None]:
classification_endpoint = aiplatform.Endpoint.create(
    display_name=cls_endpoint_name,
    project=gcloud_project,
    location=gcloud_location,
)

In [None]:
cls_model.deploy(classification_endpoint)

In [None]:
def predict_image_classification(
    project: str,
    endpoint_id: str,
    image: np.ndarray,
    vertexai_client,
    location: str = "us-central1"
):
    _, image_bytes = cv2.imencode('.jpg', image)

    # The format of each instance should conform to the deployed model's prediction input schema.
    encoded_content = base64.b64encode(image_bytes).decode("utf-8")
    instance = predict.instance.ImageClassificationPredictionInstance(
        content=encoded_content,
    ).to_value()
    instances = [instance]
    # See gs://google-cloud-aiplatform/schema/predict/params/image_classification_1.0.0.yaml for the format of the parameters.
    parameters = predict.params.ImageClassificationPredictionParams(
        confidence_threshold=0.5,
    ).to_value()
    endpoint = vertexai_client.endpoint_path(
        project=project, location=location, endpoint=endpoint_id
    )
    response = vertexai_client.predict(
        endpoint=endpoint, instances=instances, parameters=parameters
    )
    # gt.print_report()
    
    # See gs://google-cloud-aiplatform/schema/predict/prediction/image_classification_1.0.0.yaml for the format of the predictions.
    predictions = response.predictions
    return dict(predictions[0])

## Vertexai prediction functions

In [None]:
def vertexai_get_bboxes(image):
    preds = predict_image_object_detection(gcloud_project, detection_endpoint.name, image, 
                                           vertexai_client, confidence_threshold=0.001)
    result = []
    for xMin, xMax, yMin, yMax in preds['bboxes']:
        frame_height, frame_width, _ = sample_frame.shape
        x, y = xMin * frame_width, yMin * frame_height
        w, h = (xMax - xMin) * frame_width, (yMax - yMin) * frame_height
        result.append((x, y, w, h))
    return result

In [None]:
def vertexai_classify_anomalies(image):
    cls_preds = predict_image_classification(gcloud_project, classification_endpoint.name,
                                             image, vertexai_client)
    return cls_preds['displayNames'][0] != 'normal', cls_preds['confidences'][0]

### Example of predictions for a video frame

In [None]:
sample_frame = read_image(frames[0][0])

In [None]:
_boxes = vertexai_get_bboxes(sample_frame)

Detection predictions:

In [None]:
fig, ax = plt.subplots(figsize = (20,20))
ax.imshow(sample_frame)
for x, y, w, h in _boxes:
    rect = patches.Rectangle((x, y), w, h, linewidth=3, edgecolor='cyan', facecolor='none')
    ax.add_patch(rect)
plt.show()

Anomaly classification predictions: boolean flag is_anomaly and confidence score

In [None]:
for bbox in _boxes[0:2]:
    bbox = tuple(map(int, bbox))
    image_bbox = crop_n_resize_image(sample_frame, bbox, (IMAGE_WIDTH, IMAGE_HEIGHT), PAD_COLOR)
    plt.imshow(image_bbox)
    plt.show()
    _pred = vertexai_classify_anomalies(image_bbox)
    print(_pred)

## Main step: do object tracking and anomaly detection

In [None]:
AREA_INTERSECTION_THRESH = 0.5

In [None]:
def _get_coordinates(rect):
    '''
    returns bottom left and top right corners coordinates of a rectangle
    rect: rectangle in X, Y, W, H format
    '''
    x1, y1, w, h = rect
    x2 = x1 + w
    y2 = y1 + h
    x1, x2 = min(x1, x2), max(x1, x2)
    y1, y2 = min(y1, y2), max(y1, y2)
    return x1, y1, x2, y2


def intersect_rectangles(rect1, rect2):
    '''
    finds intersection of 2 rectangles as a rectangle or None, if they don't intersect
    rect1, rect2: rectangles in X, Y, W, H format
    '''
    l1, d1, r1, u1 = _get_coordinates(rect1)
    l2, d2, r2, u2 = _get_coordinates(rect2)
    l = max(l1, l2)
    r = min(r1, r2)
    d = max(d1, d2)
    u = min(u1, u2)
    if l < r and d < u:
        return l, d, r - l, u - d
    else:
        return None
    
def get_area(rect):
    '''
    finds area of a rectangle
    rect: a rectangle in X, Y, W, H format
    '''
    x, y, w, h = rect
    return w * h

Objects that touch left or right edge considered as partially visible (because in our scenario the "conveyor belt" moves from left to right

In [None]:
EDGE_WIDTH_PERCENT = 0.05

EDGE_WIDTH_PERCENT - a width of left and right edge areas (in percentage of the total image width)

In [None]:
def not_touches_edge(box, frame_shape):
    '''
    checks if a bounding box doesn't touch an image edge (left or right)
    box: a bounding box in X, Y, W, H format
    frame_shape: image size in H, W format
    '''
    x_min = frame_shape[1] * EDGE_WIDTH_PERCENT
    x_max = frame_shape[1] - x_min
    y_min = frame_shape[0] * EDGE_WIDTH_PERCENT
    y_max = frame_shape[0] - y_min
    x0, y0, w, h = box
    x1 = x0 + w
    y1 = y0 + h
    return x_min <= x0 and x_min <= x1
#     return x_min <= x0 < x_max and y_min <= y0 < y_max and x_min <= x1 < x_max and y_min <= y1 < y_max

In [None]:
def get_anomaly_confidence(ad_predictions, object_id, num_preds=2):
    preds = ad_predictions[object_id]
    if len(preds) < num_preds:
        return None, None
    confidence = sum(preds) / len(preds)
    if confidence > 0.5:
        return True, confidence
    else:
        return False, 1 - confidence
    
def upd_predictions(ad_predictions, object_id, prediction, num_preds=2):
    preds = ad_predictions[object_id]
    is_anomaly, confidence = prediction
    if not is_anomaly:
        confidence = 1 - confidence
    preds.append(confidence)
    return get_anomaly_confidence(ad_predictions, object_id)

The main pipeline steps:
* iterate over frames, detect new objects and track existing ones
    * iterate over objects and drop overlapping boxes (rarely happens because of false positives in tracking and detection algorithms)
    * iterate over objects and assign anomaly classification labels (using the anomaly classification model for  new objects and caching the resulting labels)

In [None]:
downscale_ratio = 0.4

tracker = MultiTracker(dst_thresh=DST_THRESHOLD * downscale_ratio, tracker_func=cv2.legacy.TrackerKCF_create,
                        upd_area_thresh=1.15, upd_dst_thresh=UPD_DST_THRESHOLD * downscale_ratio)
tracking_ad_results = {}

ad_predictions = defaultdict(list)

for frame_id, (frame_name, frame_path) in enumerate(tqdm(frames)):
    frame = cv2.imread(frame_path)
        
    downscale_size = (int(frame.shape[1] * downscale_ratio), int(frame.shape[0] * downscale_ratio))
    downscaled_frame = cv2.resize(frame, downscale_size)
        
    tracker.update(downscaled_frame)
    
    # do object detection to add new objects
    boxes = vertexai_get_bboxes(frame)
    downscaled_boxes = np.array(boxes) * downscale_ratio

    tracker.add_boxes(downscaled_boxes, downscaled_frame)
    
    boxes = [(idx, (np.array(box) / downscale_ratio)) for idx, box in tracker.get_objects()]
    
    
    # filter overlapping boxes
    overlapped_boxes = set()
    for idx1, box1 in boxes:
        for idx2, box2 in boxes:
            if idx1 != idx2:
                intersection = intersect_rectangles(box1, box2)
                if intersection is not None and \
                        get_area(intersection) >= min(get_area(box1), get_area(box2)) * AREA_INTERSECTION_THRESH:
                    if get_area(box1) < get_area(box2):
                        overlapped_boxes.add(idx1)
                    else:
                        overlapped_boxes.add(idx2)
    boxes = [(idx, box) for idx, box in boxes if idx not in overlapped_boxes]
    
    # add anomaly detection results
    result = []
    for idx, bbox in boxes:
        is_anomaly, confidence = get_anomaly_confidence(ad_predictions, idx)
        if is_anomaly is None or confidence < 0.9:
            if not_touches_edge(bbox, frame.shape):
                bbox = tuple(map(int, bbox))
                image_bbox = crop_n_resize_image(frame, bbox, (IMAGE_WIDTH, IMAGE_HEIGHT), PAD_COLOR)
                pred = vertexai_classify_anomalies(image_bbox)
                is_anomaly, confidence = upd_predictions(ad_predictions, idx, pred)
            else:
                # object is partially visible, do not use anomaly classification for this frame
                # (will be done later, when objects fully enter the frame)
                is_anomaly, confidence = False, None
        result.append((idx, bbox, is_anomaly, confidence))
        
    tracking_ad_results[frame_name] = result
        

## Cleanup vertexai resources (to save costs)

In [None]:
for model in detection_endpoint.list_models():
    detection_endpoint.undeploy(model.id)

In [None]:
detection_endpoint.delete()

In [None]:
for model in classification_endpoint.list_models():
    classification_endpoint.undeploy(model.id)

In [None]:
classification_endpoint.delete()

## Save the resulting frames and make a demo video

In [None]:
result_frames = make_frames_with_tracking_boxes(frames, tracking_ad_results, output_result_dir)

In [None]:
save_video_from_frames(result_frames, result_video_path, fps=30)

# Final cleanup: delete all created resources from Google Cloud

In [None]:
cls_model.delete()

In [None]:
detection_model.delete()

In [None]:
cls_dataset.delete()

In [None]:
detection_dataset.delete()

In [None]:
storage_client = storage.Client(project=gcloud_project)

bucket = storage_client.bucket(gcs_bucket)
bucket.delete()