In [1]:
import geopandas as gpd
import pandas as pd
import numpy as np
import statistics as st
import os
from shapely.geometry import Point
import matplotlib.pyplot as plt

from tqdm import tqdm
from time import sleep

import hlb_utils as hlb

In [2]:
def get_overlay(gpd_canopy_r, gpd_canopy_e):
    
    over = gpd.overlay(gpd_canopy_r, gpd_canopy_e)
    #over = over.loc[over['id_1'] != over['id_2']].explode().reset_index(drop=True)
    if over.shape[0] != 0:        
        over['geo_id1'] = gpd_canopy_r.loc[list(over['id_1']), 'geometry'].values
        over['geo_id2'] = gpd_canopy_e.loc[list(over['id_2']), 'geometry'].values
    return over


def get_pairs(canopy_r, canopy_e):
    pairs = {}

    with tqdm(total=len(canopy_r.index)) as pbar:
        pbar.set_description("Identificando pares")
        sleep(0.1)

        for i in canopy_r.index:

            polygon = canopy_r.loc[i, 'geometry']

            over = canopy_e.intersection(polygon)
            over = over.loc[~over.is_empty]

            if over.shape[0] > 1:
                id_ext = over.area.sort_values(ascending=False).index[0]
            elif over.shape[0] == 1:
                id_ext = over.area.index[0]
            else:
                id_ext = -999

            if id_ext != -999 and type(over.loc[id_ext]) != type(None):
                if over.loc[id_ext].geom_type == 'Polygon':
                    pairs[i] = id_ext
            id_ext = -999
            pbar.update(1)      
        
    return pairs


def get_label_pairs(pairs):
    label_pairs = []
    for k, v in pairs.items():
        label_pairs.append(str(float(k))+'-'+str(float(v)))
        
    return label_pairs

    
def compile_data(canopy_r, canopy_e, pairs):
    
    label_pairs = get_label_pairs(pairs)
    
    intersec = get_overlay(canopy_r, canopy_e)
    intersec['label_id'] = [str(float(i1)) + '-' + str(float(i2)) for i1, i2 in zip(list(intersec['id_1']), list(intersec['id_2']))]
    intersec = intersec.loc[intersec['label_id'].isin(label_pairs)]
    intersec = intersec.drop_duplicates('id_2')
    
    intersec['geo_union'] = intersec['geo_id1'].union(intersec['geo_id2'])
    intersec['IoU'] = intersec['geometry'].area/intersec['geo_union'].area
    
    return intersec

    
def get_TruePositive(gpd_intersection, IoU_threshold = 0.5):
    TP = gpd_intersection.loc[gpd_intersection['IoU'] > IoU_threshold]    
    return TP


def get_TruePositive2(gpd_intersection, IoU_threshold = 0.5):
    TP = gpd_intersection[over['geometry'].area > IoU_threshold * gpd_intersection['geo_id1'].area]
    TP = TP.loc[TP['geometry'].contains(TP['geo_id1'].centroid)]
    
    TP.drop_duplicates('geo_id1', inplace=True)
    TP.drop_duplicates('geo_id2', inplace=True)
    
    return TP


def calculate_IoU(gpd_TruePositive):
    list_union = []
    for i in gpd_TruePositive.index:
        union = gpd_TruePositive.loc[i, 'geo_id1'].union(gpd_TruePositive.loc[i, 'geo_id2'])
        list_union.append(union)

    gpd_TruePositive['union_geo'] = list_union
    gpd_TruePositive['IoU'] = gpd_TruePositive['geometry'].area / gpd.GeoSeries(gpd_TruePositive['union_geo']).area
    return gpd_TruePositive


def get_FalseNegative(gpd_canopy_r, gpd_TruePositive):
    return canopy_r.loc[[x for x in canopy_r.index if x not in list(gpd_TruePositive['id_1'])]]


def get_FalsePositive(gpd_canopy_e, gpd_TruePositive):
    return gpd_canopy_e.loc[[x for x in gpd_canopy_e.index if x not in list(gpd_TruePositive['id_2'])]]

def get_metrics(canopy_r, canopy_e):

    canopy_e['id'] = canopy_e.index
    canopy_e['id'] = canopy_e.index

    pares = get_pairs(canopy_r, canopy_e)
    dados = compile_data(canopy_r, canopy_e, pares)

    TP = get_TruePositive(dados)
    FN = get_FalseNegative(canopy_r, TP)
    FP = get_FalsePositive(canopy_e, TP)

    precision = TP.shape[0]/(TP.shape[0]+FP.shape[0])
    recall = TP.shape[0]/(TP.shape[0]+FN.shape[0])

    if precision + recall != 0:
        f1_score = (2 * precision * recall) / (precision + recall)

    return (precision, recall, f1_score), (TP, FN, FP), dados

def iter_detection_score(canopy_e):
    slices = {}
    
    for i in np.arange(0.90, 1, 0.01):
        df = canopy_e.loc[canopy_e['detection_score'] >= i].copy()
        
        slices[round(i, 2)] = df
    
    return slices

def get_threshold_gaph(canopy_r, canopy_e):
    
    slices = iter_detection_score(canopy_e)
    
    
    metrics_dic = {}
    
    for k, v in slices.items():
        
        metrics, _, _ = get_metrics(canopy_r, v)
        
        metrics_dic[k] = metrics
        
    return metrics_dic

In [None]:
def calculate_mAP(over, gpd_canopy_r, gpd_canopy_e):
    
    precision_list = []
    for i in np.arange(0.5, 0.96, 0.05):
        
        TP = get_TruePositive(over, i)
        FP = get_FalsePositive(gpd_canopy_e, TP)
        
        precision = TP.shape[0]/(TP.shape[0]+FP.shape[0])
        
        precision_list.append(precision)
        
    return np.mean(precision_list)

