#### Batch Data Ingestion & Preparation

##### https://www.citibikenyc.com/system-data

In [2]:
from pyspark.sql.types import *
from pyspark.sql import functions as psf

In [3]:
# Config section:
bronze_path = '/FileStore/tables/Citi-Bike/Bronze'
silver_path = '/FileStore/tables/Citi-Bike/Silver'
gold_path = '/FileStore/tables/Citi-Bike/Gold'

# Create folder structure on DBFS if absent: (ignores if present)
dbutils.fs.mkdirs(bronze_path)
dbutils.fs.mkdirs(silver_path)
dbutils.fs.mkdirs(gold_path)

In [4]:
dbutils.fs.ls('/FileStore/tables/Citi-Bike/Gold/dim_station/_delta_log/')

In [5]:
%sh
cat /dbfs/FileStore/tables/Citi-Bike/Gold/dim_station/_delta_log/00000000000000000000.json

###### Hive like Catalog DB named: citibike.
###### To store Project Tables & Views.

In [7]:
%sql
DROP DATABASE IF EXISTS citibike CASCADE;
CREATE DATABASE citibike;

In [8]:
# Verify that CSV Batch data files are placed in the Bronze DBFS layer.
# Manually downloaded from Citibike website for the Demo. FTP to be setup for Prod.
display(
  dbutils.fs.ls(bronze_path)
)

path,name,size
dbfs:/FileStore/tables/Citi-Bike/Bronze/JC_202001_citibike_tripdata.csv,JC_202001_citibike_tripdata.csv,4861189
dbfs:/FileStore/tables/Citi-Bike/Bronze/JC_202002_citibike_tripdata.csv,JC_202002_citibike_tripdata.csv,4274782
dbfs:/FileStore/tables/Citi-Bike/Bronze/JC_202003_citibike_tripdata.csv,JC_202003_citibike_tripdata.csv,3288964
dbfs:/FileStore/tables/Citi-Bike/Bronze/JC_202004_citibike_tripdata.csv,JC_202004_citibike_tripdata.csv,1712500
dbfs:/FileStore/tables/Citi-Bike/Bronze/JC_202005_citibike_tripdata.csv,JC_202005_citibike_tripdata.csv,4635457
dbfs:/FileStore/tables/Citi-Bike/Bronze/JC_202006_citibike_tripdata.csv,JC_202006_citibike_tripdata.csv,6819314


In [10]:
# Clean-up step (as & when needed)
# View and Delete unwanted files from Landing/Bronze directory:
#dbutils.fs.ls('/FileStore/tables/')
#dbutils.fs.rm('/FileStore/tables/JC_202006_citibike_tripdata.csv')

In [11]:
# schema for the raw trip history data
trip_schema = StructType([
  StructField('tripduration', IntegerType()),
  StructField('start_time', StringType()),
  StructField('stop_time',  StringType()),
  StructField('start_station_id', IntegerType()),
  StructField('start_station_name', StringType()),
  StructField('start_station_latitude', StringType()),
  StructField('start_station_longitude', StringType()),
  StructField('end_station_id', IntegerType()),
  StructField('end_station_name', StringType()),
  StructField('end_station_latitude', StringType()),
  StructField('end_station_longitude', StringType()),
  StructField('bike_id', IntegerType()),
  StructField('user_type', StringType()),
  StructField('birth_year', StringType()),
  StructField('user_gender', StringType()),
  ])

In [12]:
# read the raw trip history data to dataframe, without triggering job, by passing csv schema
bronze = spark.read.csv(
  bronze_path, 
  header=True,  
  schema=trip_schema
  )

In [13]:
#bronze.rdd.getNumPartitions()
#bronze = bronze.coalesce(200) # If we use Coalesce to reduce NumPartitions, then we can use Coalesce to INCREASE numPart till that old num !!
#bronze = bronze.repartition(201, ["start_station_id", "end_station_id"])
#bronze.rdd.getNumPartitions()

In [14]:
bronze = bronze.distinct()
#display(bronze.orderBy(bronze.start_time.desc()))
#bronze.distinct().count() #101046
bronze.count()  #101046

##### Complete Refresh / Append to Delta Lake - Silver Layer

