# Identifying Missing Data in the Caltrans/PeMS Dataset

The California Department of Transportation (Caltrans) collects data that describes the flow of traffic on California freeways. Caltrans stores these data in a database called PeMS. The data describe the number of counts per unit time meaured by roughly 45,000 sensors on a 30-second cadence. The type of sensor varies considerably, e.g. radar and magnetometers (see Chapter 1 of the [Introduction to PeMS User Guide](https://pems.dot.ca.gov/Papers/PeMS_Intro_User_Guide_v6.pdf)). 

In some cases, these data are missing. Faulty or broken sensors do not collect data. Or sensor data is not wirelessly transmitted back to PeMS. In addition, Caltrans performs some calculations to convert these raw sensor data into physical observables such as speed. These calculations include some assumptions such as the length of the vehicle, or $g$. Based on the quality of the assumption, these data can include errors.

In this notebook, we will take a look at the nature of the missing data. Some questions to ask:
1. Are all the data available for District 5 from the District Map and County Chart during 2023?
2. If data are missing, do they occur in any spatial or temporal clusters?
3. Are there any outliers or unexpected values in the data?
4. Are all the data available for all the available districts in recent decade, 2013-2023? And again, is there any pattern to the missing data? Are there any odd values?

### Setup

In [1]:
import ibis
import os
import itertools

import numpy as np
import pandas as pd
import seaborn as sns
import ibis.selectors as s
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

from dotenv import load_dotenv
from functools import reduce
from datetime import datetime as dt_obj
from datetime import timedelta

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 3000)

In [2]:
load_dotenv(override=True)
USERNAME = os.getenv('USERNAME')
PASSWORD = os.getenv('PASSWORD')

In [3]:
con_personal = ibis.snowflake.connect(
    user=USERNAME,
    password=PASSWORD,
    role='TRANSFORMER_DEV',
    warehouse='TRANSFORMING_XS_DEV',
    account="VSB79059-DSE_CALTRANS_PEMS",
    database="TRANSFORM_DEV/DBT_MBOBRA",
)

Create a `DBT_SCHEMA` by going into Snowflake -> Databases -> TRANSFORM_DEV and clicking on the blue button that says "+ Schema". Here I added schema named `DBT_MBOBRA`.

In [4]:
station_metadata = con_personal.table("STATION_META", database='RAW_PRD', schema='CLEARINGHOUSE');
station_raw = con_personal.table("STATION_RAW", database='RAW_PRD', schema='CLEARINGHOUSE');
#station_status = con.table("STATION_STATUS");

Insufficient privileges to operate on account 'NGB13288'


In [5]:
station_metadata_df = station_metadata.execute(limit=10)
station_raw_df = station_raw.execute(limit=10)
#station_status_df = station_status.execute(limit=10)

In [6]:
station_metadata_df

Unnamed: 0,FILENAME,ID,FWY,DIR,DISTRICT,COUNTY,CITY,STATE_PM,ABS_PM,LATITUDE,LONGITUDE,LENGTH,TYPE,LANES,NAME,USER_ID_1,USER_ID_2,USER_ID_3,USER_ID_4
0,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801230,10,W,8,71,48788,0.591,47.356,34.082004,-117.699964,0.845,ML,4,MONTE VISTA AVE,1,,,
1,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801231,10,W,8,71,48788,0.591,47.356,34.082004,-117.699964,,OR,2,MONTE VISTA AVE,1,,,
2,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801232,10,E,8,71,48788,0.721,47.486,34.082149,-117.697705,0.996,ML,4,MONTE VISTA AVE,1,,,
3,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801233,10,E,8,71,48788,0.721,47.486,34.082149,-117.697705,,OR,2,MONTE VISTA AVE,1,,,
4,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801234,10,W,8,71,48788,1.168,47.933,34.085065,-117.690768,0.66,ML,4,CENTRAL AVE,1,,,
5,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801236,10,W,8,71,48788,1.168,47.933,34.085065,-117.690768,,OR,2,CENTRAL AVE,1,,,
6,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801237,10,E,8,71,48788,1.342,48.107,34.086058,-117.68797,0.595,ML,4,CENTRAL AVE,1,,,
7,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801238,10,E,8,71,48788,1.342,48.107,34.086058,-117.68797,,OR,2,CENTRAL AVE,1,,,
8,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801239,10,E,8,71,81344,1.912,48.677,34.087192,-117.67825,0.523,ML,4,BENSON AVE,1,,,
9,clhouse/meta/d08/2024/01/d08_text_meta_2024_01...,801240,10,W,8,71,81344,1.912,48.677,34.087372,-117.678335,0.544,ML,4,BENSON AVE,1,,,


In [7]:
station_raw_df

Unnamed: 0,FILENAME,SAMPLE_TIMESTAMP,SAMPLE_DATE,ID,FLOW_1,OCCUPANCY_1,SPEED_1,FLOW_2,OCCUPANCY_2,SPEED_2,FLOW_3,OCCUPANCY_3,SPEED_3,FLOW_4,OCCUPANCY_4,SPEED_4,FLOW_5,OCCUPANCY_5,SPEED_5,FLOW_6,OCCUPANCY_6,SPEED_6,FLOW_7,OCCUPANCY_7,SPEED_7,FLOW_8,OCCUPANCY_8,SPEED_8
0,clhouse/raw/d12/2021/06/d12_text_station_raw_2...,2021-06-23 18:25:35,2021-06-23,1219464,13,0.086,85.0,,,,,,,,,,,,,,,,,,,,,
1,clhouse/raw/d08/2021/06/d08_text_station_raw_2...,2021-06-23 18:21:33,2021-06-23,828143,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,
2,clhouse/raw/d12/2021/06/d12_text_station_raw_2...,2021-06-23 23:45:33,2021-06-23,1205276,3,0.017,101.0,7.0,0.043,90.0,3.0,0.018,94.0,2.0,0.01,112.0,0.0,0.0,0.0,,,,,,,,,
3,clhouse/raw/d08/2021/06/d08_text_station_raw_2...,2021-06-23 18:21:33,2021-06-23,827811,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,
4,clhouse/raw/d03/2021/06/d03_text_station_raw_2...,2021-06-23 00:24:35,2021-06-23,316140,1,0.0,0.0,4.0,0.027,71.0,,,,,,,,,,,,,,,,,,
5,clhouse/raw/d08/2021/06/d08_text_station_raw_2...,2021-06-23 18:21:33,2021-06-23,828188,0,0.0,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,
6,clhouse/raw/d12/2021/06/d12_text_station_raw_2...,2021-06-23 23:45:35,2021-06-23,1202276,0,0.0,0.0,,,,,,,,,,,,,,,,,,,,,
7,clhouse/raw/d08/2021/06/d08_text_station_raw_2...,2021-06-23 18:21:36,2021-06-23,809497,0,0.0,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,
8,clhouse/raw/d03/2021/06/d03_text_station_raw_2...,2021-06-23 04:37:20,2021-06-23,316940,0,0.0,0.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,
9,clhouse/raw/d08/2021/06/d08_text_station_raw_2...,2021-06-23 18:21:11,2021-06-23,823650,0,0.0,0.0,3.0,0.053,0.0,,,,,,,,,,,,,,,,,,


### Question 1. Are all the data available for District 5 from the [District Map and County Chart](https://cwwp2.dot.ca.gov/documentation/district-map-county-chart.htm) during 2023?

1. Filter the station metadata file to look at `DISTRICT` values of 5.
2. Strip away all old versions of the data by selecting the most recent unique ID.
3. Select all station IDs within District 5 that took data in 2023.
4. Merge the raw and metadata files on the keyword `ID`.
5. Create a coverage map of the timestamps for all freeways using the keyword `SAMPLE_TIMESTAMP`.

##### 1. Filter the station metadata file to look at `DISTRICT` values of 5.

In [8]:
district_5_filter = station_metadata.filter(station_metadata["DISTRICT"] == "5")

In [9]:
district_5_df = district_5_filter.execute()

##### 2. Strip away all old versions of the data by selecting the most recent unique ID.

Create a new table in Snowflake called `DISTRICT_5_RECENT_VERSION` that contains metadata for the most recent unique IDs.

Identify the number of unique values of ID in District 5.

In [10]:
print('There are {} unique values of ID in District 5.'.format(district_5_df['ID'].nunique()))

There are 706 unique values of ID in District 5.


Construct a new column called `DATA_VERSION`.

In [11]:
district_5_df['DATA_VERSION'] = np.NaN

Extract the date from the `FILENAME` keyword. Populate these dates in the `DATA_VERSION` keyword.

In [12]:
data_version = [dt_obj.strptime(filename[39:49], '%Y_%m_%d') for filename in district_5_df['FILENAME'].values]

In [13]:
district_5_df['DATA_VERSION'] = data_version

Select the most recent `DATA_VERSION` for each unique ID. Drop the rest. The dataframe `district_5_recent_version_df` contains the most recent metadata values for each station ID.

In [14]:
unique_IDs = district_5_df['ID'].value_counts().index.to_list()

In [15]:
drop_these_rows = []
for i in range(len(unique_IDs)):
    ID_subset = district_5_df[district_5_df['ID'] == unique_IDs[i]]
    index_for_max_value = ID_subset['DATA_VERSION'].idxmax()
    indices_for_rows_to_drop = ID_subset.drop(index_for_max_value).index.to_list()
    drop_these_rows.append(indices_for_rows_to_drop)

In [16]:
drop_these_rows_flattened = list(itertools.chain.from_iterable(drop_these_rows))

In [17]:
district_5_recent_version_df = district_5_df.drop(drop_these_rows_flattened).reset_index(drop=True)

In [18]:
DISTRICT_5_RECENT_VERSION = ibis.memtable(district_5_recent_version_df, name='DISTRICT_5_RECENT_VERSION')

Create a table in Snowflake called `DISTRICT_5_RECENT_VERSION`.

To create the table for the first time, uncomment and run the commented cell below.
If the table is already created, skip the cell below.

In [19]:
district_5_recent = con_personal.create_table("DISTRICT_5_RECENT_VERSION", DISTRICT_5_RECENT_VERSION, overwrite=True)

Have some of these data been updated in 2023? Yes.

In [20]:
max(district_5_recent_version_df['DATA_VERSION'])

Timestamp('2023-11-17 00:00:00')

##### 3. Select all station IDs within District 5 that took data in 2023.

Selecting dates using a datetime object will call the Snowflake [date_from_parts()](https://docs.snowflake.com/en/sql-reference/functions/date_from_parts) function, which will search through all the partitions. To optimize the query, select dates in string format. Selecting one year of data for one district takes 3.5 minutes of execution time, but 9.5 minutes of wall time. See [this query](https://app.snowflake.com/vsb79059/dse_caltrans_pems/#/compute/history/queries/01b18b20-0001-fb1d-003e-3887000f305e). However, there is not enough memory to merge the metadata and raw dataframes on the keyword `ID`.

In [21]:
# Select dates within the year 2023
date_selection_start = '2023-03-01'
date_selection_end = '2023-03-30'

In [22]:
print(date_selection_start)
print(date_selection_end)

2023-03-01
2023-03-30


In [23]:
station_raw_filter_ibis_table = station_raw.select("SAMPLE_DATE","SAMPLE_TIMESTAMP","FILENAME","ID").filter(
    (station_raw["SAMPLE_DATE"] >= date_selection_start) & (station_raw["SAMPLE_DATE"] < date_selection_end)
).filter(
    station_raw["FILENAME"].contains("/d05")
).select("ID","SAMPLE_TIMESTAMP","SAMPLE_DATE")

In [24]:
%%time
station_raw_filter_df = station_raw_filter_ibis_table.execute()

CPU times: user 13 s, sys: 2.68 s, total: 15.7 s
Wall time: 32.9 s


In [25]:
# A list of all the station IDs that took data in 2023
IDs_in_D5_with_2023_data = station_raw_filter_df['ID'].value_counts().index.to_list();
len(IDs_in_D5_with_2023_data)

570

##### 4. Merge the raw and metadata files on the keyword `ID`.

Create a new table called `MERGED_IBIS_TABLE`, which merges the raw data with the most recent metadata values for each station ID.

In [26]:
merged_ibis_table = con_personal.create_table("MERGED_IBIS_TABLE", station_raw_filter_ibis_table.join(district_5_recent, ["ID"], how='inner'), overwrite=True)

##### 5. Create aggregations

In [27]:
number_of_30_second_slots_per_day = 24*60*2

In [29]:
raw_and_metadata_merged_ibis_table = con_personal.table("MERGED_IBIS_TABLE");

In [30]:
raw_and_metadata_merged_ibis_table_aggregated = raw_and_metadata_merged_ibis_table.select("ID","SAMPLE_DATE").group_by(["ID", "SAMPLE_DATE"]).count()

In [32]:
aggregate_df = raw_and_metadata_merged_ibis_table_aggregated.execute()

In [33]:
aggregate_df = aggregate_df.rename(columns={"CountStar()": "TOTAL_COUNT"})

In [34]:
aggregate_df["PERCENT_COVERAGE"] = aggregate_df["TOTAL_COUNT"]/number_of_30_second_slots_per_day

In [35]:
aggregate_df

Unnamed: 0,ID,SAMPLE_DATE,TOTAL_COUNT,PERCENT_COVERAGE
0,501018143,2023-03-10,2870,0.996528
1,501014052,2023-03-10,2814,0.977083
2,501010051,2023-03-10,2859,0.992708
3,500014121,2023-03-10,2873,0.997569
4,500014031,2023-03-10,2640,0.916667
...,...,...,...,...
15617,500011143,2023-03-21,2644,0.918056
15618,500013081,2023-03-21,1531,0.531597
15619,500010153,2023-03-14,2833,0.983681
15620,500013081,2023-03-26,710,0.246528


Push this back to Snowflake as a table.

In [36]:
AGGREGATED_TABLE = ibis.memtable(aggregate_df, name='AGGREGATED_TABLE')

In [37]:
AGGREGATED_TABLE

In [38]:
aggregated_ibis_table = con_personal.create_table("AGGREGATED_TABLE", AGGREGATED_TABLE, overwrite=True)