In [1]:
import cv2
import numpy as np
import easyocr
import imutils
from skimage.metrics import structural_similarity as ssim
from datetime import datetime

import firebase_admin
from firebase_admin import credentials, firestore

# Inicjalizacja Firebase
if not firebase_admin._apps:
    cred = credentials.Certificate("serviceAccountKey.json")
    firebase_admin.initialize_app(cred)
db = firestore.client()

# Liczba klatek na sekunde do przetwarzania
FPS_PROCESSING = 3

# Wymiary wirtualnych bramek
RECT_WIDTH = 90
RECT_HEIGHT = 20

# Górne lewe narożniki bramek (współrzędne x, y)
ENTRANCE_RECT_TOP_LEFT = (745, 840)
EXIT_RECT_TOP_LEFT = (162, 1030)

# Stała określająca stopień podobieństwa strefy wjazdowej i wyjazdowej
SIMILARITY_THRESHOLD = 0.7

# Tablica wszystkich dostępnych miejsc parkingowych
PARKING_SPOTS = [
    # numer miejsca, x1, x2, y1, y2, status
    # status: 0 - wolne, 1 - zajęte
    [1, 850, 1065, 632, 704, 0],
    [2, 850, 1065, 514, 604, 0],
    [3, 850, 1065, 390, 485, 0],
    [4, 760, 833, 22, 293, 0],
    [5, 655, 730, 22, 293, 0],
    [6, 546, 628, 22, 293, 0],
    [7, 415, 519, 22, 293, 0],
    [8, 27, 270, 17, 110, 0],
    [9, 27, 270, 140, 218, 0],
    [10, 27, 270, 246, 335, 0],
    [11, 27, 270, 363, 443, 0]
]

# Współrzędne stref wjazdowej i wyjazdowej
ENTRANCE_ZONE = [840, 1060, 750, 845]
EXIT_ZONE = [835, 1060, 140, 250]

# Baza danych samochodów korzystajacych z parkingu
vehicle_data = {}

# Flaga wykonania rotacji
ENTRY_ROTATION = 0
EXIT_ROTATION = 0

# Funckja dokonująca rotacji obrazu
def rotate_image(image, angle):
    (h, w) = image.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    cos = np.abs(M[0, 0])
    sin = np.abs(M[0, 1])
    new_w = int((h * sin) + (w * cos))
    new_h = int((h * cos) + (w * sin))
    M[0, 2] += (new_w / 2) - center[0]
    M[1, 2] += (new_h / 2) - center[1]
    rotated = cv2.warpAffine(image, M, (new_w, new_h), borderValue=(0, 0, 0))
    return rotated


# Funkcja do obliczania podobieństwa obrazów
def calculate_similarity_ssim(image1, image2):
    if image1.shape != image2.shape:
        return None
    score, _ = ssim(image1, image2, full=True)
    return score


