In [1]:
from models.s3 import S3Client
from models.availability_model_trainer import AvailabilityModelTrainer
from models.availability_model_trainer import FEATURES_ORDER

from io import BytesIO
import joblib
import duckdb
import pyarrow.dataset as ds
import pandas as pd
import tempfile
import os

2023-02-19 20:27:58,415 - youconfigme.youconfigme - INFO - searching for config on /home/dml/proyectos/bicisba/research/models/settings.ini
2023-02-19 20:27:58,416 - youconfigme.youconfigme - INFO - searching for config on /home/dml/proyectos/bicisba/research/settings.ini


In [2]:
s3_cli = S3Client()

In [4]:
CURRENT_DATE = "2022/09/10"
TRAINING_PERIOD = 21
DATES = pd.date_range(end=CURRENT_DATE, periods=TRAINING_PERIOD)

In [5]:
"silver/status/"+DATES[-1].strftime('year=%Y/month=%-m/day=%-d')

'silver/status/year=2022/month=9/day=10'

In [6]:
temp_dir = tempfile.TemporaryDirectory()

for day in DATES:
    day_keys= s3_cli.client.Bucket("frame").objects.filter(Prefix="silver/status/"+day.strftime('year=%Y/month=%-m/day=%-d'))
    for parquet_object in day_keys:
        parquet_temp_path = temp_dir.name + "/" + parquet_object.key
        os.makedirs(os.path.dirname(parquet_temp_path), exist_ok = True)
        s3_cli.client.Bucket("frame").download_file(Key=parquet_object.key, Filename=parquet_temp_path)




In [7]:
dataset = ds.dataset(temp_dir.name + "/silver/status", format="parquet", partitioning="hive")
con = duckdb.connect()
con = con.register("status", dataset)

In [10]:
station_ids = con.execute("select distinct(station_id) from status").df()["station_id"].values
len(station_ids)

333

In [8]:
for station_id in station_ids[:1]:
    dfs_to_concat = []
    for i in range(1,16):
        auxdf = con.execute(
            f"""
            select
                hour,
                dayofweek(make_timestamp(year, month, day, hour, minute, 0.0)) as dow,
                num_bikes_available,
                num_bikes_disabled,
                num_docks_available,
                num_docks_disabled,
                minute(lead(make_timestamp(year, month, day, hour, minute, 0.0), {i}) over (
                    partition by station_id
                    order by make_timestamp(year, month, day, hour, minute, 0.0) asc
                ) - make_timestamp(year, month, day, hour, minute, 0.0))  as minutes_bt_check,
                lead(num_bikes_available, {i}) over (
                    partition by station_id
                    order by make_timestamp(year, month, day, hour, minute, 0.0) asc
                ) as bikes_available,
            from
                status
            where
                station_id = {station_id} and
                status = 'IN_SERVICE'
            """
        ).df()
        dfs_to_concat.append(auxdf)
    mins_df = pd.concat(dfs_to_concat)
    del dfs_to_concat
    mins_df["bikes_a"] = (mins_df["bikes_available"]>0).astype(int)

In [9]:
for station_id in station_ids[:1]:
    dfs_to_concat = []
    for i in range(1, 17, 3):
        auxdf = con.execute(
            f"""
            select
                hour,
                dayofweek(make_timestamp(year, month, day, hour, minute, 0.0)) as dow,
                num_bikes_available,
                num_bikes_disabled,
                num_docks_available,
                num_docks_disabled,
                minute(lead(make_timestamp(year, month, day, hour, minute, 0.0), {i}) over (
                    partition by station_id
                    order by make_timestamp(year, month, day, hour, minute, 0.0) asc
                ) - make_timestamp(year, month, day, hour, minute, 0.0))  as minutes_bt_check,
                lead(num_bikes_available, {i}) over (
                    partition by station_id
                    order by make_timestamp(year, month, day, hour, minute, 0.0) asc
                ) as bikes_available,
            from
                status
            where
                station_id = {station_id} and
                status = 'IN_SERVICE'
            """
        ).df()
        dfs_to_concat.append(auxdf)
    mins_df = pd.concat(dfs_to_concat)
    del dfs_to_concat
    mins_df["bikes_a"] = (mins_df["bikes_available"]>0).astype(int)

In [10]:
for station_id in station_ids[:1]:
    df_query = " union ".join([
    f"""
    select
        station_id,
        hour,
        dayofweek(make_timestamp(year, month, day, hour, minute, 0.0)) as dow,
        num_bikes_available,
        num_bikes_disabled,
        num_docks_available,
        num_docks_disabled,
        minute(lead(make_timestamp(year, month, day, hour, minute, 0.0), {i}) over (
            order by make_timestamp(year, month, day, hour, minute, 0.0) asc
        ) - make_timestamp(year, month, day, hour, minute, 0.0))  as minutes_bt_check,
        lead(num_bikes_available, {i}) over (
            order by make_timestamp(year, month, day, hour, minute, 0.0) asc
        ) as bikes_available,
    from
        status
    where
        station_id = {station_id} and
        status = 'IN_SERVICE'
    """ for i in range(1, 17, 3)])
    mins_df2 = con.execute(df_query).df()
    mins_df2["bikes_a"] = (mins_df2["bikes_available"]>0).astype(int)

In [11]:
for station_id in station_ids[:1]:
    df_query = f"""
    WITH base_status AS (select
        station_id,
        hour,
        num_bikes_available,
        num_bikes_disabled,
        num_docks_available,
        num_docks_disabled,
        status,
        make_timestamp(year, month, day, hour, minute, 0.0) as ts,
    from
        status
    where
        station_id = {station_id} and
        status = 'IN_SERVICE')"""
    df_query += " union ".join([
    f"""
    select
        station_id,
        hour,
        dayofweek(ts) as dow,
        num_bikes_available,
        num_bikes_disabled,
        num_docks_available,
        num_docks_disabled,
        minute(lead(ts, {i}) over (
            order by ts asc
        ) - ts)  as minutes_bt_check,
        lead(num_bikes_available, {i}) over (
            order by ts asc
        ) as bikes_available,
    from
        base_status
    """ for i in range(1, 17, 3)])
    mins_df3 = con.execute(df_query).df()
    mins_df3["bikes_a"] = (mins_df3["bikes_available"]>0).astype(int)

In [12]:
avail_model_trainer = AvailabilityModelTrainer()
avail_model_trainer.train_station(station_id, mins_df3.dropna())

In [13]:
avail_model_trainer.dump_stations_pipelines(DATES[-1])



In [14]:
s3_cli.client.Bucket("frame").Object("models/current_availability_model.txt").get()['Body'].read().decode()



'models/year=2022/month=09/10_availability.joblib'

In [15]:
with BytesIO() as mem_f:
    s3_cli.client.Bucket("frame").download_fileobj(Key="models/"+ DATES[-1].strftime('year=%Y/month=%m/%d') +"_availability.joblib", Fileobj=mem_f)
    mem_f.seek(0)
    loaded_model = joblib.load(mem_f)



In [16]:
loaded_model[station_ids[0]].predict_proba(mins_df[:3][FEATURES_ORDER].values)

array([[0., 1.],
       [0., 1.],
       [0., 1.]])

In [12]:
temp_dir.cleanup()

# Whole pipeline

In [None]:
avail_model_trainer = AvailabilityModelTrainer()
dataset_df = avail_model_trainer.create_dataset(CURRENT_DATE)

In [None]:
avail_model_trainer.train_all_stations(dataset_df)

In [None]:
avail_model_trainer.dump_stations_pipelines(DATES[-1], current=True)