# Laboratorium 3

In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import warnings
warnings.filterwarnings('ignore')


input_folder = "/home/plorenc/Desktop/AiR_ISS/AVS/pedestrian/input/"
ref_folder = "/home/plorenc/Desktop/AiR_ISS/AVS/pedestrian/groundtruth"

def calculate_f1_score(thresh, I_ref):
    # Binaryzacja maski referencyjnej
    I_ref = cv2.threshold(I_ref, 127, 255, cv2.THRESH_BINARY)[1]

    # Obliczenie True Positives, False Positives, False Negatives
    TP = np.sum((thresh == 255) & (I_ref == 255))
    FP = np.sum((thresh == 255) & (I_ref == 0))
    FN = np.sum((thresh == 0) & (I_ref == 255))

    # Oblicz Precision i Recall
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0

    # Oblicz F1-score
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return (f1, precision, recall)

def f1_for_video(f1):
    sumf = 0
    num = 0
    for f in f1:
        if f[0] != 0:
            sumf += f[0]
            num += 1
    return round(sumf/num, 2)

def thres_and_morph(I, thres=10, ksize=3, iterations=1):
    _, I = cv2.threshold(I, thres, 255, cv2.THRESH_BINARY)
    I = cv2.medianBlur(I, ksize=ksize)
    
    #kernel = np.ones((ksize, ksize), np.uint8) 
    I = cv2.morphologyEx(I, cv2.MORPH_OPEN, (3, 3), iterations=iterations)

    return I


### zadanie 1 BUFFOR

In [3]:
import os


N = 60  # Rozmiar bufora
threshold_value = 20  # Wartość progowania dla detekcji ruchu

input_folder = "/home/plorenc/Desktop/AiR_ISS/AVS/pedestrian/input/"

ref_folder = "/home/plorenc/Desktop/AiR_ISS/AVS/pedestrian/groundtruth"
f1_med = []
f1_mean = []

first_image = cv2.imread(os.path.join(input_folder, "in000300.jpg"), cv2.IMREAD_GRAYSCALE)

YY, XX = first_image.shape 

BUF = np.zeros((YY, XX, N), np.uint8) # buffor 
iN = 0  # Licznik bufora

for i in range(900, 1100):  
    image_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    ref_path = os.path.join(ref_folder, f"gt{i:06d}.png")

    IG = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

    ref_img = cv2.imread(ref_path, cv2.IMREAD_GRAYSCALE)

    if iN < N:
        BUF[:, :, iN] = IG # buffor jest napełniany
        iN += 1
    else:
        BUF = np.roll(BUF, -1, axis=2)  # po przepełnieniu pierwszy eleent jest usuwany a jako ostani jest dołączony nowy obraz
        BUF[:, :, -1] = IG  

    if iN > 59:
        background_mean = np.mean(BUF, axis=2).astype(np.uint8) # z tensora po osotaniej osi jest obliczna srednia
        background_median = np.median(BUF, axis=2).astype(np.uint8) # z tensora po ostaniej osi jest liczona mediana

        fg_mask_mean = cv2.absdiff(IG, background_mean) # maska tła dla sredniej 
        fg_mask_median = cv2.absdiff(IG, background_median) # maska tła dla mediany 

        _, fg_mask_mean = cv2.threshold(fg_mask_mean, threshold_value, 255, cv2.THRESH_BINARY)
        _, fg_mask_median = cv2.threshold(fg_mask_median, threshold_value, 255, cv2.THRESH_BINARY)


        fg_mask_mean = cv2.medianBlur(fg_mask_mean, 5)
        fg_mask_median = cv2.medianBlur(fg_mask_median, 5)
        
        fg_mask_mean = cv2.morphologyEx(fg_mask_mean, cv2.MORPH_OPEN, (7, 7), iterations=3)
        fg_mask_median = cv2.morphologyEx(fg_mask_median, cv2.MORPH_OPEN, (7, 7), iterations=3)

        f1_med.append(calculate_f1_score(fg_mask_median, ref_img))
        f1_mean.append(calculate_f1_score(fg_mask_mean, ref_img))

        combined_image = cv2.hconcat([fg_mask_mean, fg_mask_median])
        cv2.imshow("Foreground Mask", combined_image)

        cv2.waitKey(10)

cv2.destroyAllWindows()

In [4]:
f1_for_video(f1_mean), f1_for_video(f1_med)

(0.53, 0.77)

### zadanie 2 Aproksymacja mediany i średniej

In [5]:
alpha = 0.2

first_image = cv2.imread(os.path.join(input_folder, "in000300.jpg"), cv2.IMREAD_GRAYSCALE)
YY, XX = first_image.shape

bg_mean = first_image.astype(np.float32)
bg_median = first_image.astype(np.uint8)

f1_med = []
f1_mean = []

