# Postprocessing Smoke Test

This notebook tests the revised version of the unacceptable gaps table updater.

In [176]:
import psycopg2
import datetime
import pytz
import pathlib
import configparser
import numpy as np
import pandas as pd
from plotly import graph_objs as go
from ipywidgets import interact

import intersection_tmc_notebook03test as itmc

config = configparser.ConfigParser()
config.read(pathlib.Path.home().joinpath('.charlesconfig').as_posix())
postgres_settings = config['POSTGRES']

In [177]:
local_tz = pytz.timezone('US/Eastern')

## Smoke Test

Both the old and new versions of the updater insert rows into `miovision_api.unacceptable_gaps(intersection_uid, gap_start, gap_end, gap_minute, allowed_gap, accept)`.

To test this, generated two tables - `czhu.unacceptable_gaps_old` and `czhu.unacceptable_gaps_new`, then populated the former using a version of the old script and the latter with a version of the new. Our time interval of interest is 2020-02-01 to 2020-04-01.

In [178]:
sql_query = """CREATE TABLE czhu.unacceptable_gaps_old
(
    intersection_uid integer,
    gap_start timestamp without time zone,
    gap_end timestamp without time zone,
    gap_minute integer,
    allowed_gap integer,
    accept boolean,
    CONSTRAINT unacceptable_gaps_old_key UNIQUE (intersection_uid, gap_start, gap_end)
);

CREATE TABLE czhu.unacceptable_gaps_new
(
    intersection_uid integer,
    gap_start timestamp without time zone,
    gap_end timestamp without time zone,
    gap_minute integer,
    allowed_gap integer,
    accept boolean,
    CONSTRAINT unacceptable_gaps_new_key UNIQUE (intersection_uid, gap_start, gap_end)
);
"""

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    with conn.cursor() as cur:
        cur.execute(sql_query)

In [179]:
sql_query = """CREATE MATERIALIZED VIEW czhu.gapsize_lookup_old
TABLESPACE pg_default
AS
 WITH temp(period, isodow) AS (
         VALUES ('Weekday'::text,'[1,6)'::int4range), ('Weekend'::text,'[6,8)'::int4range)
        ), mio AS (
         SELECT volumes.intersection_uid,
            datetime_bin(volumes.datetime_bin, 60) AS hourly_bin,
            sum(volumes.volume) AS vol
           FROM miovision_api.volumes
          WHERE volumes.datetime_bin > ('2020-04-01'::timestamp without time zone - '60 days'::interval) AND
                volumes.datetime_bin <= '2020-04-01'::timestamp without time zone
          GROUP BY volumes.intersection_uid, (datetime_bin(volumes.datetime_bin, 60))
        )
 SELECT mio.intersection_uid,
    d.period,
    mio.hourly_bin::time without time zone AS time_bin,
    avg(mio.vol) AS avg_vol,
        CASE
            WHEN avg(mio.vol) < 100::numeric THEN 20
            WHEN avg(mio.vol) >= 100::numeric AND avg(mio.vol) < 500::numeric THEN 15
            WHEN avg(mio.vol) >= 500::numeric AND avg(mio.vol) < 1500::numeric THEN 10
            WHEN avg(mio.vol) > 1500::numeric THEN 5
            ELSE NULL::integer
        END AS gap_size
   FROM mio
     CROSS JOIN temp d
  WHERE date_part('isodow'::text, mio.hourly_bin)::integer <@ d.isodow
  GROUP BY mio.intersection_uid, d.period, (mio.hourly_bin::time without time zone)
WITH DATA;

CREATE UNIQUE INDEX czhu_gapsize_lookup_old_intersection_uid_period_time_bin_idx
    ON czhu.gapsize_lookup_old USING btree
    (intersection_uid, period COLLATE pg_catalog."default", time_bin);
"""

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    with conn.cursor() as cur:
        cur.execute(sql_query)

