# Setup

In [1]:
import pandas as pd
import numpy as np
import warnings

from io import StringIO
from calitp.tables import tbl
from datetime import date, datetime
from siuba import *
from plotnine import *

def friendly_date(x): 
    return datetime.strptime(x, "%Y-%m-%d").strftime("%b %d")

E0614 22:15:25.836378840      16 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0614 22:15:25.846955507      16 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


In [2]:
DIR_PATH="2022/05/4"

#CALITP_ITP_ID = 10
#CALITP_ITP_ID=45
#CALITP_ITP_ID=269
#CALITP_ITP_ID=108
#CALITP_ITP_ID=106
CALITP_ITP_ID=350
CALITP_URL_NUMBER = 0
DEBUG = True

DATE_START = "2022-05-01"
DATE_END = "2022-05-31"
PUBLISH_DATE = "2022-06-01"

CHANGE_PLOT_BREAKS = ["Added", "Removed", "Unchanged"]
CHANGE_PLOT_VALUES = ["#51BF9D","#E16B26", "#8CBCCB"]

In [3]:
DATE_TODAY=date.today()
START_MONTH_DAY = friendly_date(DATE_START)
END_MONTH_DAY = friendly_date(DATE_END)

WEEK_MARKERS = pd.date_range(DATE_START, DATE_END, freq="W").astype(str).tolist()
BIWEEKLY_MARKERS = pd.date_range(DATE_START, DATE_END, freq="2W").astype(str).tolist()

In [4]:
if not DEBUG:
    warnings.filterwarnings("ignore")

In [5]:
# Convenience functions ----

filter_start = filter(
    _.calitp_extracted_at <= DATE_START,
    _.calitp_deleted_at.fillna("2099-01-01") > DATE_START,
)

filter_end = filter(
    _.calitp_extracted_at <= DATE_END,
    _.calitp_deleted_at.fillna("2099-01-01") > DATE_END,
)

filter_itp = filter(
    _.calitp_itp_id == CALITP_ITP_ID, _.calitp_url_number == CALITP_URL_NUMBER
)

def collect_to_dict(df):
    """Return the first row of a DataFrame as a dictionary.
    
    If there are no rows, then all dictionary values are None.
    """
    
    result = collect(df)
    
    if len(result) == 0:
        return {k: None for k in result.columns}
    # elif len(result) > 1:
    #     raise ValueError("table contained more than 1 row")
    else:
        return result.iloc[0,:].to_dict()


select_rm_calitp = select(
    -_.calitp_itp_id,
    -_.calitp_url_number,
    -_.calitp_hash,
    -_.calitp_extracted_at,
    -_.calitp_deleted_at,
)

def percent_format(labels):
    return ["{:.0f}%".format(v*100) for v in labels]

def query_id_changes(start_table, end_table, id_vars):
    """Calculate id variables that are removed, added, or unchanged between tables.
    
    It works by adding a special column to each table, performing a full join,
    then checking where the special column is null.
    """
    sym_id_vars = [_[k] for k in id_vars]

    is_in_start = start_table >> select(*id_vars) >> mutate(is_in_start=True)
    is_in_end = end_table >> select(*id_vars) >> mutate(is_in_end=True)

    baseline = start_table >> count(*id_vars) >> rename(n_baseline="n")
    tallies = (
        is_in_start
        >> full_join(_, is_in_end, id_vars)
        >> count(*sym_id_vars, _.is_in_start, _.is_in_end)
        >> mutate(
            status=case_when(
                _,
                {
                    _.is_in_end.isna(): "Removed",
                    _.is_in_start.isna(): "Added",
                    True: "Unchanged",
                },
            )
        )
        >> count(*sym_id_vars[:-1], _.status)
        >> group_by(*sym_id_vars[:-1])
        >> mutate(percent=_.n / _.n.sum())
    )

    return tallies

In [41]:
# Data ====
# 1. High level feed info ----

## start/end dates now dt.date, need to format downstream?...
## collect here for 1 less query, small table after all
tbl_dim_feeds = (tbl.views.gtfs_schedule_dim_feeds()
     >> filter_end
     >> filter_itp
     >> collect()
    )

status = (
    tbl_dim_feeds   
    >> select(_.calitp_agency_name, _.raw_gtfs_schedule_url)
    >> pipe(collect_to_dict)
    )

