# ETL and EDA Notebook

# Part 1 - Amtrak Northeast Regional Train Data
* This project would not be possible without the diligent joint effort by [Chris Juckins](https://juckins.net/index.php) and [John Bobinyec](http://dixielandsoftware.net/Amtrak/status/StatusMaps/) to collect and preserve Amtrak's on-time performance records. Chris Juckins' archive of timetables was another invaluable resource which enabled me to sort through the trains and stations I chose to use in this project.
* The train data is sourced from [Amtrak Status Maps Archive Database (ASMAD)](https://juckins.net/amtrak_status/archive/html/home.php), and has been retrieved with Chris' permission.

### Overview of the Process
* Functions were written to scrape the HTML table returned from the search query and to process each column to the desired format
* Additional columns were also added during processing to aid in joining the train data with weather data

### Setup

In [None]:
import time
import requests
import re
import lxml.html as lh
import pandas as pd
import numpy as np
from datetime import date, timedelta
from trains_retrieve_and_process_data import * 

### Retrieve HTML table data and recreate as a Pandas DataFrame
* Default is to collect data from the previous day (run after 5am or else no data will be retrieved, ASMAD updates around 4am)
* Collects both arrival and departure data and stores in a dictionary further indexed by station

In [None]:
start = date(2021,6,11)
end = date(2021,6,11)

In [None]:
raw_data = retrieve_data(start=start, end=end)

In [None]:
depart =  raw_data_to_raw_df(raw_data, 'Depart')
print(depart.shape[0])
depart.tail()

In [None]:
arrive = raw_data_to_raw_df(raw_data, 'Arrive')
print(arrive.shape[0])
arrive.tail()

### Save the raw DF to disk

In [None]:
arrive_filestring = './data/trains_raw/arrive_raw_{}_{}.csv'.format(str(start), str(end))
depart_filestring = './data/trains_raw/depart_raw_{}_{}.csv'.format(str(start), str(end))

arrive.to_csv(arrive_filestring, line_terminator='\n', index=False)
depart.to_csv(depart_filestring, line_terminator='\n', index=False)

### Process the raw DF with modifications/additions 
* Modifications to the data:
    * Separate the Origin Date and Origin Week Day  into two columns
    * Add separate columns for Origin Year and Origin Month
    * Separate the Scheduled Arrival/Departure Date, Scheduled Arrival/Departure Week Day, and Scheduled Arrival/Departure Time into three seperate columns
    * Calculate the value of the time difference between Scheduled and Actual Arrival/Departure
    * Convert Service Disruption and Cancellation column text flags to binary indicator columns
    
    

In [None]:
full_arrive = process_columns(arrive, 'Arrive')
full_arrive.head()

In [None]:
full_depart = process_columns(depart, "Depart")
full_depart.head()

### For new 2021 data, concatenate with previously retrieved and processed data from this year

In [None]:
arrive_filestring2021 = './data/trains/arrive_2021_processed.csv'
depart_filestring2021 = './data/trains/depart_2021_processed.csv'
        
prev_arrive2021 = pd.read_csv(arrive_filestring2021)
prev_depart2021 = pd.read_csv(depart_filestring2021)

In [None]:
new_arrive2021 = pd.concat([prev_arrive2021, full_arrive], ignore_index=True, axis=0)
new_depart2021 = pd.concat([prev_depart2021, full_depart], ignore_index=True, axis=0)

In [None]:
new_arrive2021.shape[0]

In [None]:
new_depart2021.shape[0]

### Drop duplicate rows

In [None]:
new_arrive2021.drop_duplicates(inplace = True, ignore_index = True)
new_arrive2021.shape[0]

In [None]:
new_depart2021.drop_duplicates(inplace = True, ignore_index = True)
new_depart2021.shape[0]

In [None]:
new_arrive2021.head()

In [None]:
new_arrive2021.tail()

In [None]:
new_depart2021.head()

In [None]:
new_depart2021.tail()

In [None]:
new_arrive2021.to_csv(arrive_filestring2021, line_terminator='\n', index=False)
new_depart2021.to_csv(depart_filestring2021, line_terminator='\n', index=False)

# Part 2 - Visual Crossing Weather Data

### Setup

In [None]:
import requests
import os
import pandas as pd
import numpy as np
from datetime import date, timedelta

In [None]:
from weather_retrieve_and_process_data import *
assert os.environ.get('VC_TOKEN') is not None , 'empty token!'

### Retrieve unprocessed data

In [None]:
start = str(date(2021,6,10))
end = str(date.today()-timedelta(days=1))

In [None]:
successful_retrievals = retrieve_weather_data(start, end)

### Data Cleaning/Taking Subset of Columns

* Processing recent data by year - add new columns, make minor fixes to string format, take subset of full columns list.
* Function processes the files that were successfully created in the previous step.
* This part is assuming 2021 data is being read and concatenates the previously retrieved data with the new data to create a single combined file.
* Output shows the fraction of the data kept, data is valid and complete almost always ($> 99\%$ of original data has been retained)

In [None]:
process_weather_data(successful_retrievals)

### Data sample for viewing

In [None]:
sample = pd.read_csv('./data/weather/Providence_RI_weather_subset_2021.csv')
sample.head()

In [None]:
sample.tail()

# Part 3a: Loading Data into Postgres Database
Schema pictured below:
![Database Schema](data/schema/Final_DB_Schema.pdf)

### Setup

In [None]:
import psycopg2
import csv
import os
import sys 
import time
assert os.environ.get('DB_PASS') != None , 'empty password!'

#### Functions to create and update tables in the database

In [None]:
def execute_command(conn, command):
    """
    Execute specified command in PostgreSQL database.
    """
    try:
        cur = conn.cursor()
        cur.execute(command)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        conn.rollback()

def update_table(conn, command, csv_file):
    """
    Insert rows from a CSV file into table specified by the command.
    """
    cur = conn.cursor()
    with open(csv_file, newline='') as file:
        info_reader = csv.reader(file, delimiter=',')
        next(info_reader) # Skip header                                                                          
        for row in info_reader:                                           
            try:
                cur.execute(command, tuple(row))
            except (Exception, psycopg2.DatabaseError) as error:
                print(error)
                conn.rollback()
        conn.commit() 
        
def update_trains(conn, command, arr_or_dep, csv_file):
    """
    Insert rows from trains CSV file into table specified by the command.
    """
    cur = conn.cursor()
    with open(csv_file, newline='') as file:
        info_reader = csv.reader(file, delimiter=',')
        next(info_reader) # Skip header                                                                          
        for row in info_reader:                                           
            try:
                cur.execute(command, tuple([arr_or_dep] + row))
            except (Exception, psycopg2.DatabaseError) as error:
                print(error)
                conn.rollback()
        conn.commit() 

In [None]:
conn = psycopg2.connect("dbname='amtrakproject' user='{}' password={}".format(os.environ.get('USER'), os.environ.get('DB_PASS')))
assert conn is not None, 'need to fix conn!!'

In [None]:
create_station_info = """ 
                      DROP TABLE IF EXISTS station_info CASCADE;
                      CREATE TABLE station_info (
                          station_code text UNIQUE PRIMARY KEY,
                          amtrak_station_name text,
                          crew_change boolean,
                          weather_location_name text,
                          longitude real,
                          latitude real,
                          nb_next_station text,
                          sb_next_station text,
                          nb_mile numeric,
                          sb_mile numeric,
                          nb_stop_num numeric,
                          sb_stop_num numeric,
                          nb_miles_to_next numeric,
                          sb_miles_to_next numeric
                      );
                      """

insert_into_station_info = """
                           INSERT INTO
                               station_info (
                                   station_code,
                                   amtrak_station_name,
                                   crew_change,
                                   weather_location_name,
                                   longitude,
                                   latitude,
                                   nb_next_station,
                                   sb_next_station,
                                   nb_mile,
                                   sb_mile,
                                   nb_stop_num,
                                   sb_stop_num,
                                   nb_miles_to_next,
                                   sb_miles_to_next

                             )
                         VALUES
                             (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                         ON CONFLICT DO NOTHING;
                         """  

In [None]:
create_stops = """
                 DROP TABLE IF EXISTS stops CASCADE;
                 CREATE TABLE stops (
                     stop_id SERIAL PRIMARY KEY,
                     arrival_or_departure text, 
                     train_num text,
                     station_code text REFERENCES station_info,
                     direction text,
                     origin_date date,
                     origin_year int,
                     origin_month int,
                     origin_week_day text,
                     full_sched_arr_dep_datetime timestamp,
                     sched_arr_dep_date date,
                     sched_arr_dep_week_day text,
                     sched_arr_dep_time time,
                     act_arr_dep_time time,
                     full_act_arr_dep_datetime timestamp,
                     timedelta_from_sched numeric,
                     service_disruption boolean,
                     cancellations boolean
                 );
               """

insert_into_stops = """
                    INSERT INTO
                        stops (
                            arrival_or_departure,
                            train_num,
                            station_code,
                            direction,
                            origin_date,
                            origin_year,
                            origin_month,
                            origin_week_day,
                            full_sched_arr_dep_datetime,
                            sched_arr_dep_date,
                            sched_arr_dep_week_day,
                            sched_arr_dep_time,
                            act_arr_dep_time,
                            full_act_arr_dep_datetime,
                            timedelta_from_sched,
                            service_disruption,
                            cancellations
                          )
                      VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                      ON CONFLICT DO NOTHING;
                      """

In [None]:
create_weather = """
                 DROP TABLE IF EXISTS weather_hourly CASCADE;
                 CREATE TABLE weather_hourly (
                     weather_id SERIAL PRIMARY KEY,
                     location text,
                     obs_datetime timestamp,
                     temperature real,
                     precipitation real,
                     cloud_cover real,
                     weather_type text
                 );
                 """

insert_into_weather = """
                      INSERT INTO
                          weather_hourly (
                              location,
                              obs_datetime,
                              temperature,
                              precipitation,
                              cloud_cover,
                              weather_type
                      )
                      VALUES
                          (%s, %s, %s, %s, %s, %s) 
                      ON CONFLICT DO NOTHING;
                      """ 

In [None]:
create_route = """
               DROP TABLE IF EXISTS regional_route CASCADE;

               CREATE TABLE regional_route (
                 coord_id SERIAL PRIMARY KEY,
                 longitude real,
                 latitude real,
                 path_group numeric,
                 connecting_path text, 
                 nb_station_group text,
                 sb_station_group text
               );
               """

insert_into_route = """
                    INSERT INTO
                      regional_route (
                          longitude,
                          latitude, 
                          path_group,
                          connecting_path,
                          nb_station_group,
                          sb_station_group
                      )
                    VALUES 
                        (%s, %s, %s, %s, %s, %s) 
                    ON CONFLICT DO NOTHING;
                    """

In [None]:
conn = psycopg2.connect("dbname='amtrakproject' user={} password={}".format(os.environ.get('USER'), os.environ.get('DB_PASS')))
assert conn is not None, 'need to fix conn!!'

In [None]:
create_table_cmds = [create_station_info, create_stops, create_weather,  create_route]

for cmd in create_table_cmds:
    execute_command(conn, cmd)

In [None]:
# Insert all station facts into station info table
update_table(conn, insert_into_station_info, './data/facts/geo_stations_info.csv')

# Insert route with the coordiniates into route table
update_table(conn, insert_into_route, './data/facts/NE_regional_lonlat.csv')

In [None]:
years = [2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021]

begin_everything = time.time()

# Insert all train data into arrival and departure data tables
for year in years:
    start = time.time()
    arrive_csv = './data/trains/arrive_{}_processed.csv'.format(year)
    depart_csv = './data/trains/depart_{}_processed.csv'.format(year)
    update_trains(conn, insert_into_stops, 'Arrival', arrive_csv)
    update_trains(conn, insert_into_stops, 'Departure', depart_csv)
    print('Finished adding year', year, 'to database in', time.time() - start, 'seconds')
print('COMPLETE in', time.time() - begin_everything)

In [None]:
location_names_for_files = ['Boston_MA', 'Providence_RI', 'Kingston_RI', 'Westerly_RI', 'Mystic_CT',
                            'New_London_CT', 'Old_Saybrook_CT', 'New_Haven_CT', 'Bridgeport_CT', 
                            'Stamford_CT', 'New_Rochelle_NY', 'Manhattan_NY', 'Newark_NJ', 'Iselin_NJ', 
                            'Trenton_NJ', 'Philadelphia_PA', 'Wilmington_DE','Aberdeen_MD', 'Baltimore_MD',
                            'Baltimore_BWI_Airport_MD', 'New_Carrollton_MD', 'Washington_DC']

years = [2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021]

# Insert all weather data into the weather data table
begin_everything = time.time()
for location in location_names_for_files:
    start = time.time()
    for year in years:
        weather_csv = './data/weather/{}_weather_subset_{}.csv'.format(location, year)
        update_table(conn, insert_into_weather, weather_csv)
    print('Finished adding location', location, 'to database in', time.time() - start, 'seconds')
print("COMPLETE in", time.time() - begin_everything)

In [None]:
create_dates_trains = """
                      DROP TABLE IF EXISTS dates_trains CASCADE;
                      CREATE TABLE dates_trains AS SELECT DISTINCT
                          origin_date,
                          train_num
                      FROM
                          stops
                      GROUP BY
                          origin_date,
                          train_num;

                      ALTER TABLE dates_trains
                          ADD COLUMN trip_id SERIAL PRIMARY KEY;
                      """

In [None]:
execute_command(conn, create_dates_trains)

In [None]:
join_data = """
            CREATE TABLE stops_joined AS
            SELECT
                *
            FROM
                stops s
                INNER JOIN (
                    SELECT
                        station_code AS si_station_code,
                        amtrak_station_name,
                        crew_change,
                        weather_location_name,
                        longitude,
                        latitude,
                        nb_next_station,
                        sb_next_station,
                        nb_mile,
                        sb_mile,
                        nb_stop_num,
                        sb_stop_num,
                        nb_miles_to_next,
                        sb_miles_to_next
                    FROM
                        station_info) si ON s.station_code = si.si_station_code
                INNER JOIN weather_hourly wh ON wh.location = si.weather_location_name
                    AND DATE_TRUNC('hour', s.full_act_arr_dep_datetime) = wh.obs_datetime
            ORDER BY
                s.full_sched_arr_dep_datetime;
            """

alter_joined_table = """
                      ALTER TABLE stops_joined
                          DROP COLUMN location,
                          DROP COLUMN obs_datetime,
                          DROP COLUMN weather_id,
                          DROP COLUMN weather_location_name,
                          DROP COLUMN longitude,
                          DROP COLUMN latitude;
                     """

In [None]:
execute_command(conn, join_data)

In [None]:
execute_command(conn, alter_joined_table)

### Remove duplicate rows
* There are 393 duplicate entries which somehow ended up in the dataset, as determined by the unique tuples (`origin_date`, `train_num`, `station_code`, `arrival_or_departure`)

In [None]:
remove_duplicates = """
                    DELETE 
                    FROM stops_joined
                    WHERE stops_joined.stop_id IN 
                    (
                        SELECT sj_stop_id
                        FROM(
                            SELECT 
                                *, 
                                sj.stop_id AS sj_stop_id,
                                row_number() OVER (PARTITION BY origin_date, train_num, station_code, arrival_or_departure ORDER BY stop_id) 
                            FROM stops_joined sj
                        ) s
                        WHERE row_number >= 2
                    );
                    """

In [None]:
execute_command(conn, remove_duplicates)

In [None]:
add_trip_ids_col = """
                   ALTER TABLE
                   ADD COLUMN trip_id integer;
                   """

In [None]:
create_function_get_stop_ids = """
                               CREATE FUNCTION get_all_stop_ids (origin_date date, train_num text, OUT stop_id int, OUT station_code text)
                                    RETURNS SETOF record
                                    AS $$
                                        SELECT
                                            stop_id,
                                            station_code
                                        FROM
                                            stops_joined
                                        WHERE
                                            origin_date = $1
                                        AND train_num = $2;

                                $$
                                LANGUAGE SQL;
                               """

In [None]:
execute_command(conn, add_trip_ids_col)
execute_command(conn, create_function_get_stop_idss)

In [None]:
add_cloud_level = """ALTER TABLE stops_joined ADD COLUMN cloud_level integer;"""


set_cloud_level = """UPDATE
                        stops_joined
                     SET
                        cloud_level = (
                            CASE WHEN cloud_cover < 10 THEN
                                0
                            WHEN cloud_cover BETWEEN 10
                                AND 34.999 THEN
                                1
                            WHEN cloud_cover BETWEEN 35
                                AND 74.999 THEN
                                2
                            WHEN cloud_cover >= 75 THEN
                                3
                            END)
                     WHERE
                        cloud_cover IS NOT NULL;
                    """
    
execute_command(conn, add_cloud_level)
execute_command(conn, set_cloud_level)

In [None]:
add_below_freezing = """
                     ALTER TABLE stops_joined
                     ADD COLUMN below_freezing boolean;
                     """
set_below_freezing = """
                     UPDATE
                        stops_joined
                     SET
                        below_freezing = (
                            CASE WHEN temperature >= 32 THEN
                                '0'
                            ELSE
                                '1'
                            END
                        WHERE
                            temperature IS NOT NULL;
                    """

In [None]:
execute_command(conn, add_below_freezing
execute_command(conn, set_below_freezing)

In [None]:
add_above_temp_thresh = """ALTER TABLE stops_joined ADD COLUMN above_temp_thresh boolean;"""
set_above_temp_thresh = """
                        UPDATE
                            stops_joined
                        SET
                            above_temp_thresh = (
                                CASE WHEN temperature < 90 THEN
                                    '0'
                                ELSE
                                    '1'
                                END) 
                        WHERE
                            temperature IS NOT NULL;
                        """

execute_command(conn, add_above_temp_thresh)
execute_command(conn, set_above_temp_thresh)

In [None]:
add_precip_flag = """ALTER TABLE stops_joined ADD COLUMN precip_exists boolean;"""
set_precip_flag = """UPDATE
                        stops_joined
                     SET
                        weather_cond_exists = (
                            CASE WHEN weather_type LIKE '%Snow%' OR weather_type LIKE '%Rain%' THEN
                                '1'
                            ELSE
                                '0'
                            END)
                     WHERE
                        weather_type IS NOT NULL;
                    """

execute_command(conn, add_precip_flag)
execute_command(conn, set_precip_flag)


In [None]:
add_precip_level = """ALTER TABLE stops_joined ADD COLUMN precip_level integer;"""

set_precip_level = """
                    UPDATE
                        stops_joined
                    SET
                        precip_level = (
                            CASE WHEN precipitation BETWEEN 0.01
                                AND 0.09999 THEN
                                1
                            WHEN precipitation BETWEEN 0.10
                                AND 0.24999 THEN
                                2
                            WHEN precipitation BETWEEN 0.250
                                AND 0.40999 THEN
                                3
                            WHEN precipitation BETWEEN 0.50
                                AND 0.74999 THEN
                                4
                            WHEN precipitation >= 0.75 THEN
                                5
                            ELSE
                                0
                            END)
                    WHERE
                        precipitation IS NOT NULL;
                   """

execute_command(conn, add_precip_level)
execute_command(conn, set_precip_level)


In [None]:
add_worst_case_precip = """ALTER TABLE stops_joined
                           ADD COLUMN worst_case_precip integer;"""

set_worst_case_precip = """
                        UPDATE
                            stops_joined
                        SET
                            worst_case_precip = (
                                CASE WHEN weather_type LIKE '%Snow%'
                                    AND weather_type LIKE '%Rain%' THEN
                                    3
                                WHEN weather_type LIKE '%Snow%'
                                    AND weather_type NOT LIKE '%Rain%' THEN
                                    3
                                WHEN weather_type LIKE '%Thunderstorm%'
                                    OR weather_type LIKE '%Hail%' THEN
                                    3
                                WHEN weather_type LIKE '%Rain%'
                                    AND weather_type NOT LIKE '%Snow%' THEN
                                    2
                                WHEN weather_type = '' THEN
                                    0
                                ELSE
                                    1
                                END)
                        WHERE
                            weather_type IS NOT NULL;
                        """

execute_command(conn, add_worst_case_precip)
execute_command(conn, set_worst_case_precip)