In [180]:
sql_query = """WITH ful AS (
	SELECT generate_series('2020-03-01', '2020-04-01', interval '1 minute')::timestamp without time zone 
		AS datetime_bin
), grp AS (
	SELECT
	vol.volume_uid,
	vol.intersection_uid,
    ful.datetime_bin,
	dense_rank() OVER (ORDER BY ful.datetime_bin)
	- dense_rank() OVER (PARTITION BY vol.intersection_uid ORDER BY vol.datetime_bin) AS diff
	FROM ful
	LEFT JOIN miovision_api.volumes vol
	USING (datetime_bin)
), island AS (
	SELECT grp.intersection_uid, 
	MAX(datetime_bin) - MIN(datetime_bin) + interval '1 minute' AS island_size, 
	MIN(datetime_bin) AS time_min, MAX(datetime_bin) AS time_max
	FROM grp
	GROUP BY grp.intersection_uid, diff
	ORDER BY grp.intersection_uid, time_min
), gap AS (
	SELECT
	    ROW_NUMBER() OVER(ORDER BY island.intersection_uid, time_min) AS rn,
		island.intersection_uid,
		island_size,
	    time_min,
	    time_max,
	    LAG(time_max,1) OVER (PARTITION BY island.intersection_uid ORDER BY island.intersection_uid, time_min) AS prev_time_end,
		time_min - LAG(time_max,1) OVER (PARTITION BY island.intersection_uid ORDER BY island.intersection_uid, time_min) AS gap_size
	FROM island
	ORDER BY rn
), sel AS (
	SELECT gap.intersection_uid, gap.prev_time_end AS gap_start, gap.time_min AS gap_end, 
	date_part('epoch'::text, gap.gap_size) / 60::integer AS gap_minute
	FROM gap 
	WHERE gap.gap_size > '00:01:00'::time  --where gap size is greater than 1 minute
	GROUP BY gap.intersection_uid, time_min, prev_time_end, gap.gap_size
	ORDER BY gap.intersection_uid, gap_start
), acceptable AS (
	-- THEN, MATCH IT TO THE LOOKUP TABLE TO CHECK IF ACCEPTABLE
	SELECT sel.intersection_uid, sel.gap_start, sel.gap_end,
		sel.gap_minute, gapsize_lookup.gap_size AS allowed_gap,
	CASE WHEN gap_minute < gapsize_lookup.gap_size THEN TRUE
	WHEN gap_minute >= gapsize_lookup.gap_size THEN FALSE
	END AS accept
	FROM sel 
	LEFT JOIN czhu.gapsize_lookup_old gapsize_lookup
	ON sel.intersection_uid = gapsize_lookup.intersection_uid
	AND DATE_TRUNC('hour', sel.gap_start)::time = gapsize_lookup.time_bin
	AND (CASE WHEN EXTRACT(isodow FROM sel.gap_start) IN (1,2,3,4,5) THEN 'Weekday'
	WHEN EXTRACT(isodow FROM sel.gap_start) IN (6,7) THEN 'Weekend'
	ELSE NULL END)  = gapsize_lookup.period
)
-- INSERT INTO THE TABLE
-- DISTINCT ON cause there might be more than 1 gap happening in the same hour for the same intersection
INSERT INTO czhu.unacceptable_gaps_old(intersection_uid, gap_start, gap_end, gap_minute, allowed_gap, accept)
SELECT DISTINCT ON (intersection_uid, DATE_TRUNC('hour', acceptable.gap_start)) *
FROM acceptable
WHERE accept = false
ORDER BY DATE_TRUNC('hour', acceptable.gap_start), intersection_uid"""

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    with conn.cursor() as cur:
        cur.execute(sql_query)