for i in range(300, 1100, 1):
    I_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    I_ref_path = os.path.join(ref_folder, f"gt{i:06d}.png")

    I = cv2.imread(I_path, cv2.IMREAD_GRAYSCALE)
    I_ref = cv2.imread(I_ref_path, cv2.IMREAD_GRAYSCALE)

    bg_mean = alpha*I + (1-alpha)*bg_mean
    bg_mean = bg_mean.astype(np.uint8)

    bg_median[bg_median < I] += 1
    bg_median[bg_median > I] -= 1

    diff_mean = cv2.absdiff(I, bg_mean)
    diff_median = cv2.absdiff(I, bg_median)

    diff_mean = thres_and_morph(diff_mean, ksize=3, iterations=2)
    diff_median = thres_and_morph(diff_median, ksize=3, iterations=2)

    combine = cv2.hconcat([diff_mean, diff_median])

    f1_med.append(calculate_f1_score(diff_median, I_ref))
    f1_mean.append(calculate_f1_score(diff_mean, I_ref))

    cv2.imshow("test", combine)
    cv2.waitKey(10)

    
cv2.destroyAllWindows()


In [6]:
f1_for_video(f1_mean), f1_for_video(f1_med)

(0.62, 0.81)

### Zadanie 3 Polityka konserwatywna i liberalna

In [None]:
alpha = 0.2

first_image = cv2.imread(os.path.join(input_folder, "in000300.jpg"), cv2.IMREAD_GRAYSCALE)
YY, XX = first_image.shape

bg_mean = first_image.astype(np.float32)
bg_median = first_image.astype(np.uint8)

f1_med = []
f1_mean = []

for i in range(300, 900, 1):
    I_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    I_ref_path = os.path.join(ref_folder, f"gt{i:06d}.png")

    I = cv2.imread(I_path, cv2.IMREAD_GRAYSCALE)
    I_ref = cv2.imread(I_ref_path, cv2.IMREAD_GRAYSCALE)

    _, mask_mean = cv2.threshold(bg_mean, 30, 255, cv2.THRESH_BINARY)
    _, mask_median = cv2.threshold(bg_median, 30, 255, cv2.THRESH_BINARY)

    mask_mean = mask_mean == 0 # zostało sklaifikowanie jako tło 
    mask_median = mask_median == 0 # zostało sklaifikowanie jako tło 

    bg_mean[mask_mean] = alpha * I[mask_mean] + (1 - alpha) * bg_mean[mask_mean]
    
    bg_median[(bg_median < I) & (mask_median == 0)] += 1
    bg_median[(bg_median > I) & (mask_median == 0)] -= 1

    diff_mean = cv2.absdiff(I, bg_mean.astype(np.uint8))
    diff_median = cv2.absdiff(I, bg_median)

    diff_mean = thres_and_morph(diff_mean, ksize=5, iterations=2)
    diff_median = thres_and_morph(diff_median, ksize=5, iterations=2)

    combine = cv2.hconcat([diff_mean, diff_median])

    f1_med.append(calculate_f1_score(diff_median, I_ref))
    f1_mean.append(calculate_f1_score(diff_mean, I_ref))

    cv2.imshow("test", combine)
    cv2.waitKey(10)

cv2.destroyAllWindows()


In [None]:
f1_for_video(f1_mean), f1_for_video(f1_med)

(0.74, 0.82)

### Zadanie 4 MOG

In [None]:
first_image = cv2.imread(os.path.join(input_folder, "in000300.jpg"), cv2.IMREAD_GRAYSCALE)
YY, XX = first_image.shape

bg_mean = first_image.astype(np.float32)
bg_median = first_image.astype(np.uint8)

f1_mog = []

history = 500
varThras = 10

bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=history, varThreshold=varThras, detectShadows=False)

for i in range(300, 1100, 2):
    I_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    I_ref_path = os.path.join(ref_folder, f"gt{i:06d}.png")

    I = cv2.imread(I_path, cv2.IMREAD_GRAYSCALE)
    I_ref = cv2.imread(I_ref_path, cv2.IMREAD_GRAYSCALE)
    bg = bg_subtractor.apply(I, learningRate=-1)

    bg = thres_and_morph(bg, ksize=5, iterations=2)

    f1_mog.append(calculate_f1_score(bg, I_ref))

    cv2.imshow("test", bg)
    cv2.waitKey(10)

cv2.destroyAllWindows()

In [None]:
f1_for_video(f1_mog)

0.83

### Zadanie 5 KNN

In [None]:
first_image = cv2.imread(os.path.join(input_folder, "in000300.jpg"), cv2.IMREAD_GRAYSCALE)
YY, XX = first_image.shape

bg_mean = first_image.astype(np.float32)
bg_median = first_image.astype(np.uint8)

