# Temporo spatial join

Method : 
1. Temporo Spatial Join : for each year - Spatial join between Senf & Seidl and the others datasets 
3. Group creation : groupby on the Senf & Seidl index. (One group per index)
2. Group work : 
    - Computing weights for each row
    - Computing score per disturbance type
    - Save group to a dict with the index as key
    - Create row with year, class, score, tree_type, essence  

In [1]:
# Loading 
import geopandas as gpd
epsg = 'epsg:2154'
senfseidl = gpd.read_parquet('../data/processed_datasets/SenfSeidl_joined_EPSG4326_FR.parquet').to_crs(epsg)
nfi = gpd.read_parquet('../data/processed_datasets/NFI_2003-2021_EPSG4326_FR.parquet').to_crs(epsg)
hm = gpd.read_parquet('../data/processed_datasets/health-monitoring_2007-2023_EPSG4326_FR.parquet').to_crs(epsg)
dfde = gpd.read_parquet('../data/processed_datasets/DFDE_1984_2021_EPSG4326_FR.parquet').to_crs(epsg)

# Utils

In [None]:
import pandas as pd

#compute weights
def spatial_weight(x) -> float:
    if x <= 1:
        return 1
    else: 
        return 1 - (x-1)/9 
    
def temporal_weight(x) -> float:
    if x <= 3:
        return 1 - x/12
    else: 
        return 0.75 * (1 - (x-3)/3)

from thefuzz import fuzz

def compute_tree_coherence(row_tt, row_e, ref_tt, ref_e) -> float:

    for essence in ref_e.split(','):
        if fuzz.token_set_ratio(row_e.lower(), essence.lower()) > 80:
            return 1
        
    if row_tt.lower() == ref_tt.lower():
        return 0.75 
    
    if row_tt.lower() == 'mixed' or ref_tt.lower() == 'mixed':
        return 0.5

    return 0.25 

#compute proba per class
dict_isin = {
    'Fire': ['Fire'],
    'Storm': ['Storm', 'Storm,Biotic'],
    'Drought': ['Drought'],
    'Biotic-dieback': ['Biotic-dieback', 'Biotic', 'Storm,Biotic', 'Other'],
    'Biotic-mortality': ['Biotic-mortality', 'Biotic', 'Storm,Biotic', 'Other'],
    'Tree-logging': ['Tree-logging', 'Other'],
    'Other': ['Other']
}

def compute_proba_per_class(gdf):
    dclasses = {}
    present_classes = [ k for k in gdf['class'].unique() if k != 'Other']
    if present_classes == []:
        return {'Other': gdf['p'].mean()}

    for c in [k for k,v in dict_isin.items() if set(v).intersection(set(present_classes))]:
        cond = gdf['class'].isin(dict_isin[c])
        for_ = gdf[cond]['p'].sum()
        against_ = gdf[~cond]['p'].sum()
        dclasses[c] = (for_ - against_) / len(gdf)

    return dclasses 

def compute_class_p_spread(d):
    #sort dict by value descending 
    d = {k: v for k, v in sorted(d.items(), key=lambda item: item[1], reverse=True)}
    keys = list(d.keys()) 
    values = list(d.values())
    spread = values[0] - values[1] if len(values) > 1 else np.nan 

    return keys[0], values[0], spread

def wrapper_class_proba_spread(group):
    dclasses = compute_proba_per_class(group)
    max_key, max_value, spread = compute_class_p_spread(dclasses)
    #return dataframe with index
    return pd.DataFrame({'class': max_key, 'p': max_value, 'spread': spread}, index=group.index[[0]]) 

import numpy as np

def compute_weight_on_merge(row):
    # spatial distance, spatial weight, temporal distance, temporal weight, tree correspondance weight, overall accuracy
    if row['dataset'] == 'senfseidl':
        return 0, 1, 0, 1, 1, 0.88, 0.88 
    elif row['dataset'] == 'dfde':
        sd = (row['area'] / 1e6)** (1/2) / 35
        oa = 0.95
    elif row['dataset'] in ['hm', 'nfi']:
        sd = row['sd'] / 1e3
        oa = 0.9

    if row['dataset'] in ['dfde', 'nfi']:
        td = min(abs(row['year_y'] - row['start_date'].year), abs(row['end_date'].year - row['year_y']))
    elif row['dataset'] == 'hm':
        td = min(abs(row['year_y'] - row['year']), abs(row['year'] - row['year_y']))
    
    tc = compute_tree_coherence(row['tree_type'], row['essence'], row['tree_type_y'], row['essence_y'])
    sw = spatial_weight(sd)
    tw = temporal_weight(td)

    return sd, sw, td, tw, tc, oa, np.mean([sw,tw,tc]) * oa

def get_prob(df):
    return df.apply(compute_weight_on_merge, axis=1, result_type='expand')


# Preprocessing 

Performing geometric computation here so we can entirely rely on Dask.DataFrame later for big data processing. 


In [2]:
#Senf & Seidl
senfseidl.year = senfseidl.year.astype(int)

