In [1]:
!pip install -r requirements.txt
!pip install joblib
from PIL import Image
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import joblib
import cv2
import os
import matplotlib.pyplot as plt



In [2]:
from sklearn.metrics import accuracy_score


In [3]:
input_dir = 'data'
categories = ['empty', 'not_empty']
images = []
labels = []

for category_idx, category in enumerate(categories):
    for file in os.listdir(os.path.join(input_dir, category)):
        img_path = os.path.join(input_dir, category, file)
        img = cv2.imread(img_path)
        img = cv2.resize(img, (15, 15))
        images.append(img.flatten())
        labels.append(category_idx)
    

In [4]:
images = np.asarray(images)
labels = np.asarray(labels)

x_train, x_test, y_train, y_test = train_test_split(images, labels, test_size =0.2, shuffle=True, stratify=labels)

In [5]:
param_grid = [
    {'classifier': [SVC()], 'classifier__kernel': ['rbf', 'linear'], 'classifier__C': [1, 10, 100]},
    {'classifier': [RandomForestClassifier()], 'classifier__n_estimators': [10, 50, 100], 'classifier__max_features': [1, 2, 3]},
    {'classifier': [LogisticRegression()], 'classifier__C': [1, 10, 100]}
]

In [6]:
from sklearn.pipeline import Pipeline
pipe = Pipeline([('classifier', SVC())])

In [7]:
grid_search = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)

In [8]:
grid_search.fit(x_train, y_train)

In [9]:
best_est = grid_search.best_estimator_

In [10]:
y_pred = best_est.predict(x_test)

In [11]:
score = accuracy_score(y_pred, y_test)

In [12]:
joblib.dump(best_est, 'Park_classify')

['Park_classify']

In [13]:
def bounding_boxx(connected_components):
    (totalLabels, label_ids, values, centroid) = connected_components

    slots = []
    coef = 1
    for i in range(1, totalLabels):

        # Now extract the coordinate points
        x1 = int(values[i, cv2.CC_STAT_LEFT] * coef)
        y1 = int(values[i, cv2.CC_STAT_TOP] * coef)
        w = int(values[i, cv2.CC_STAT_WIDTH] * coef)
        h = int(values[i, cv2.CC_STAT_HEIGHT] * coef)
       
        slots.append([x1, y1, w, h])

    return slots

In [14]:
from joblib import load
model = load('Park_classify')

EMPTY = True
NOT_EMPTY = False
def classify_spot(img):
    flat =[]
    
    img = cv2.resize(img, (15, 15))
    flat.append(img.flatten())
    flat = np.array(flat)
    out = model.predict(flat)
    

    if out == 0:
        return EMPTY
    else:
        return NOT_EMPTY
    

In [15]:
def pixel_diff(img1, img2):
    return np.abs(np.mean(img1) - np.mean(img2))

In [16]:
video_path  = 'parking_1920_1080_loop.mp4'
mask = 'mask_1920_1080.png'
# Create a VideoCapture object
cap = cv2.VideoCapture(video_path)
mask = cv2.imread(mask, 0)
connected_components = cv2.connectedComponentsWithStats(mask, 4, cv2.CV_32S)
Boxs = bounding_boxx(connected_components)

In [17]:
frame_count = 0
prev_frame = None
steps = 30
Boxs_status = [None for j in Boxs]
diffs = [None for j in Boxs]
# Check if video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

# Read and display frame by frame
while True:
    ret, frame = cap.read()  # Read a frame
    if frame_count % steps == 0 and prev_frame is not None :
        for box_idx, box in enumerate(Boxs):
            x1,y1,w,h = box
            
            spot_crop = frame[y1:y1+h, x1:x1+w, :]
            diffs[box_idx] = pixel_diff(spot_crop, prev_frame[y1:y1+h, x1:x1+w, :])
            
        

    
    if frame_count % steps == 0 :
        if prev_frame is None:
            arr_ = range(len(Boxs))
        else:
            arr_ = [j for j in np.argsort(diffs) if diffs[j] / np.amax(diffs) >0.4]
        for box_idx in arr_:
            box = Boxs[box_idx]
            x1,y1,w,h = box
            
            spot_crop = frame[y1:y1+h, x1:x1+w, :]
            status = classify_spot(spot_crop)
            Boxs_status[box_idx] = status



    
    if frame_count % steps == 0:
        prev_frame = frame.copy()

    
    for box_indx, box in enumerate(Boxs):
        status = Boxs_status[box_indx]
        x1,y1,w,h = Boxs[box_indx]

        if status:
            frame = cv2.rectangle(frame, (x1, y1), (x1+w, y1+h), (0, 255, 0), 2)
        else:
            frame = cv2.rectangle(frame, (x1, y1), (x1+w, y1+h), (0, 0, 255), 2)
    if not ret:
        break

    
    cv2.putText(frame, 'Parking Spots Available: {} / {}'.format(str(sum(Boxs_status)), str(len(Boxs_status))), (100, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)



    
    cv2.imshow('Frame', frame)  # Display the frame

    # Press 'q' on the keyboard to exit
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break
    frame_count += 1
# Release the VideoCapture object and close windows
cap.release()
cv2.destroyAllWindows()