In [181]:
sql_query = """WITH wkdy_lookup(period, isodow) AS (
         VALUES ('Weekday'::text,'[1,6)'::int4range), ('Weekend'::text,'[6,8)'::int4range)
), mio AS (
 SELECT volumes.intersection_uid,
    datetime_bin(volumes.datetime_bin, 60) AS hourly_bin,
    sum(volumes.volume) AS vol
   FROM miovision_api.volumes
  WHERE volumes.datetime_bin > (GREATEST('2020-04-01'::timestamp without time zone, '2019-03-01'::timestamp without time zone) - '60 days'::interval) AND
        volumes.datetime_bin <= GREATEST('2020-04-01'::timestamp without time zone, '2019-03-01'::timestamp without time zone)
  GROUP BY volumes.intersection_uid, (datetime_bin(volumes.datetime_bin, 60))
), gapsize_lookup AS (
 SELECT mio.intersection_uid,
    d.period,
    mio.hourly_bin::time without time zone AS time_bin,
    avg(mio.vol) AS avg_vol,
        CASE
            WHEN avg(mio.vol) < 100::numeric THEN 20
            WHEN avg(mio.vol) >= 100::numeric AND avg(mio.vol) < 500::numeric THEN 15
            WHEN avg(mio.vol) >= 500::numeric AND avg(mio.vol) < 1500::numeric THEN 10
            WHEN avg(mio.vol) > 1500::numeric THEN 5
            ELSE NULL::integer
        END AS gap_size
   FROM mio
     CROSS JOIN wkdy_lookup d
  WHERE date_part('isodow'::text, mio.hourly_bin)::integer <@ d.isodow
  GROUP BY mio.intersection_uid, d.period, (mio.hourly_bin::time without time zone)
), ful AS (
	SELECT generate_series('2020-03-01', '2020-04-01', interval '1 minute')::timestamp without time zone 
		AS datetime_bin
), grp AS (
	SELECT
	vol.volume_uid,
	vol.intersection_uid,
    ful.datetime_bin,
	dense_rank() OVER (ORDER BY ful.datetime_bin)
	- dense_rank() OVER (PARTITION BY vol.intersection_uid ORDER BY vol.datetime_bin) AS diff
	FROM ful
	LEFT JOIN miovision_api.volumes vol
	USING (datetime_bin)
), island AS (
	SELECT grp.intersection_uid, 
	MAX(datetime_bin) - MIN(datetime_bin) + interval '1 minute' AS island_size, 
	MIN(datetime_bin) AS time_min, MAX(datetime_bin) AS time_max
	FROM grp
	GROUP BY grp.intersection_uid, diff
	ORDER BY grp.intersection_uid, time_min
), gap AS (
	SELECT
	    ROW_NUMBER() OVER(ORDER BY island.intersection_uid, time_min) AS rn,
		island.intersection_uid,
		island_size,
	    time_min,
	    time_max,
	    LAG(time_max,1) OVER (PARTITION BY island.intersection_uid ORDER BY island.intersection_uid, time_min) AS prev_time_end,
		time_min - LAG(time_max,1) OVER (PARTITION BY island.intersection_uid ORDER BY island.intersection_uid, time_min) AS gap_size
	FROM island
	ORDER BY rn
), sel AS (
	SELECT gap.intersection_uid, gap.prev_time_end AS gap_start, gap.time_min AS gap_end, 
	date_part('epoch'::text, gap.gap_size) / 60::integer AS gap_minute
	FROM gap 
	WHERE gap.gap_size > '00:01:00'::time  --where gap size is greater than 1 minute
	GROUP BY gap.intersection_uid, time_min, prev_time_end, gap.gap_size
	ORDER BY gap.intersection_uid, gap_start
), acceptable AS (
	-- THEN, MATCH IT TO THE LOOKUP TABLE TO CHECK IF ACCEPTABLE
	SELECT sel.intersection_uid, sel.gap_start, sel.gap_end,
		sel.gap_minute, gapsize_lookup.gap_size AS allowed_gap,
	CASE WHEN gap_minute < gapsize_lookup.gap_size THEN TRUE
	WHEN gap_minute >= gapsize_lookup.gap_size THEN FALSE
	END AS accept
	FROM sel 
	LEFT JOIN gapsize_lookup
	ON sel.intersection_uid = gapsize_lookup.intersection_uid
	AND DATE_TRUNC('hour', sel.gap_start)::time = gapsize_lookup.time_bin
	AND (CASE WHEN EXTRACT(isodow FROM sel.gap_start) IN (1,2,3,4,5) THEN 'Weekday'
	WHEN EXTRACT(isodow FROM sel.gap_start) IN (6,7) THEN 'Weekend'
	ELSE NULL END)  = gapsize_lookup.period
)
-- INSERT INTO THE TABLE
-- DISTINCT ON cause there might be more than 1 gap happening in the same hour for the same intersection
INSERT INTO czhu.unacceptable_gaps_new(intersection_uid, gap_start, gap_end, gap_minute, allowed_gap, accept)
SELECT DISTINCT ON (intersection_uid, DATE_TRUNC('hour', acceptable.gap_start)) *
FROM acceptable
WHERE accept = false
ORDER BY DATE_TRUNC('hour', acceptable.gap_start), intersection_uid
"""

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    with conn.cursor() as cur:
        cur.execute(sql_query)

