In [None]:
import pyodbc
import pandas as pd
import geopandas as gpd
from tqdm.auto import tqdm
import os
import math
from datetime import date, datetime, timedelta
from geopy.distance import great_circle
from random import randrange

tqdm.pandas()

SHARED_PROJECT_PATH = '...'
SHARED_PROJECT_PATH_POLY = '...'

SERVER = '...'
DATABASE = '...'
USERNAME = '...'
PASSWORD = '...'
TRUSTSERVERCERT = '...'
TRUSTEDCONN = '...'

---

## Reading and preprocessing accident alerts

In [None]:
# Read raw accident data
connection_string = f'''
    DRIVER={{ODBC Driver 18 for SQL Server}};
    SERVER={SERVER};
    DATABASE={DATABASE};
    Trusted_Connection={TRUSTEDCONN};
    UID={USERNAME};
    PWD={PASSWORD};
    TrustServerCertificate={TRUSTSERVERCERT};
'''
sql_query = f'''
    SELECT *
    FROM ...
    WHERE type = 'ACCIDENT'
'''
conn = pyodbc.connect(connection_string)
df_acc = pd.read_sql(sql_query, conn)

In [None]:
# Process raw accident data
df_acc = df_acc.drop_duplicates()
df_acc = df_acc.groupby('uuid').agg({
    'city': 'first',
    'confidence': 'max',
    'nThumbsUp': 'first',
    'street': 'first',
    'country': 'first',
    'subtype': 'first',
    'roadType': 'first',
    'reliability': 'max',
    'magvar': 'first',
    'reportRating': 'first',
    'ts': 'first',
    'geoWKT': 'first'
}).reset_index()

df_acc['ts'] = pd.to_datetime(df_acc['ts'])

df_acc['geometry'] = gpd.GeoSeries.from_wkt(df_acc['geoWKT'])
df_acc = gpd.GeoDataFrame(df_acc, crs='EPSG:4326', geometry=df_acc.geometry).to_crs('EPSG:23700')
df_acc.drop(columns=['geoWKT'], inplace=True)

In [None]:
# Read Budapest polygons
gdf_poly = gpd.read_file(os.path.join(SHARED_PROJECT_PATH_POLY, 'bp_polygons_osm.geojson')).to_crs('EPSG:23700')

In [None]:
# Filter accidents for Budapest
df_acc = df_acc[df_acc.within(gdf_poly.iloc[0].geometry)]

In [None]:
s_geometry_wgs = df_acc.to_crs('EPSG:4236')['geometry']
df_acc.loc[:, 'latitude'] = s_geometry_wgs.x
df_acc.loc[:, 'longitude'] = s_geometry_wgs.y

---

## ST-DBSCAN clustering

In [None]:
"""
INPUTS:
    df={o1,o2,...,on} Set of objects
    spatial_threshold = Maximum geographical coordinate (spatial) distance value
    temporal_threshold = Maximum non-spatial distance value
    min_neighbors = Minimun number of points within Eps1 and Eps2 distance
OUTPUT:
    C = {c1,c2,...,ck} Set of clusters
"""
def ST_DBSCAN(df, spatial_threshold, temporal_threshold, min_neighbors):
    cluster_label = 0
    NOISE = -1
    UNMARKED = 777777
    stack = []

    # initialize each point with unmarked
    df['cluster'] = UNMARKED
    
    # for each point in database
    # for index, point in tqdm(df.iterrows(), total=df.shape[0]):
    for index, point in df.iterrows():
        if df.loc[index]['cluster'] == UNMARKED:
            neighborhood = retrieve_neighbors(index, df, spatial_threshold, temporal_threshold)
            
            if len(neighborhood) < min_neighbors:
                df.at[index, 'cluster'] = NOISE

            else: # found a core point
                cluster_label = cluster_label + 1
                df.at[index, 'cluster'] = cluster_label# assign a label to core point

                for neig_index in neighborhood: # assign core's label to its neighborhood
                    df.at[neig_index, 'cluster'] = cluster_label
                    stack.append(neig_index) # append neighborhood to stack
                
                while len(stack) > 0: # find new neighbors from core point neighborhood
                    current_point_index = stack.pop()
                    new_neighborhood = retrieve_neighbors(current_point_index, df, spatial_threshold, temporal_threshold)
                    
                    if len(new_neighborhood) >= min_neighbors: # current_point is a new core
                        for neig_index in new_neighborhood:
                            neig_cluster = df.loc[neig_index]['cluster']
                            if (neig_cluster != NOISE) & (neig_cluster == UNMARKED): 
                                # TODO: verify cluster average before add new point
                                df.at[neig_index, 'cluster'] = cluster_label
                                stack.append(neig_index)
    return df


