In [14]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### External Imports

In [15]:
import torch
import torchvision as tv
import random
import cv2 as cv
import numpy as np
import typing as t
import pathlib as pb
from ultralytics import YOLO

### Internal Imports

In [16]:
import vistraff as vt
from vistraff.alg import VehicleOccupancy
from vistraff.data import CarTrafficDataset, CarTrafficVideo, CarTrafficTaskTwoDataset, CarTrafficTaskThreeDataset

### Environment Setup

In [17]:
PATH_ROOT: pb.Path = pb.Path('.')
PATH_DATA: pb.Path = PATH_ROOT / 'data' / 'train'
PATH_ASSETS: pb.Path = PATH_ROOT / 'assets'
PATH_MASKS: pb.Path = PATH_ASSETS / 'masks'
PATH_MODELS: pb.Path = PATH_ASSETS / 'models'
SEED: int = 42

In [18]:
np.random.default_rng(SEED)
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

### Data Loading

In [19]:
dataset = CarTrafficDataset(PATH_DATA)

In [None]:
    for i, frame in enumerate(dataset.context[context]):
        if (context_dir / f'frame_{i}.jpg').exists():
            continue
        cv.imwrite(str(context_dir / f'frame_{i}.jpg'), frame['image'])

In [22]:
for context in range(15):
    context_dir = pb.Path(PATH_DATA / 'context_videos_frames' / f'{context + 1:02}')
    context_dir.mkdir(exist_ok=True, parents=True)
    for i, frame in enumerate(dataset.context[context]):
        if (context_dir / f'frame_{i}.jpg').exists():
            continue
        cv.imwrite(str(context_dir / f'frame_{i}.jpg'), frame['image'])

KeyboardInterrupt: 

### Task 1

In [7]:
def show_img(img, s = 'name'):
    cv.imshow(s, img)
    cv.waitKey(0)
    cv.destroyWindow(s)

In [57]:
bgs: cv.BackgroundSubtractor = cv.createBackgroundSubtractorKNN(detectShadows=False)
vo = VehicleOccupancy(masks_path=PATH_MASKS)

for i, frame in enumerate(dataset.context[3]):
    # Read and smooth image
    image = frame['image']


    # Apply morphological operations
    fgmask: np.ndarray = bgs.apply(frame['image'], None)
    _, fgmask = cv.threshold(fgmask, 0, 255, cv.THRESH_BINARY)

    kernel = cv.getStructuringElement(cv.MORPH_CROSS, ksize=(3, 3))
    fgmask = cv.erode(fgmask, kernel, iterations=1)

    kernel = cv.getStructuringElement(cv.MORPH_CROSS, ksize=(5, 5))
    fgmask = cv.erode(fgmask, kernel, iterations=2)

    kernel = cv.getStructuringElement(cv.MORPH_ELLIPSE, ksize=(3, 3))
    fgmask = cv.dilate(fgmask, kernel, iterations=3)
    kernel = cv.getStructuringElement(cv.MORPH_ELLIPSE, ksize=(5, 5))
    fgmask = cv.dilate(fgmask, kernel, iterations=2)
    kernel = cv.getStructuringElement(cv.MORPH_ELLIPSE, ksize=(7, 7))
    fgmask = cv.dilate(fgmask, kernel, iterations=1)

    # Find the contours to do object detection
    contours, _ = cv.findContours(fgmask, cv.RETR_TREE, cv.CHAIN_APPROX_NONE)

    # Show FPS counter
    fg_image = cv.bitwise_and(image, image, mask=fgmask)
    image: np.ndarray = cv.putText(image, f'Frame: {i + 1}', (10, 30), cv.FONT_ITALIC, 1, (255, 255, 255), 2, cv.LINE_AA)
    frameCopy = image.copy()

    # Draw BBOXes
    for cnt in contours:
        if 2_000 < cv.contourArea(cnt):
            x, y, width, height = cv.boundingRect(cnt)
            cv.rectangle(frameCopy, (x, y), (x + width, y + height), (0, 0, 255), 2)
            cv.putText(frameCopy, 'Car Detected', (x , y - 10), cv.FONT_HERSHEY_SIMPLEX, 0.3, (0,255,0), 1, cv.LINE_AA)

    # Obtain mask for the predefined lanes
    # fgmask_lane: np.ndarray = (fgmask != 0) * (vo.lanes_mask != 0)
    # fgmask_lane = np.tile(fgmask_lane[..., np.newaxis], (1, 1, 3))

    # Draw
    cv.imshow('frame', image)
    cv.imshow('copy', frameCopy)
    cv.imshow('foreground', fg_image)
    cv.imshow('bg', bgs.getBackgroundImage())
    cv.waitKey(int((1 / frame['fps']) * 500))
cv.destroyAllWindows()

KeyboardInterrupt: 

### Prediction

### Saving and evaluating the results