In [None]:
from conf import *
import random
import pyspark
import time
import math
import numpy as np
import subprocess
from sys import exit

from datetime import date, datetime, timedelta

from pyspark.sql import Row, DataFrame, SparkSession
from pyspark import SparkConf, SparkContext
from pyspark import StorageLevel
from operator import add
from sklearn.metrics.pairwise import haversine_distances
from interval_tree import Interval, Node, IntervalTree
from sklearn.neighbors import BallTree

from pathlib import Path
from event import event


def event_transformer(line: str) -> event:
    r = line.replace('\r', '').replace('\n', '').split(',')
    if r[1] == 'W':
        e = event(eventId=r[0], type='W', refinedType=r[2], startTime=r[3], endTime=r[4], locationLat=0, locationLng=0,  
                          distance=0, airportCode=r[8], number=0, street='NA', side='NA', city='NA', county='NA', state='NA', 
                          zipCode='NA', childs=set(), parents=set())
    else:
        e = event(eventId=r[0], type='T', refinedType=r[2], startTime=r[3], endTime=r[4], locationLat=float(r[5]), 
                          locationLng=float(r[6]), distance=float(r[7]), airportCode=r[15], number=(0 if r[9]=='N/A' or r[9]=='' else int(r[9])), 
                          street=r[10], side=r[11], city=r[12], county=r[13], state=r[14], zipCode=r[15], childs=set(), parents=set())
    return e

def str_transformer(e: event) -> str:
    child, parents = list(e.childs), list(e.parents)
    child_str = ';'.join(child) if len(child) > 0 else ''
    parent_str = ';'.join(parents) if len(parents) > 0 else ''
    if e.type == 'W':
        res = f'{e.eventId},{e.type},{e.refinedType},{e.startTime},{e.endTime},N/A,N/A,N/A,{e.airportCode},N/A,N/A,N/A,N/A,N/A,N/A,N/A,{child_str},{parent_str}'

    else:
        res = f'{e.eventId},{e.type},{e.refinedType},{e.startTime},{e.endTime},{e.locationLat},{e.locationLng},{e.distance},{e.airportCode},{e.number},{e.street},{e.side},{e.city},{e.county},{e.state},{e.zipCode},{child_str},{parent_str}'

    return res
    


sc.stop()
# Create new config
conf = pyspark.SparkConf().setAll([("spark.driver.maxResultSize", '16g'), ('spark.executor.memoryOverhead', '16g'), ('spark.executor.memory', '16g')])

SparkSession.builder.config(conf=conf)
spark = SparkSession.builder.appName('test_03_11_1').getOrCreate()
sc = spark.sparkContext
sc.addPyFile('file:/event.py')
sc.addPyFile('file:/interval_tree.py')
storage_level = pyspark.StorageLevel.MEMORY_AND_DISK
sc.setLogLevel("OFF")

#print(spark.sparkContext.getConf().getAll())

#SUBSET = True # False if you are the entire datasets, True otherwise
if SUBSET:
    f_name = f'_{CITY}'
    res = input(f'You are using a SUBSET ({CITY}). Are you sure? [y/n]:')
else:
    res = input('You are using ENTIRE DATASET. Are you sure? [y/n]:')
    f_name = ''

if(res != 'y'):
    exit()

start = time.time()

traffic_path = f'file:/datasets/TrafficEvents_Aug16_Dec20_Publish{f_name}.csv'
weather_path = f'file:/datasets/WeatherEvents_Aug16_Dec20_Publish{f_name}.csv'

def format_airport_row(r: str) -> str:
    parts = r.split(',')
    return (str(int(parts[0])),parts[1])

start_phase_1 = time.time()
print("\nLoading Traffic and Weather events...")
input_rdd = sc.textFile(traffic_path)
input_rdd2 = sc.textFile(weather_path)
input_rdd3 = sc.textFile(airport_path).filter(lambda it: not it.startswith('Zip,')).map(format_airport_row)

print("%d Traffic events\n%d Weather events" % (input_rdd.count(),input_rdd2.count()))
print("Elapsed Time: %.2fs" % (time.time()-start_phase_1))