number_to_class = {
    1:'Storm,Biotic', 
    2:'Fire',
    3:'Other'
}

senfseidl['class'] = senfseidl['cause'].map(number_to_class)
senf_seidl_col = ['year', 'geometry', 'class', 'tree_type', 'essence']
senfseidl = senfseidl[senf_seidl_col]
senfseidl.drop_duplicates(inplace=True)
senfseidl.dropna(inplace=True)

In [3]:
# DFDE

#class
dict_class = {
    'Fire': ['Fire'],
    'Storm': ['Wind'],
    'Drought': ['Summer drought', 'Frost'],
    'Biotic': [
        'Ips typographus', 'Pissodes spp.', 'Bark beetles', 'Bombix desparate',
        'Zeiraphera diniana', 'Biotic', 'Insects', 'Other insects', 'Biotic;Abiotic',
        'Pityogenes chalcographus', 'Tetropium luridum;Tetropium fuscum',
        'Ips acuminatus', 'Tomicus piniperda;Tomicus minor',
        'Phaenops cyanea', 'Pissodes pini', 'Ips cembrae',
        'Tetropium gabrieli', 'Agrilus biguttatus', 'Agrilus viridis',
        'Xyloterus lineatus', 'Erannis defoliaria',
        'Operophtera brumata;Operophtera fagata', 'Lymantria dispar',
        'Thaumetopoea processionea', 'Hylobius abietis',
        'Melolontha hippocastani;Melolontha melolontha',
        'Microtus agrestis;Microtus arvalis;Clethrionomys glareolus',
        'Arvicola terrestris', 'Lophodermium seditiosum',
        'Sphaeropsis sapinea', 'Heterobasidion annosum',
        'Armillaria mellea', 'Chalara fraxinea;Hymenoscyphus fraxineus',
        'Beech decline', 'Oak decline', 'Viscum album', 'Ips sexdentatus'
    ],
    'Tree-logging': [],
    'Other': ['Accident']
}

def get_class(x):
    for key, values in dict_class.items():
        if x in values:
            return key
    return 'Other'

dfde['class'] = dfde['cause'].apply(get_class)

#geometry
dname_geom = {k:v for k,v in zip(dfde['name'].tolist(), dfde['geometry'].tolist())}
dname_geom = {k:v.buffer(5000).simplify(5000) for k,v in dname_geom.items()}
dname_area = {k:v.area for k,v in dname_geom.items()}

#drop duplicates 
dfde.drop_duplicates(subset=['name', 'start_date', 'end_date', 'essence', 'cause', 'notes'], inplace=True)
dfde['geometry'] = dfde['name'].apply(lambda x: dname_geom[x]) 

#compute area here !
dfde['area'] = dfde['name'].apply(lambda x: dname_area[x])

#clean date
import pandas as pd
dfde['start_date'] = pd.to_datetime(dfde['start_date'])
dfde['end_date'] = pd.to_datetime(dfde['end_date'])

#keep_col
dfde_col = ['start_date', 'end_date', 'geometry', 'class', 'tree_type', 'essence', 'cause', 'notes', 'area']
dfde = dfde[dfde_col]

dfde.dropna(inplace=True)
dfde.drop_duplicates(inplace=True)


In [4]:
#nfi 

#filtering
nfi = nfi[ (nfi['probability'] >= 0.1) ]
nfi = nfi[ ~((nfi['class'] == 'Tree-logging')&(nfi['intensity']==0)) ]

#correct start_date
from datetime import timedelta
import pandas as pd
def get_start_date(row):
    if not pd.isnull(row['start_date']):
        return row['start_date']
    else:
        return row['end_date'] - timedelta(days=5*365.25)
    

nfi['start_date'] = nfi.apply(get_start_date, axis=1)

#keep col 
nfi_col = ['start_date', 'end_date', 'geometry', 'class', 'tree_type', 'essence']
nfi = nfi[nfi_col]

nfi.dropna(inplace=True)
nfi.drop_duplicates(inplace=True)

In [5]:
#hm
def get_class(x):
    if x  == 'biotic-factor':
        return 'Biotic'
    else :
        return 'Other'
    
hm['class'] = hm['class'].apply(get_class)
hm['year'] = hm['year'].astype(int)
hm.drop_duplicates(inplace=True)
hm.dropna(inplace=True)

hm.rename(columns={'LIB_Problème principal':'cause', 'Remarques':'notes'}, inplace=True)

# RD Joining

Joining ~ 10s / year

In [6]:
#full Dask -> 20s 
# with sjoin_nearest and sd computation -> 8s  
import dask_geopandas as dgpd
import dask.dataframe as dd

years = senfseidl['year'].unique()

temporal_buffer = 5 #years 
spatial_buffer = 5000 #meters
# nfi.geometry = nfi.geometry.buffer(spatial_buffer)
# hm.geometry = hm.geometry.buffer(spatial_buffer)

nfi['dataset'] = 'nfi'
hm['dataset'] = 'hm'
dfde['dataset'] = 'dfde'
senfseidl['dataset'] = 'senfseidl'