feed_info = (tbl_dim_feeds
    >> select(_.feed_publisher_name, _.feed_publisher_url, _.feed_lang,
           _.default_lang, _.feed_start_date, _.feed_end_date, _.feed_version,
           _.feed_contact_email, _.feed_contact_url)
    >> pipe(collect_to_dict)
    )

## specific date makes more sense (end of month, maintain consistency)
routes_on_date = tbl.views.gtfs_schedule_fact_daily_feed_routes() >> filter(_.date == BIWEEKLY_MARKERS[-1])
tbl_routes = (tbl.views.gtfs_schedule_dim_routes() 
            >> inner_join(_, routes_on_date, on = 'route_key')
            >> filter_itp 
        )
_n_routes = tbl_routes >> count() >> collect()

stops_on_date = tbl.views.gtfs_schedule_fact_daily_feed_stops() >> filter(_.date == BIWEEKLY_MARKERS[-1])
tbl_stops = (tbl.views.gtfs_schedule_dim_stops() 
            >> inner_join(_, stops_on_date, on = 'stop_key')
            >> filter_itp 
        )
_n_stops = tbl_stops >> count() >> collect()

feed_info["n_routes"] = int(_n_routes.loc[0, "n"])
feed_info["n_stops"] = int(_n_stops.loc[0, "n"])

In [46]:
# 2. Monthly metrics ----
# Service hours per day. Note that the queried table calculates service
# hours per service id, so we need to sum across service ids for the day
_cross_cal = (
    tbl.views.dim_date()
    >> filter(_.full_date.between(DATE_START, DATE_END))
    >> select(_.service_date == _.full_date)
)

## IN PROGRESS move to warehouse (goin' well)
tbl_daily_service_hours = (
    tbl.views.gtfs_schedule_fact_daily_service()
    >> filter_itp
    >> filter(_.service_date.between(DATE_START, DATE_END))
    >> right_join(_, _cross_cal, ["service_date"])
    >> collect()
    >> group_by(_.service_date)
    >> summarize(
        ttl_service_hours=(_.last_arrival_ts.max() - _.first_departure_ts.min()) / 3600,
        ttl_service_hours2=_.ttl_service_hours.sum(),
    )
    >> mutate(
        ttl_service_hours=_.ttl_service_hours.astype(float).round(2),
        service_date=_.service_date.astype("datetime64[ns]"),
    )
)

# number of days where a feed did not have any trips in service
n_days_no_service = (
    tbl_daily_service_hours
    >> filter(_.ttl_service_hours.isna())
    >> pipe(lambda d: {"n": d.shape[0]})
)

# 3. Stop and Route ID Changes ----

query_id_changes = (
    # note that we remove these ids from the base fact table, since they are accessible via the
    # join below, and will likely be removed from the table in the future.
    select(-_.calitp_itp_id, -_.calitp_url_number)
    >> filter(_.metric_period == "month")
    >> inner_join(
        _,
        tbl.views.gtfs_schedule_dim_feeds() >> select(_.feed_key, _.calitp_itp_id, _.calitp_url_number),
        "feed_key"
    )
    >> filter_itp 
    # note that metric dates for calendar type roll-ups (e.g. month, quarter) is the first day of the next
    # period (e.g. metric date June 1st rolls up May).
    >> filter(_.metric_date == PUBLISH_DATE)
    >> rename(status = "change_status")
    >> mutate(percent = _.n / _.n.sum())
    >> collect()
)

tbl_stops_changed = tbl.views.gtfs_schedule_fact_stop_id_changes() >> query_id_changes
tbl_routes_changed = tbl.views.gtfs_schedule_fact_route_id_changes() >> query_id_changes

In [48]:
tbl_stops_changed

Unnamed: 0,feed_key,metric_period,metric_date,status,n,calitp_url_number,calitp_itp_id,percent
0,-637281990126960778,month,2022-06-01,Unchanged,159,0,350,1.0


In [50]:
tbl.views.gtfs_schedule_fact_daily_service()