def retrieve_neighbors(index_center, df, spatial_threshold, temporal_threshold):
    neigborhood = []

    center_point = df.loc[index_center]

    # filter by time 
    min_time = center_point['ts'] - timedelta(minutes = temporal_threshold)
    max_time = center_point['ts'] + timedelta(minutes = temporal_threshold)
    df = df[(df['ts'] >= min_time) & (df['ts'] <= max_time)]

    # filter by distance
    for index, point in df.iterrows():
        if index != index_center:
            distance = great_circle((center_point['latitude'], center_point['longitude']), (point['latitude'], point['longitude'])).meters
            if distance <= spatial_threshold:
                neigborhood.append(index)

    return neigborhood

In [None]:
def group_clusters(df_clust):
    df_clust.sort_values(by=['ts', 'uuid'], inplace=True)
    df_clust.rename(columns={'nThumbsUp': 'n_alerts_clustered'}, inplace=True)
    df_clust['n_alerts_clustered'] = 1
    df_clust.loc[df_clust[df_clust['subtype'] == ''].index, 'subtype'] = 'UNKNOWN'

    # Generate the connection table between clustered alerts
    df_groups = df_clust[df_clust['cluster'] != -1][['uuid']].copy()
    df_groups.loc[:, 'gr_uuid'] = df_clust[df_clust['cluster'] != -1].groupby('cluster')['uuid'].transform('first')
    
    df_clust = gpd.GeoDataFrame(pd.concat([
         gpd.GeoDataFrame(df_clust[df_clust['cluster'] != -1].groupby('cluster').agg({'uuid': 'first', 'city': 'first', 'confidence': 'max', 'street': 'first', 'country': 'first',
                                                                                      'subtype': 'min', 'roadType': 'first', 'reliability': 'max', 'magvar': 'first',
                                                                                      'reportRating': 'max', 'ts': 'first', 'geometry': 'first', 'n_alerts_clustered': 'count'}),
                         crs='EPSG:23700'),
        df_clust[df_clust['cluster'] == -1]
    ]), crs='EPSG:23700').sort_values('ts').drop(columns=['cluster']).set_index('uuid')

    return df_clust, df_groups

In [None]:
def random_date(start, end):
    """
    This function will return a random datetime between two datetime 
    objects.
    """
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = randrange(int_delta)
    return start + timedelta(seconds=random_second)

