In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import pandas as pd
import random
bg_path= "data/images/1660712400.jpg"

# Utils

In [2]:
def get_beach_mask(bg_path):
    """
    Generates a mask highlighting the beach area from a given background image.

    This function processes the input image by applying several image processing techniques 
    such as grayscale conversion, Gaussian blurring, thresholding, dilation, and noise removal 
    to segment the beach area. It also identifies and combines lake/sky and mountain regions 
    to improve the segmentation of the beach area.

    Args:
        bg_path (str): The path to the input background image.

    Returns:
        numpy.ndarray: A binary mask (numpy array) where the beach area is marked as 255 
                        (white), and all other areas are 0 (black).
    """
    #Lake
    bg_img= cv2.imread(bg_path)
    bg_img=cv2.cvtColor(bg_img,cv2.COLOR_BGR2GRAY)
    bg_img= cv2.GaussianBlur(bg_img,(5,5),0)
    ret,bg_img = cv2.threshold(bg_img,120,255,cv2.THRESH_BINARY)
    kernel=np.ones((5,5))
    bg_img = cv2.dilate(bg_img,kernel,iterations = 4)
    mask_lake_sky=bg_img>0

    #mountain
    bg_img= cv2.imread(bg_path)
    bg_img=cv2.cvtColor(bg_img,cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE()
    bg_img= cv2.GaussianBlur(bg_img,(5,5),0)
    bg_img = cv2.dilate(bg_img,kernel,iterations = 4)
    ret,mountain_bg = cv2.threshold(bg_img,90,255,cv2.THRESH_BINARY_INV)
    mountain_bg[mask_lake_sky]=0
    mask_mountain_bg=mountain_bg>0

    #Remove noise
    img=cv2.imread(bg_path)
    img=cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
    img= cv2.GaussianBlur(img,(5,5),0)
    img[mask_lake_sky] = 0
    img[mask_mountain_bg] = 0
    ret,img = cv2.threshold(img,20,255,cv2.THRESH_BINARY)
    kernel = np.ones((5,5),np.uint8)
    # img = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
    
    kernel = np.ones((3,3),np.uint8)
    img=  cv2.erode(img, kernel) 


    analysis = cv2.connectedComponentsWithStats(img, 
                                            4, 
                                            cv2.CV_32S) 
    (totalLabels, label_ids, values, centroid) = analysis 
    totalLabels,cv2.CC_STAT_AREA
    output = np.zeros(img.shape, dtype="uint8")
    idx= np.where(values[:, cv2.CC_STAT_AREA]>10000)[0]
    i=idx[-1]
    area = values[i, cv2.CC_STAT_AREA]   
    
    # Labels stores all the IDs of the components on the each pixel 
    # It has the same dimension as the threshold 
    # So we'll check the component 
    # then convert it to 255 value to mark it white 
    componentMask = (label_ids == i).astype("uint8") * 255
      
    # Creating the Final output mask 
    output = cv2.bitwise_or(output, componentMask)
    
    kernel=np.ones((15,15))
    output = cv2.dilate(output,kernel,iterations = 5)
    
    kernel=np.ones((10,10))
    
    output = cv2.morphologyEx(output, cv2.MORPH_CLOSE, kernel)
    
    beach_mask= output>0

    return beach_mask


def compute_image_metric_level_table(predictions_points,gt_points, MAX_ERROR_DISTANCE):
    """
    Computes a table of image metric results for comparing predicted points against ground truth points.
    This function calculates the Euclidean distance between each predicted point and ground truth points, 
    and determines whether each predicted point is a true positive (TP), false positive (FP), or a false negative (FN).
    
    The result is a table of predictions with corresponding ground truth matches, and the number of false negatives.

    Args:
        predictions_points (numpy.ndarray): A 2D array of shape (N, 2) where each row represents a predicted point 
                                             with (x, y) coordinates.
        gt_points (numpy.ndarray): A 2D array of shape (M, 2) where each row represents a ground truth point 
                                    with (x, y) coordinates.
        MAX_ERROR_DISTANCE (float): The maximum allowable Euclidean distance to consider a match between a 
                                     predicted point and a ground truth point.

    Returns:
        tuple: 
            - table_results (numpy.ndarray): A 2D array of shape (N, 6) with columns:
                (pred_x, pred_y, tp, fp, matched_gt_x, matched_gt_y)
                where `tp` is 1 if the predicted point is a true positive, `fp` is 1 if it's a false positive, 
                and `matched_gt_x`, `matched_gt_y` are the coordinates of the matched ground truth point (or None if no match).
                
            - fn (int): The number of false negatives, i.e., the number of ground truth points that were not matched to any prediction.

    """
    gt_points_remaining = gt_points.copy()  # Copy the ground truth points to keep track of unmatched ones
    table_results = []  # To store results as a list of tuples: (pred_x, pred_y, tp, fp, gt_x, gt_y)
    for pred in predictions_points:
        best_distance_match = 900  # Initialize best match distance as a large number
        best_idx_match = -1  # Index of the best match ground truth poin
        # Iterate over remaining ground truth points
        for idx, gt in enumerate(gt_points_remaining):
            # Calculate Euclidean distance using np.linalg.norm
            eucledian_distance = np.linalg.norm(pred - gt)  # Pred and gt are arrays or tuples, eucledian_distance is a scalar
            # Check if this is the best (smallest) distance so far
            if eucledian_distance < best_distance_match:
                best_distance_match = eucledian_distance
                best_idx_match = idx  # Store the index of the matching ground truth point
    
        # Check if the best match is within the allowable error distance
        if best_distance_match <= MAX_ERROR_DISTANCE:
            # Add the match result to the table: (pred_x, pred_y, tp, fp, matched_gt_x, matched_gt_y)
            table_results.append((pred[0], pred[1], 1, 0, gt_points_remaining[best_idx_match][0], gt_points_remaining[best_idx_match][1]))
            # Remove the matched ground truth point from the remaining list
            gt_points_remaining = np.delete(gt_points_remaining, best_idx_match, axis=0)
        else:
            # If no match is found within the error distance, mark it as a false positive
            table_results.append((pred[0], pred[1], 0, 1, None, None))
    table_results= np.array(table_results)
    fn = len(gt_points_remaining)  # Already calculated earlier
    return table_results,fn


# Main

In [5]:
df= pd.read_csv('data/annotation_points.csv',header=None)
df.columns= ['label','x','y','filename','witdth','height']
images_path =  "data/images/"+df.filename.unique()
images_path = np.insert(images_path, 0, bg_path, axis=0)
beach_mask = get_beach_mask(bg_path)
len(images_path)

[ WARN:0@41.610] global loadsave.cpp:241 findDecoder imread_('images/1660712400.jpg'): can't open/read file: check file path/integrity


error: OpenCV(4.10.0) /io/opencv/modules/imgproc/src/color.cpp:196: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'


In [None]:
metrics_image_level=[]
metrics_person_level=[]
fn_global=0
theshold_person= 90
for img_path in images_path:
    img=cv2.imread(img_path)
    gray_img = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
    gray_img= cv2.GaussianBlur(gray_img,(5,5),0)
    ret,gray_img = cv2.threshold(gray_img,theshold_person,255,cv2.THRESH_BINARY_INV)
    gray_img[~beach_mask] = 0

    # Apply the Component analysis function 
    analysis = cv2.connectedComponentsWithStats(gray_img,4,cv2.CV_32S) 
    (totalLabels, label_ids, values, centroid) = analysis 
    predictions_indx=(values[:, cv2.CC_STAT_AREA]  > 50) & (values[:, cv2.CC_STAT_AREA]  < 1000)
    predictions_points = centroid[predictions_indx]  # Get predicted points
    #Compute metrics
    filename= img_path.replace("images/","")
    gt_points = df[df.filename == filename][['x', 'y']].to_numpy()  # Get ground truth points as a numpy array
    
    metrics_image_level.append([filename, len(gt_points), len(predictions_points),] )

    MAX_ERROR_DISTANCE=130
    temp,fn = compute_image_metric_level_table(predictions_points,gt_points, MAX_ERROR_DISTANCE)
    fn_global+=fn
    new_col_filename = np.full((len(temp),1),filename)
    new_col_fn = np.full((len(temp),1),fn)
    temp = np.append(new_col_filename,temp,axis=1)
    temp = np.append(temp,new_col_fn, axis=1)
    if len(metrics_person_level)>0:
        metrics_person_level= np.concatenate((metrics_person_level,temp),axis=0)
    else:
        metrics_person_level = temp
metrics_image_level=np.array(metrics_image_level)
df_results_image = pd.DataFrame(metrics_image_level, columns=['filename','gt', 'pred'])
df_results_person = pd.DataFrame(metrics_person_level, columns=['filename','pred_x', 'pred_y', 'tp', 'fp', 'gt_x', 'gt_y','fn'])
# df_results_person.head()

metrics_image_level=np.array(metrics_image_level)
df_results_image = pd.DataFrame(metrics_image_level, columns=['filename','gt', 'pred'])
df_results_person = pd.DataFrame(metrics_person_level, columns=['filename','pred_x', 'pred_y', 'tp', 'fp', 'gt_x', 'gt_y','fn'])
# df_results_person.head()
df_aggregated = df_results_person.groupby('filename').agg({
    'tp': 'sum',   # Sum tp values
    'fp': 'sum',   # Sum fp values
    'fn': 'max'    # Take the max fn value
}).reset_index()
df_aggregated['precision'] = df_aggregated.tp /(df_aggregated.tp + df_aggregated.fp)*100
# df_aggregated['recall'] = df_aggregated.tp /(df_aggregated.tp + df_aggregated.fn)*100 if  df_aggregated.fn> 0 else 0
def calculate_recall(row):
    if row['fn'] > 0:
        return (row['tp'] / (row['tp'] + row['fn'])) * 100
    else:
        return 0
df_aggregated['recall'] = df_aggregated.apply(calculate_recall, axis=1)
df_aggregated['accurac'] = df_aggregated.tp /(df_aggregated.tp + df_aggregated.fp+df_aggregated.fn)*100

df_results_image.gt = df_results_image['gt'].astype(int)
df_results_image.pred = df_results_image['pred'].astype(int)
df_results_image["MSE"]=(df_results_image.gt - df_results_image.pred)**2
df_aggregated = df_aggregated.merge(df_results_image,how='inner',on='filename')
df_aggregated

In [None]:
df_aggregated.to_csv("results.csv",sep=';')

In [None]:
#Print the Mean Square ERRor
mse = df_results_image["MSE"].sum()/len(df_results_image)
print("Mean Square Error ",mse)

# Plot Sample

In [None]:

# filesnames = df_results_person.filename.unique()\
# filename = filesnames[random.randint(0,len(filesnames)-1)]
filename= '1660752000.jpg'
df_sample = df_results_person[df_results_person.filename  == filename]
df_sample_gt= df[df.filename == filename]
img= cv2.imread(f"images/{filename}")
img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
plt.figure(figsize=(20,15))
# plt.scatter(df_sample['x'],df_sample['y'], c ="blue")
plt.imshow(img)
df_tp= df_sample[df_sample.tp == 1]
df_fp= df_sample[df_sample.fp == 1]

plt.scatter(df_sample_gt['x'],df_sample_gt['y'], c ="blue")
plt.scatter(df_tp['pred_x'],df_tp['pred_y'], c ="green")
plt.scatter(df_fp['pred_x'],df_fp['pred_y'], c ="red")
plt.show()