Unnamed: 0,feed_key,calitp_itp_id,calitp_url_number,service_date,service_id,ttl_service_hours,n_trips,n_routes,first_departure_ts,last_arrival_ts,service_window
0,-1950217576129328381,177,0,2022-04-07,c_17267_b_20618_d_15,71.5,90,3,21600,76980,15.383333
1,-5399973046735831493,99,0,2021-12-13,c_1992_b_33281_d_127,24.0,64,1,25200,65400,11.166667
2,698190465679356930,343,1,2022-07-09,c_23877_b_33141_d_112,0.4,1,1,63000,64440,0.4
3,6055868810463182223,208,0,2021-10-10,023-Sunday-11-202103-1111111-,26.5,15,1,23700,80340,15.733333
4,2163366114810222902,190,0,2021-12-23,c_1992_b_33281_d_127,24.0,64,1,25200,65400,11.166667


In [49]:
tbl_daily_service_hours

Unnamed: 0,service_date,ttl_service_hours,ttl_service_hours2
0,2022-05-01,12.15,45.3
1,2022-05-02,18.05,103.166667
2,2022-05-03,18.05,103.166667
3,2022-05-04,18.05,103.166667
4,2022-05-05,18.05,103.166667
5,2022-05-06,18.05,103.166667
6,2022-05-07,12.52,46.116667
7,2022-05-08,12.15,45.3
8,2022-05-09,18.05,103.166667
9,2022-05-10,18.05,103.166667


In [47]:
tbl.views.gtfs_schedule_fact_stop_id_changes()

Unnamed: 0,feed_key,calitp_itp_id,calitp_url_number,metric_period,metric_date,change_status,n
0,8446634525599948032,61,1,quarter,2022-01-01,Added,1497
1,1317138533117163520,110,0,quarter,2021-10-01,Added,1
2,1317138533117163520,110,0,quarter,2021-10-01,Unchanged,254
3,-9079380371442109952,101,0,quarter,2022-04-01,Unchanged,124
4,-6382563470680836095,172,0,quarter,2021-10-01,Unchanged,909


In [None]:
# 4. Feed files being checked for ----

file_categories = pd.DataFrame(
    {
        "shapes.txt": "Visual display",
        "pathways.txt": "Navigation",
        "levels.txt": "Navigation",
        "fare_rules.txt": "Fares",
        "fare_leg_rules.txt": "Fares",
        "feed_info.txt": "Technical contacts",
    }.items(),
    columns=["name", "category"],
)

importance = ["Visual display", "Navigation", "Fares", "Technical contacts"]

tbl_file_check = (
    tbl.gtfs_schedule_history.calitp_files_updates()
    >> filter_itp
    >> filter(_.calitp_extracted_at.isin(BIWEEKLY_MARKERS))
    >> select(_.name, _.calitp_extracted_at)
    >> collect()
    >> right_join(_, file_categories, ["name"])
    >> mutate(
        calitp_extracted_at=_.calitp_extracted_at.fillna("missing").astype(str),
        success="✅",
    )
    >> spread(_.calitp_extracted_at, _.success)
    >> select(-_.missing)
    >> arrange(_.category.apply(importance.index))
    >> select(_.category, _.contains(""))
    >> pipe(_.fillna(""))
)

# Analyze validation notices ----

from sqlalchemy.sql import func

tbl_biweekly = (
    tbl.views.dim_date()
    >> filter(_.full_date.isin(BIWEEKLY_MARKERS))
    >> select(_.date == _.full_date)
)

tbl_validation_notices_raw = (
    tbl.gtfs_schedule_type2.validation_notices()
    >> filter_itp
    >> inner_join(
        _,
        tbl_biweekly,
        sql_on = (lambda lhs, rhs:
                  (lhs.calitp_extracted_at <= rhs.date) &
                  (func.coalesce(lhs.calitp_deleted_at, "2099-01-01") > rhs.date)
        )
    )
    # do a simple count of the number of notices on each date
    >> select(_.date, _.code, _.severity)
    >> count(_.date, _.code, _.severity)
    >> collect()
)

In [None]:
# create a DataFrame with correct columns and no rows
_validation_empty = pd.DataFrame(
    dict([("code", []), ("severity", []), *zip(BIWEEKLY_MARKERS, [tuple()] * len(BIWEEKLY_MARKERS))]),
)

# spread validation notices so dates are columns, and counts are values
# TODO: when we spread the dates out, the counts sometimes can become floats,
# so have decimals, which might look funky. We can fix this by either converting
# the counts to strings before the code below
if not tbl_validation_notices_raw.shape[0]:
    _tbl_validation_notices = _validation_empty
