# Initial Data Transformation

## Library imports

Use **pip** or **conda** to install the required packages/libraries before running the notebook.


In [None]:
#!pip install -r requirements.txt

import requests
from pathlib import Path
from datetime import datetime
from IPython.display import Markdown

import pandas as pd
import geopandas as gpd
from shapely.geometry import Point

import h3
import h3pandas  # More specialised H3 functions, specifically for use with Pandas

# Pandas alternatives (just for speed comparisons)
import vaex
import dask
#import polars
#import cudf

# Imports for interactive map visualisation.
# (Must mark this notebook as Trusted to see it.)
import folium
from folium.plugins import HeatMap, HeatMapWithTime
#import pydeck


## Display settings

Set the notebook display option to show all columns in full.

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

## Data folders

This checks if the data files for the challenge are present, otherwise it downloads them from the S3 bucket.

Original files are stored in the **data** folder, while processed output will be written to the **output** folder.


In [None]:
data_url = 'https://cct-ds-code-challenge-input-data.s3.af-south-1.amazonaws.com/'

data_folder = Path('./data/')
output_folder = Path('./output/')

data_files = [
    'sr.csv.gz',
    'sr_hex.csv.gz',
    #'sr_hex_truncated.csv',
    'city-hex-polygons-8.geojson',
    #'city-hex-polygons-8-10.geojson',
]

# Create `data` and `output` subfolders if they don't exist yet
data_folder.mkdir(parents=False, exist_ok=True)
output_folder.mkdir(parents=False, exist_ok=True)

for data_file in data_files:
    file_path = data_folder / data_file
    if not file_path.exists():
        display(Markdown(f'Downloading *{data_file}*...'))
        file_url = data_url + data_file
        response = requests.get(file_url)
        response.raise_for_status()
        with open(file_path, 'wb') as f:
            f.write(response.content)
    else:
        display(Markdown(f'*{data_file}* already exists.'))

display(Markdown('**Done.**'))

## Schema inspection

The first part of the task involves joining the features from `city-hex-polygons-8.geojson` to the service request entries in `sr_hex.csv.gz`.

Let's see what the schema looks like by just showing 1 item from each...

In [None]:
pd.read_csv(data_folder / 'sr.csv.gz', nrows=1, index_col=0)

In [None]:
gpd.read_file(data_folder / 'city-hex-polygons-8.geojson', index_col='index').head(1)

## Calculate the hexagon index

There appears to be no common key column that we can join on. We can calculate the H3 hexagon index from the `latitude` and `longitude` of each service request. That can then be matched up with the `index` of the geojson feature.

(We could also have done a point-in-polygon check on the polygon geometry, but that would have been much less efficient. Doing a search for the nearest centroid would be similarly inefficient, while also potentially inaccurate for points in the outer corners of the hexagon.)


In [None]:
%%time

# Load the CSV file into a Pandas DataFrame
df = pd.read_csv(data_folder / 'sr.csv.gz', index_col=0)

# Replace NaN values in the latitude and longitude columns with 0
# NOTE: This is optional - h3pandas can work with NaNs.
#df[['latitude', 'longitude']] = df[['latitude', 'longitude']].fillna(value=0)

# Create a h3pandas DataFrame, calculating the h3 id from the latitude and longitude
h3df = df[['latitude', 'longitude']].h3.geo_to_h3(resolution=8, lat_col='latitude', lng_col='longitude')

# Append the h3 id column to the original DataFrame
df['h3_level8_index'] = h3df.index.to_frame(index=False)

# Set the h3 id to '0' if coordinates are missing
# NOTE: This is optional - h3pandas already does it internally.
#df.loc[df['latitude'].isnull() | df['longitude'].isnull(), 'h3_level8_index'] = 0

# Keep for later (validation)
df_pandas = df.copy()

## Validation

Check to make sure our results match the reference CSV file.



In [None]:

# Check if the Pandas DataFrame contains the same values as the provided reference file
df_csv = pd.read_csv(data_folder / 'sr_hex.csv.gz', index_col=None)
matched = df_pandas.equals(df_csv)
display(Markdown(f'Pandas == sr_hex.csv: **{matched}**'))



## Join on hexagon index

Now that we have a common key, we can merge the two DataFrames together.


In [None]:
%%time

# Read CSV file into Vaex dataframe
df_sr = vaex.from_csv(data_folder / 'sr_hex.csv.gz', index_col=None)
#print(len(df_sr))  # 941634