f1_knn = []

history = 500
dist2Threshold = 70

bg_subtractor = cv2.createBackgroundSubtractorKNN(history=history, dist2Threshold=dist2Threshold, detectShadows=False)

for i in range(300, 1100, 2):
    I_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    I_ref_path = os.path.join(ref_folder, f"gt{i:06d}.png")

    I = cv2.imread(I_path, cv2.IMREAD_GRAYSCALE)
    I_ref = cv2.imread(I_ref_path, cv2.IMREAD_GRAYSCALE)
    bg = bg_subtractor.apply(I, learningRate=-1)

    bg = thres_and_morph(bg, ksize=5, iterations=2)

    f1_knn.append(calculate_f1_score(bg, I_ref))

    cv2.imshow("test", bg)
    cv2.waitKey(10)

cv2.destroyAllWindows()

In [None]:
f1_for_video(f1_knn)

0.82

### Sieć neuronowa

### Zadanie 6 Dodatkowe

In [55]:
I_prew = cv2.imread(os.path.join(input_folder, "in000300.jpg"), cv2.IMREAD_GRAYSCALE)
YY, XX = I_prew.shape

# Parametry
threshold_tolerance = 0.8
N = 30  # Rozmiar bufora
iN = 0

# Inicjalizacja bufora tła
BUFFOR = np.zeros([YY, XX, N], dtype=np.uint8)

for i in range(301, 1100):
    I_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    I = cv2.imread(I_path, cv2.IMREAD_GRAYSCALE)

    # Obliczenie różnicy pomiędzy klatkami
    I_dif = cv2.absdiff(I, I_prew)
    I_prew = I

    # Progowanie maski różnic
    _, bg = cv2.threshold(I_dif, np.mean(I_dif) * threshold_tolerance, 255, cv2.THRESH_BINARY)

    # Konwersja do wartości 0 lub 1 (uint8)
    bg = (bg > 0).astype(np.uint8)

    # Aktualizacja bufora
    if iN < N:
        BUFFOR[:, :, iN] = bg
        iN += 1
    else:
        BUFFOR = np.roll(BUFFOR, -1, axis=2)
        BUFFOR[:, :, -1] = bg

    # Generowanie maski tła po zapełnieniu bufora
    if iN == N:
        mask = np.sum(BUFFOR, axis=2)

        # Tworzenie maski obszarów tła
        bg_final = np.zeros([YY, XX], dtype=np.uint8)
        bg_final[mask > (threshold_tolerance * N)] = 255  # Próg dla stabilnych pikseli

        # Operacje morfologiczne (opcjonalne)
        bg_final = cv2.dilate(bg_final, None, iterations=2)
        bg_final = cv2.erode(bg_final, None, iterations=2)

        cv2.imshow("Background", bg_final)

    cv2.waitKey(10)

cv2.destroyAllWindows()

### Zadanie 7 Dodatkowe

Algorytm VIBE (Visual Background Extractor) wykrywa ruch poprzez przechowywanie zbioru próbek pikseli dla każdego punktu obrazu i porównywanie nowych wartości do tych próbek. Jeśli nowa wartość pasuje do co najmniej określonej liczby próbek, uznawana jest za tło, a w przeciwnym razie traktowana jako obiekt w ruchu. Co pewien czas algorytm losowo aktualizuje próbki, aby dostosować się do zmian w scenie. Dzięki temu VIBE jest szybki, adaptacyjny i efektywny w wykrywaniu obiektów w ruchu w statycznym tle.

In [44]:
class ViBE:
    def __init__(self, num_samples=100, min_matches=3, radius=30):
        self.num_samples = num_samples  # Liczba próbek tła dla każdego piksela
        self.min_matches = min_matches  # Minimalna liczba dopasowań do uznania piksela za tło
        self.radius = radius            # Promień porównywania jasności
        self.background_model = None    # Model tła

    def initialize(self, frame):
        """Inicjalizacja modelu tła na podstawie pierwszej klatki."""
        h, w = frame.shape
        self.background_model = np.zeros((h, w, self.num_samples), dtype=np.uint8) # Inicjacja modelu tła zerami 
        for i in range(self.num_samples):
            noise_x = np.random.randint(-5, 5, size=(h, w))  # dodanie szumu do modelu tła że był bardziej odporny na rzeczywiste zakłócenia  
            noise_y = np.random.randint(-5, 5, size=(h, w))
            x_idx = np.clip(np.arange(w) + noise_x, 0, w - 1)

            
            y_idx = np.clip(np.arange(h)[:, np.newaxis] + noise_y, 0, h - 1)
            
            self.background_model[:, :, i] = frame[y_idx, x_idx]


    def apply(self, frame):
        """Zwraca maskę pierwszego planu na podstawie aktualnej klatki."""
        if self.background_model is None:
            self.initialize(frame)
        
        h, w = frame.shape
        fg_mask = np.zeros((h, w), dtype=np.uint8)

        for i in range(self.num_samples):
            matches = cv2.absdiff(frame, self.background_model[:, :, i]) < self.radius # Odejmujemy warosci i patrzymy czy roznice sa znczące 
            fg_mask += matches.astype(np.uint8)

        fg_mask = (fg_mask < self.min_matches).astype(np.uint8) * 255
        return fg_mask