year = 2010

senfseidl_year = senfseidl[senfseidl['year'] == year]
nfi_year = nfi[(nfi['start_date'].dt.year >= year - temporal_buffer) & (nfi['end_date'].dt.year <= year + temporal_buffer)]
hm_year = hm[(hm['year'] >= year - temporal_buffer) & (hm['year'] <= year + temporal_buffer)]
dfde_year = dfde[(dfde['start_date'].dt.year >= year - temporal_buffer) & (dfde['end_date'].dt.year <= year + temporal_buffer)]

senfseidl_year_ = dgpd.from_geopandas(senfseidl_year, npartitions=10)
# nfi_year = dgpd.from_geopandas(nfi_year, npartitions=10)
# hm_year = dgpd.from_geopandas(hm_year, npartitions=10)
dfde_year_ = dgpd.from_geopandas(dfde_year, npartitions=10)

# senfseidl_nfi_year = nfi_year.sjoin(senfseidl_year)
# senfseidl_hm_year = hm_year.sjoin(senfseidl_year)
senfseidl_hm_year = hm_year.sjoin_nearest(senfseidl_year, max_distance=spatial_buffer, distance_col='sd')
senfseidl_nfi_year = nfi_year.sjoin_nearest(senfseidl_year, max_distance=spatial_buffer, distance_col='sd')
senfseidl_dfde_year = dfde_year_.sjoin(senfseidl_year_)

#concat with dask_geopandas
import dask.dataframe as dd
concatenation = dd.concat([senfseidl_nfi_year, senfseidl_hm_year, senfseidl_dfde_year], axis=0).compute()

In [7]:
#entire dataset -> 2.3s
print(concatenation.shape)

col = ['start_date', 'end_date', 'geometry', 'year_left', 'class_left', 'tree_type_left', 'essence_left', 'dataset_left', 'cause', 'notes', 'area', 'sd']
all_index_right = concatenation['index_right'].unique()
concatenation = concatenation[['index_right']+col]
rename = {c: c.split('_left')[0] for c in col}
concatenation = concatenation.rename(columns=rename)

senfseidl_year['index_right'] = senfseidl_year.index

#l'order of senfseidl_year and co is important. If we want to ise iloc[0] on the group to retrieve senfseidl row, we have to stick to this order.
concatenation = dd.concat([senfseidl_year.loc[all_index_right], concatenation], axis=0).compute()

(354418, 19)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  super().__setitem__(key, value)


In [8]:
concatenation

Unnamed: 0,year,geometry,class,tree_type,essence,dataset,index_right,start_date,end_date,cause,notes,area,sd
2424927,2010.0,"POLYGON ((318824.872 6257012.037, 318794.980 6...",Other,Mixed,larch,senfseidl,2424927,NaT,NaT,,,,
2424928,2010.0,"POLYGON ((318846.967 6257104.615, 318876.859 6...",Other,Mixed,larch,senfseidl,2424928,NaT,NaT,,,,
2424949,2010.0,"POLYGON ((320572.848 6259442.594, 320483.176 6...",Other,Mixed,"mixed,laricio pine, black pine",senfseidl,2424949,NaT,NaT,,,,
2424950,2010.0,"POLYGON ((320353.214 6259541.838, 320383.105 6...",Other,Mixed,"deciduous oaks,laricio pine, black pine",senfseidl,2424950,NaT,NaT,,,,
2424953,2010.0,"POLYGON ((315790.192 6261751.147, 315849.974 6...",Other,Broadleaf,broadleaf,senfseidl,2424953,NaT,NaT,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
64876,,POINT (777643.312 6415230.264),Biotic-dieback,Broadleaf,European Ash,nfi,2531745,2010-12-27 18:00:00,2015-12-28,,,,2032.403655
64886,,POINT (553066.658 6200340.091),Storm,Broadleaf,Pedunculate Oak,nfi,2503011,2010-12-27 18:00:00,2015-12-28,,,,1650.712193
64890,,POINT (193749.856 6855715.893),Tree-logging,Broadleaf,Pedunculate Oak,nfi,2426031,2010-12-27 18:00:00,2015-12-28,,,,4609.385803
64891,,POINT (834333.883 6736479.550),Biotic-dieback,Broadleaf,Pedunculate Oak,nfi,2542883,2010-12-27 18:00:00,2015-12-28,,,,1852.003375


## Methode 1 : Concat -> Groupby -> Apply

- Version normal : ok 
- Version vectorisée : 2x plus lent
- Version parallelisée avec Joblib : 2x plus lent
- Version avec Dask : ne fonctionne pas

25min / year 

40 year ~ 17h

