## Workbook for local actions related to the streaming UK Rail workload

In [24]:
import awswrangler as aw
import json
import boto3
import pandas as pd
import numpy as np
import os
from datetime import datetime

In [25]:
# Ensure that the correct session is used
boto3.setup_default_session(region_name="us-east-1")

Read the location export

In [57]:
location_df = pd.read_json('./LocationData/TiplocPublicExport_2021-07-28_15-45.json')
# The read file contains tiplocs in json format - this line reformats those json cells (which 
# get parsed into dictionaries by pandas) into their own rows
df_save_locations = pd.DataFrame((d for d in location_df['Tiplocs']))
# Select only the information that is required
df_save_locations = df_save_locations[['Name', 'Tiploc', 'Stanox', 'Latitude', 'Longitude']]
df_save_locations

Unnamed: 0,Name,Tiploc,Stanox,Latitude,Longitude
0,Aachen,AACHEN,5.0,50.767721,6.091281
1,Abercwmboi,ABCWM,78128.0,51.690694,-3.402551
2,Penywaun Bus,ABDAPEN,,51.730537,-3.484104
3,Aberdare,ABDARE,78100.0,51.714525,-3.441859
4,Trecynon,ABDATRE,,51.721419,-3.459253
...,...,...,...,...,...
11199,Training Location Alpha,ZTRGALP,,53.593103,-0.000494
11200,Training Location Brian,ZTRGBRI,,53.593129,0.000923
11201,Training Location Deltad,ZTRGDED,,53.593161,0.002006
11202,Training Location James,ZTRGJAM,,53.593154,0.003176


Prepare the dataset of locations to save. First obtain the schema
of the dataframe

In [44]:
saveSchema = aw.catalog.extract_athena_types(df_save_locations)[0]
saveSchema['stanox'] = 'string'
saveSchema

{'name': 'string',
 'tiploc': 'string',
 'stanox': 'string',
 'latitude': 'double',
 'longitude': 'double'}

Next, deduplicate the locations by stanox (the key for location in the
streaming dataset)

In [54]:
dedupe = df_save_locations.groupby('stanox').first().reset_index()
dedupe

Unnamed: 0,stanox,name,tiploc,latitude,longitude
0,1001,Thurso,THURSO,58.589893,-3.527793
1,10021,Aspatria,ASPTRIA,54.758960,-3.331890
2,10023,Maryport,MPRT,54.711187,-3.494391
3,10026,Flimby,FLIMBY,54.689933,-3.520584
4,1003,Thurso UKAEA,THURSOU,58.589065,-3.528059
...,...,...,...,...,...
10097,9541,Rylstone S.B.,RYLSTON,54.030788,-2.053228
10098,9542,Rylstone Tilcon (GBRf),RYLSGBF,54.050636,-2.023828
10099,9543,Rylstone (swinden) Quarry,RYLSQRY,54.045146,-2.029240
10100,9545,Rylstone Lc,RYLSLC,54.045146,-2.029240


Finaly, save the location dataframe to the location table. First create the table
with the schema extracted from the dataframe

In [55]:
aw.catalog.create_parquet_table(database = "train_silver", 
                                table = "location", 
                                path = "s3://train-silver/location/",
                                columns_types = saveSchema,
                                compression = "snappy",
                                table_type = "GOVERNED")

And then write the data

In [56]:
aw.s3.to_parquet(
            df=dedupe,
            dataset=True,
            mode="append",
            database="train_silver",
            table="location"
        )

{'paths': ['s3://train-silver/location/4e7accbb3c61406d979e6a818e1db2b7.snappy.parquet'],
 'partitions_values': {}}

## Experimentation zone

These cells relate to various bits of experimentation performed to understand how AWS data wrangler works with Athena and Lake Formation

In [None]:
for subdir, dirs, files in os.walk('./sample_data/'):
    for file in files:
        file_loc = os.path.join(subdir, file)
        f = open(file_loc)
        print(file_loc)
        filedata = json.load(f)
        rows = []
        for row in filedata:
            row_to_add = row['body']
            row_to_add['msg_type'] = row['header']['msg_type']
            rows.append(row_to_add)
            
        frame = pd.DataFrame(rows)        
        frame['segment_timestamp'] = np.nan
        if 'actual_timestamp' in frame.columns:
            frame['segment_timestamp'] = frame['segment_timestamp'].fillna(frame['actual_timestamp'])
        if 'creation_timestamp' in frame.columns:
            frame['segment_timestamp'] = frame['segment_timestamp'].fillna(frame['creation_timestamp'])
        if 'dep_timestamp' in frame.columns:
            frame['segment_timestamp'] = frame['segment_timestamp'].fillna(frame['dep_timestamp'])
        if 'event_timestamp' in frame.columns:
            frame['segment_timestamp'] = frame['segment_timestamp'].fillna(frame['event_timestamp'])
                
        frame['segment_date'] = frame['segment_timestamp'].apply(lambda x: datetime.fromtimestamp(int(x) / 1000.0).date())
        frame = frame.drop(columns=['segment_timestamp'])
        print('Calling s3 to parquet')
        aw.s3.to_parquet(
            df=frame,
            dataset=True,
            mode="append",
            database="train_bronze",
            table="train_movements_governed",
            catalog_versioning=True,  # Optional
            #schema_evolution=True,
            partition_cols=['segment_date']
        )
        
        break

