## Input the evaluation image folder

In [1]:
img_dir = './Evaluation Dataset Images'

## Predict results from classification model

In [2]:
from utils_RHM.tools import process_directory, process_dataset, load_saved_model
import torch
process_directory(img_dir)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
img_data , model = process_dataset(img_dir,device)
model = load_saved_model('./pretrained_model/classification.pt',model)
from utils_RHM.tools import evaluate_model
all_predictions = evaluate_model(img_data , model , device)

Saving to /Data/home/Dennis/CV_in_Construction/CAE_hackathon/MainCategoryClassification/Evaluation Dataset Images/_classes.csv
<torch.utils.data.dataloader.DataLoader object at 0x7fb71ab83e80>


                                                  

In [3]:
import pandas as pd
import torch
import numpy as np
import os
threshold = 0.5
data = []
category_info = {'Aeration':0, 'Discolouration_colour':1, 'Discolouration_Outfall':2,'Wildlife_Fish': 3,'Modified_Channel':4, 'Obstruction':5,
                 'Outfall':6, 'Outfall_Aeration':7, 'Outfall_Screen':8, 'Outfall_Spilling':9,
                 'Rubbish':10, 'Sensor':11, 'Wildlife_Algal':12, 'Wildlife_Birds':13,
                 'Wildlife_Other':14}
for idx, logits in enumerate(all_predictions):
    probs = torch.sigmoid(logits)
    predicted_classes = np.where(probs >= threshold, 1, 0)
    ID = os.path.basename( img_data.dataset.img_path[idx])
    ID = os.path.splitext(ID)[0]
    row = {'ImageID': ID}
    for category, index in category_info.items():
        row[category] = predicted_classes[index]
    data.append(row)

df_prediction = pd.DataFrame(data)
df_submision = pd.read_csv('./Evaluation Dataset Images/Hackathon Model Evaluation submission.csv')
import pandas as pd
merged_df = pd.merge(df_submision, df_prediction, on='ImageID', how='left', suffixes=('', '_df2'))
for column in df_submision.columns:
    if column != 'ImageID':
        if column in df_prediction.columns:
            df_submision[column] = merged_df[column + '_df2']

## Caculate the ratio of aeration, modified channel, obstruction

### Some functions

In [4]:
from ultralytics import YOLO
def yolo_predict(image_path, specific_classes, device):
    model = YOLO('./pretrained_model/object_detection.pt').to(device)
    results = model(image_path,verbose=False)
    yolo_boxes = []
    for result in results:
        for box, class_id in zip(result.boxes.xyxy, result.boxes.cls):
            x1, y1, x2, y2 = box[:4]
            class_id = class_id.item()
            if class_id in specific_classes:
                yolo_boxes.append((x1.item(), y1.item(), x2.item(), y2.item(), class_id))
    return yolo_boxes


In [11]:
from torchvision import transforms
from PIL import Image
import torchvision.models.segmentation as segmentation
def deeplab_predict(image_path, device):
    deeplab_model = segmentation.deeplabv3_resnet101(weights=segmentation.DeepLabV3_ResNet101_Weights.DEFAULT)
    deeplab_model.classifier[4] = torch.nn.Conv2d(256, 1, kernel_size=(1,1), stride=(1,1))
    deeplab_model.load_state_dict(torch.load('./pretrained_model/river_segmentation.pth'))
    deeplab_model.to(device)
    image = Image.open(image_path).convert("RGB")
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    input_tensor = transform(image).unsqueeze(0).to(device)
    deeplab_model.eval()
    with torch.no_grad():
        output = deeplab_model(input_tensor)['out']
    pred_mask = output[0].cpu().squeeze()
    return pred_mask > 0

In [12]:
def calculate_overlap(yolo_boxes, mask, class_coefficients):
    mask_np = mask.cpu().numpy()
    mask_area = np.sum(mask_np)
    overlap_ratios = {}

    for box in yolo_boxes:
        x1, y1, x2, y2, class_id = box
        box_mask = np.zeros_like(mask_np)
        box_mask[int(y1):int(y2), int(x1):int(x2)] = 1

        intersection = np.logical_and(box_mask, mask_np)
        overlap_area = np.sum(intersection)
        overlap_ratio = overlap_area / mask_area if mask_area > 0 else 0
        if int(class_id) not in overlap_ratios:
            overlap_ratios[int(class_id)] = 0
        overlap_ratios[int(class_id)] += overlap_ratio * class_coefficients.get(int(class_id), 1.0)
    return overlap_ratios

In [13]:
import os
img_paths = []
for root, dirs, files in os.walk(img_dir):
    for file in files:
        if file.lower().endswith('.jpg'):
            img_path = os.path.join(root, file)
            img_paths.append(img_path)

In [14]:
import pandas as pd
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
specific_classes = [0, 4, 5]
data = []
class_names = {0: 'Aeration_Extent', 4: 'Proportion_Modification', 5: 'Obstruction'}
merged_ratios = {'Aeration_Extent': 0, 'Proportion_Modification': 0, 'Obstruction': 0}
# 'Aeration_Extent', 'Proportion_Modification', 'Obstruction'
for img_path in img_paths:
    ID = os.path.basename( img_path)
    ID = os.path.splitext(ID)[0]
    row = {'ImageID': ID}
    data.append(row)
    yolo_boxes = yolo_predict(img_path, specific_classes, device)
    mask = deeplab_predict(img_path, device)
    class_coefficients = {0: 0.8, 4: 2, 5: 3}
    overlap_ratios = calculate_overlap(yolo_boxes, mask, class_coefficients)
    for class_id, ratio in overlap_ratios.items():
        class_name = class_names[int(class_id)]
        merged_ratios[class_name] += ratio
    for category, index in merged_ratios.items():
        row[category] = index

In [None]:
df_prediction = pd.DataFrame(data)
df_prediction.loc[df_prediction['Proportion_Modification'] > 100, 'Proportion_Modification'] = 100
df_prediction.loc[df_prediction['Aeration_Extent'] > 100, 'Aeration_Extent'] = 100
df_prediction.loc[df_prediction['Obstruction'] > 100, 'Obstruction'] = 100
merged_df = pd.merge(df_submision, df_prediction, on='ImageID', how='left', suffixes=('', '_df2'))
for column in df_submision.columns:
    if column != 'ImageID':
        if column in df_prediction.columns:
            df_submision[column] = merged_df[column + '_df2']

## Wildlife categopry compilation

In [None]:
columns_to_check = ['Wildlife_Fish', 'Wildlife_Algal', 'Wildlife_Invertebrate','Wildlife_Birds','Wildlife_Other']
condition = (df_submision[columns_to_check].notna() & (df_submision[columns_to_check] != 0)).any(axis=1)
df_submision['Wildlife'] = np.where(condition, 1, df_submision['Wildlife'])
df_submision['Wildlife'] = df_submision['Wildlife'].apply(lambda x: 0 if x != 1 else 1)

## Categorize different ratio of aeration, modified channel, obstruction

In [None]:
def categorize(value):
    if value == 100:
        return 'Full'
    elif value >= 25:
        return 'Partial'
    elif value < 25 and value > 0:
        return 'Minor'
    elif value == 0:
        return 'None'
df_submision['Aeration_Extent'] = df_submision['Aeration_Extent'].apply(categorize)
df_submision['Proportion_Modification'] = df_submision['Proportion_Modification'].apply(categorize)
df_submision['Obstruction'] = df_submision['Obstruction'].apply(categorize)


## Export to csv file

In [None]:
df_submision.to_csv('Model_Evaluation_Submission_Team1_Final.csv', index=False)