# Convert Postgres Data for Arman

Arman is requesting 2016 - 2018 (inclusive) data from `prj_volume.centreline_volumes`. This notebook downloads that data from Postgres, dumps it to zip files and checks its integrity. It's based off of `Convert Postgres Data for TEPs.ipynb`.

Notebook was run on my laptop, under `/home/czhu/Data/GitHub`.

In [1]:
%matplotlib inline
import sys
sys.path.append('./bdit_traffic_prophet/')
import importlib
import matplotlib.pyplot as plt
import numpy as np
import knowyourdata as kyd
import zipfile
import warnings

import pandas as pd
from traffic_prophet import cfg
import pathlib, os
import configparser

from traffic_prophet import connection
from traffic_prophet.countmatch import reader

defaultcolours = plt.rcParams['axes.prop_cycle'].by_key()['color']

filepath = pathlib.Path.home().joinpath('.charlesconfig')
if os.path.isfile(filepath):
    vol_conn = connection.Connection(filepath, 'POSTGRES',
                                     'czhu.btp_centreline_volumes')
    ll_conn = connection.Connection(filepath, 'POSTGRES',
                                    'czhu.btp_centreline_lonlat')
    config = configparser.RawConfigParser()
    config.read(filepath.as_posix())
    MAPBOX_TOKEN = config['MAPBOX']['token']
    PLOTLY_USER = config['PLOTLY']['user']
    PLOTLY_KEY = config['PLOTLY']['key']
else:
    raise ValueError

In [2]:
from traffic_prophet.countmatch import reader

In [3]:
teps_dir = "/mnt/c/Users/czhu5/Documents/VolumeModel/TEPs-I-EEDrun/"

## Dump data into zips

### Find all centreline IDs (including HW401 ones) already in use in TEPs

In [4]:
zipsn = [teps_dir + "PRTCS/negative/15min_counts_{0}.zip".format(x)
         for x in range(2006, 2017)]
zipsp = [teps_dir + "PRTCS/positive/15min_counts_{0}.zip".format(x)
         for x in range(2006, 2017)]
zips = zipsn + zipsp
rdr = reader.ReaderZip(zips)

# Get HW401 centreline IDs
re_centrelines = []

for zf in rdr.source:
    for c in rdr.get_zipreader(zf):
        if 're' in c['filename']:
            re_centrelines.append([c['filename'], c['centreline_id']])

re_centreline_ids = list(set([x[1] for x in re_centrelines]))

In [5]:
# Get all centreline IDs in mid_f_point.csv
df = pd.read_csv(teps_dir + "PRTCS/negative/mid_f_point.csv", header=None)
midpoint_centerline_ids = list(df[0].values)

In [6]:
# Get all centreline IDs in Landuse_pop_lane_speed.xlsx
df = pd.read_excel(teps_dir + "PRTCS/negative/locals/Landuse_pop_lane_speed.xlsx")
landuse_centreline_ids = list(df['centreline'].values)

In [7]:
available_teps_centreline_ids = list(
    set(midpoint_centerline_ids).intersection(set(landuse_centreline_ids)))

### Postgres reader

In [8]:
class ReaderPostgresRaw(reader.ReaderBase):
    """Hacked method of accessing the raw 15-minute bin table from Postgres."""

    def get_pgreader(self, year):
        with self.source.connect() as db_con:
            sql_cmd = (
                ("SELECT centreline_id, dir_bin, count_bin, volume, count_type "
                 "FROM {dbt} WHERE EXTRACT(year from count_bin) = {year} "
                 "ORDER BY centreline_id, dir_bin, count_bin")
                .format(dbt=self.source.tablename,  year=year))

            all_data = pd.read_sql(sql_cmd, db_con,
                                   parse_dates=['count_bin', ])

            for key, df in all_data.groupby(['centreline_id', 'dir_bin']):
                centreline_id = key[0]
                direction = key[1]

                data = df[['count_bin', 'volume', 'count_type']].copy()
                data.columns = ['Timestamp', '15-minute Volume', 'Count Type']
                data.reset_index(drop=True, inplace=True)

                # Filename is used to flag for HW401 data in Arman's zip files,
                # so just pass a dummy value here.  Note that we can't use
                # 'postgres' here since it contains 're'!
                yield {'filename': 'fromPG',
                       'centreline_id': int(centreline_id),
                       'direction': int(direction),
                       'data': data,
                       'year': year}

    def write_db_to_zip(self, year, fpath="./"):
        """Writes a year's worth of 15 minute bins"""
        rdr = self.get_pgreader(year)

        fhzp = zipfile.ZipFile(
            fpath + "15min_counts_{0}_positive.zip".format(year), 'w')
        fhzn = zipfile.ZipFile(
            fpath + "15min_counts_{0}_negative.zip".format(year), 'w')

        for tc in rdr:
            # Control sequence to prevent centreline_ids on HW401 and those with
            # no land use data from being included in zip.
            if tc['centreline_id'] in re_centreline_ids:
                warnings.warn("{0} found in HW401 IDs!".format(tc['centreline_id']))
                continue
            elif tc['centreline_id'] not in available_teps_centreline_ids:
                warnings.warn("{0} doesn't have TEPs land use/geographic data!".format(tc['centreline_id']))
                continue

            # Extract data from dict and convert it to TEPs format.
            data = tc['data']
            # Convert to DD-MMM-YYYY TT:TT:TT format favoured by Matlab.
            data['Timestamp'] = data['Timestamp'].dt.strftime(r"%d-%b-%Y %T")
            data['Nonsense'] = '999:9999999'
            data['Centreline ID'] = tc['centreline_id']
            data['Direction'] = tc['direction']
            # Output to csv, but dump to string instead of file.
            datastr = data[['Nonsense', 'Centreline ID', 'Direction',
                            'Timestamp', '15-minute Volume', 'Count Type']].to_csv(
                None, sep='\t', na_rep='N/A', header=False, index=False)

            filename = "{0}_99999_{1}.txt".format(tc['centreline_id'], year)
            if tc['direction'] > 0:
                fhzp.writestr(filename, datastr)
            else:
                fhzn.writestr(filename, datastr)