In [None]:
sector = 'dem'
shape_path_aux = '../datasets/segmentacao_todos/results_{}/'.format(sector)

result_paths = []
for filename in os.listdir(shape_path_aux):
    if os.path.splitext(filename)[1].lower() == '.geojson':
        result_paths.append(os.path.join(shape_path_aux, filename))

pols = pd.concat([gpd.read_file(i) for i in result_paths], axis=0).reset_index(drop=True)
pols = pols[pols.geometry.is_valid]
pols['geometry'] = pols.buffer(0.1).buffer(-0.1).simplify(0.03)
pols = pols.explode('geometry').reset_index(drop=True)
pols.set_index(keys=pd.Index(range(1, pols.shape[0] + 1)), inplace=True)
pols['id'] = pols.index

pols.to_file('../datasets/segmentacao_todos/merged_{}.geojson'.format(sector), driver='GeoJSON')

In [None]:
sector = 'dem'

P = hlb.Processing()
canopy_e, pols_filtred = P.join_vectors('../datasets/segmentacao_todos/results_{}'.format(sector))

canopy_e.to_file('../datasets/segmentacao_todos/canopy_{}_raw.geojson'.format(sector), driver='GeoJSON')
pols_filtred.to_file('../datasets/segmentacao_todos/canopy_{}_filtred.geojson'.format(sector), driver='GeoJSON')

## Mask RCNN + Patch_merge

In [3]:
canopy_r = gpd.read_file('D:/FelipeSa/OneDrive/Felipe_INPE/Segmentacao/Analise_final/train_canopy_all_cut_final.geojson')
canopy_r['id'] = canopy_r.index

### RGB

In [4]:
canopy_e = gpd.read_file('..\datasets\segmentacao_todos\canopies_rgb_filtered.geojson')
canopy_e['id'] = canopy_e.index

In [5]:
# df = canopy_e.loc[canopy_e['detection_score'] >= 0.99].copy()
df = canopy_e.copy()

metrics, dfs, _ = get_metrics(canopy_r, df)

(TP, FN, FP) = dfs

#FN.to_file('..\datasets\segmentacao_todos\FN_rgb.geojson', driver='GeoJSON')
#FP.to_file('..\datasets\segmentacao_todos\FP_rgb.geojson', driver='GeoJSON')

Identificando pares: 100%|█████████████████████████████████████████████████████████| 2026/2026 [02:03<00:00, 16.41it/s]


In [None]:
FN.to_file('..\datasets\segmentacao_todos\FN_rgb2.geojson', driver='GeoJSON')
FP.to_file('..\datasets\segmentacao_todos\FP_rgb2.geojson', driver='GeoJSON')

In [6]:
metrics

(0.9861042183622829, 0.9807502467917077, 0.9834199455580303)

In [None]:
metrics

In [None]:
data_rgb = get_threshold_gaph(canopy_r, canopy_e)
data_rgb

In [None]:
plt.plot(data_rgb.keys(), data_rgb.values())

### DEM

In [8]:
canopy_e = gpd.read_file('..\datasets\segmentacao_todos\canopies_dem_filtered.geojson')
canopy_e['id'] = canopy_e.index

In [9]:
#df = canopy_e.loc[canopy_e['detection_score'] >= 0.99].copy()
df = canopy_e.copy()

metrics, dfs, _ = get_metrics(canopy_r, df)

(TP, FN, FP) = dfs

FN.to_file('..\datasets\segmentacao_todos\FN_dem.geojson', driver='GeoJSON')
FP.to_file('..\datasets\segmentacao_todos\FP_dem.geojson', driver='GeoJSON')

Identificando pares: 100%|█████████████████████████████████████████████████████████| 2026/2026 [02:02<00:00, 16.55it/s]


In [10]:
metrics

(0.9889280322093609, 0.9698914116485686, 0.9793172190381261)

In [None]:
data_dem = get_threshold_gaph(canopy_r, canopy_e)
data_dem

In [None]:
plt.plot(data_dem.keys(), data_dem.values())

In [None]:
def get_TruePositive(gpd_intersection, IoU_threshold = 0.5):
    TP = intersec.loc[intersec['IoU']>IoU_threshold]
    
    return TP




In [None]:
calculate_mAP(intersec, canopy_r, canopy_e)

In [None]:
## Artigo: Extraction of information about individual trees from high-spatial-resolution uav-acquired images of an orchard
## Métrica: Classes de resultado

over = get_intersection(canopy_r, canopy_e)
match = calculate_matches(over)

canopy_e['result'] = ''
canopy_e.at[match['id_2'].values, 'result'] = 'match'

# canopy_e.to_file('../datasets/segmentacao_todos/validacao_over-match.geojson', driver='GeoJSON')

In [None]:
## Artigo: Mask R-CNN refitting strategy for plant counting and sizing in uav imagery
## Metrica: mean Average Precision (mAP)

over = get_intersection(canopy_r, canopy_e)
TP = get_TruePositive(over)
TP = calculate_IoU(TP)
TP

In [None]:
canopy_e = gpd.read_file('..\datasets\segmentacao_todos\canopy_detection_result_rgb_raw.geojson')
canopy_e['id'] = canopy_e.index

over = get_intersection(canopy_r, canopy_e)
TP = get_TruePositive(over)
TP = calculate_IoU(TP)
FN = get_FalseNegative(canopy_r, TP)
FP = get_FalsePositive(canopy_e, TP)

precision = TP.shape[0]/(TP.shape[0]+FP.shape[0])
recall = TP.shape[0]/(TP.shape[0]+FN.shape[0])

if precision + recall != 0:
    f1_score = (2 * precision * recall) / (precision + recall)
    
precision, recall, f1_score