In [None]:
def perform_random_test(ts_last_proc, buffer_window):
    """
    Test incremental clustering with a random date
        - check whether the incremental implementation returns the same clustering result as clustering all alerts at once
    
    INCREMENTAL CLUSTERING:
        1. Cluster new alerts with a buffer window (24h empirically) from old alerts
        2. Update old clusters:
        2. Overwrite old clusters with new points from new data
        3. Remove old clusters which are merged together in the new clusters (caused by new in-between data points)
        
    """
    # calculate date parameters
    ts_first_proc = datetime.strftime(datetime.strptime(ts_last_proc, '%Y-%m-%d %H:%M:%S') - timedelta(days=1), '%Y-%m-%d %H:%M:%S')
    ts_last_new = datetime.strftime(datetime.strptime(ts_last_proc, '%Y-%m-%d %H:%M:%S') + timedelta(days=1), '%Y-%m-%d %H:%M:%S')
    ts_start_buff = datetime.strftime(datetime.strptime(ts_last_proc, '%Y-%m-%d %H:%M:%S') - timedelta(hours=buffer_window), '%Y-%m-%d %H:%M:%S')
    
    # cut dataframe by dates
    df_acc_old = df_acc[df_acc.ts.between(ts_first_proc, ts_last_proc)].sort_values('ts')
    df_acc_new = df_acc[df_acc.ts.between(ts_start_buff, ts_last_new)].sort_values('ts')
    df_acc_both = df_acc[df_acc.ts.between(ts_first_proc, ts_last_new)].sort_values('ts')

    # perform ST-DBSCAN clustering on cut dataframes
    df_acc_old = ST_DBSCAN(df_acc_old, 300, 200, 1).drop(columns=['latitude', 'longitude'])
    df_acc_new = ST_DBSCAN(df_acc_new, 300, 200, 1).drop(columns=['latitude', 'longitude'])
    df_acc_both = ST_DBSCAN(df_acc_both, 300, 200, 1).drop(columns=['latitude', 'longitude'])

    # group dataframes by clusters
    df_acc_old, _ = group_clusters(df_acc_old)
    df_acc_new, df_groups_new = group_clusters(df_acc_new)
    df_acc_both, _ = group_clusters(df_acc_both)
    
    # continue if no conflicts, if nothing has to be overwritten
    if df_acc_old.n_alerts_clustered.sum() + df_acc_new[df_acc_new.ts > ts_last_proc].n_alerts_clustered.sum() == df_acc_both.n_alerts_clustered.sum():
        return False, None,\
               df_acc_new[df_acc_new.ts.between(ts_last_proc, ts_last_new)].shape[0],\
               df_acc_new[df_acc_new.ts.between(ts_start_buff, ts_last_proc)].shape[0]

    # overwrite matching old clustered alerts with new ones
    #           matching: same uuid, different n_alerts_clustered
    # find corresponding clusters to overwrite
    df_acc_old_intersect = df_acc_old[df_acc_old['ts'] >= ts_start_buff][['n_alerts_clustered']]
    l_uuids_to_overwrite = pd.merge(df_acc_new[['n_alerts_clustered']],
                                    df_acc_old_intersect, on='uuid', how='inner')\
                                .query('n_alerts_clustered_x != n_alerts_clustered_y').index.to_list()

    # If there is a new report between two old clusters, that might cause them to merge together
    #           -> remove all old clusters that have been reclustered together into one new cluster
    df_groups_to_drop = df_groups_new[df_groups_new['gr_uuid'].isin(l_uuids_to_overwrite)]
    l_uuids_to_drop = pd.merge(df_groups_to_drop[df_groups_to_drop['gr_uuid'] != df_groups_to_drop['uuid']].set_index('uuid'),
                               df_acc_old_intersect, on='uuid', how='inner').index.to_list()

    # overwrite changed old clusters
    df_acc_old.loc[l_uuids_to_overwrite] = df_acc_new.loc[l_uuids_to_overwrite]

    # remove reclustered old clusters
    df_acc_old.drop(l_uuids_to_drop, inplace=True)

    # add new clusters
    df_acc_clust = pd.concat([df_acc_old, df_acc_new[df_acc_new.ts > ts_last_proc]])
    
    # check if dfs are equal
    if not df_acc_clust.sort_index().equals(df_acc_both.sort_index()):
        return True, ts_last_proc,\
               df_acc_new[df_acc_new.ts.between(ts_last_proc, ts_last_new)].shape[0],\
               df_acc_new[df_acc_new.ts.between(ts_start_buff, ts_last_proc)].shape[0]
   
    return True, None,\
           df_acc_new[df_acc_new.ts.between(ts_last_proc, ts_last_new)].shape[0],\
           df_acc_new[df_acc_new.ts.between(ts_start_buff, ts_last_proc)].shape[0]

In [None]:
%%time

from dask import compute, delayed
from dask.diagnostics import ProgressBar

d1 = datetime.strptime('1/1/2020 12:00 PM', '%m/%d/%Y %I:%M %p')
d2 = datetime.strptime('1/1/2025 12:00 PM', '%m/%d/%Y %I:%M %p')

rand_dates = [datetime.strftime(random_date(d1, d2), '%Y-%m-%d %H:%M:%S') for i in range(20000)]

results = []
for buffer_window in [18]:
    delayed_results = [delayed(perform_random_test)(rand_date, buffer_window) for rand_date in rand_dates]
    with ProgressBar():
        results.append(compute(*delayed_results, scheduler="processes"))