In [None]:
df = aw.athena.read_sql_query('SELECT max(canx_timestamp) AS max_date FROM "AwsDataCatalog"."train_silver"."journey";', database="train_silver")
df.fillna(0)
df.head()

In [None]:
df.head()['max_date'][0]

In [None]:
df.count()[0] == 0


## Batch Job dev

This notebook code was used to test the various elements of the AWS batch job that writes journeys and stops

In [None]:
max_journey_timestamp = 0

journey_query = """SELECT train_id, loc_stanox as stanox, CAST(canx_timestamp as bigint) AS canx_timestamp, segment_date 
FROM "AwsDataCatalog"."train_bronze"."train_movements_governed" 
WHERE canx_type = 'AT ORIGIN' 
AND cast(canx_timestamp AS bigint) > {}""".format(max_journey_timestamp)

print(journey_query)


journey_df = aw.athena.read_sql_query(journey_query, database="train_bronze")

In [None]:
journey_df.count()

In [None]:
journey_df = aw.athena.read_sql_query('SELECT count(*) as all_journey_count FROM "train_silver"."journey";', database="train_silver")
journey_df

In [None]:
stop_df = aw.athena.read_sql_query('SELECT count(*) as all_journey_count FROM "train_silver"."stop";', database="train_silver")
stop_df

In [None]:
df = aw.athena.read_sql_query('SELECT max(canx_timestamp) AS max_date FROM "AwsDataCatalog"."train_silver"."journey";', database="train_silver")
max_journey_timestamp = df.head()['max_date']

In [None]:
max_journey_timestamp = df.head()['max_date'][0]
max_journey_timestamp

In [None]:
query = """SELECT a.train_id, a.loc_stanox as stanox_code, 
       a.planned_timestamp as planned_arrival_int, 
       a.actual_timestamp as actual_arrival_int,
       a.variation_status as time_status,
       b.planned_timestamp as planned_departure_int, 
       b.actual_timestamp as actual_departure_int,
       CASE a.planned_event_type 
           WHEN 'DESTINATION' THEN True
           ELSE False
       END AS is_terminus
FROM "train_bronze"."train_movements_governed" as a
     LEFT JOIN "train_bronze"."train_movements_governed" as b
     ON  a.train_id = b.train_id
     AND a.loc_stanox = b.loc_stanox
     AND a.event_type = 'ARRIVAL'  
     AND b.event_type = 'DEPARTURE'  
WHERE a.event_type = 'ARRIVAL'  
AND (b.event_type = 'DEPARTURE' or a.planned_event_type = 'DESTINATION') 
AND (CAST(a.actual_timestamp as bigint) > {} OR CAST(b.actual_timestamp as bigint) > {})""".format(0, 0)

stop_df = aw.athena.read_sql_query(query, database="train_bronze")

In [None]:
stop_df['planned_departure_int'] = stop_df['planned_departure_int'].fillna('0')
stop_df['actual_departure_int'] = stop_df['actual_departure_int'].fillna('0')

def calc_date(planned, actual):
    if planned == 0:
        return datetime.fromtimestamp(int(actual) / 1000.0)
    return datetime.fromtimestamp(int(planned) / 1000.0)

def value_with_fallback(value, fallback):
    if value == 0:
        return fallback
    return value

def calc_date_none(actual):
    if actual == 0:
        return None
    return datetime.fromtimestamp(int(actual) / 1000.0)

def calc_delay(expected, actual):
    if expected == 0 or actual == 0:
        return 0
    return (actual - expected) / 1000.0

stop_df["arrival"] = stop_df['actual_arrival_int'].apply(lambda x: datetime.fromtimestamp(int(x) / 1000.0))
stop_df["planned_arrival_int"].replace({"": "0"}, inplace=True)
stop_df["delay_seconds"] = stop_df.apply(lambda x: calc_delay(int(x['planned_arrival_int']), int(x['actual_arrival_int'])), axis=1)
stop_df["planned_arrival"] = stop_df.apply(lambda x: calc_date(int(x['planned_arrival_int']), int(x['actual_arrival_int'])), axis=1)

stop_df["planned_departure_int"].replace({"": "0"}, inplace=True)
stop_df["departure"] = stop_df['actual_departure_int'].apply(lambda x: calc_date_none(x))
stop_df["planned_departure"] = stop_df['planned_departure_int'].apply(lambda x: calc_date_none(x))
stop_df["depart_timestamp"] = stop_df.apply(lambda x: value_with_fallback(int(x['actual_departure_int']), int(x['actual_arrival_int'])), axis=1)

stop_df.drop(['planned_arrival_int', 'actual_arrival_int', 'planned_departure_int', 'actual_departure_int'], inplace=True, axis=1)
stop_df


In [None]:
df = aw.athena.read_sql_query('SELECT * FROM "AwsDataCatalog"."train_silver"."stop" LIMIT 50;', database="train_silver")
df