In [16]:
# Save all combined de-duped CSVs as Delta Lake in Silver layer DBFS:
bronze.write.format('delta').mode('overwrite').save(silver_path)

In [17]:
# Verify write to Silver layer:
slvrDF = spark.read.format('delta').load(silver_path)

In [18]:

# Validation step
# Verify Counts/Schema/Top few records etc.
print("slvrDF.count()  :   ", slvrDF.count())
print("================================================")
print("slvrDF.dtypes  :   ", slvrDF.dtypes)
print("================================================")
print("slvrDF.head()  :   ", slvrDF.head())
print("================================================")
print("slvrDF.schema  :   ", slvrDF.schema)

In [19]:
# Spark SQL based approach to analyze data:
#%sql
#select count(*) from silver

##### Time Travel SQL on Delta Lake

In [21]:
slvrDF_ttrvl = spark.read.format('delta').option("timestampAsOf", "2020-07-30").load(silver_path)
slvrDF_ttrvl.count()

#%sql
#select count(*) from silver TIMESTAMP AS OF "2019-01-01"

In [22]:
# View top few rows using databricks's display function, which enables charts & more
display(slvrDF.head(10))

tripduration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,user_gender
977,2020-06-01 13:50:23.1360,2020-06-01 14:06:40.1460,3481,York St,40.71649,-74.04105,3185,City Hall,40.7177325,-74.043845,42330,Subscriber,1982,2
1902,2020-06-01 19:04:27.8520,2020-06-01 19:36:10.1530,3185,City Hall,40.7177325,-74.043845,3185,City Hall,40.7177325,-74.043845,42105,Subscriber,1985,1
2616,2020-06-02 08:24:20.3350,2020-06-02 09:07:57.2860,3194,McGinley Square,40.7253399253558,-74.06762212514877,3270,Jersey & 6th St,40.72528910781132,-74.04557168483734,42154,Subscriber,1993,1
4456,2020-06-02 12:43:38.1080,2020-06-02 13:57:54.5090,3213,Van Vorst Park,40.71848892,-74.047726625,3213,Van Vorst Park,40.71848892,-74.047726625,39066,Customer,1969,0
389,2020-06-02 13:22:00.3990,2020-06-02 13:28:29.9400,3195,Sip Ave,40.73089709786179,-74.06391263008118,3678,Fairmount Ave,40.72572613742557,-74.07195925712585,42481,Subscriber,1967,1
533,2020-06-02 17:33:48.7820,2020-06-02 17:42:42.2420,3195,Sip Ave,40.73089709786179,-74.06391263008118,3193,Lincoln Park,40.7246050998869,-74.07840594649315,31242,Subscriber,1994,2
2510,2020-06-02 18:29:01.9040,2020-06-02 19:10:52.7490,3214,Essex Light Rail,40.7127742,-74.0364857,3267,Morris Canal,40.7124188237569,-74.03852552175522,42580,Customer,1992,1
859,2020-06-02 18:51:55.9090,2020-06-02 19:06:15.8570,3638,Washington St,40.7242941,-74.0354826,3267,Morris Canal,40.7124188237569,-74.03852552175522,42133,Subscriber,1985,2
7013,2020-06-03 18:20:24.4290,2020-06-03 20:17:17.5280,3192,Liberty Light Rail,40.7112423,-74.0557013,3192,Liberty Light Rail,40.7112423,-74.0557013,42393,Customer,1969,0
800,2020-06-03 18:46:06.9230,2020-06-03 18:59:27.3370,3276,Marin Light Rail,40.71458403535893,-74.04281705617905,3639,Harborside,40.7192517,-74.034234,29072,Customer,1991,1


##### Create a Station Master Dimension table

In [24]:
slvrDF.createOrReplaceTempView("silver")

startDF = slvrDF.select(["start_station_id","start_station_name"]).distinct().withColumnRenamed("start_station_id","station_id").withColumnRenamed("start_station_name","station_name")
endDF = slvrDF.select(["end_station_id","end_station_name"]).distinct().withColumnRenamed("end_station_id","station_id").withColumnRenamed("end_station_name","station_name")

unionDF = startDF.unionAll(endDF).distinct()

#display(unionDF)

unionDF.write.format('delta').mode('overwrite').save(f"{gold_path}/dim_station")

