In [8]:
import pickle
from collections import OrderedDict
from datetime import datetime, timedelta
from itertools import product

import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd

from py.consts import HOURS_OF_DAY, MONTHS_OF_YEAR, SIDES, DAYS_OF_WEEK

In [9]:
df = pd.read_parquet(f"../data/flitsers.parquet")
df.datum = pd.to_datetime(df.datum)
all_roads = df.wegnummer.unique()
all_years = df.datum.dt.year.unique()

In [10]:
# create prediction df


tomorrow = datetime.now() + timedelta(days=1)
tomorrow_day = tomorrow.strftime("%A")
tomorrow_month = tomorrow.strftime("%B")
tomorrow_year = tomorrow.strftime("%Y")



all_start_stop_hour_combinations = [(start, stop) for start in HOURS_OF_DAY for stop in HOURS_OF_DAY if start < stop]

data = OrderedDict({
    "zijde": SIDES,
    # "year": all_years,
    # "month": MONTHS_OF_YEAR,
    # "day": DAYS_OF_WEEK,
    "wegnummer": all_roads,
    "start_stop_hours": all_start_stop_hour_combinations
})

cartesian_product = list(product(*data.values()))
pred_tomorrow = pd.DataFrame.from_records(data=cartesian_product, columns=data.keys())


# def compute_cartesian_chunk(chunk_size: int, list1_chunk, list2_chunk, list3_chunk, list4_chunk, list5_chunk, list6_chunk: list):
#     cartesian_product_chunk = list(product(list1_chunk, list2_chunk, list3_chunk, list4_chunk, list5_chunk, list6_chunk))
#     return pd.DataFrame(cartesian_product_chunk, columns=data.keys())
# 
# # Set the chunk size (adjust this based on available memory)
# chunk_size = 2
# 
# # Partition each list into chunks
# list1_partitions = [data["zijde"][i:i+chunk_size] for i in range(0, len(data["zijde"]), chunk_size)]
# list2_partitions = [data["year"] for _ in range(len(list1_partitions))]  # Keep list2 unchanged
# list3_partitions = [data["month"] for _ in range(len(list1_partitions))]  # Keep list3 unchanged
# list4_partitions = [data["day"] for _ in range(len(list1_partitions))]  # Keep list4 unchanged
# list5_partitions = [data["wegnummer"] for _ in range(len(list1_partitions))]  # Keep list5 unchanged
# list6_partitions = [data["start_stop_hours"] for _ in range(len(list1_partitions))]  # Keep list6 unchanged
# 
# # Create Dask delayed objects for each chunk of the Cartesian product
# delayed_results = [dask.delayed(compute_cartesian_chunk)(chunk_size, list1_chunk, list2_chunk, list3_chunk, list4_chunk, list5_chunk, list6_chunk)
#                    for list1_chunk, list2_chunk, list3_chunk, list4_chunk, list5_chunk, list6_chunk in zip(list1_partitions, list2_partitions, list3_partitions, list4_partitions, list5_partitions, list6_partitions)]
# 
# # Compute the Cartesian product in parallel while avoiding memory issues
# results = dask.compute(*delayed_results)
# 
# # Concatenate the results into a single Dask DataFrame
# pred_tomorrow = dd.from_pandas(pd.concat(results, ignore_index=True), npartitions=1)

print(pred_tomorrow.shape)
print(pred_tomorrow.head())

(483552, 3)
   zijde wegnummer start_stop_hours
0  links      N321           (0, 1)
1  links      N321           (0, 2)
2  links      N321           (0, 3)
3  links      N321           (0, 4)
4  links      N321           (0, 5)


In [11]:
pred_tomorrow["start_hour"], pred_tomorrow["stop_hour"] = zip(*pred_tomorrow.start_stop_hours)
pred_tomorrow = pred_tomorrow.drop("start_stop_hours", axis=1)

In [12]:
from py.utils import one_hot_encode_all


ddf_pred_tomorrow = dd.from_pandas(pred_tomorrow, npartitions=3)
ddf_pred_tomorrow = one_hot_encode_all(ddf=ddf_pred_tomorrow, columns=["zijde", "wegnummer", "start_hour", "stop_hour"], dtype=int)

ddf_pred_tomorrow.head()

Unnamed: 0,zijde_links,zijde_rechts,wegnummer_A10,wegnummer_A12,wegnummer_A13,wegnummer_A15,wegnummer_A16,wegnummer_A17,wegnummer_A18,wegnummer_A2,...,stop_hour_14,stop_hour_15,stop_hour_16,stop_hour_17,stop_hour_18,stop_hour_19,stop_hour_20,stop_hour_21,stop_hour_22,stop_hour_23
0,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [13]:
# for year in all_years:
#     y = 1 if year == tomorrow_year else 0
#     ddf_pred_tomorrow[f"year_{year}"] = y
    
for month in MONTHS_OF_YEAR:
    y = 1 if month == tomorrow_month else 0
    ddf_pred_tomorrow[f"month_{month}"] = y
    
for day in DAYS_OF_WEEK:
    y = 1 if day == tomorrow_day else 0
    ddf_pred_tomorrow[f"day_{day}"] = y

ddf_pred_tomorrow.head()

Unnamed: 0,zijde_links,zijde_rechts,wegnummer_A10,wegnummer_A12,wegnummer_A13,wegnummer_A15,wegnummer_A16,wegnummer_A17,wegnummer_A18,wegnummer_A2,...,month_October,month_November,month_December,day_Monday,day_Tuesday,day_Wednesday,day_Thursday,day_Friday,day_Saturday,day_Sunday
0,1,0,0,0,0,0,0,0,0,0,...,1,0,0,0,0,1,0,0,0,0
1,1,0,0,0,0,0,0,0,0,0,...,1,0,0,0,0,1,0,0,0,0
2,1,0,0,0,0,0,0,0,0,0,...,1,0,0,0,0,1,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,1,0,0,0,0,1,0,0,0,0
4,1,0,0,0,0,0,0,0,0,0,...,1,0,0,0,0,1,0,0,0,0


In [14]:
ddf_pred_tomorrow.to_parquet(f"../data/prediction_preset.parquet")