In [None]:
import os
import sys
sys.path.insert(0, os.path.abspath("../"))
import pandas
import numpy as np
from pathlib import Path
import t4c22
from t4c22.misc.t4c22_logging import t4c_apply_basic_logging_config
from t4c22.t4c22_config import load_basedir
from t4c22.misc.parquet_helpers import load_df_from_parquet
import tqdm
t4c_apply_basic_logging_config(loglevel="DEBUG")
# Load BASEDIR from file, change to your data root.
BASEDIR = load_basedir(fn="t4c22_config.json", pkg=t4c22)
USE_ETA_BASELINE_SNAPSHOTS = False
EXPERIMENT_NAME = 'cc_10_clusters'
NUM_VOLUME_CLUSTERS = 10

In [None]:
def load_train_input(city):
    train_input_frames = []
    for train_input_file in sorted((BASEDIR / 'train' / city / 'input').glob('counters_*.parquet')):
        train_input_frames.append(pandas.read_parquet(train_input_file))
    print(f'Read {len(train_input_frames)} training input files for {city}')
    train_input = pandas.concat(train_input_frames)

    train_input['vol'] = np.array(train_input['volumes_1h'].to_numpy().tolist()).sum(axis=1)
    return train_input

def get_cluster_ids(volume_clusters, df, group_fields):
    df_groups = df.groupby(group_fields).sum()[['vol']]
    df_groups = df_groups.reset_index()
    df_groups['cluster'] = [get_cluster_id(volume_clusters, vol) for vol in df_groups['vol']]
    return df_groups


def load_tests(city):
    test_input = pandas.read_parquet(BASEDIR / 'test' / city / 'input' / 'counters_test.parquet')
    test_input['vol'] = np.array(test_input['volumes_1h'].to_numpy().tolist()).sum(axis=1)
    return test_input
def get_cluster_id(volume_clusters, vol):
    for id, lower_bound, upper_bound in volume_clusters:
        if vol >= lower_bound and vol < upper_bound:
            return id
    return -1


def get_cluster_ids(volume_clusters, df, group_fields):
    df_groups = df.groupby(group_fields).sum()[['vol']]
    df_groups = df_groups.reset_index()
    df_groups['cluster'] = [get_cluster_id(volume_clusters, vol) for vol in df_groups['vol']]
    return df_groups

STATIC_VOLUME_CLUSTERS = {
    1: {
        'london': [(0, -46599.0, 7806420.0)],
        'madrid': [(0, 263688.0, 12361861.0)],
        'melbourne': [(0, 35418.79999999999, 6885818.7)]
    },
    10: {
        'london': [
            (0, -46599.0, 932240.2999999999),
            (1, 932240.2999999999, 1554726.4),
            (2, 1554726.4, 2588921.9000000004),
            (3, 2588921.9000000004, 3921954.0),
            (4, 3921954.0, 4863117.5),
            (5, 4863117.5, 5771968.0),
            (6, 5771968.0, 6005322.7),
            (7, 6005322.7, 6232288.4),
            (8, 6232288.4, 6493853.5),
            (9, 6493853.5, 7806420.0)
        ],
        'madrid': [
            (0, 263688.0, 923316.4),
            (1, 923316.4, 1911206.0),
            (2, 1911206.0, 3344747.900000001),
            (3, 3344747.900000001, 4972842.2),
            (4, 4972842.2, 6422392.0),
            (5, 6422392.0, 7669201.8),
            (6, 7669201.8, 8975374.4),
            (7, 8975374.4, 9762011.8),
            (8, 9762011.8, 10563257.9),
            (9, 10563257.9, 12361861.0)
        ],
        'melbourne': [
            (0, 35418.79999999999, 344195.01666666666),
            (1, 344195.01666666666, 588418.9400000001),
            (2, 588418.9400000001, 1072442.9133333336),
            (3, 1072442.9133333336, 1817113.62),
            (4, 1817113.62, 2663038.05),
            (5, 2663038.05, 3292800.466666667),
            (6, 3292800.466666667, 3936777.729999998),
            (7, 3936777.729999998, 4662487.213333336),
            (8, 4662487.213333336, 5325993.843333334),
            (9, 5325993.843333334, 6885818.7)
        ]
    }
}
if NUM_VOLUME_CLUSTERS in STATIC_VOLUME_CLUSTERS:
    city_volume_clusters = STATIC_VOLUME_CLUSTERS[NUM_VOLUME_CLUSTERS]
else:
    print('Computing volume clusters:')
    city_volume_clusters = {
        'london': find_volume_clusters('london'),
        'madrid': find_volume_clusters('madrid'),
        'melbourne': find_volume_clusters('melbourne')
    }