In [None]:
def compute_weight(row, reference):
    # spatial distance, spatial weight, temporal distance, temporal weight, tree correspondance weight, overall accuracy
    if row['dataset'] == 'senfseidl':
        return 0, 1, 0, 1, 1, 0.88, 0.88 
    elif row['dataset'] == 'dfde':
        sd = (row['area'] / 1e6)** (1/2) / 35
        oa = 0.95
    elif row['dataset'] in ['hm', 'nfi']:
        sd = row['sd'] / 1e3
        oa = 0.9

    if row['dataset'] in ['dfde', 'nfi']:
        td = min(abs(reference['year'] - row['start_date'].year), abs(row['end_date'].year - reference['year']))
    elif row['dataset'] == 'hm':
        td = min(abs(reference['year'] - row['year']), abs(row['year'] - reference['year']))
    
    tc = compute_tree_coherence(row['tree_type'], row['essence'], reference['tree_type'], reference['essence'])
    sw = spatial_weight(sd)
    tw = temporal_weight(td)

    return sd, sw, td, tw, tc, oa, sw * tw * tc * oa

def wrappper_weight_group(group):
    reference = group.iloc[0]
    group[['sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']] = group.apply(lambda x: compute_weight(x,reference), axis=1, result_type='expand')
    return group

In [18]:
#simplifying code : using concat and groupby for appending the reference (senfseidl) row. 

#entire dataset -> 2.3s
co = concatenation
print(co.shape)

col = ['start_date', 'end_date', 'geometry', 'year_left', 'class_left', 'tree_type_left', 'essence_left', 'dataset_left', 'cause', 'notes', 'area', 'sd']
all_index_right = co['index_right'].unique()
co = co[['index_right']+col]
rename = {c: c.split('_left')[0] for c in col}
co = co.rename(columns=rename)

senfseidl_year['index_right'] = senfseidl_year.index

#l'order of senfseidl_year and co is important. If we want to ise iloc[0] on the group to retrieve senfseidl row, we have to stick to this order.
groups = dd.concat([senfseidl_year.loc[all_index_right], co], axis=0).compute().groupby(by='index_right')
len(groups)

(354418, 19)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  super().__setitem__(key, value)
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


92236

La version vectorisée n'est pas plus rapide car les groupes sont petits. De 2 à 50 lignes. 

In [14]:
def wrapper(group):
    reference = group.iloc[0]
    group[['sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']] = group.apply(lambda x: compute_weight(x, reference), axis=1, result_type='expand')
    return group

In [14]:
#130 000 -> 22min
disturbances = groups.apply(wrapper)

In [169]:
#1000 -> 14.5s 
#10000 -> 2min 14s
from itertools import islice
for name, group in islice(groups, 1000):
    vectorized_compute_weight(group)


In [170]:
#1000 -> 6.6s 
#10000 -> 1mins 
#130 000 -> 13min
from itertools import islice
for name, group in islice(groups, 1000):
    group[['sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']] = group.apply(lambda x: compute_weight(x, group.iloc[0]), axis=1, result_type='expand')

In [11]:
from dask.distributed import Client

client = Client() 

print(client.dashboard_link) 

http://127.0.0.1:8787/status


In [27]:
dtypes = {'year': 'float64',
 'geometry': 'geometry',
 'class': 'object',
 'tree_type': 'object',
 'essence': 'object',
 'dataset': 'object',
 'index_right': 'int64',
 'start_date': 'datetime64[ns]',
 'end_date': 'datetime64[ns]',
 'cause': 'string',
 'notes': 'string',
 'area': 'float64',
 'sd': 'float64',
 'sw': 'float64',
 'td': 'float64',
 'tw': 'float64',
 'tc': 'float64',
 'oa': 'float64',
 'p': 'float64'}

meta = pd.DataFrame(columns=dtypes.keys()).astype(dtypes)
co_pd = pd.DataFrame(co)
senfseidl_year_pd = pd.DataFrame(senfseidl_year.loc[all_index_right])
from dask.diagnostics import ProgressBar
with ProgressBar():
    gr = dd.concat([senfseidl_year_pd, co_pd], axis=0).groupby(by='index_right').apply(wrapper, meta=meta).compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.
Key:       ('shuffle-transfer-a1a63667780cb7392e35539ec39715c9', 0)
Function:  shuffle_transfer
args:      (           year  ... _partitions
2424927  2010.0  ...           1
2424928  2010.0  ...           0
2424949  2010.0  ...           1
2424950  2010.0  ...           1
2424953  2010.0  ...           1
...         ...  ...         ...
5470026  2010.0  ...           1
5470029  2010.0  ...           1
5470077  2010.0  ...           0
5470086  2010.0  ...           0
5470094  2010.0  ...           1

[92236 rows x 14 columns], 'a1a63667780cb7392e35539ec39715c9', 0, 2, '_partitions', Empty DataFrame
Columns: [year, geometry, class, tree_type, essence, dataset, index_right, start_date, end_date, cause, notes, area, sd, _partitions]
Index: [], {0, 1})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle a1a63667780cb7392e35539ec39715c9')"



RuntimeError: shuffle_transfer failed during shuffle a1a63667780cb7392e35539ec39715c9

## Méthode 2 : Merge -> Map partition

30s / year

40 year ~ 20min