else:
    out_wide = (
        tbl_validation_notices_raw >> mutate(date=_.date.astype(str)) >> spread(_.date, _.n)
    )
    
    _tbl_validation_notices = pd.concat(
        [_validation_empty, out_wide], ignore_index=True
    ).fillna("")

In [None]:
# create a DataFrame with correct columns and no rows
_validation_severity_empty = pd.DataFrame(
    dict([("code", []), ("severity", []), *zip(BIWEEKLY_MARKERS, [tuple()] * len(BIWEEKLY_MARKERS))]),
)

# spread validation notices so dates are columns, and counts are values
# TODO: when we spread the dates out, the counts sometimes can become floats,
# so have decimals, which might look funky. We can fix this by either converting
# the counts to strings before the code below
if not tbl_validation_notices_raw.shape[0]:
    _tbl_validation_severity = _validation_severity_empty
else:
    out_wide = (
        tbl_validation_notices_raw >> mutate(date=_.date.astype(str)) >> spread(_.date, _.n)
    )
    
    _tbl_validation_severity = pd.concat(
        [_validation_severity_empty, out_wide], ignore_index=True
    ).fillna("")

In [None]:
tbl_code_descriptions = (
    tbl.views.validation_code_descriptions()
    >> select(_.code, _.human_readable_description)
    >> collect()
    # convert code to snakecase,
    # note that this currently screws up cases like IO -> i_o
    >> mutate(code = _.code.str.replace(r'(?<!^)(?=[A-Z])', '_').str.lower().str.replace("_notice$", ""))
)

tbl_validation_notices_unfiltered = (
    _tbl_validation_notices 
    >> inner_join(_, tbl_code_descriptions, ["code"])
    >> select(_.code, _.human_readable_description, _.contains(""))
    #>> rename(error_name = _.code, error_description=_.human_readable_description)
)

tbl_validation_severity = (
    _tbl_validation_severity
    >> inner_join(_, tbl_code_descriptions, ["code"])
    >> select(_.code, _.human_readable_description, _.severity)
)

# hackily remove float decimal values by casting to string and removing ".0"
# see https://stackoverflow.com/a/68693516/269834
for col in tbl_validation_notices_unfiltered.columns[2:]:
    tbl_validation_notices_unfiltered[col] = tbl_validation_notices_unfiltered[col].astype(str).apply(lambda x: x.replace('.0',''))

In [None]:
#filter tbl_validation for severity

tbl_validation_notices = (
    tbl_validation_notices_unfiltered
    >> select(-_.severity)
)

## Dump data

In [None]:
# Note that everything is saved in this cell, except for plots,
# which are saved in the cell they're defined in

import json
import shutil

from pathlib import Path

def to_rowspan_table(df, span_col):
    d = df.to_dict(orient="split")
    row_span = df.groupby(span_col)[span_col].transform("count")
    not_first = df[span_col].duplicated()
    
    row_span[not_first] = 0
    
    d["rowspan"] = row_span.tolist()
    return d


out_dir = Path(f"outputs/{DIR_PATH}/data")
out_dir.mkdir(parents=True, exist_ok=True)


json.dump(feed_info, open(out_dir / "1_feed_info.json", "w"))
json.dump(status, open(out_dir / "1_status.json", "w"))

tbl_daily_service_hours.to_json(
    out_dir / "2_daily_service_hours.json", orient="records"
)
json.dump(n_days_no_service, open(out_dir / "2_n_days_no_service.json", "w"))

tbl_stops_changed.to_json(out_dir / "3_stops_changed.json", orient="records")
tbl_routes_changed.to_json(out_dir / "3_routes_changed.json", orient="records")

json.dump(to_rowspan_table(tbl_file_check, "category"), open(out_dir / "4_file_check.json", "w"))
tbl_validation_severity.to_json(out_dir / "4_validation_severity.json", orient="split")
tbl_validation_notices.to_json(out_dir / "5_validation_notices.json", orient="split")

# Monthly GTFS Quality Report

In [None]:
from IPython.display import Markdown

Markdown(f"""
Transit provider name: {feed_info["feed_publisher_name"]}

Date generated: {DATE_TODAY}
""")