print(city_volume_clusters)

In [None]:
def create_counts_cc(city):
    snapshot_file = BASEDIR / 'snapshots_cc' / f'cc_volume_cluster_baseline_{EXPERIMENT_NAME}_{city}.parquet'
    if USE_ETA_BASELINE_SNAPSHOTS:
        counts_cc_df = pandas.read_parquet(snapshot_file)
    else:
        train_inputs_df = load_train_input(city)
        print(f'Inputs: {len(train_inputs_df)}')
        cluster_dates_df = get_cluster_ids(city_volume_clusters[city], train_inputs_df, ['day', 't'])
        print(f'Inputs grouped: {len(cluster_dates_df)}')
        
        train_labels_df = load_train_labels(city)
        print(f'Labels: {len(train_labels_df)}')
        train_labels_df = train_labels_df.merge(cluster_dates_df, on=['day', 't'])
        print(f'Labels merged: {len(train_labels_df)}')
        # print(f'Unique supersegments: {len(train_labels_df[["u","v"]].unique())}')
        # train_labels_df = train_labels_df[['u','v' ,'cluster', 'cc']]
        df_grouped = train_labels_df.groupby(['u','v' ,'cluster','cc']).count().reset_index()
        # counts_cc_df = train_labels_df.groupby(['u','v', 'cluster'])
        assert df_grouped["vol"].sum() == len(train_labels_df), (df_grouped["cluster"].sum(), len(train_labels_df))
        counts_cc_df = df_grouped.pivot(index=['u','v' ,'cluster'], columns="cc",values = "vol")

        counts_cc_df = counts_cc_df.fillna(0)
        assert counts_cc_df[1].sum() == len(train_labels_df[train_labels_df["cc"] == 1])
        assert counts_cc_df[2].sum() == len(train_labels_df[train_labels_df["cc"] == 2])
        assert counts_cc_df[3].sum() == len(train_labels_df[train_labels_df["cc"] == 3])
        counts_cc_df["total"] = counts_cc_df[1] + counts_cc_df[2] + counts_cc_df[3]
        counts_cc_df["logit_green"] = counts_cc_df[1] / counts_cc_df["total"]
        counts_cc_df["logit_yellow"] = counts_cc_df[2] / counts_cc_df["total"]
        counts_cc_df["logit_red"] = counts_cc_df[3] / counts_cc_df["total"]
        del counts_cc_df[1]
        del counts_cc_df[2]
        del counts_cc_df[3]

        counts_cc_df = counts_cc_df.reset_index()
        print(f'Median ETAs: {len(counts_cc_df)}')
        snapshot_file.parent.mkdir(exist_ok=True, parents=True)
        counts_cc_df.to_parquet(snapshot_file, compression='snappy')
        
    

In [None]:
for city in ["london","madrid","melbourne"]:
    create_counts_cc(city)

In [None]:
def add_train_cluster(city):
    for train_input_file in sorted((BASEDIR / 'train' / city / 'input').glob('counters_*.parquet')):
        train_input = pandas.read_parquet(train_input_file)
        train_input['vol'] = np.array(train_input['volumes_1h'].to_numpy().tolist()).sum(axis=1)
        cluster_dates_df = get_cluster_ids(city_volume_clusters[city], train_input, ['day', 't'])

        train_input = train_input.merge(cluster_dates_df, on=['day', 't'])
        output_parquet = BASEDIR / 'train' / city / 'cluster_input'
        output_parquet.mkdir(exist_ok=True, parents=True)
        train_input = train_input[['node_id','day','t','volumes_1h','cluster']]
        train_input.to_parquet(output_parquet / str(train_input_file).split('/')[-1], compression="snappy")
def add_test_cluster(city):
    for train_input_file in sorted((BASEDIR / 'test' / city / 'input').glob('counters_*.parquet')):
        train_input = pandas.read_parquet(train_input_file)
        train_input['vol'] = np.array(train_input['volumes_1h'].to_numpy().tolist()).sum(axis=1)
        cluster_dates_df = get_cluster_ids(city_volume_clusters[city], train_input, ['test_idx'])

        train_input = train_input.merge(cluster_dates_df, on=['test_idx'])
        output_parquet = BASEDIR / 'test' / city / 'cluster_input'
        output_parquet.mkdir(exist_ok=True, parents=True)
        train_input = train_input[['node_id','volumes_1h','test_idx','cluster']]
        train_input.to_parquet(output_parquet / str(train_input_file).split('/')[-1], compression="snappy")
for city in ["london","madrid","melbourne"]:
    add_train_cluster(city)
    add_test_cluster(city)