In [16]:
merge = pd.merge(concatenation, senfseidl_year, left_on='index_right', right_index=True)
drop = ['geometry_y', 'class_y', 'dataset_y', 'index_right_y', 'index_right_x']
merge = merge.drop(columns=drop)
rename = {c: c.split('_x')[0] for c in merge.columns}
merge = merge.rename(columns=rename).reset_index()

In [18]:
#~ 30s 

from dask.distributed import Client

client = Client() 
print(client.dashboard_link) 
dd_merge = dd.from_pandas(merge.reset_index(), npartitions=10)

from dask.diagnostics import ProgressBar
import traceback
meta = {'sd': 'float64',
        'sw': 'float64',
        'td': 'float64',
        'tw': 'float64',
        'tc': 'float64',
        'oa': 'float64',
        'p': 'float64'}

def get_prob(df):
    return df.apply(compute_weight_on_merge, axis=1, result_type='expand')
    
# try: 
with ProgressBar():
    results = dd_merge.map_partitions(get_prob, meta=meta, enforce_metadata=False).compute()
#     raise Exception("ERROR HERE") # Some code that caused the exception/error
# except:
#     traceback.print_exc()

results.columns = ['sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']
client.close()

http://127.0.0.1:8787/status


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [45]:
merge = merge.drop(columns=['sd'])
disturbances = pd.concat([merge, results], axis=1)
disturbances.rename(columns={'index_right': 'index_ref'}, inplace=True)
disturbances.drop(columns=['year_y', 'tree_type_y', 'essence_y'], inplace=True)

In [46]:
disturbances

Unnamed: 0,index,index_ref,year,geometry,class,tree_type,essence,dataset,start_date,end_date,cause,notes,area,sd,sw,td,tw,tc,oa,p
0,2424927,2424927,2010.0,"POLYGON ((318824.872 6257012.037, 318794.980 6...",Other,Mixed,larch,senfseidl,NaT,NaT,,,,0.000000,1.000000,0.0,1.000000,1.00,0.88,0.880000
1,716,2424927,,"POLYGON ((306824.713 6263439.296, 324087.178 6...",Storm,mixed,broadleaves and conifers (conifers: 51 to 89%),dfde,2009-01-24,2009-01-24,Wind,Klaus storm - area: area affected by the storm...,9.504405e+10,8.808347,0.132406,1.0,0.916667,0.75,0.95,0.569706
2,717,2424927,,"POLYGON ((306824.713 6263439.296, 324087.178 6...",Storm,broadleaf,broadleaves,dfde,2009-01-24,2009-01-24,Wind,Klaus storm,9.504405e+10,8.808347,0.132406,1.0,0.916667,0.50,0.95,0.490540
3,718,2424927,,"POLYGON ((306824.713 6263439.296, 324087.178 6...",Storm,conifer,conifers,dfde,2009-01-24,2009-01-24,Wind,Klaus storm,9.504405e+10,8.808347,0.132406,1.0,0.916667,0.50,0.95,0.490540
4,719,2424927,,"POLYGON ((306824.713 6263439.296, 324087.178 6...",Storm,conifer,maritime pine,dfde,2009-01-24,2009-01-24,Wind,Klaus storm,9.504405e+10,8.808347,0.132406,1.0,0.916667,0.50,0.95,0.490540
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
446649,5470086,5470086,2010.0,"POLYGON ((1215037.117 6116545.426, 1214947.610...",Fire,Conifer,"laricio pine, black pine",senfseidl,NaT,NaT,,,,0.000000,1.000000,0.0,1.000000,1.00,0.88,0.880000
446650,23359,5470086,2013.0,POINT (1215621.207 6116339.642),Biotic,conifer,Pines,hm,NaT,NaT,Erodé,Parquet en régénération - PLC au stade gaulis ...,,0.619281,1.000000,3.0,0.750000,0.75,0.90,0.750000
446651,5470094,5470094,2010.0,"POLYGON ((1210124.853 6138064.278, 1210154.685...",Fire,Mixed,"mixed,maritime pine",senfseidl,NaT,NaT,,,,0.000000,1.000000,0.0,1.000000,1.00,0.88,0.880000
446652,17383,5470094,2012.0,POINT (1209622.093 6139189.644),Biotic,conifer,Pines,hm,NaT,NaT,Cochenille du pin maritime,Foyer actif de Matsucoccus Feytaudii avec fort...,,1.091095,0.989878,2.0,0.833333,0.50,0.90,0.696964


In [47]:
groups = disturbances.groupby(by='index_ref')

In [9]:
def compute_proba_per_class(gdf):
    dclasses = {}
    present_classes = [ k for k in gdf['class'].unique() if k != 'Other']
    if present_classes == []:
        return {'Other': gdf['p'].mean()}

    for c in [k for k,v in dict_isin.items() if set(v).intersection(set(present_classes))]:
        cond = gdf['class'].isin(dict_isin[c])
        for_ = gdf[cond]['p'].sum()
        against_ = gdf[~cond]['p'].sum()
        dclasses[c] = (for_ - against_) / len(gdf)

    return dclasses 