def format_event_row(r: str, i: int) -> str:
    parts = r.split(',')
    if r.startswith('T'):
        return 'T-' + str(i+1) + ',T,' + parts[1] + ',' + parts[5] + ',' + parts[6] + ',' + \
    parts[8] + ',' + parts[9] + ',' + parts[10] + ',' + parts[11] + ',' + parts[12] + ',' + \
    parts[13] + ',' + parts[14] + ',' + parts[15] + ',' + parts[16] + ',' + parts[17] + ',' + parts[18]
    
    else:
        return 'W-' + str(i+1) + ',W,' + parts[1] + '-' + parts[2] + ',' + parts[3] + ',' + \
    parts[4] + ',N/A,N/A,N/A,' + parts[8] + ',N/A,N/A,N/A,N/A,N/A,N/A,N/A'


start_phase_2 = time.time()
print("\nRemoving Traffic events without Zip Code and Weather events without airportCode...")
traffic_rdd_map = input_rdd.filter(lambda it: not it.startswith('EventId,') and not it.split(',')[18]=='').zipWithIndex().map(lambda it: (it[0].split(',')[18], format_event_row(it[0], it[1])))
weather_rdd_map = input_rdd2.filter(lambda it: not it.startswith('EventId,') and not it.split(',')[8]=='').zipWithIndex().map(lambda it: (it[0].split(',')[8], format_event_row(it[0], it[1])))

t1_count = traffic_rdd_map.count()
w1_count = weather_rdd_map.count()
print("%d Traffic events (after first cleaning)\n%d Weather events (after first cleaning)" % (t1_count,w1_count))
print("Elapsed Time: %.2fs" % (time.time()-start_phase_2))
      
traffic_rdd_gbk = traffic_rdd_map.groupByKey().mapValues(list)
weather_rdd_gbk = weather_rdd_map.groupByKey().mapValues(list)


distanceThresh = 0.2
dist_unit_sphere = distanceThresh / 3958.7564

def integrateSimilarWeatherEvents(events):
    coords = []
    event_obj_ids_dict = {}
    
    for num, e in enumerate(events):
        e_obj = event_transformer(e)
        event_obj_ids_dict[num] = e_obj
        coords.append([e_obj.locationLat, e_obj.locationLng])
    
    coords = np.array(coords)
    coords = np.deg2rad(coords)    
    bt = BallTree(coords, metric='haversine')
    neighs_idx = bt.query_radius(coords, r=dist_unit_sphere) 
    
    new_events = {}
        
    for idx, array in enumerate(neighs_idx):
        if len(array)==1:
            ev = event_obj_ids_dict[array[0]]
            if ev.toBeMerged:
                continue
            new_events[ev.eventId] = ev
            continue
        
        for ev in array:
            
            if idx==ev:
                continue
            
            e1 = event_obj_ids_dict[idx]
            
            if e1.toBeMerged:
                continue
            
            e2 = event_obj_ids_dict[ev]
                            
            if e1.refinedType == e2.refinedType:
                timeDiff = max((e1.startTime - e2.endTime).total_seconds(), (e2.startTime - e1.endTime).total_seconds())
                if 'snow' in e1.refinedType: th = wTimeThreshs['snow']
                elif 'rain' in e1.refinedType: th = wTimeThreshs['rain']
                else: th = wTimeThreshs['default']
                if timeDiff < (th*60):
                    e1.startTime = min(e1.startTime, e2.startTime)
                    e1.endTime = max(e1.endTime, e2.endTime)                              
                    e2.toBeMerged = True 
                
            new_events[e1.eventId] = e1
        
    return list(new_events.values())
    
