# Cleaning and Aggregation

In [1]:
import dask.dataframe as dd
import geopandas as gpd
import h3
from shapely import wkt

In [3]:
outputpath = '/Users/catherinehayden/WB/cluj' # desired location on local disk

# read data into Dask
ddf = dd.read_csv(
        "RO_9clujnapoca_alerts.000000000000.csv.gz", 
        compression="gzip", 
        blocksize=None, 
        dtype={
            'confidence': 'int8',
            'type': 'category',
            'subtype': 'category',
            'roadType': 'float32', # to accept NA values
            'reliability': 'int8',
            'magvar': 'int16',
            'street': 'object'
        },
)


ddf["ts"] = dd.to_datetime(ddf["ts"], utc=True).dt.tz_convert('Europe/Bucharest')
# Note: after changing timezone to bucharest, we have 2 observations taking place on jan 1 2022
ddf["date"] = ddf["ts"].dt.date
ddf["dayofweek"] = ddf["ts"].dt.dayofweek # monday = 0, sunday = 6
ddf['year'] = ddf['ts'].dt.year
ddf['month'] = ddf['ts'].dt.month
ddf['hour'] = ddf['ts'].dt.hour
ddf['timeofday'] = 1 # to be set to Nielsen Audio dayparting times
ddf['timeofday'] = ((ddf.hour>5)& (ddf.hour<10))*1 + ((ddf.hour>9) & (ddf.hour<16))*2 + \
((ddf.hour>15) & (ddf.hour<20))*3 + ((ddf.hour>19) & (ddf.hour<=23))*4 + ((ddf.hour>=0) & (ddf.hour<6))*5

# computations with geopandas and pandas
df = ddf.compute()

geocsv = df.to_csv(outputpath + "/geodata.csv", index = False)

df['coordinates'] = df['geoWKT'].apply(wkt.loads)
gdf = gpd.GeoDataFrame(df, geometry='coordinates')
df['lon'] = gdf.geometry.x
df['lat'] = gdf.geometry.y
df = df.drop(['coordinates'], axis=1)

# adding in rush hour times sent by our Romanian colleagues
def rush(series): 
    if (series>=7) & (series<=9):
        return "Morning Rush"
    elif (series>=16) & (series<=19):
        return "Afternoon Rush"
    else:
        return "No Rush"
df['rush'] = df['hour'].apply(rush)


# mapping obs to h3 hexagons
df["h3"] = df.apply(lambda x: h3.geo_to_h3(x["lat"], x["lon"], 10), axis=1)

## Exporting full cleaned data

In [None]:
df.to_csv(outputpath + "/fullclean.csv", index = False)

## Aggregation

In [None]:
# by date
datehex = df.groupby(["date", "h3"], as_index = False)["uuid"].count()

datehex = datehex.rename(columns = {'uuid': 'AlertCounts'})

datehex.to_csv(outputpath + "/datehex.csv", index = False)

# by day of the week
weekdayhextype = df.groupby(["dayofweek", "h3", "type"], as_index = False)["uuid"].count()

weekdayhextype = weekdayhextype.rename(columns = {'uuid': 'AlertCount'})

weekdayhextype.to_csv(outputpath + "/weekdayhextype.csv", index = False)


weekdayhex = df.groupby(["dayofweek", "h3"], as_index = False)["uuid"].count()

weekdayhex = weekdayhex.rename(columns = {'uuid': 'AlertCount'})

weekdayhex.to_csv(outputpath + "/weekdayhex.csv", index = False)

# by hour of the day
hourhextype = df.groupby(["hour", "h3", "type"], as_index = False)["uuid"].count()

hourhextype = hourhextype.rename(columns = {'uuid': 'AlertCount'})

hourhextype = hourhextype[hourhextype.AlertCount > 0]

hourhextype.to_csv(outputpath + "/hourhextype.csv", index = False)


hourhex = df.groupby(["hour", "h3"], as_index = False)["uuid"].count()

hourhex = hourhex.rename(columns = {'uuid': 'AlertCount'})

hourhex.to_csv(outputpath + "/hourhex.csv", index = False)

# by rush hour designation
rushhex = df.groupby(["rush", "h3"], as_index = False)["uuid"].count()

rushhex = rushhex.rename(columns = {'uuid': 'AlertCount'})

rushhex.to_csv(outputpath + "/rushhex.csv", index = False)


rushhexweekday = df.groupby(["rush", "h3", "dayofweek"], as_index = False)["uuid"].count()

rushhexweekday = rushhexweekday.rename(columns = {'uuid': 'AlertCount'})

rushhexweekday.to_csv(outputpath + "/rushhexweekday.csv", index = False)