def compute_class_p_spread(d):
    #sort dict by value descending 
    d = {k: v for k, v in sorted(d.items(), key=lambda item: item[1], reverse=True)}

    keys = list(d.keys()) 
    values = list(d.values())

    spread = values[0] - values[1] if len(values) > 1 else np.nan 

    return keys[0], values[0], spread

def wrapper_class_proba_spread(group):
    dclasses = compute_proba_per_class(group)
    max_key, max_value, spread = compute_class_p_spread(dclasses)
    #return dataframe with index
    return pd.DataFrame({'class': max_key, 'p': max_value, 'spread': spread}, index=group.index[[0]]) 


In [92]:
from itertools import islice
for name, group in islice(groups, 100):
    wrapper_class_proba_spread(group)

In [87]:
group

Unnamed: 0,index,index_ref,year,geometry,class,tree_type,essence,dataset,start_date,end_date,cause,notes,area,sd,sw,td,tw,tc,oa,p
186,2425266,2425266,2010.0,"POLYGON ((216638.494 6792249.478, 216578.666 6...",Other,Broadleaf,broadleaf,senfseidl,NaT,NaT,,,,0.0,1.0,0.0,1.0,1.0,0.88,0.88
187,11922,2425266,2010.0,POINT (216764.134 6791334.000),Other,broadleaf,Oaks,hm,NaT,NaT,Dégât dû au gel,"Dégats important sur l',ensemble des quadrats ...",,0.924059,1.0,0.0,1.0,0.75,0.9,0.825


In [137]:
wrapper_class_proba_spread(group)

Unnamed: 0,class,p,spread
268,Biotic-dieback,0.805748,0.0


In [89]:
a = compute_proba_per_class(group)
a

{'Other': 0.8525}

In [91]:
compute_class_p_spread(a)

('Other', 0.8525, nan)

In [138]:
#92000 -> >10min (pd)
#92000 -> 6min (dd groupby apply)
#92000 -> 4min (dd map_partitions) but issue on the index

client = Client() 
print(client.dashboard_link) 


r_ = dd.from_pandas(disturbances[['index_ref', 'class', 'sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']], npartitions=10)

dtypes = {
    'class': 'string',
    'p': 'float64',
    'spread': 'float64'
}

# with ProgressBar():
#     r = r_.groupby(by='index_ref').apply(wrapper_class_proba_spread, meta=meta).compute()
# client.close()

r = r_.map_partitions(lambda df: df.groupby('index_ref',as_index=True).apply(wrapper_class_proba_spread), meta=meta).compute()
client.close()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 52831 instead


http://127.0.0.1:52831/status


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [124]:
len(disturbances.groupby(by='index_ref').groups)

92236

In [139]:
r

Unnamed: 0_level_0,Unnamed: 1_level_0,class,p,spread
index_ref,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2424927,0,Storm,0.083111,0.166223
2424928,6,Storm,0.097208,0.194416
2424949,12,Storm,0.082907,0.165813
2424950,18,Storm,0.232265,
2424953,23,Storm,0.200598,
...,...,...,...,...
5470026,446643,Fire,0.143339,0.286678
5470029,446645,Fire,0.081608,0.163217
5470077,446647,Fire,0.065000,
5470086,446649,Fire,0.065000,0.130000


In [144]:
r.reset_index().drop_duplicates(subset=['index_ref'])

Unnamed: 0,index_ref,level_1,class,p,spread
0,2424927,0,Storm,0.083111,0.166223
1,2424928,6,Storm,0.097208,0.194416
2,2424949,12,Storm,0.082907,0.165813
3,2424950,18,Storm,0.232265,
4,2424953,23,Storm,0.200598,
...,...,...,...,...,...
92239,5470026,446643,Fire,0.143339,0.286678
92240,5470029,446645,Fire,0.081608,0.163217
92241,5470077,446647,Fire,0.065000,
92242,5470086,446649,Fire,0.065000,0.130000


# Test for one year

In [13]:
#remove all warnings
import warnings
warnings.filterwarnings('ignore')

#full Dask -> 20s 
# with sjoin_nearest and sd computation -> 8s  
import dask_geopandas as dgpd
import dask.dataframe as dd
import pandas as pd

years = senfseidl['year'].unique()

temporal_buffer = 5 #years 
spatial_buffer = 5000 #meters

nfi['dataset'] = 'nfi'
hm['dataset'] = 'hm'
dfde['dataset'] = 'dfde'
senfseidl['dataset'] = 'senfseidl'

year = 2010

# SPATIAL AND TEMPORAL JOIN
senfseidl_year = senfseidl[senfseidl['year'] == year]
nfi_year = nfi[(nfi['start_date'].dt.year >= year - temporal_buffer) & (nfi['end_date'].dt.year <= year + temporal_buffer)]
hm_year = hm[(hm['year'] >= year - temporal_buffer) & (hm['year'] <= year + temporal_buffer)]
dfde_year = dfde[(dfde['start_date'].dt.year >= year - temporal_buffer) & (dfde['end_date'].dt.year <= year + temporal_buffer)]