# Funkcja do odczytywania tablicy rejestracyjnej pojazdu
# https://www.youtube.com/watch?v=Nmplz6wZFcU
def find_license_plate(rotated_frame, reference_texts, reader, entry=False, exit=False):
    global ENTRY_ROTATION
    global EXIT_ROTATION
    for angle in range(-15, 15, 1):
        rotated_img = rotate_image(rotated_frame, angle)
        gray = cv2.cvtColor(rotated_img, cv2.COLOR_BGR2GRAY)
        bfilter = cv2.bilateralFilter(gray, 11, 17, 17)
        edged = cv2.Canny(bfilter, 30, 200)

        keypoints = cv2.findContours(edged.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        contours = imutils.grab_contours(keypoints)
        contours = sorted(contours, key=cv2.contourArea, reverse=True)[:10]

        location = None
        for contour in contours:
            approx = cv2.approxPolyDP(contour, 10, True)
            if len(approx) == 4:
                location = approx
                break
        if location is None:
            continue

        mask = np.zeros(gray.shape, np.uint8)
        cv2.drawContours(mask, [location], 0, 255, -1)
        cv2.bitwise_and(rotated_img, rotated_img, mask=mask)

        (x, y) = np.where(mask == 255)
        (x1, y1) = (np.min(x), np.min(y))
        (x2, y2) = (np.max(x), np.max(y))
        cropped_image = gray[x1:x2 + 1, y1:y2 + 1]

        result = reader.readtext(cropped_image)
        if result:
            text = result[0][-2]
            if text in reference_texts:
                if entry:
                    print(f"wjazd - {text}")
                    entry_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    add_entry_exit_data(text, entry_time=entry_time)
                    ENTRY_ROTATION = 1
                    return text
                if exit:
                    print(f"wyjazd - {text}")
                    exit_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    add_entry_exit_data(text, exit_time=exit_time)
                    EXIT_ROTATION =1
                    return text
    return None


# Funkcja do wycinana i przetwarzania strefy wjazdowej
def process_entrance_region(frame, first_entrance_region, reference_texts, reader):
    global ENTRY_ROTATION
    selected_entrance_region = frame[ENTRANCE_ZONE[0]:ENTRANCE_ZONE[1], ENTRANCE_ZONE[2]:ENTRANCE_ZONE[3]]
    selected_entrance_region_gray = cv2.cvtColor(selected_entrance_region, cv2.COLOR_BGR2GRAY)
    ssim_score_entrance = calculate_similarity_ssim(first_entrance_region, selected_entrance_region_gray)

    if ssim_score_entrance <= SIMILARITY_THRESHOLD:
        rotated_frame = rotate_image(selected_entrance_region, -90)
        if ENTRY_ROTATION == 0:
            text = find_license_plate(rotated_frame, reference_texts, reader, True, False)
            return text
    else:
        ENTRY_ROTATION = 0

    return None


# Funkcja do wycinana i przetwarzania strefy wyjazdowej
def process_exit_region(frame, first_exit_region, reference_texts, reader):
    global EXIT_ROTATION
    selected_exit_region = frame[EXIT_ZONE[0]:EXIT_ZONE[1], EXIT_ZONE[2]:EXIT_ZONE[3]]
    selected_exit_region_gray = cv2.cvtColor(selected_exit_region, cv2.COLOR_BGR2GRAY)
    ssim_score_exit = calculate_similarity_ssim(first_exit_region, selected_exit_region_gray)

    if ssim_score_exit <= SIMILARITY_THRESHOLD:
        rotated_frame = rotate_image(selected_exit_region, 90)
        if EXIT_ROTATION == 0:
            text = find_license_plate(rotated_frame, reference_texts, reader, False, True)
            return text
    else:
        EXIT_ROTATION = 0

    return None


# Funkcja do aktualizacji czasów wjazdu i wyjazdu pojazdu
def add_entry_exit_data(plate, entry_time=None, exit_time=None, parking_spot=None):
    if plate in vehicle_data:

        if exit_time:
            vehicle_data[plate]["exit_time"] = exit_time
            vehicle_data[plate]["parking_spot"] = "None"

        elif parking_spot:
            vehicle_data[plate]["parking_spot"] = parking_spot
            if "visited_spots" not in vehicle_data[plate]:
                vehicle_data[plate]["visited_spots"] = []
            if parking_spot not in vehicle_data[plate]["visited_spots"]:
                vehicle_data[plate]["visited_spots"].append(parking_spot)
                
    else:
        vehicle_data[plate] = {
            "entry_time": entry_time,
            "exit_time": exit_time,
            "parking_spot": parking_spot if exit_time is None else "None",
            "visited_spots": [parking_spot] if parking_spot else []
        }
    
    # Zapis do Firebase
    save_to_firebase(plate)


# Funkcja do zapisu danych pojazdu do bazy Firebase
def save_to_firebase(plate):
    data = vehicle_data[plate]
    transformed_data = {}

    # Zamiana pustej wartości na 'None'
    for key, value in data.items():
        if value is None:
            transformed_data[key] = "None"
        else:
            transformed_data[key] = value

    transformed_data["entry_time"] = transformed_data.get("entry_time", "None")
    transformed_data["exit_time"] = transformed_data.get("exit_time", "None")
    transformed_data["parking_spot"] = transformed_data.get("parking_spot", "None")
    transformed_data["visited_spots"] = transformed_data.get("visited_spots", [])
    vehicle_ref = db.collection("vehicles").document(plate)
    vehicle_ref.set(transformed_data)


# Funkcja do czyszczenia bazy danych
def clear_database():
    collections = db.collections()
    for collection in collections:
        docs = collection.stream()
        for doc in docs:
            db.collection(collection.id).document(doc.id).delete()


# Funkcja do przetwarzania stanu miejsc parkingowych
def process_parking_spot(prev_parking_spots_table, parking_spots_table):
    for parking_number in range(len(PARKING_SPOTS)):
        prev_parking_spot = prev_parking_spots_table[parking_number]
        current_parking_spot = parking_spots_table[parking_number]

        ssim_score_parking = calculate_similarity_ssim(prev_parking_spot, current_parking_spot)
        if ssim_score_parking:
            if ssim_score_parking <= SIMILARITY_THRESHOLD:                
                for plate, data in vehicle_data.items():
                    parking_spot = data.get("parking_spot")

                    if parking_spot is None:
                        # Przypisanie pojazdowi miejsca parkingowego
                        data["parking_spot"] = PARKING_SPOTS[parking_number][0]
                        add_entry_exit_data(plate, parking_spot=PARKING_SPOTS[parking_number][0])
                        PARKING_SPOTS[parking_number][5] = 1
                        print(f"Przypisano miejsce parkingowe {PARKING_SPOTS[parking_number][0]} do pojazdu: {plate}")
                        return plate, PARKING_SPOTS[parking_number][0]
                    
                    if parking_spot == PARKING_SPOTS[parking_number][0]:
                        # Pomijanie zajętych miejsc
                        if PARKING_SPOTS[parking_number][5] == 1:
                            break
            else:
                if PARKING_SPOTS[parking_number][5] == 1:
                    # Zwolnienia miejsca parkingowego
                    print(f"Zwolniono miejsce parkingowe: {PARKING_SPOTS[parking_number][0]}")
                PARKING_SPOTS[parking_number][5] = 0
    
    return None, None


# Funkcja do tworzenia tablicy obrazów miejsc parkingowych
def create_parking_spots_table(frame):
    parking_spots_table = []
    for spot in PARKING_SPOTS:
        selected_parking_region = frame[spot[3]:spot[4], spot[1]:spot[2]]
        selected_parking_region_gray = cv2.cvtColor(selected_parking_region, cv2.COLOR_BGR2GRAY)
        parking_spots_table.append(selected_parking_region_gray)
    return parking_spots_table


# Funkcja do wyświetlania wirtualnych bramek
def create_virtual_gate(frame, entrance_text, exit_text):
    if entrance_text is None:
        cv2.rectangle(frame, ENTRANCE_RECT_TOP_LEFT, (ENTRANCE_RECT_TOP_LEFT[0] + RECT_WIDTH, ENTRANCE_RECT_TOP_LEFT[1] + RECT_HEIGHT), (255, 0, 0), -1)
        cv2.putText(frame, "   WJAZD", (ENTRANCE_RECT_TOP_LEFT[0], ENTRANCE_RECT_TOP_LEFT[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
    if exit_text is None:
        cv2.rectangle(frame, EXIT_RECT_TOP_LEFT, (EXIT_RECT_TOP_LEFT[0] + RECT_WIDTH, EXIT_RECT_TOP_LEFT[1] + RECT_HEIGHT), (255, 0, 0), -1)
        cv2.putText(frame, "  WYJAZD", (EXIT_RECT_TOP_LEFT[0], EXIT_RECT_TOP_LEFT[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)


# Funkcja do wyświetlania ramek na miejscach parkingowych
def create_parking_bounding_box():
    for spot in PARKING_SPOTS:
        x1, x2, y1, y2, status = spot[1], spot[2], spot[3], spot[4], spot[5]
        DISPLAY_INACCURACY = 12

        license_plate = None
        for plate, data in vehicle_data.items():
            if data.get("parking_spot") == spot[0]:
                license_plate = plate
                break

        if status == 0:
            # Zielony dla wolnych miejsc
            color = (0, 255, 0)
            cv2.putText(frame, f"Miejsce {spot[0]}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        else:
            # Czerwony dla zajętych miejsc
            color = (0, 0, 255)
            cv2.putText(frame, f"{license_plate}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        cv2.rectangle(frame, (x1-DISPLAY_INACCURACY, y1), (x2-DISPLAY_INACCURACY, y2), color, 2)


# Wyświetlanie tablicy z danymi pojazdów
def print_vehicle_data_table(vehicle_data):
    for plate, times in vehicle_data.items():
        exit_time = times.get('exit_time')
        if exit_time is None:
            exit_time = "None"
        visited_spots = ", ".join(map(str, times.get("visited_spots", [])))
        if not visited_spots:
            visited_spots = "None"
        print(f"Pojazd: {plate}, Wjazd: {times['entry_time']}, Wyjazd: {exit_time}, Zajmowane miejsca parkingowe: {visited_spots}")

# Lista referencyjna tablic rejestracyjnych
reference_texts = ["EL-101", "EL-801", "EL-501", "EL-301", "EL-601"]

# Inicjalizacj awideo
video_path = "OBG.mp4"
reader = easyocr.Reader(['en'])
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Nie można otworzyć pliku wideo.")
    exit()

fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_time = int(1000 / fps)
multiplier = 30 / FPS_PROCESSING
frame_count = 0
scale_factor = 0.7

entrance_text = None
exit_text = None
first_entrance_region = None
first_exit_region = None
prev_parking_spots_table = []
parking_spots_table = []
clear_database()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    if frame_count == 0:
        # Wjazd
        first_entrance_region = frame[ENTRANCE_ZONE[0]:ENTRANCE_ZONE[1], ENTRANCE_ZONE[2]:ENTRANCE_ZONE[3]]
        first_entrance_region = cv2.cvtColor(first_entrance_region, cv2.COLOR_BGR2GRAY)

        # Wyjazd
        first_exit_region = frame[EXIT_ZONE[0]:EXIT_ZONE[1], EXIT_ZONE[2]:EXIT_ZONE[3]]
        first_exit_region = cv2.cvtColor(first_exit_region, cv2.COLOR_BGR2GRAY)

        # Tablica miejsc parkingowych
        prev_parking_spots_table = create_parking_spots_table(frame)

    frame_count += 1

    if frame_count % (FPS_PROCESSING * multiplier) == 0:
        # Przetwarzanie miejsc parkingowych
        parking_spots_table = create_parking_spots_table(frame)
        plate, parking_number = process_parking_spot(prev_parking_spots_table, parking_spots_table)
        if plate:
            add_entry_exit_data(plate, parking_spot=parking_number)

        # Przetwarzanie wjazdu
        entrance_text = process_entrance_region(frame, first_entrance_region, reference_texts, reader)
        if entrance_text:
            entry_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            add_entry_exit_data(entrance_text, entry_time=entry_time)

        # Przetwarzanie wyjazdu
        exit_text = process_exit_region(frame, first_exit_region, reference_texts, reader)
        if exit_text:
            exit_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            add_entry_exit_data(exit_text, exit_time=exit_time)

    # Wirtualne bramki
    create_virtual_gate(frame, entrance_text, exit_text)

    # Rysowanie ramek wokół zajętych miejsc parkingowych
    create_parking_bounding_box()
    
    resized_frame = cv2.resize(frame, (0, 0), fx=scale_factor, fy=scale_factor)
    cv2.imshow("PARKING", resized_frame)
    if cv2.waitKey(frame_time) & 0xFF == 27:
        break

# PRZYPADEK TESTOWY - ZMIANA MIEJSCA PRZEZ POJAZD
# add_entry_exit_data("EL-101", parking_spot=9)
# add_entry_exit_data("EL-101", parking_spot=5)

cap.release()
cv2.destroyAllWindows()

# Wyświetlenie tablicy z danymi
print("\n")
print_vehicle_data_table(vehicle_data)

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


wjazd - EL-101
Przypisano miejsce parkingowe 4 do pojazdu: EL-101
wjazd - EL-801
Przypisano miejsce parkingowe 6 do pojazdu: EL-801
wjazd - EL-501
Przypisano miejsce parkingowe 1 do pojazdu: EL-501
wjazd - EL-301
Przypisano miejsce parkingowe 11 do pojazdu: EL-301
wjazd - EL-601
Przypisano miejsce parkingowe 3 do pojazdu: EL-601
Zwolniono miejsce parkingowe: 4
wyjazd - EL-101
Zwolniono miejsce parkingowe: 6
wyjazd - EL-801
Zwolniono miejsce parkingowe: 1
wyjazd - EL-501
Zwolniono miejsce parkingowe: 11
wyjazd - EL-301
Zwolniono miejsce parkingowe: 3
wyjazd - EL-601


Pojazd: EL-101, Wjazd: 2025-01-28 18:59:17, Wyjazd: 2025-01-28 18:59:44, Zajmowane miejsca parkingowe: 4
Pojazd: EL-801, Wjazd: 2025-01-28 18:59:21, Wyjazd: 2025-01-28 18:59:48, Zajmowane miejsca parkingowe: 6
Pojazd: EL-501, Wjazd: 2025-01-28 18:59:26, Wyjazd: 2025-01-28 18:59:54, Zajmowane miejsca parkingowe: 1
Pojazd: EL-301, Wjazd: 2025-01-28 18:59:31, Wyjazd: 2025-01-28 18:59:57, Zajmowane miejsca parkingowe: 11
Pojaz