# Read GeoJSON file into GeoPandas dataframe
df_geo = gpd.read_file(data_folder / 'city-hex-polygons-8.geojson', index_col='index')
#print(len(df_geo))  # 3832

# Rename key column to match the other dataframe
df_sr.rename('h3_level8_index', 'index')

# Convert from GeoPandas to Vaex dataframe, depending on what kind of dataframe we want after merging
df_geo = vaex.from_pandas(df_geo)
#print(type(df_geo))  # <class 'vaex.dataframe.DataFrameLocal'>

# Merge the dataframes on the common index column
# NOTE: Vaex does not support outer join yet,
#       so we do a left join, which keeps all the service requests,
#       but discards any unmatched (empty) hexagon features.
merged = df_sr.join(df_geo, how='left', on='index', allow_duplication=False)
#print(len(merged))  # 941634 (same as service request count)

print(f'Column names after merge: { merged.column_names }\n')
#merged.head(5)

## Find unmatched entries

Even without merging, a comparison of the hex id's on both sides will indicate if there are any rows that won't match.

1. Check for service requests with hex id's that don't exist in the provided GeoJSON.

2. Check for hex id's in the GeoJSON that have no associated service requests.

In [None]:

# Count number of service requests without coordinates
num_zero = len(df_sr[df_sr['index']=='0'])

# Get list of h3 index strings from both dataframes
sr_list = df_sr['index'].tolist()
geo_list = df_geo['index'].tolist()

# Convert to list to remove duplicates
sr_set = set(sr_list)
geo_set = set(geo_list)

# Only keep non-zero h3 id's
sr_set.discard('0')

# Get the difference between the sets
sr_unmatched = sr_set - geo_set
geo_unmatched = geo_set - sr_set

# Display some stats
display(Markdown(f'## Statistics'))
display(Markdown('&nbsp;'))
display(Markdown(f'Total service requests: **{ len(sr_list) }**'))
display(Markdown(f'- Without coordinates: **{ num_zero }**'))
display(Markdown(f'- Inside unlisted hexagons: **{ len(sr_unmatched) }**'))
display(Markdown(f'`{ list(sr_unmatched) }`'))
display(Markdown('&nbsp;'))
display(Markdown(f'Total listed hexagons: **{ len(geo_set) }**'))
display(Markdown(f'- With service requests: **{ len(sr_set) }**'))
display(Markdown(f'- Without service requests: **{ len(geo_unmatched) }**'))
display(Markdown('&nbsp;'))


## Error threshold

It is not entirely clear what the intent is with this part of the challenge instructions:

> Include logging that lets the executor know how many of the records failed to join, and include a join error threshold above which the script will error out. Please motivate why you have selected the error threshold that you have.

Maybe it's for SQL solutions?
Even a 100% failure rate doesn't necessarily mean there is a problem with the script. People might just be leaving out the coordinates when creating service requests, which would lead to an unjoined row that will not be shown on the map. Instead of triggering errors, these incomplete entries could instead be flagged for clean-up before even starting the merge.

## Distributed computing

Vaex already makes it possible to process the data in chunks, or across multiple processes on a single computer. But if the idea is to split the work across multiple machines on a network, we can use a Dask cluster. The syntax is similar to Pandas and Vaex. It can even utilise the GPU (with RAPIDS cuDF).

For now, we stick with just Vaex, though.


## Grouping / Aggregation

Show the total number of service requests per:
- Suburb (`official_suburb`)
- Directorate (`directorate`)
- Month (new column)
- hexagon (`index`)


In [None]:
# Add calculated MMM YYYY column

# Define a function to extract month name and year from timestamp string
def get_month_name(timestamp_str):
    timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S%z')
    return timestamp.strftime('%b %Y')

# Create a new column called 'month_name' containing the short month name and year format
merged['month_name'] = merged['creation_timestamp'].apply(get_month_name)

# List of months
print(set(merged['month_name'].tolist()))

# List of directorates
print(set(merged['directorate'].tolist()))


In [None]:
%%time

# Service requests by suburb for specific directorate and month

df_only_water = merged[merged['directorate'] == 'WATER AND SANITATION']

df_only_recent = df_only_water[df_only_water['month_name'] == 'Dec 2020']

agg = df_only_recent.groupby(['official_suburb']).agg({'per_suburb': 'count'})

agg = agg.sort('per_suburb', ascending=False)

agg.head(10)

## Preliminary visualisations

Some plots and charts to get a sense of the data before building a proper dashboard.

First prepare all the hexagon shapes, then all the individual points. Then plot it all on a Folium map.

In [None]:
df = merged
#df = df_only_water