senfseidl_year_ = dgpd.from_geopandas(senfseidl_year, npartitions=10)
dfde_year_ = dgpd.from_geopandas(dfde_year, npartitions=10)

senfseidl_hm_year = hm_year.sjoin_nearest(senfseidl_year, max_distance=spatial_buffer, distance_col='sd')
senfseidl_nfi_year = nfi_year.sjoin_nearest(senfseidl_year, max_distance=spatial_buffer, distance_col='sd')
senfseidl_dfde_year = dfde_year_.sjoin(senfseidl_year_)

#concat with dask_geopandas
concatenation = dd.concat([senfseidl_nfi_year, senfseidl_hm_year, senfseidl_dfde_year], axis=0).compute()

print('Spatial and temporal join done.')
#CONCATENATION WITH REF
#entire dataset -> 2.3s
col = ['start_date', 'end_date', 'geometry', 'year_left', 'class_left', 'tree_type_left', 'essence_left', 'dataset_left', 'cause', 'notes', 'area', 'sd']
all_index_right = concatenation['index_right'].unique()
concatenation = concatenation[['index_right']+col]
rename = {c: c.split('_left')[0] for c in col}
concatenation = concatenation.rename(columns=rename)

senfseidl_year['index_right'] = senfseidl_year.index

#l'order of senfseidl_year and co is important. If we want to ise iloc[0] on the group to retrieve senfseidl row, we have to stick to this order.
concatenation = dd.concat([senfseidl_year.loc[all_index_right], concatenation], axis=0).compute()

print('Concatenation with reference done.')
#MERGING
merge = pd.merge(concatenation, senfseidl_year, left_on='index_right', right_index=True)
drop = ['geometry_y', 'class_y', 'dataset_y', 'index_right_y', 'index_right_x']
merge = merge.drop(columns=drop)
rename = {c: c.split('_x')[0] for c in merge.columns}
merge = merge.rename(columns=rename).reset_index()

print('Merging done.')

#COMPUTING WEIGHT
dd_merge = dd.from_pandas(merge.reset_index(), npartitions=10)
meta = {'sd': 'float64',
        'sw': 'float64',
        'td': 'float64',
        'tw': 'float64',
        'tc': 'float64',
        'oa': 'float64',
        'p': 'float64'}

def get_prob(df):
    return df.apply(compute_weight_on_merge, axis=1, result_type='expand')
    

results = dd_merge.map_partitions(get_prob, meta=meta, enforce_metadata=False).compute()


results.columns = ['sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']


#concatenation with weights
merge = merge.drop(columns=['sd'])
disturbances = pd.concat([merge, results], axis=1)
disturbances.rename(columns={'index_right': 'index_ref'}, inplace=True)
disturbances.drop(columns=['year_y', 'tree_type_y', 'essence_y'], inplace=True)

print('Computing weights done.')

#GROUPBY INDEX_REF and Classification 
r_ = dd.from_pandas(disturbances[['index_ref', 'class', 'sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']], npartitions=10)

dtypes = {
    'class': 'string',
    'p': 'float64',
    'spread': 'float64'
}

meta = pd.DataFrame(columns=dtypes.keys()).astype(dtypes)
r = r_.map_partitions(lambda df: df.groupby('index_ref',as_index=True).apply(wrapper_class_proba_spread), meta=meta).compute()
r = r.reset_index().drop_duplicates(subset=['index_ref'])
print('Groupby index_ref and classification done.')

#ATTRIBUTION CONSTRUCTION
col_left = ['year', 'geometry', 'tree_type', 'essence']
left = senfseidl_year.loc[all_index_right].sort_index()[col_left]
col_right = ['index_ref', 'class', 'p', 'spread']
right = r.sort_values(by='index_ref')[col_right]
attribution = pd.merge(left, right, left_index=True, right_on='index_ref')

print('Attribution construction done.')



Spatial and temporal join done.
Concatenation with reference done.
Merging done.
Computing weights done.
Groupby index_ref and classification done.


# RUN

In [31]:
import os
import dask_geopandas as dgpd
import dask.dataframe as dd
import pandas as pd

