In [2]:
import os
import pandas as pd
import numpy as np
import dask
import dask.dataframe as dd

In [3]:
from IPython.display import Audio, display

url_done = (
    "https://sound.peal.io/ps/audios/000/000/537/original/woo_vu_luvub_dub_dub.wav"
)
url_done = "https://www.myinstants.com/media/sounds/taco-bell-bong-sfx.mp3"
url_done = "https://www.myinstants.com/media/sounds/magic_immune.mp3"
# url_done="https://www.myinstants.com/media/sounds/tindeck_1.mp3"
# url_done="https://www.myinstants.com/media/sounds/dun_dun_1.mp3"


def allDone():
    display(
        Audio(
            url=url_done,
            autoplay=True,
        )
    )


url_exception = "http://www.wav-sounds.com/movie/austinpowers.wav"
url_exception = "https://www.myinstants.com/media/sounds/roblox-death-sound_1.mp3"


def play_sound(self, etype, value, tb, tb_offset=None):
    self.showtraceback((etype, value, tb), tb_offset=tb_offset)
    display(Audio(url=url_exception, autoplay=True))


get_ipython().set_custom_exc((Exception,), play_sound)

In [4]:
from dask.distributed import Client

client = Client(n_workers=4)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 49875 instead


In [5]:
data_dir = os.path.join(os.getcwd(), "..", "data")

In [6]:
import datetime


def lower_case_sort_columns(df):
    df.columns = df.columns.str.lower()

    df = df[
        [
            "apc_status",
            "arrive_time",
            "data_source",
            "direction",
            "door",
            "dwell",
            "estimated_load",
            "leave_time",
            "lift",
            "location_id",
            "maximum_speed",
            "offs",
            "ons",
            "pattern_distance",
            "route_number",
            "schedule_status",
            "service_date",
            "service_key",
            "stop_time",
            "train",
            "train_mileage",
            "trip_number",
            "vehicle_number",
            "x_coordinate",
            "y_coordinate",
        ]
    ]
    return df


def parse_stop_event(df):

    month_dict = {
        "JAN": "01",
        "FEB": "02",
        "MAR": "03",
        "APR": "04",
        "MAY": "05",
        "JUN": "06",
        "JUL": "07",
        "AUG": "08",
        "SEP": "09",
        "OCT": "10",
        "NOV": "11",
        "DEC": "12",
    }

    #     @lru_cache()
    def parse_date(date_str):
        date = date_str.split(":")[0]
        day = date[:2]
        month = month_dict[date[2:5]]
        year = date[5:]
        return year + "/" + month + "/" + day

    df["service_date"] = df["service_date"].apply(
        parse_date, meta=("service_date", str)
    )

    df["service_date"] = dd.to_datetime(df["service_date"], format="%Y/%m/%d")

    df["day_of_year"] = df["service_date"].dt.day_of_year
    df["day_of_week"] = df["service_date"].dt.day_of_week

    df["arrival_deviance"] = df["stop_time"] - df["arrive_time"]
    df["arrive_deviance_departure_delta"] = (
        df["arrival_deviance"] + df["leave_time"] - df["arrive_time"]
    )

    minutes_per_time_cat = 5
    times = ["stop_time", "arrive_time", "leave_time"]
    time_cats = ["time_cat_" + x for x in times]
    df[time_cats] = df[times] // 60 // minutes_per_time_cat

    return df

In [14]:
stop_events = os.path.join(data_dir, "stop_event")

stop_event_file_names = [
    "2 trimet_stop_event - Fall 2019.csv",
    "2 trimet_stop_event - Mar to Aug 2020.csv",
    "2 trimet_stop_event - Spring 2019.csv",
    "2 trimet_stop_event - Summer 2019.csv",
    "2 trimet_stop_event - Winter 2018.csv",
    "2 trimet_stop_event - Winter 2019-20.csv",
    "trimet_stop_event - Fall 2018 v2.csv",
]

stop_event_dfs = []
for file_name in stop_event_file_names:
    file_name = os.path.join(stop_events, file_name)
    df = dd.read_csv(
        file_name,
        dtype={
            "LOCATION_DISTANCE": "float32",
            "PATTERN_DISTANCE": "float32",
            "TRAIN_MILEAGE": "float32",
            "X_COORDINATE": "float32",
            "Y_COORDINATE": "float32",
        },
    )
    df = lower_case_sort_columns(df)
    df = df[df["route_number"] == 9]
    stop_event_dfs.append(df)

df = dd.concat(stop_event_dfs, axis=0, ignore_index=True)

df = parse_stop_event(df)

times = ["stop_time", "arrive_time", "leave_time"]
time_cats = ["time_cat_" + x for x in times]
categories = [
    "vehicle_number",
    "train",
    "route_number",
    "direction",
    "service_key",
    "location_id",
    "door",
    "lift",
    "apc_status",
    "day_of_year",
    "day_of_week",
    *time_cats,
]
from dask_ml.preprocessing import Categorizer

cat = Categorizer(columns=categories)
df = cat.fit_transform(df)

df = df.sort_values("trip_number")

with pd.option_context("display.max_rows", None, "display.max_columns", None):
    print(df.head())

save_file = os.path.join(data_dir, "mega_stop_event.hdf")
df.to_hdf(save_file, "/df", complevel=1)

allDone()

       apc_status  arrive_time  data_source direction door  dwell  \
202727          G        19379            2         0    0      0   
330011          G        19928            2         0    0      0   
330010          G        19914            2         0    0      0   
330009          G        19890            2         0    0      0   
330008          G        19874            2         0    0      0   

        estimated_load  leave_time lift  location_distance location_id  \
202727               1       19379    0                0.0        4564   
330011               0       19928    0                0.0        4612   
330010               0       19914    0                0.0        4610   
330009               0       19890    0                0.0       13957   
330008               0       19874    0                0.0        4606   

        maximum_speed  offs  ons  pattern_distance route_number  \
202727             32     0    0            1680.0            9   
330011

In [10]:
# extra clean up

save_file = os.path.join(data_dir, "mega_stop_event.hdf")
df = dd.read_hdf(save_file, "/df")

df = df.drop(columns=["location_distance", "delta_train_mileage", "delta_pattern_distance"])

print(df.head(10000).describe())
# print(df.shape[0].compute())
with pd.option_context("display.max_rows", None, "display.max_columns", None):
    print(df.head())

save_file = os.path.join(data_dir, "mega_stop_event_less_columns.hdf")
df.to_hdf(save_file, "/df", append=False, complevel=1)

allDone()

        arrive_time   data_source         dwell  estimated_load    leave_time  \
count  10000.000000  10000.000000  10000.000000    10000.000000  10000.000000   
mean   18927.180300      1.997700      3.968400        4.047400  18935.175400   
std     2113.901249      0.047906     17.070429        3.872861   2112.815768   
min    14486.000000      1.000000      0.000000        0.000000  14486.000000   
25%    16853.250000      2.000000      0.000000        1.000000  16857.000000   
50%    19716.000000      2.000000      0.000000        3.000000  19720.500000   
75%    20404.000000      2.000000      0.000000        6.000000  20408.000000   
max    22716.000000      2.000000    645.000000       23.000000  22716.000000   

       maximum_speed          offs           ons  pattern_distance  \
count   10000.000000  10000.000000  10000.000000      10000.000000   
mean       26.935200      0.181500      0.190900      24188.015625   
std         6.114372      0.758298      0.556856      18117.