# Use only rows with valid hexagon id
df = df[df['index']!='0']

# Convert from Vaex to Pandas dataframe
df = df.to_pandas_df()

# Convert from Pandas to GeoPandas dataframe
gdf = gpd.GeoDataFrame(df, geometry='geometry')

# Group by h3 hexagon and add a column for number of entries per hexagon
grouped_gdf = gdf.groupby(['index']).size().reset_index(name='h3_sr_count')

# Merge columns we want to plot
gdf_with_count = gdf[['index', 'geometry']].drop_duplicates(subset=['index']).merge(grouped_gdf)

# Concatenate with unmatched 
gdf_unmatched = gpd.GeoDataFrame(df_geo[df_geo['index'].isin(geo_unmatched)][['index', 'geometry']].to_pandas_df(), geometry='geometry')
gdf_unmatched['h3_sr_count'] = 0  # Add a h3_sr_count column of 0 for all
gdf_final = pd.concat([gdf_with_count, gdf_unmatched], ignore_index=True)

gdf_final


In [None]:
# Get only relevant service request columns
datapoints = df_sr[['notification_number', 'cause_code', 'latitude', 'longitude']].to_pandas_df()

# Only keep points with coordinates
datapoints = datapoints.dropna(subset=['latitude', 'longitude'])

# Replace latitude and longitude with a Point geometry
datapoints['geometry'] = datapoints.apply(lambda row: Point(row['longitude'], row['latitude']), axis=1)

# Convert GeoPandas dataframe
datapoints = gpd.GeoDataFrame(datapoints, geometry='geometry')

# Set CRS (Coordinate Reference System) to WGS 84 datum
datapoints = datapoints.set_crs(epsg=4326)

datapoints

In [None]:
# Calculate centerpoint before setting CRS
mean_latlon = [gdf_final.geometry.centroid.y.mean(), gdf_final.geometry.centroid.x.mean()]

# Set CRS (Coordinate Reference System) to WGS 84 datum
gdf_crs = gdf_final.set_crs(epsg=4326)

# Normalise count values (0..1)
#gdf_crs['normalised_count'] = gdf_crs['h3_sr_count']/gdf_crs['h3_sr_count'].max()

# Create a map object
m = folium.Map(location=mean_latlon, zoom_start=10, tiles='cartodbpositron')

# Add choropleth layer(s)
choropleth0 = folium.Choropleth(
        geo_data=gdf_crs[gdf_crs['h3_sr_count']==0],
        name='choropleth0',
        data=gdf_crs[gdf_crs['h3_sr_count']==0],
        columns=['index', 'h3_sr_count'],
        key_on='feature.properties.index',
        fill_color='Greys',  # ColorBrewer code
        fill_opacity=0.5,
        line_opacity=0.5,
        legend_name='No Service Requests'
    )
choropleth1 = folium.Choropleth(
        geo_data=gdf_crs[gdf_crs['h3_sr_count']>0],
        name='choropleth1',
        data=gdf_crs[gdf_crs['h3_sr_count']>0],
        columns=['index', 'h3_sr_count'],
        key_on='feature.properties.index',
        fill_color='YlOrRd',  # ColorBrewer code
        fill_opacity=0.5,
        line_opacity=0.5,
        legend_name='Service Request Count'
    )

# Add the created layers to the map
choropleth0.add_to(m)
choropleth1.add_to(m)

# Create a marker per individual service request
#markers = folium.GeoJson(datapoints.to_json(), name='Points', tooltip=folium.features.GeoJsonTooltip(fields=['cause_code']))  # Too slow...
#markers.add_to(m)

# Add a heatmap layer instead of individual points
HeatMap(datapoints[['latitude', 'longitude']],
        radius=3,
        blur=5,
        min_opacity=1,
        gradient={0: 'blue', 0.25: 'cyan', 0.5: 'yellow', 0.75: 'orange', 1: 'red'},
        overlay=True,
        control=True,
        show=True,
        name='heatmap',
        legend_name='Service Request Heatmap'
       ).add_to(m)

# Add additional style/theme options.
#folium.TileLayer('OpenStreetMap', control=True).add_to(m)
#folium.TileLayer('Stamen Toner').add_to(m)
#folium.TileLayer('Stamen Terrain').add_to(m)
#folium.TileLayer('Stamen Water Color').add_to(m)
#folium.TileLayer('cartodbpositron').add_to(m)
#folium.TileLayer('cartodbdark_matter').add_to(m)

# Make it toggleable
folium.LayerControl().add_to(m)

m

---