This is a monthly report, generated by the California Integrated Travel Project ([Cal-ITP](https://dot.ca.gov/cal-itp/cal-itp-gtfs)), summarizing issues discovered by [MobilityData](http://mobilitydata.io/)’s [GTFS Validator](https://github.com/MobilityData/gtfs-validator). This report is available for viewing by the general public to support continuous improvement of GTFS data and the experience of transit passengers. 

## Overview

In [None]:
Markdown(f"""
Feed location: {status["raw_gtfs_schedule_url"]}

Metrics for the most recent published version of the feed:

* Date published: {feed_info["feed_version"]}
* Number of routes in any service: {feed_info["n_routes"]}
* Number of stops in service: {feed_info["n_stops"]}
""")

## Aggregated Metrics for May

In [None]:
# TODO: 

# Markdown(f"""
# Days when the active feed was expired: {n_days_no_service["n"]}
# """)

Markdown(f"""
Days with no service hours: {n_days_no_service["n"]}
""")


In [None]:
p = (
    tbl_daily_service_hours
    >> ggplot(aes("service_date", "ttl_service_hours2"))
    + geom_line()
    + geom_point()
    + theme_minimal(base_size=16)    
    + theme(axis_text_x=element_text(angle=45, hjust=1))
    + scale_x_datetime(date_breaks="1 week")
    + expand_limits(y=0)
    + labs(
        y = "Total service hours",
        x = "Service date",
        #title="Service hours per day"
    )
)

p.save(out_dir / "2_service_hours.png", width=4, height=4, dpi=300)
#p.save(out_dir / "2_service_hours.png")

p.draw();

## Changes Since Previous Month

In [None]:
from siuba.dply.forcats import fct_rev

p = (
    pd.concat(
        [
            tbl_stops_changed >> mutate(kind="Stops"),
            tbl_routes_changed >> mutate(kind="Routes"),
        ]
    )
    >> mutate(status = fct_rev(_.status))
    >> ggplot(aes("kind", "n", fill="status"))
    + geom_col()
    + scale_fill_manual(limits = CHANGE_PLOT_BREAKS, values = CHANGE_PLOT_VALUES)
    + labs(
        x="GTFS schedule table",
        y="Number of IDs",
        title=f"IDs Changed Between {START_MONTH_DAY} and {END_MONTH_DAY}",
    )
    + theme_minimal()
        + theme(legend_position="bottom", legend_margin=30)
)


#p.save(out_dir / "3_id_changes.png")

p.draw();

### (Alternative version using percentages)

In [None]:
p = (
    pd.concat(
        [
            tbl_stops_changed >> mutate(kind="Stops"),
            tbl_routes_changed >> mutate(kind="Routes"),
        ]
    )
    >> mutate(status = fct_rev(_.status))    
    >> ggplot(aes("kind", "percent", fill="status"))
    + geom_col()
    + scale_fill_manual(limits = CHANGE_PLOT_BREAKS, values = CHANGE_PLOT_VALUES)
    + labs(
        x="GTFS schedule table",
        y="Percentage of IDs",
        #title=f"IDs Changed Between {START_MONTH_DAY} and {END_MONTH_DAY}",
    )
    + scale_y_continuous(labels=percent_format, breaks=np.arange(0, 1.2, 0.2))
    + theme_minimal(base_size=16)
    + theme(legend_position="bottom", legend_margin=30)
)

p.save(out_dir / "3_id_changes.png", width=4, height=4, dpi=300)
p.draw();

## Consistency with the [California GTFS Minimum Guidelines](https://dot.ca.gov/cal-itp/california-minimum-general-transit-feed-specification-gtfs-guidelines) for the feed downloaded


### Do the following files/fields exist?

In [None]:
tbl_file_check

### Validation Errors Observed

In [None]:
tbl_validation_severity

In [None]:
if tbl_validation_severity.shape[0] == 0:
    display(Markdown("No validation error observed in your feed."))
else:    
    display(tbl_validation_severity)

### Cal-ITP Validation Errors Observed

In [None]:
tbl_validation_notices

In [None]:
if tbl_validation_notices.shape[0] == 0:
    display(Markdown("No validation error observed in your feed."))
else:    
    display(tbl_validation_notices)

For more information about Cal-ITP, including the [Minimum GTFS Guidelines](https://dot.ca.gov/cal-itp/california-minimum-general-transit-feed-specification-gtfs-guidelines) and our [Transit Data Helpdesk](https://dot.ca.gov/programs/rail-and-mass-transportation/gtfs/helpdesk), contact [GTFSRT@dot.ca.gov](mailto:GTFSRT@dot.ca.gov).