In [182]:
with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    df_ug_old = pd.read_sql("SELECT * FROM czhu.unacceptable_gaps_old", con=conn)
    df_ug_new = pd.read_sql("SELECT * FROM czhu.unacceptable_gaps_new", con=conn)

In [183]:
df_ug_old['from_old'] = True
df_ug_new['from_new'] = True

df_ug_united = pd.merge(df_ug_old, df_ug_new, on=tuple(set(df_ug_old.columns) - set(['from_old'])))

In [185]:
assert not np.any(df_ug_united['from_old'].isna())
assert not np.any(df_ug_united['from_new'].isna())

The two tables are identical.

In [186]:
sql_query = """DROP MATERIALIZED VIEW czhu.gapsize_lookup_old;
DROP TABLE czhu.unacceptable_gaps_new;
DROP TABLE czhu.unacceptable_gaps_old;
"""

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    with conn.cursor() as cur:
        cur.execute(sql_query)

## Test Sequential Processing

Next, we test a simple script for sequential aggregation using `intersection_tmc.py`, and plot the results using IPyWidgets. Detailed analysis can be found in comments under [this comment on GitHub](https://github.com/CityofToronto/bdit_data-sources/issues/331#issuecomment-703867196).

In [65]:
def daterange(start_time, n_months):
    """Generator for monthly time increments."""
    start_month = start_time.month
    start_year = start_time.year

    for month_i in range(start_month, start_month + n_months):
        c_month_start = (month_i - 1) % 12 + 1
        c_year_start = (month_i - 1) // 12 + start_year
        c_month_end = month_i % 12 + 1
        c_year_end = month_i // 12 + start_year
        yield (datetime.datetime(c_year_start, c_month_start, 1, 0, 0, 0),
               datetime.datetime(c_year_end, c_month_end, 1, 0, 0, 0))

In [67]:
start_time = local_tz.localize(datetime.datetime(2020, 3, 1, 0, 0, 0))
n_months = 2

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    for (c_start_t, c_end_t) in daterange(start_time, n_months):
        print("Processing date range", c_start_t, " - ", c_end_t)
        itmc.process_data(conn, c_start_t, c_end_t)

Processing date range 2020-03-01 00:00:00  -  2020-04-01 00:00:00


01 Oct 2020 16:47:02     	INFO    NOTICE:  Found a total of 174 (hours) gaps that are unacceptable

01 Oct 2020 16:47:02     	INFO    Updated gapsize table and found gaps exceeding allowable size
01 Oct 2020 18:10:21     	INFO    Aggregated to 15 minute bins
01 Oct 2020 18:14:56     	INFO    Completed data processing for 2020-03-01 00:00:00
01 Oct 2020 18:15:09     	INFO    report_dates done


Processing date range 2020-04-01 00:00:00  -  2020-05-01 00:00:00


01 Oct 2020 18:17:55     	INFO    NOTICE:  Found a total of 142 (hours) gaps that are unacceptable

01 Oct 2020 18:17:55     	INFO    Updated gapsize table and found gaps exceeding allowable size
01 Oct 2020 19:17:56     	INFO    Aggregated to 15 minute bins
01 Oct 2020 19:22:13     	INFO    Completed data processing for 2020-04-01 00:00:00
01 Oct 2020 19:22:26     	INFO    report_dates done


Happily we have saved some archival data in `covid.miovision_summary_20200922backup` that falls within that same time frame.

In [173]:
old_miov_query = """SELECT datetime_bin, volume_actual volume FROM covid.miovision_summary_20200922backup
WHERE intersection_uid = {intersection_uid} AND class_type = '{class_type}'
      AND datetime_bin BETWEEN '2020-03-01' AND '2020-04-30 23:59:59'
ORDER BY datetime_bin"""

new_miov_query = """WITH valid_bins AS (
	 SELECT volumes_15min.intersection_uid,
		volumes_15min.datetime_bin
	   FROM miovision_api.volumes_15min
	   WHERE datetime_bin BETWEEN '2020-03-01' AND '2020-04-30 23:59:59'
	  GROUP BY volumes_15min.intersection_uid, volumes_15min.datetime_bin
	 HAVING sum(
			CASE
				WHEN volumes_15min.classification_uid = 1 THEN volumes_15min.volume
				ELSE NULL::numeric
			END) > 0::numeric
	), valid_classes AS (
	 SELECT unnest(x.classes) AS class_type
	   FROM ( SELECT ARRAY['Lights'::text, 'Trucks'::text, 'Cyclists'::text, 'Pedestrians'::text] AS classes) x
	), intersection_classes(intersection_uid, class_type) AS (
	 VALUES (2,'Cyclists'::text), (2,'Pedestrians'::text), (2,'Lights'::text), (2,'Trucks'::text), (4,'Cyclists'::text), (4,'Pedestrians'::text), (4,'Lights'::text), (4,'Trucks'::text), (8,'Pedestrians'::text), (8,'Lights'::text), (8,'Trucks'::text), (17,'Pedestrians'::text), (17,'Lights'::text), (17,'Trucks'::text), (18,'Cyclists'::text), (18,'Pedestrians'::text), (18,'Lights'::text), (18,'Trucks'::text), (21,'Cyclists'::text), (21,'Pedestrians'::text), (21,'Lights'::text), (21,'Trucks'::text), (25,'Cyclists'::text), (25,'Pedestrians'::text), (25,'Lights'::text), (25,'Trucks'::text), (26,'Cyclists'::text), (26,'Pedestrians'::text), (26,'Lights'::text), (26,'Trucks'::text), (28,'Cyclists'::text), (28,'Pedestrians'::text), (28,'Lights'::text), (28,'Trucks'::text), (31,'Cyclists'::text), (31,'Pedestrians'::text), (31,'Lights'::text), (31,'Trucks'::text)
	), all_data AS (
	 SELECT volumes_15min.intersection_uid,
			CASE
				WHEN volumes_15min.classification_uid = 1 THEN 'Lights'::text
				WHEN volumes_15min.classification_uid = ANY (ARRAY[4, 5]) THEN 'Trucks'::text
				ELSE classifications.class_type
			END AS class_type,
    		volumes_15min.datetime_bin,
    		sum(volumes_15min.volume) AS total_volume
	   FROM miovision_api.volumes_15min
		 JOIN miovision_api.classifications USING (classification_uid)
	  WHERE (classifications.class_type = ANY (ARRAY['Vehicles'::text, 'Pedestrians'::text, 'Cyclists'::text])) AND volumes_15min.datetime_bin >= '2019-01-01 00:00:00'::timestamp without time zone AND "left"(volumes_15min.leg, 1) <> "left"(volumes_15min.dir, 1)
	  GROUP BY volumes_15min.intersection_uid, (
			CASE
				WHEN volumes_15min.classification_uid = 1 THEN 'Lights'::text
				WHEN volumes_15min.classification_uid = ANY (ARRAY[4, 5]) THEN 'Trucks'::text
				ELSE classifications.class_type
			END), volumes_15min.datetime_bin
	), hourly_data AS (
	SELECT row_number() OVER () AS uid,
	a.intersection_uid,
	b.class_type,
	date_trunc('hour'::text, a.datetime_bin) AS datetime_bin,
	COALESCE(sum(d.total_volume), 0::numeric) AS total_volume
	FROM valid_bins a
	 CROSS JOIN valid_classes b
	 JOIN intersection_classes c USING (intersection_uid, class_type)
	 LEFT JOIN all_data d USING (intersection_uid, datetime_bin, class_type)
	GROUP BY a.intersection_uid, b.class_type, (date_trunc('hour'::text, a.datetime_bin))
	HAVING count(DISTINCT a.datetime_bin) = 4
)
SELECT datetime_bin,
       total_volume volume
FROM hourly_data
WHERE intersection_uid = {intersection_uid} AND class_type = '{class_type}' AND datetime_bin BETWEEN '2020-03-01' AND '2020-04-30 23:59:59';
"""

def get_comparison_plot(intersection_uid=26, class_type='Lights'):

    with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
        df_old = pd.read_sql(
            old_miov_query.format(intersection_uid=intersection_uid,
                                  class_type=class_type),
            con=conn)
        df_new = pd.read_sql(
            new_miov_query.format(intersection_uid=intersection_uid,
                                  class_type=class_type),
            con=conn)

    df_merged = (pd.merge(df_old, df_new, on="datetime_bin", how="outer", suffixes=("_old", "_new"))
                 .set_index('datetime_bin').sort_index())
    not_in_new = df_merged['volume_new'].isna()
    not_in_old = df_merged['volume_old'].isna()
    in_both = ~not_in_new & ~not_in_old
    df_merged.fillna(0., inplace=True)
    df_merged['vol_difference'] = df_merged['volume_new'] - df_merged['volume_old']

    fig = go.Figure()

    fig.add_trace(go.Scatter(
        x=df_merged.loc[in_both, 'vol_difference'].index,
        y=df_merged.loc[in_both, 'vol_difference'],
        mode='lines',
        name='Valid'))

    fig.add_trace(go.Scatter(
        x=df_merged.loc[not_in_new, 'volume_old'].index,
        y=-df_merged.loc[not_in_new, 'volume_old'],
        mode='markers',
        marker=dict(color='#f79400'),
        name='Old Only'))
    
    fig.add_trace(go.Scatter(
        x=df_merged.loc[not_in_old, 'volume_new'].index,
        y=df_merged.loc[not_in_old, 'volume_new'],
        mode='markers',
        marker=dict(color='#006130'),
        name='New Only'))
    
    fig.update_layout(
        title={
            'text': ("Difference in Volumes for "
                     "intersection_uid = {0}; class = {1}").format(intersection_uid, class_type),
            'font_size': 14
        },
        xaxis_title="Date",
        yaxis_title="New - Old Volume",
        xaxis_rangeslider_visible=True,
        margin=dict(l=40, r=40, t=80, b=40),
    )

    fig2 = go.Figure()

    fig2.add_trace(
        go.Histogram(
            histfunc="count",
            histnorm="probability",
            x=df_merged.loc[in_both, 'vol_difference'].values,
        )
    )

    fig2.update_layout(
        xaxis_title="New - Old Volume",
        yaxis_title="Fraction of Valid Data",
        height=200,
        margin=dict(l=40, r=40, t=40, b=40),
    )
    
    fig.show()
    fig2.show();

In [174]:
sql_query = """SELECT DISTINCT intersection_uid FROM covid.miovision_summary_20200922backup
ORDER BY 1"""

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    df_intnames = pd.read_sql(sql_query, con=conn)
    intnames = list(df_intnames['intersection_uid'].values)

In [175]:
interact(get_comparison_plot, intersection_uid=intnames, class_type=['Lights', 'Pedestrians', 'Trucks']);

interactive(children=(Dropdown(description='intersection_uid', index=7, options=(2, 4, 8, 17, 18, 21, 25, 26, …

<function __main__.get_comparison_plot(intersection_uid=26, class_type='Lights')>