def get_attribution_per_year(year, senfseidl, hm, nfi, dfde, temporal_buffer, spatial_buffer, outdir, verbose=0):

    #TEMPORAL AND SPATIAL JOIN ~ 8s
    senfseidl_year = senfseidl[senfseidl['year'] == year]
    nfi_year = nfi[(nfi['start_date'].dt.year >= year - temporal_buffer) & (nfi['end_date'].dt.year <= year + temporal_buffer)]
    hm_year = hm[(hm['year'] >= year - temporal_buffer) & (hm['year'] <= year + temporal_buffer)]
    dfde_year = dfde[(dfde['start_date'].dt.year >= year - temporal_buffer) & (dfde['end_date'].dt.year <= year + temporal_buffer)]

    senfseidl_year_ = dgpd.from_geopandas(senfseidl_year, npartitions=10)
    dfde_year_ = dgpd.from_geopandas(dfde_year, npartitions=10)

    senfseidl_hm_year = hm_year.sjoin_nearest(senfseidl_year, max_distance=spatial_buffer, distance_col='sd')
    senfseidl_nfi_year = nfi_year.sjoin_nearest(senfseidl_year, max_distance=spatial_buffer, distance_col='sd')
    senfseidl_dfde_year = dfde_year_.sjoin(senfseidl_year_)

    #concat with dask_geopandas
    concatenation = dd.concat([senfseidl_nfi_year, senfseidl_hm_year, senfseidl_dfde_year], axis=0).compute()
    if verbose > 0:
        print('Spatial and temporal join done.')
    
    #CONCATENATION WITH REF ~ 2s
    col = ['start_date', 'end_date', 'geometry', 'year_left', 'class_left', 'tree_type_left', 'essence_left', 'dataset_left', 'cause', 'notes', 'area', 'sd']
    all_index_right = concatenation['index_right'].unique()
    concatenation = concatenation[['index_right']+col]
    rename = {c: c.split('_left')[0] for c in col}
    concatenation = concatenation.rename(columns=rename)
    senfseidl_year['index_right'] = senfseidl_year.index

    #l'order of senfseidl_year and co is important. If we want to ise iloc[0] on the group to retrieve senfseidl row, we have to stick to this order.
    concatenation = dd.concat([senfseidl_year.loc[all_index_right], concatenation], axis=0).compute()
    if verbose > 0:
        print('Concatenation with reference done.')

    #MERGING ~ 1s 
    merge = pd.merge(concatenation, senfseidl_year, left_on='index_right', right_index=True)
    drop = ['geometry_y', 'class_y', 'dataset_y', 'index_right_y', 'index_right_x']
    merge = merge.drop(columns=drop)
    rename = {c: c.split('_x')[0] for c in merge.columns}
    merge = merge.rename(columns=rename).reset_index()

    if verbose > 0:
        print('Merging done.')
    #COMPUTING WEIGHT ~ 30s 
    dd_merge = dd.from_pandas(merge.reset_index(), npartitions=10)
    meta = {'sd': 'float64',
            'sw': 'float64',
            'td': 'float64',
            'tw': 'float64',
            'tc': 'float64',
            'oa': 'float64',
            'p': 'float64'}

    
    results = dd_merge.map_partitions(get_prob, meta=meta, enforce_metadata=False).compute()


    results.columns = ['sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']


    #concatenation with weights
    merge = merge.drop(columns=['sd'])
    disturbances = pd.concat([merge, results], axis=1)
    disturbances.rename(columns={'index_right': 'index_ref'}, inplace=True)
    disturbances.drop(columns=['year_y', 'tree_type_y', 'essence_y'], inplace=True)
    gpd.GeoDataFrame(disturbances, crs=epsg).to_parquet(os.path.join(outdir, f'disturbances_{year}.parquet'))
    if verbose > 0:
        print('Computing weights done.')
    
    #GROUPBY INDEX_REF and Classification ~ 4min 
    r_ = dd.from_pandas(disturbances[['index_ref', 'class', 'sd', 'sw', 'td', 'tw', 'tc', 'oa', 'p']], npartitions=10)
    dtypes = {
        'class': 'string',
        'p': 'float64',
        'spread': 'float64'
        }
    meta = pd.DataFrame(columns=dtypes.keys()).astype(dtypes)
    r = r_.map_partitions(lambda df: df.groupby('index_ref',as_index=True).apply(wrapper_class_proba_spread), meta=meta).compute()
    r = r.reset_index().drop_duplicates(subset=['index_ref'])
    if verbose > 0:
        print('Groupby index_ref and classification done.')

    #ATTRIBUTION CONSTRUCTION ~ 1s 
    col_left = ['year', 'geometry', 'tree_type', 'essence']
    left = senfseidl_year.loc[all_index_right].sort_index()[col_left]
    col_right = ['index_ref', 'class', 'p', 'spread']
    right = r.sort_values(by='index_ref')[col_right]
    attribution = pd.merge(left, right, left_index=True, right_on='index_ref')
    if verbose > 0:
        print('Attribution construction done.')

    #WRITING ~ 10s 
    gpd.GeoDataFrame(attribution, crs=epsg).to_parquet(os.path.join(outdir, f'attribution_{year}.parquet'))
    return attribution




In [32]:
#35y -> 223min 

#remove all warnings
import warnings
warnings.filterwarnings('ignore')

from tqdm import tqdm 

outdir = '../data/results'
os.makedirs(outdir, exist_ok=True)
years = senfseidl['year'].unique()

temporal_buffer = 5 #years 
spatial_buffer = 5000 #meters

nfi['dataset'] = 'nfi'
hm['dataset'] = 'hm'
dfde['dataset'] = 'dfde'
senfseidl['dataset'] = 'senfseidl'

for year in tqdm(years):
    get_attribution_per_year(year, senfseidl, hm, nfi, dfde, temporal_buffer, spatial_buffer, outdir)

100%|██████████| 35/35 [3:43:20<00:00, 382.86s/it]   