def integrateSimilarTrafficIncidents(events):
        
    _trTimeThresh = 5
    coords = []
    event_obj_ids_dict = {}
    
    for num, e in enumerate(events):
        e_obj = event_transformer(e)
        event_obj_ids_dict[num] = e_obj
        coords.append([e_obj.locationLat, e_obj.locationLng])
        
    coords = np.array(coords)
    coords = np.deg2rad(coords)    
    bt = BallTree(coords, metric='haversine')
    neighs_idx = bt.query_radius(coords, r=dist_unit_sphere) 
        
    new_events = {}
        
    for idx, array in enumerate(neighs_idx):
        if len(array)==1:
            ev = event_obj_ids_dict[array[0]]
            if ev.toBeMerged:
                continue
            new_events[ev.eventId] = ev
            continue
        
        for ev in array:
            
            if idx==ev:
                continue
            
            e1 = event_obj_ids_dict[idx]
            
            if e1.toBeMerged:
                continue
            
            e2 = event_obj_ids_dict[ev]
                            
            timeDiff = abs((e1.startTime - e2.startTime).total_seconds())
            
            if timeDiff < (_trTimeThresh*60) and e1.refinedType==e2.refinedType and 'Congestion' in e1.refinedType and e1.street==e2.street and e1.side==e2.side:
                e1.startTime = min(e1.startTime, e2.startTime)
                e1.endTime = max(e1.endTime, e2.endTime)
                e1.distance = max(e1.distance, e2.distance)  
                e2.toBeMerged = True
                
            new_events[e1.eventId] = e1
        
    return list(new_events.values())

def findChildParents(data):
    weatherEvents = data[1]
    trafficEvents = data[0]
    coords = []
    traffic_ids_dict = {}
    weather_ids_dict = {}
    interval_tree = IntervalTree()
        
    trafficEvents = list(set().union(*trafficEvents))
    
    for num, e in enumerate(trafficEvents):
        traffic_ids_dict[num] = e
        coords.append([e.locationLat, e.locationLng])
        
    coords = np.array(coords)
    coords = np.deg2rad(coords)    
    bt = BallTree(coords, metric='haversine')
    neighs_idx = bt.query_radius(coords, r=dist_unit_sphere) 
    
    for e in weatherEvents:
        weather_ids_dict[e.eventId] = e
        try: th = wTimeThreshs[e.refinedType]
        except: th = 5

        interval = Interval(e.startTime + timedelta(minutes=5), e.endTime + timedelta(minutes=th))
        interval_tree.add(interval, event_id=e.eventId)

    for idx, array in enumerate(neighs_idx):
        e1 = traffic_ids_dict[idx]
        for ev in array:
            
            if idx == ev:
                continue
            
            e2 = traffic_ids_dict[ev]
                            
            if e1.street != e2.street or \
                    e1.side != e2.side or \
                    (e1.endTime + timedelta(minutes=trTimeThresh)) < e2.startTime or \
                    e1.startTime > (e2.endTime + timedelta(minutes=trTimeThresh)):
                    continue

            if e1.startTime < e2.startTime:
                e1.childs.add(e2.eventId)
                e2.parents.add(e1.eventId)
            else:           
                e2.childs.add(e1.eventId)
                e1.parents.add(e2.eventId)
            
        incident_start_time = e1.startTime
        events_of_interests = interval_tree.query_point(incident_start_time)
            
        for ed in events_of_interests:
            e_id = ed['event_id']
            e1.parents.add(e_id)
            weather_ids_dict[e_id].childs.add(e1.eventId)
    
    res = list(traffic_ids_dict.values()) + list(weather_ids_dict.values())
    return res
    
start_phase_3 = time.time()
print("\nChecking for similar Traffic events...")
traffic_flatmap = traffic_rdd_gbk.values().flatMap(lambda events: integrateSimilarTrafficIncidents(events))
t2_count = traffic_flatmap.count()
print("Have found %d similar pairs of traffic incidents" % (t1_count-t2_count))
print("%d Traffic events (after similarity check)" % t2_count)
print("Elapsed Time: %.2fs" % (time.time()-start_phase_3))

traffic_rdd_map_v2 = traffic_flatmap.map(lambda it: (it.zipCode, it))
traffic_rdd_gbk_v2 = traffic_rdd_map_v2.groupByKey().mapValues(list) #key = zipCode, value = [TrafficEvents]

traffic_rdd_join = input_rdd3.join(traffic_rdd_gbk_v2) #key = zipCode, value = (airportCode, [TrafficEvents])
traffic_rdd_join_map = traffic_rdd_join.map(lambda it: (it[1][0],it[1][1])) #key = airportCode, value = [TrafficEvents]
traffic_rdd_join_map_gbk = traffic_rdd_join_map.groupByKey().mapValues(list) #key = airportCode, value = [[TrafficEvents]]

