# NYCbuswatcher Shipment --> Data Lake Migration

### Step 1. Convert each JSON in "shipment_folder" to a single parquet file. 

In [1]:
import os, json
import pandas as pd
import numpy as np

In [2]:
shipment_folder="/Volumes/nice/bigdata/bus_depot/bus-data-migrations/nyc/data/in/nyc_shipments/2022/2/2"

In [3]:
# get list of shipment filenames

shipment_list = []
for root, _, filenames in os.walk(shipment_folder):   
    for filename in filenames:
        shipment_list.append(os.path.join(root, filename))
len(shipment_list)

5832

In [4]:
shipment_list[0]

'/Volumes/nice/bigdata/bus_depot/bus-data-migrations/nyc/data/in/nyc_shipments/2022/2/2/0/BX16/shipment_2022-2-2-0-BX16.json'

In [5]:
# iterate over file list and write each JSON as a single parquet file with pyarrow

out_path="/Volumes/nice/bigdata/bus_depot/bus-data-migrations/nyc/data/out/nyc_shipments_as_parquets/"
os.makedirs(out_path, exist_ok=False)

from pyspark.sql.types import IntegerType

# not sure why this is so hard, but need to ensure every file has the same schema
def df_clean(df): 
    df["passenger_count"] = pd.to_numeric(df["passenger_count"],errors='coerce')
    if df.dtypes['passenger_count'] != "float64":
        df['passenger_count'] = df['passenger_count'].astype(dtype = 'float64')
    if df.dtypes['bearing'] != "float64":
        df['bearing'] = df['bearing'].astype(dtype = 'float64')
    if df.dtypes['next_stop_d_along_route'] != "float64":
        df['next_stop_d_along_route'] = df['next_stop_d_along_route'].astype(dtype = 'float64')
    if df.dtypes['next_stop_d'] != "float64":
        df['next_stop_d'] = df['next_stop_d'].astype(dtype = 'float64')   
    return df


for shipment in shipment_list:
    with open(shipment) as data_file:    
        data = json.load(data_file)
        try:
            df = pd.json_normalize(data, 'buses').convert_dtypes()
        except:
            pass
        df = df_clean(df)
        out_file=out_path+shipment.split('/')[-1].split('.')[0]+".parquet"
        df.to_parquet(out_file, engine="fastparquet")


### Step 2. Read all the parquets and repartition by route-hour using Pyspark

In [6]:
import os, json
import pandas as pd

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType, LongType

spark = SparkSession.builder \
  .master("local") \
  .appName("nyc_reparition_individual_parquets_by_route_hour_1") \
  .config("spark.executor.cores", 3) \
  .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/22 20:33:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

In [9]:
# Load all the parquets
in_path="/Volumes/nice/bigdata/bus_depot/bus-data-migrations/nyc/data/out/nyc_shipments_as_parquets/"

df = spark.read.parquet(in_path)
print((df.count(), len(df.columns)))



(2442497, 23)


                                                                                

In [10]:
# df.printSchema()

In [11]:
# Fix timestamp type
# https://sparkbyexamples.com/spark/pyspark-to_timestamp-convert-string-to-timestamp-type/
from pyspark.sql.functions import *

#Timestamp String to DateType
new_df = df.withColumn("timestamp",to_timestamp("timestamp"))
new_df.printSchema()

root
 |-- route: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- route_long: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- service_date: string (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- gtfs_shape_id: string (nullable = true)
 |-- route_short: string (nullable = true)
 |-- agency: string (nullable = true)
 |-- origin_id: string (nullable = true)
 |-- destination_name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- bearing: double (nullable = true)
 |-- progress_rate: string (nullable = true)
 |-- progress_status: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- next_stop_id: string (nullable = true)
 |-- next_stop_eta: string (nullable = true)
 |-- next_stop_d_along_route: double (nullable = true)
 |-- next_stop_d: double (nullable = true)
 |-- gtfs_block_id: string (nullable = true)



In [12]:
out_path="/Volumes/nice/bigdata/bus_depot/bus-data-migrations/nyc/data/out/nyc_shipments_as_parquets_partitioned/"
os.makedirs(out_path, exist_ok=False)

In [13]:
# Save, adding date/hour columns on the fly for partitioning
# https://stackoverflow.com/questions/52527888/spark-partition-data-writing-by-timestamp/52528333#52528333
    
new_df \
    .withColumn("year", year(col("timestamp"))) \
    .withColumn("month", month(col("timestamp"))) \
    .withColumn("day", dayofmonth(col("timestamp"))) \
    .withColumn("hour", hour(col("timestamp"))) \
    .repartition("year", "month", "day", "hour") \
    .write \
    .mode('overwrite')\
    .partitionBy("year", "month", "day", "hour","route_short") \
    .parquet(out_path)

                                                                                