In [25]:
# Create Managed Table using spark SQL (alternative to Hive SQL)
tableName = "dim_station"

spark.sql(f"DROP TABLE IF EXISTS CITIBIKE.{tableName}")

spark.sql(f"""
  CREATE TABLE CITIBIKE.{tableName}
  USING DELTA
  LOCATION "{gold_path}/dim_station"
""")

spark.catalog.listTables()

In [26]:
%sql
select * from CITIBIKE.dim_station

station_id,station_name
3629,Adam Clayton Powell Blvd & W 126 St
83,Atlantic Ave & Fort Greene Pl
3256,Pier 40 - Hudson River Park
3540,Morningside Ave & W 123 St
3461,Murray St & Greenwich St
327,Vesey Pl & River Terrace
3623,W 120 St & Claremont Ave
3277,Communipaw & Berry Lane
534,Water - Whitehall Plaza
3649,W 129 St & Convent Ave


#### Perform Top N Analysis - Gold Layer:
* Busiest Station-ID: Stations with most Trips
* Stations with Longest Trips
* Stations with Shortest Trips
* Customers by Age group : 30+, 40+, 50+ yrs 
* Stations with Most Subscribers

###### Spark SQL approach:
###### Busiest Station-ID: Stations with most Trips

In [29]:
# DEFINE Busiest Station-ID: Stations with most Trips
slvrDF.createOrReplaceTempView("silver")

In [30]:
#stations_by_cnt = 
spark.sql('''
select station_id, count(station_id) AS trip_cnt
FROM 
  (
     select start_station_id as station_id from silver
     UNION ALL
     select end_station_id as station_id from silver
  ) AS station_ids
GROUP BY station_id
order by trip_cnt DESC
'''
).createOrReplaceTempView("dm_busiest_stations")

#display(stations_by_cnt) # IF a Dataframe was created, then we have seen Content this way

In [31]:
%sql
-- Analytics VIEW
select A.station_id, S.station_name, A.trip_cnt
from dm_busiest_stations A
INNER JOIN CITIBIKE.dim_station S
on A.station_id = S.station_id
LIMIT 7

station_id,station_name,trip_cnt
3186,Grove St PATH,21876
3192,Liberty Light Rail,14002
3199,Newport Pkwy,13990
3203,Hamilton Park,12863
3639,Harborside,11581
3195,Sip Ave,11382
3792,Columbus Dr at Exchange Pl,11056


##### Stations with Longest Trips

In [33]:
%sql
-- MEDIAN driven approach:
select DISTINCT A.*, S.station_name FROM (
  select start_station_id as station_id, tripduration, PERCENT_RANK() OVER (PARTITION BY start_station_id ORDER BY tripduration) AS median_tripduration
  from silver
  UNION ALL
  select end_station_id as station_id, tripduration, PERCENT_RANK() OVER (PARTITION BY start_station_id ORDER BY tripduration) AS median_tripduration
  from silver
) A
INNER JOIN CITIBIKE.dim_station S
on A.station_id = S.station_id
WHERE A.median_tripduration = 0.5
ORDER BY A.tripduration DESC, A.station_id
LIMIT 7

station_id,tripduration,median_tripduration,station_name
3638,1147,0.5,Washington St
3791,1147,0.5,Hoboken Ave at Monmouth St
3187,1101,0.5,Warren St
3277,1101,0.5,Communipaw & Berry Lane
3195,942,0.5,Sip Ave
3281,942,0.5,Leonard Gordon Park
3191,707,0.5,Union St


In [34]:
%sql
-- MEAN/AVG based approach:
select A.start_station_id, round(avg(A.tripduration),0) as avg_tripduration, S.station_name
from silver A
INNER JOIN CITIBIKE.dim_station S
on A.start_station_id = S.station_id
GROUP BY A.start_station_id, S.station_name
--order by tripduration DESC

start_station_id,avg_tripduration,station_name
3187,1283.0,Warren St
3281,1702.0,Leonard Gordon Park
3212,1182.0,Christ Hospital
3198,3595.0,Heights Elevator
3201,1359.0,Dey St
3791,2086.0,Hoboken Ave at Monmouth St
3209,917.0,Brunswick St
3694,1677.0,Jackson Square
3679,1904.0,Bergen Ave
3207,1541.0,Oakland Ave
