### Read CSV file to delta format

In [0]:
# import library

from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType

#### Payments

In [0]:

# payment extraction
payments_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/tables/payments.csv") \
    .toDF("payment_id","date","amount","rider_id")

In [0]:
display(payments_df.limit(10))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


In [0]:
# Write data to delta location
payments_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/payments")
     

#### Riders


In [0]:
# riders extraction
riders_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/tables/riders.csv") \
    .toDF("rider_id", "first_name", "last_name", "address", "birthday", "account_start_date", "account_end_date", "is_member")
   

In [0]:
display(riders_df.limit(10))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True


In [0]:
# Write data to delta location
riders_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/riders")

#### Stations

In [0]:

# stations extraction
stations_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/tables/stations.csv") \
    .toDF("station_id", "name", "latitude", "longitude")


In [0]:
display(stations_df.limit(10))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [0]:
# Write data to delta location
stations_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/stations")

#### Trips

In [0]:
# trips extraction
trips_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/tables/trips.csv") \
    .toDF("trip_id", "rideable_type", "start_at", "ended_at", "start_station_id", "end_station_id", "rider_id")


In [0]:
display(trips_df.limit(10))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56Z,2021-02-12T16:21:43Z,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18Z,2021-02-09T19:19:10Z,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41Z,2021-02-02T17:54:06Z,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23Z,2021-02-23T15:22:37Z,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24T15:43:33Z,2021-02-24T15:49:05Z,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01T17:47:42Z,2021-02-01T17:48:33Z,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11T18:33:53Z,2021-02-11T18:35:09Z,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27T15:13:39Z,2021-02-27T15:36:36Z,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20T08:59:42Z,2021-02-20T09:17:04Z,KP1705001026,KP1705001026,57068


In [0]:
# Write data to delta location
trips_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/trips")
     

#### Load data to table

In [0]:
# Load payments
if(spark._jsparkSession.catalog().tableExists('default', 'staging_payments') == True):
    spark.sql("DROP TABLE IF EXISTS staging_payments")
spark.sql("CREATE TABLE staging_payments USING DELTA LOCATION '/delta/payments'")


DataFrame[]

In [0]:
# Load riders
if(spark._jsparkSession.catalog().tableExists('default', 'staging_riders') == True):
    spark.sql("DROP TABLE IF EXISTS staging_riders")
spark.sql("CREATE TABLE staging_riders USING DELTA LOCATION '/delta/riders'")


DataFrame[]

In [0]:

# Load stations
if(spark._jsparkSession.catalog().tableExists('default', 'staging_stations') == True):
    spark.sql("DROP TABLE IF EXISTS staging_stations")    
spark.sql("CREATE TABLE staging_stations USING DELTA LOCATION '/delta/stations'")


DataFrame[]

In [0]:
# Load trips
if(spark._jsparkSession.catalog().tableExists('default', 'staging_trips') == True):
    spark.sql("DROP TABLE IF EXISTS staging_trips")
spark.sql("CREATE TABLE staging_trips USING DELTA LOCATION '/delta/trips'")

DataFrame[]

#### Transform data to star schmema


#### Payment

In [0]:
payments = spark.table("default.staging_payments")
     

In [0]:
display(payments.limit(10))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


In [0]:
# Write data to fact_payments
spark.sql("drop table if exists default.fact_payments")
payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("default.fact_payments")


#### Rider

In [0]:
riders = spark.table("default.staging_riders")
     

In [0]:
display(riders.limit(10))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True
57259,Bryan,Manning,089 Sarah Square,1984-11-05,2018-08-10,,True
57260,Michele,Rowe,3157 Nicole Ferry Apt. 826,1997-09-21,2016-06-03,,True
57261,John,Mckenzie,312 Jessica Wells,2002-10-13,2016-02-01,2018-07-01,True
57262,Tami,Rivera,910 Lopez Pass Apt. 426,2001-06-29,2020-11-12,,True
57263,Joseph,Hodge,411 Mccoy Haven,1993-04-30,2020-05-29,,False
57264,Lauren,Brown,667 Rodriguez Ramp,2002-10-20,2020-05-10,,False
57265,Stephanie,Reed,90789 Fowler Circle,1993-03-20,2018-09-05,2019-09-01,True
57266,Brittney,Lamb,55635 Valerie Falls,1993-12-09,2020-10-10,,True


In [0]:
# Write data to dim_rider
spark.sql("drop table if exists default.dim_rider")
riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_rider")
     


#### Station

In [0]:
stations = spark.table("default.staging_stations")


In [0]:
display(stations.limit(10))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [0]:
# Write data to dim_station
spark.sql("drop table if exists default.dim_station")
stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")
 

#### Trips

In [0]:
trips = spark.table("default.staging_trips")


In [0]:
display(trips.limit(10))


trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
F6F309843C09CAAC,classic_bike,2021-09-02T18:36:00Z,2021-09-02T18:48:43Z,TA1306000026,TA1307000041,70456
BD496FA19316E89C,classic_bike,2021-09-02T17:23:28Z,2021-09-02T17:32:43Z,TA1306000026,TA1309000019,24732
657E606A35206CC1,classic_bike,2021-09-05T23:13:43Z,2021-09-05T23:30:26Z,15642,TA1305000041,11875
B10DEEC4A5D90EB3,electric_bike,2021-09-09T18:26:00Z,2021-09-09T18:40:17Z,TA1307000001,TA1308000049,33064
268A00298A05078B,classic_bike,2021-09-06T18:49:27Z,2021-09-06T19:02:22Z,TA1306000026,TA1308000049,71976
41E9E476004232FD,electric_bike,2021-09-04T21:41:58Z,2021-09-04T22:03:02Z,13235,TA1305000041,43348
3F89D8A2BEE07478,classic_bike,2021-09-11T16:10:38Z,2021-09-11T16:36:36Z,13235,TA1307000041,1159
0BE315967E011514,electric_bike,2021-09-22T21:53:46Z,2021-09-22T22:11:58Z,13235,TA1309000019,75374
AE58DD50675A5E4E,classic_bike,2021-09-05T00:41:37Z,2021-09-05T00:54:15Z,TA1306000026,TA1308000049,3290
B435E6AE404F547E,classic_bike,2021-09-11T17:23:49Z,2021-09-11T17:37:19Z,TA1306000026,TA1307000041,31610


In [0]:
fact_trips = trips.alias("tbl1").join(riders.alias("tbl2"), col("tbl1.rider_id") == col("tbl2.rider_id"), "left") \
            .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
            .withColumn('rider_age', round((unix_timestamp("account_start_date") - unix_timestamp('birthday'))/3600/24)) \
            .withColumnRenamed('start_at', 'start_time_id') \
            .withColumnRenamed('ended_at', 'end_time_id') \
            .select("trip_id", col("tbl1.rider_id"), "rideable_type", "start_station_id", "end_station_id", "start_time_id", "end_time_id", "duration", "rider_age")
            
display(fact_trips.limit(10))

trip_id,rider_id,rideable_type,start_station_id,end_station_id,start_time_id,end_time_id,duration,rider_age
F6F309843C09CAAC,70456,classic_bike,TA1306000026,TA1307000041,2021-09-02T18:36:00Z,2021-09-02T18:48:43Z,13.0,12650.0
BD496FA19316E89C,24732,classic_bike,TA1306000026,TA1309000019,2021-09-02T17:23:28Z,2021-09-02T17:32:43Z,9.0,8381.0
657E606A35206CC1,11875,classic_bike,15642,TA1305000041,2021-09-05T23:13:43Z,2021-09-05T23:30:26Z,17.0,10357.0
B10DEEC4A5D90EB3,33064,electric_bike,TA1307000001,TA1308000049,2021-09-09T18:26:00Z,2021-09-09T18:40:17Z,14.0,8602.0
268A00298A05078B,71976,classic_bike,TA1306000026,TA1308000049,2021-09-06T18:49:27Z,2021-09-06T19:02:22Z,13.0,12210.0
41E9E476004232FD,43348,electric_bike,13235,TA1305000041,2021-09-04T21:41:58Z,2021-09-04T22:03:02Z,21.0,20518.0
3F89D8A2BEE07478,1159,classic_bike,13235,TA1307000041,2021-09-11T16:10:38Z,2021-09-11T16:36:36Z,26.0,10632.0
0BE315967E011514,75374,electric_bike,13235,TA1309000019,2021-09-22T21:53:46Z,2021-09-22T22:11:58Z,18.0,12288.0
AE58DD50675A5E4E,3290,classic_bike,TA1306000026,TA1308000049,2021-09-05T00:41:37Z,2021-09-05T00:54:15Z,13.0,7979.0
B435E6AE404F547E,31610,classic_bike,TA1306000026,TA1307000041,2021-09-11T17:23:49Z,2021-09-11T17:37:19Z,14.0,12196.0


In [0]:

# Write data to fact_trip
spark.sql("drop table if exists default.fact_trip")
fact_trips.write.format("delta").mode("overwrite").saveAsTable("default.fact_trip")

#### Create time table

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType
# Get min date from trips
min_date = trips.selectExpr('MIN(start_at) AS started_at').first().asDict()['started_at']
# Add date range as max start_at from trip plus 5 years
max_date = trips.selectExpr('DATEADD(year, 5, MAX(start_at)) AS started_at').first().asDict()['started_at']
expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_time = spark.createDataFrame([(1,)], ["time_id"])

dim_time = dim_time.withColumn("dateinit", f.explode(f.expr(expression)))
dim_time = dim_time.withColumn("date", f.to_timestamp(dim_time.dateinit, "yyyy-MM-dd"))

dim_time = dim_time \
            .withColumn("day_of_week", f.dayofweek(dim_time.date)) \
            .withColumn("day_of_month", f.dayofmonth(dim_time.date)) \
            .withColumn("week_of_year", f.weekofyear(dim_time.date)) \
            .withColumn("year", f.year(dim_time.date)) \
            .withColumn("quarter", f.quarter(dim_time.date)) \
            .withColumn("month", f.month(dim_time.date)) \
            .withColumn("time_id", dim_time.date.cast(StringType())) \
            .drop(f.col("dateinit"))

In [0]:
# Write data to dim_time
spark.sql("drop table if exists default.dim_time")
dim_time.write.format("delta").mode("overwrite").saveAsTable("default.dim_time")