### Dump data from Postgres to zips

In [9]:
pgreader = ReaderPostgresRaw(vol_conn)

In [10]:
pgreader.write_db_to_zip(2016)

In [11]:
pgreader.write_db_to_zip(2017)

In [12]:
pgreader.write_db_to_zip(2018)

In [13]:
pgreader.write_db_to_zip(2019)



## Check data integrity

To check if this data dump is consistent with the last one given to Arman (and the dump made in February 2020 to generate the 2018 GHG inventory for EED), created a few consistency check functions.

In [15]:
def get_problem_counts(rdr_old, rdr_new):
    """Get problematic counts.

    A problematic count is either a mismatch in daily count totals between
    the old and new data, or data completely missing. Missing data is
    denoted with NaNs.
    
    Parameters
    ----------
    rdr_old: reader.ReaderZip
        Old daily counts.
    rdr_new : reader.ReaderZip
        New daily counts.
    
    Returns
    -------
    problem_count : dict
        Dictionary where keys are count locations (direction * centreline_id)
        and values are tables of inconsistent or missing dat.
    
    """
    problem_counts = {}
    ids_all = set(rdr_new.counts.keys()) | set(rdr_old.counts.keys())

    for cid in ids_all:
        if cid not in rdr_old.counts.keys():
            merged = rdr_new.counts[cid].data[['Daily Count']].copy()
            merged.rename(
                columns={'Daily Count': "Daily Count_new"},
                inplace=True)
            merged['Daily Count_old'] = np.nan
            problem_counts[cid] = merged
        elif cid not in rdr_new.counts.keys():
            merged = rdr_old.counts[cid].data[['Daily Count']].copy()
            merged.rename(
                columns={'Daily Count': "Daily Count_old"},
                inplace=True)
            merged['Daily Count_new'] = np.nan
            problem_counts[cid] = merged
        else:
            merged = pd.merge(rdr_old.counts[cid].data[['Daily Count']],
                              rdr_new.counts[cid].data[['Daily Count']], how='outer',
                              left_index=True, right_index=True, suffixes=('_old', '_new'))
            notclose = ~np.isclose(merged["Daily Count_old"],
                                   merged["Daily Count_new"], atol=1e-10, rtol=1e-4)
            if np.sum(notclose):
                problem_counts[cid] = merged.loc[notclose, :].copy()
    
    return problem_counts


def get_mismatch_count(df):
    return np.sum(df.isnull().values), 2. * df.dropna().shape[0]


def get_mismatch_fraction(rdr_old, rdr_new, problem_counts):
    """Get the mismatch fraction.
    
    The mismatch fraction is the fraction of all lines in either rdr_old
    or rdr_new that are unique to that dataset. This can either be because
    the count is different between the two datasets, or because it is
    completely missing.
    
    """
    return (sum([sum(get_mismatch_count(x)) for x in problem_counts.values()]) /
            (sum([x.data.shape[0] for x in rdr_old.counts.values()]) +
             sum([x.data.shape[0] for x in rdr_new.counts.values()])))


def get_mismatch_breakdown(rdr_old, rdr_new, problem_counts):
    """Get the breakdown of mismatch count by type.

    Counts the total number of missing and inconsistent entries.

    """
    mismatches = np.array([get_mismatch_count(x)
                           for x in problem_counts.values()])
    return mismatches.sum(axis=0)

### 2017 data