wTimeThreshs = {'rain':15, 'snow':30, 'default':10}
start_phase_4 = time.time()
print("\nChecking for similar Weather events...")
weather_flatmap = weather_rdd_gbk.values().flatMap(lambda events: integrateSimilarWeatherEvents(events))
w2_count = weather_flatmap.count()
print("Have found %d similar pairs of weather events" % (w1_count-w2_count))
print("%d Weather events (after similarity check)" % w2_count)
print("Elapsed Time: %.2fs" % (time.time()-start_phase_4))

'''
#### SAVING TO FILE ALL EVENTS DISTINCT

print("\nSaving to file All Events Distinct...")
traffic_flatmap_912 = traffic_flatmap.map(str_transformer)
weather_flatmap_912 = weather_flatmap.map(str_transformer)
    
traffic_flatmap_912.saveAsTextFile(f'file:/datasets/Output_Traffic_912')
weather_flatmap_912.saveAsTextFile(f'file:/datasets/Output_Weather_912')

#############
'''

weather_rdd_map_v2 = weather_flatmap.map(lambda it: (it.airportCode, it))
weather_rdd_gbk_v2 = weather_rdd_map_v2.groupByKey().mapValues(list) #key = airportCode, value = [WeatherEvents]

traffic_weather_join = traffic_rdd_join_map_gbk.join(weather_rdd_gbk_v2) #key = airportCode, value = ([[TrafficEvents]], [WeatherEvents])

start_phase_5 = time.time()
print(f'\nChecking for Childs and Parents events (trTimeThresh={trTimeThresh})...')
traffic_weather_join_map_cp = traffic_weather_join.values().flatMap(lambda data: findChildParents(data)) #value = ([TrafficEvents]+[WeatherEvents])

new_traffic_rdd = traffic_weather_join_map_cp.filter(lambda it: it.type=='T').map(lambda it: (it.eventId, it))
new_weather_rdd = traffic_weather_join_map_cp.filter(lambda it: it.type=='W').map(lambda it: (it.eventId, it))

traffic_flatmap_tmp = traffic_flatmap.map(lambda it: (it.eventId, it))
tmp_traffic_rdd = traffic_flatmap_tmp.subtractByKey(new_traffic_rdd)
final_traffic_rdd = tmp_traffic_rdd.union(new_traffic_rdd)

weather_flatmap_tmp = weather_flatmap.map(lambda it: (it.eventId, it))
tmp_weather_rdd = weather_flatmap_tmp.subtractByKey(new_weather_rdd)
final_weather_rdd = tmp_weather_rdd.union(new_weather_rdd)
print("%d New Traffic events\n%d New Weather events" % (final_traffic_rdd.count(),final_weather_rdd.count()))
print("Elapsed Time: %.2fs" % (time.time()-start_phase_5))

print("%d Zip Codes\n%d Airport Codes" % (traffic_rdd_gbk_v2.count(),weather_rdd_gbk_v2.count()))
print("\nTotal Elapsed Time: %.2fs" % (time.time()-start))

In [None]:
##### Code to save both datasets to file, REMEMBER to set the SUBSET flag #####
start = time.time()
print("\nSaving to file...")
final_traffic_rdd_v2 = final_traffic_rdd.map(lambda it: str_transformer(it[1]))
final_weather_rdd_v2 = final_weather_rdd.map(lambda it: str_transformer(it[1]))

f_name += f'_TTR-{trTimeThresh}'
    
final_traffic_rdd_v2.saveAsTextFile(f'file:/datasets/Output_Traffic{f_name}')
final_weather_rdd_v2.saveAsTextFile(f'file:/datasets/Output_Weather{f_name}')

subprocess.call(f'cat Output_Traffic{f_name}/part* > TrafficEvents{f_name}.csv', shell=True, cwd='datasets/')
subprocess.call(f'cat Output_Weather{f_name}/part* > WeatherEvents{f_name}.csv', shell=True, cwd='datasets/')
subprocess.call(f'rm -r Output_Traffic{f_name}', shell=True, cwd='datasets/')
subprocess.call(f'rm -r Output_Weather{f_name}', shell=True, cwd='datasets/')

print("Elapsed Time: %.2fs" % (time.time()-start))