first_image_path = os.path.join(input_folder, "in000300.jpg")
first_image = cv2.imread(first_image_path, cv2.IMREAD_GRAYSCALE)

# inicjalizacja algorytmu ViBE
vibe = ViBE()
vibe.initialize(first_image)

f1_vibe = []

for i in range(300, 500, 1):
    I_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    I_ref_path = os.path.join(ref_folder, f"gt{i:06d}.png")

    I = cv2.imread(I_path, cv2.IMREAD_GRAYSCALE)
    I_ref = cv2.imread(I_ref_path, cv2.IMREAD_GRAYSCALE)

    fg_mask = vibe.apply(I)

    fg_mask = thres_and_morph(fg_mask, ksize=3)

    cv2.imshow("Foreground Mask", fg_mask)

    f1_vibe.append(calculate_f1_score(fg_mask, I_ref))

    cv2.waitKey(10)

cv2.destroyAllWindows()

In [22]:
f1_for_video(f1_vibe)

0.62

Algorytm PBAS (Pixel-Based Adaptive Segmenter) działa na zasadzie dynamicznej adaptacji progu detekcji ruchu dla każdego piksela. Tworzy histogram wartości pikseli w czasie i na jego podstawie ocenia, czy dany piksel należy do tła. W przeciwieństwie do VIBE, PBAS dostosowuje próg czułości indywidualnie dla każdego piksela, co pozwala lepiej radzić sobie z dynamicznie zmieniającym się tłem, np. falującą wodą czy migoczącymi światłami.

In [None]:
class PBAS:
    def __init__(self, history=100, threshold=2, alpha=0.05):
        self.history = history          # Liczba zapamiętanych klatek
        self.threshold = threshold      # Początkowa wartość progowa
        self.alpha = alpha              # Tempo adaptacji progu
        self.samples = None             # Historia wartości pikseli
        self.dynamic_threshold = None   # Dynamiczne progi dla każdego piksela

    def initialize(self, frame):
        """ Inicjalizacja pamięci dla historii pikseli """
        h, w = frame.shape
        self.samples = np.zeros((h, w, self.history), dtype=np.uint8)  
        self.dynamic_threshold = np.full((h, w), self.threshold, dtype=np.float32) # Tablica h na w wypełniona wartosciami self.threshold

        for i in range(self.history):
            self.samples[:, :, i] = frame

    def apply(self, frame):
        """ Aktualizuje model i zwraca maskę ruchu """
        if self.samples is None:
            self.initialize(frame)

        h, w = frame.shape
        fg_mask = np.ones((h, w), dtype=np.uint8) * 255

        # Obliczamy odległość do każdego z zapamiętanych pikseli
        dist = np.abs(self.samples - frame[:, :, None])
        #dist = cv2.absdiff(self.samples, frame[:, :, None])
        match = np.any(dist < self.dynamic_threshold[:, :, None], axis=2)

        # Ustawienie maski ruchu
        fg_mask[match] = 0

        self.dynamic_threshold += self.alpha * ((fg_mask == 255) - 0.5)

        # Aktualizacja historii pikseli
        rand_idx = np.random.randint(0, self.history, (h, w))
        update_mask = (np.random.rand(h, w) < 0.05)  # 5% losowych aktualizacji
        self.samples[np.arange(h)[:, None], np.arange(w), rand_idx] = frame

        return fg_mask
    
    
# Wczytanie pierwszego obrazu do inicjalizacji modelu
first_image_path = os.path.join(input_folder, "in000300.jpg")
first_image = cv2.imread(first_image_path, cv2.IMREAD_GRAYSCALE)

# Inicjalizacja algorytmu ViBE
pbsa = PBAS()
pbsa.initialize(first_image)

f1_pbsa = []

for i in range(300, 1100, 1):
    I_path = os.path.join(input_folder, f"in{i:06d}.jpg")
    I_ref_path = os.path.join(ref_folder, f"gt{i:06d}.png")

    I = cv2.imread(I_path, cv2.IMREAD_GRAYSCALE)

    fg_mask = pbsa.apply(I)

    cv2.imshow("Foreground Mask", fg_mask)

    fg_mask = thres_and_morph(fg_mask)

    fg_mask = f1_pbsa.append(fg_mask)

    cv2.waitKey(10)

cv2.destroyAllWindows()