In [16]:
# Read in old data.
rdr_old = reader.ReaderZip("/mnt/c/Users/czhu5/Documents/VolumeModel/TEPs-I-EEDrun/PRTCS/negative/15min_counts_2017.zip")
rdr_old.read()

In [17]:
rdr_new = reader.ReaderZip("./15min_counts_2017_negative.zip")
rdr_new.read()

How many centreline IDs are in the new data but not the old?

In [18]:
set(rdr_new.counts.keys()) - set(rdr_old.counts.keys())

set()

How many centreline IDs are in the old data but not the new?

In [19]:
set(rdr_old.counts.keys()) - set(rdr_new.counts.keys())

{-20050749,
 -14203393,
 -14047336,
 -13515818,
 -8203281,
 -3994385,
 -2884350,
 -441581,
 -440741,
 -9437}

So we're missing a number of counts that are no longer available.

In [20]:
problem_counts = get_problem_counts(rdr_old, rdr_new)

In [21]:
problem_counts.keys()

dict_keys([-30023559, -6868868, -913249, -909146, -20037402, -20164356, -913152, -1146615, -1146568, -7929516, -14659244, -1146507, -8322682, -8351286, -13975074, -1146358, -1146335, -445884, -14011679, -908543, -7416, -441581, -908449, -445532, -30010427, -12336171, -13515818, -12336156, -8203281, -445309, -1141572, -30087992, -30087988, -30018291, -14662366, -11070091, -1145405, -14203393, -1145317, -113111, -30022069, -440741, -1145215, -1141002, -13969576, -14047336, -444497, -444412, -1978, -440202, -20080518, -440171, -444264, -12334941, -12334937, -1841, -443987, -443975, -14177858, -12334629, -12334583, -6837741, -30012667, -1144036, -9437, -107733, -14255077, -30040979, -890, -8754031, -6853472, -20050749, -30016282, -9212691, -3994385, -2884350, -10834665, -20140757, -20050591, -1147466, -1147434, -20054568, -442915, -1147406, -1147347, -913864, -8540609, -8352168, -1147258, -30003565, -1147201, -106797, -10133796, -30011654, -1147135, -1147113, -30036166, -1146997, -14073969

In [22]:
mismatch_frac = get_mismatch_fraction(rdr_old, rdr_new, problem_counts)

print(f"Percentage of lines with mismatches: {mismatch_frac * 100:0.3f}%")

Percentage of lines with mismatches: 2.454%


In [23]:
mismatch_break = get_mismatch_breakdown(rdr_old, rdr_new, problem_counts)

print(f"Missing values: {mismatch_break[0]:0.1f}; inconsistent values: {mismatch_break[1]:0.1f}")

Missing values: 924.0; inconsistent values: 0.0


So there are 924 values missing in the current data dump that were available back in February - this represents around 2.4% of all data. No daily counts available in both datasets have values inconsistent from one another.

### 2016 data

Now we take a look at 2016 data, comparing the original data processed into a zip file by Arman with the one we just created.

In [24]:
rdr_old = reader.ReaderZip("/mnt/c/Users/czhu5/Documents/VolumeModel/TEPs-I-EEDrun/PRTCS/negative/15min_counts_2016.zip")
rdr_old.read()

In [25]:
rdr_new = reader.ReaderZip("./15min_counts_2016_negative.zip")
rdr_new.read()

In [30]:
set(rdr_new.counts.keys()) - set(rdr_old.counts.keys())

{-30075992,
 -30073995,
 -30073908,
 -30070016,
 -30013725,
 -30008185,
 -20142356,
 -14675961,
 -14674278,
 -14673213,
 -14672915,
 -14624107,
 -14230112,
 -14184390,
 -14063147,
 -14037577,
 -14037514,
 -14037155,
 -14035998,
 -14035996,
 -14025568,
 -14025469,
 -14025284,
 -14024688,
 -14020124,
 -14013502,
 -14013481,
 -14003891,
 -13323711,
 -13297428,
 -12102593,
 -11774649,
 -11714744,
 -11631428,
 -11273041,
 -11272989,
 -11226820,
 -10877626,
 -10864309,
 -10864277,
 -10516346,
 -10486145,
 -10475569,
 -10223667,
 -10222712,
 -10010771,
 -9234036,
 -9212691,
 -9085798,
 -8771611,
 -8677261,
 -8677227,
 -8676918,
 -8676871,
 -8676863,
 -8676772,
 -8491783,
 -8457366,
 -8457323,
 -8457317,
 -8457315,
 -8344825,
 -8155563,
 -8067830,
 -8033769,
 -7930670,
 -7930588,
 -7929673,
 -7762785,
 -7204482,
 -7009490,
 -6624394,
 -6619735,
 -5868170,
 -4570674,
 -4429233,
 -3369829,
 -3369767,
 -3065754,
 -3065748,
 -2884227,
 -1147466,
 -1146683,
 -1146332,
 -1146204,
 -1146057,
 -114585

In [31]:
set(rdr_old.counts.keys()) - set(rdr_new.counts.keys())

{-30073941,
 -30066680,
 -14663487,
 -13502340,
 -13502069,
 -7941494,
 -5101256,
 -910041,
 -909609,
 -909292,
 -908665,
 -908251,
 -908165,
 -908093,
 -908081,
 -908053,
 -445781,
 -445695,
 -445177,
 -445162,
 -444895,
 -444632,
 -444620,
 -444602,
 -444010,
 -443777,
 -443687,
 -443397,
 -443225,
 -443129,
 -443125,
 -442793,
 -442619,
 -442118,
 -441560,
 -439181,
 -439159,
 -438928,
 -438892,
 -438841,
 -437972,
 -108420,
 -108404,
 -108373,
 -108276,
 -108252,
 -107655,
 -107645,
 -106873,
 -106799,
 -105318}

In [26]:
problem_counts = get_problem_counts(rdr_old, rdr_new)

In [27]:
problem_counts.keys()

dict_keys([-1145856, -1145855, -1022, -444412, -14675961, -30066680, -109553, -14024688, -909292, -14255077, -908251, -14184390, -12102593, -14664635, -1978, -3065754, -3065748, -30040979, -439183, -914317, -8677261, -439181, -440202, -30023559, -20080518, -108420, -13502340, -908165, -445309, -890, -10516346, -30008185, -439159, -108404, -440171, -8677227, -10948456, -109416, -109415, -444264, -3369829, -9085798, -7762785, -6853472, -14025568, -105318, -12334941, -12334937, -443225, -108373, -12334931, -441170, -11273041, -7929673, -1141572, -11631428, -14663487, -908093, -1146683, -9109303, -30087988, -908081, -7930670, -1139498, -3369767, -1145635, -108322, -441121, -11272989, -20037402, -30016282, -908053, -13297428, -9212691, -13503251, -913167, -8491783, -442118, -20164356, -446207, -14025469, -443129, -445177, -1146615, -8067830, -443125, -108276, -445162, -105192, -13503206, -1142500, -14303970, -8491741, -7930588, -108252, -910041, -906966, -20140757, -437972, -1146568, -51012

In [28]:
mismatch_frac = get_mismatch_fraction(rdr_old, rdr_new, problem_counts)

print(f"Percentage of lines with mismatches: {mismatch_frac * 100:0.3f}%")

Percentage of lines with mismatches: 31.126%


In [29]:
mismatch_break = get_mismatch_breakdown(rdr_old, rdr_new, problem_counts)

print(f"Missing values: {mismatch_break[0]:0.1f}; inconsistent values: {mismatch_break[1]:0.1f}")

Missing values: 18185.0; inconsistent values: 0.0


That's a lot of missing values! No inconsistent ones, which is another clue that what we're looking at is missing data either in the old data or the new.

Here's one example of new data that's missing.

In [32]:
problem_counts[-30003565]

Unnamed: 0_level_0,Unnamed: 1_level_0,Daily Count_old,Daily Count_new
Year,Day of Year,Unnamed: 2_level_1,Unnamed: 3_level_1
2016,1,37267.0,
2016,10,43231.0,
2016,136,40023.0,
2016,157,28850.0,
2016,236,43884.0,
2016,238,41700.0,
2016,240,41597.0,
2016,241,37229.0,
2016,242,38195.0,
2016,243,41930.0,


In [33]:
rdr_new.counts[-30003565].data

Unnamed: 0_level_0,Unnamed: 1_level_0,Date,Daily Count
Year,Day of Year,Unnamed: 2_level_1,Unnamed: 3_level_1
2016,2,2016-01-02,45397.0
2016,3,2016-01-03,43970.0
2016,4,2016-01-04,54155.0
2016,5,2016-01-05,58119.0
2016,6,2016-01-06,60504.0
2016,...,...,...
2016,277,2016-10-03,66925.0
2016,278,2016-10-04,70376.0
2016,279,2016-10-05,69778.0
2016,280,2016-10-06,72915.0


In [34]:
rdr_old.counts[-30003565].data

Unnamed: 0_level_0,Unnamed: 1_level_0,Date,Daily Count
Year,Day of Year,Unnamed: 2_level_1,Unnamed: 3_level_1
2016,1,2016-01-01,37267.0
2016,2,2016-01-02,45397.0
2016,3,2016-01-03,43970.0
2016,4,2016-01-04,54155.0
2016,5,2016-01-05,58119.0
2016,...,...,...
2016,277,2016-10-03,66925.0
2016,278,2016-10-04,70376.0
2016,279,2016-10-05,69778.0
2016,280,2016-10-06,72915.0


I manually confirmed that `prj_volume.centreline_volumes` does not have any data for `30003565` on January 2, so this is truly missing data.