In [1]:
!pip install -r ../requirements.txt



In [23]:
import pandas as pd
import geopandas as gpd
import gzip
import shutil
import os
import geopandas as gpd
from shapely.geometry import Point
#from h3 import geo_to_h3
import time
import logging
from datetime import datetime

import warnings
warnings.filterwarnings("ignore")

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

start_time = time.time()
logging.info("Starting...")

In [None]:


# Mapping from filename to desired variable name
file_map = {
    "sr.csv.gz": "df_sr",
    "sr_hex.csv.gz": "df_sr_hex",
    "sr_hex_truncated.csv": "df_sr_hex_truncated",
    "city-hex-polygons-8.geojson": "gdf_city_hex_8",
    "city-hex-polygons-8-10.geojson": "gdf_city_hex_8_10"
}

base_url = "https://cct-ds-code-challenge-input-data.s3.af-south-1.amazonaws.com/"

# Loop through files
for file_name, var_name in file_map.items():
    print(f"Processing {file_name}...")
    url = base_url + file_name
    !wget -q {url}

    # Handle .csv.gz
    if file_name.endswith('.csv.gz'):
        csv_name = file_name[:-3]
        with gzip.open(file_name, 'rb') as f_in:
            with open(csv_name, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(file_name)
        df = pd.read_csv(csv_name)
        globals()[var_name] = df
        # Optional: os.remove(csv_name)

    # Handle .csv
    elif file_name.endswith('.csv'):
        df = pd.read_csv(file_name)
        globals()[var_name] = df
        # Optional: os.remove(file_name)

    # Handle .geojson
    elif file_name.endswith('.geojson'):
        gdf = gpd.read_file(file_name)
        globals()[var_name] = gdf
        # Optional: os.remove(file_name)

    else:
        print(f"Unsupported file format: {file_name}")


Processing sr.csv.gz...


Processing sr_hex.csv.gz...
Processing sr_hex_truncated.csv...
Processing city-hex-polygons-8.geojson...
Processing city-hex-polygons-8-10.geojson...


In [25]:
H3_NULL_VALUE = '0' 
resolution = 8

In [None]:

# --- Ensure hex index column is correctly named ---
if 'index' in gdf_city_hex_8.columns:
    gdf_city_hex_8 = gdf_city_hex_8.rename(columns={'index': 'h3_level8_index'})

# --- Filter to valid lat/lon rows ---
valid_coords = (
    df_sr['latitude'].notnull() &
    df_sr['longitude'].notnull() &
    (df_sr['latitude'] != 0) &
    (df_sr['longitude'] != 0)
)

# --- Convert df_sr to GeoDataFrame ---
gdf_sr = gpd.GeoDataFrame(
    df_sr[valid_coords].copy(),
    geometry=gpd.points_from_xy(df_sr.loc[valid_coords, 'longitude'], df_sr.loc[valid_coords, 'latitude']),
    crs=gdf_city_hex_8.crs
)

# --- Spatial join to assign hex index ---
gdf_joined = gpd.sjoin(
    gdf_sr,
    gdf_city_hex_8[['h3_level8_index', 'geometry']],
    how="left",
    predicate="within"
)

# --- Check if spatial join produced the expected column ---
if 'h3_level8_index' not in gdf_joined.columns:
    print("Spatial join failed to produce h3_level8_index.")
    print("Join result columns:", gdf_joined.columns.tolist())
    raise KeyError("'h3_level8_index' missing after spatial join.")

# --- Assign back to full df_sr ---
H3_NULL_VALUE = '0'
df_sr['h3_level8_index'] = H3_NULL_VALUE
df_sr.loc[valid_coords, 'h3_level8_index'] = gdf_joined['h3_level8_index'].values

logging.info("H3 level 8 index assignment completed via spatial join.")


In [28]:
def validate_against_gold(df_generated, df_gold, key_column='notification_number'):
    # Optional: sort both by primary key for consistent comparison
    df1 = df_generated.sort_values(by=key_column).reset_index(drop=True)
    df2 = df_gold.sort_values(by=key_column).reset_index(drop=True)

    # Check column equality
    if set(df1.columns) != set(df2.columns):
        missing_in_1 = set(df2.columns) - set(df1.columns)
        missing_in_2 = set(df1.columns) - set(df2.columns)
        raise ValueError(f"Column mismatch:\n- Missing in df1: {missing_in_1}\n- Missing in df2: {missing_in_2}")

    # Optional: reorder columns to match
    df1 = df1[df2.columns]

    # Compare actual content
    if not df1.equals(df2):
        diff = df1.compare(df2)
        logging.warning(f"DataFrames differ. Sample differences:\n{diff.head()}")
    else:
        logging.info("Validation successful: df_sr matches df_sr_hex.")

In [29]:
# --- Drop junk columns ---
if 'Unnamed: 0' in df_sr.columns:
    df_sr = df_sr.drop(columns = ['Unnamed: 0'], axis = 1)
    logging.info("Dropped column 'Unnamed: 0' from df_sr.")

In [None]:
# # Assign H3 level 8 index
# df_sr['h3_level8_index'] = df_sr.apply(
#     lambda row: geo_to_h3(row['latitude'], row['longitude'], resolution),
#     axis=1
# )

# Ensure hex index column is correctly named
if 'index' in gdf_city_hex_8.columns:
    gdf_city_hex_8 = gdf_city_hex_8.rename(columns={'index': 'h3_level8_index'})

# -------------------- Valid Coordinate Mask --------------------
valid_coords = (
    df_sr['latitude'].notnull() &
    df_sr['longitude'].notnull() &
    (df_sr['latitude'] != 0) &
    (df_sr['longitude'] != 0)
)

# -------------------- Convert to GeoDataFrame --------------------
gdf_sr = gpd.GeoDataFrame(
    df_sr[valid_coords].copy(),
    geometry=gpd.points_from_xy(
        df_sr.loc[valid_coords, 'longitude'],
        df_sr.loc[valid_coords, 'latitude']
    ),
    crs=gdf_city_hex_8.crs
)

# -------------------- Spatial Join --------------------
gdf_joined = gpd.sjoin(
    gdf_sr,
    gdf_city_hex_8[['h3_level8_index', 'geometry']],
    how="left",
    predicate="within"
)

# -------------------- Validate --------------------
if 'h3_level8_index_right' not in gdf_joined.columns:
    print("Spatial join failed to produce h3_level8_index.")
    print("Join result columns:", gdf_joined.columns.tolist())
    raise KeyError("'h3_level8_index_right' missing after spatial join.")

# -------------------- Assign Back to df_sr --------------------
df_sr['h3_level8_index'] = H3_NULL_VALUE
df_sr.loc[valid_coords, 'h3_level8_index'] = gdf_joined['h3_level8_index_right'].values

logging.info("H3 level 8 index assignment completed via spatial join.")


In [31]:
# Drop redundant centroid columns from earlier merge if present
df_sr = df_sr.drop(columns=[
    col for col in df_sr.columns
    if col in {'centroid_lat_x', 'centroid_lat_y', 'centroid_lon_x', 'centroid_lon_y'}
])

# --- Validation ---
validate_against_gold(df_sr, df_sr_hex)

        h3_level8_index                 
                   self            other
243069  88ad360221fffff  88ad360227fffff
243342  88ad360221fffff  88ad360227fffff
255666  88ad36d5b1fffff  88ad36d5b5fffff
266753  88ad360221fffff  88ad360227fffff
291399  88ad360221fffff  88ad360227fffff


In [32]:
df_sr.head().T

Unnamed: 0,0,1,2,3,4
notification_number,400583534,400555043,400589145,400538915,400568554
reference_number,9109491785.0,9108995239.0,9109614461.0,9108601346.0,
creation_timestamp,2020-10-07 06:55:18+02:00,2020-07-09 16:08:13+02:00,2020-10-27 10:21:59+02:00,2020-03-19 06:36:06+02:00,2020-08-25 09:48:42+02:00
completion_timestamp,2020-10-08 15:36:35+02:00,2020-07-14 14:27:01+02:00,2020-10-28 17:48:15+02:00,2021-03-29 20:34:19+02:00,2020-08-31 08:41:13+02:00
directorate,URBAN MOBILITY,URBAN MOBILITY,URBAN MOBILITY,URBAN MOBILITY,URBAN MOBILITY
department,Roads Infrastructure Management,Roads Infrastructure Management,Roads Infrastructure Management,Roads Infrastructure Management,Roads Infrastructure Management
branch,RIM Area Central,RIM Area East,RIM Area East,RIM Area North,RIM Area South
section,District: Blaauwberg,District : Somerset West,District : Somerset West,District : Bellville,District : Athlone
code_group,TD Customer complaint groups,TD Customer complaint groups,TD Customer complaint groups,TD Customer complaint groups,TD Customer complaint groups
code,Pothole&Defect Road Foot Bic Way/Kerbs,Manhole Cover/Gully Grid,Manhole Cover/Gully Grid,Paint Markings Lines&Signs,Pothole&Defect Road Foot Bic Way/Kerbs
