In [0]:
from pyspark.sql import SparkSession

In [0]:
# ingest payments.csv
payments_df = spark.read.format('csv') \
	.option('inferSchema','false') \
	.option('header','false') \
	.option('sep',',') \
	.load('/FileStore/alegiproject/payments.csv')

payments_df.show(3)

+---+----------+---+----+
|_c0|       _c1|_c2| _c3|
+---+----------+---+----+
|  1|2019-05-01|9.0|1000|
|  2|2019-06-01|9.0|1000|
|  3|2019-07-01|9.0|1000|
+---+----------+---+----+
only showing top 3 rows



In [0]:
# add header into payments_df
spark = SparkSession.builder.appName("AddHeader").getOrCreate()
payments_headers = ["payment_id","date","amount","account_number"]
payments_df = payments_df.toDF(*payments_headers)
payments_df.show(3)

+----------+----------+------+--------------+
|payment_id|      date|amount|account_number|
+----------+----------+------+--------------+
|         1|2019-05-01|   9.0|          1000|
|         2|2019-06-01|   9.0|          1000|
|         3|2019-07-01|   9.0|          1000|
+----------+----------+------+--------------+
only showing top 3 rows



In [0]:
# save payments_df as parquet in delta lake
payments_df.write.format("delta").mode("overwrite").save("/delta/payments")

In [0]:
# ingest payments.csv
riders_df = spark.read.format('csv') \
	.option('inferSchema','false') \
	.option('header','false') \
	.option('sep',',') \
	.load('/FileStore/alegiproject/riders.csv')

riders_df.show(3)

+----+--------+-----+--------------------+----------+----------+----------+----+
| _c0|     _c1|  _c2|                 _c3|       _c4|       _c5|       _c6| _c7|
+----+--------+-----+--------------------+----------+----------+----------+----+
|1000|   Diana|Clark| 1200 Alyssa Squares|1989-02-13|2019-04-23|      null|True|
|1001|Jennifer|Smith|     397 Diana Ferry|1976-08-10|2019-11-01|2020-09-01|True|
|1002|   Karen|Smith|644 Brittany Row ...|1998-08-10|2022-02-04|      null|True|
+----+--------+-----+--------------------+----------+----------+----------+----+
only showing top 3 rows



In [0]:
# add header into riders_df
spark = SparkSession.builder.appName("AddHeader").getOrCreate()
riders_headers = ["rider_id","first_name","last_name","address","birthday","account_start_date","account_end_date","is_member"]
riders_df = riders_df.toDF(*riders_headers)
riders_df.show(3)

+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|first_name|last_name|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
|    1000|     Diana|    Clark| 1200 Alyssa Squares|1989-02-13|        2019-04-23|            null|     True|
|    1001|  Jennifer|    Smith|     397 Diana Ferry|1976-08-10|        2019-11-01|      2020-09-01|     True|
|    1002|     Karen|    Smith|644 Brittany Row ...|1998-08-10|        2022-02-04|            null|     True|
+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
only showing top 3 rows



In [0]:
# save riders_df as parquet in delta lake
riders_df.write.format("delta").mode("overwrite").save("/delta/riders")

In [0]:
# ingest stations.csv
stations_df = spark.read.format('csv') \
	.option('inferSchema','false') \
	.option('header','false') \
	.option('sep',',') \
	.load('/FileStore/alegiproject/stations.csv')

stations_df.show(3)

+------------+--------------------+-----------------+------------------+
|         _c0|                 _c1|              _c2|               _c3|
+------------+--------------------+-----------------+------------------+
|         525|Glenwood Ave & To...|        42.012701|-87.66605799999999|
|KA1503000012|  Clark St & Lake St|41.88579466666667|-87.63110066666668|
|         637|Wood St & Chicago...|        41.895634|        -87.672069|
+------------+--------------------+-----------------+------------------+
only showing top 3 rows



In [0]:
# add header into stations_df
spark = SparkSession.builder.appName("AddHeader").getOrCreate()
stations_headers = ["station_id","station_name","latitude","longitude"]
stations_df = stations_df.toDF(*stations_headers)
stations_df.show(3)

+------------+--------------------+-----------------+------------------+
|  station_id|        station_name|         latitude|         longitude|
+------------+--------------------+-----------------+------------------+
|         525|Glenwood Ave & To...|        42.012701|-87.66605799999999|
|KA1503000012|  Clark St & Lake St|41.88579466666667|-87.63110066666668|
|         637|Wood St & Chicago...|        41.895634|        -87.672069|
+------------+--------------------+-----------------+------------------+
only showing top 3 rows



In [0]:
# save stations_df as parquet in delta late
stations_df.write.format("delta").mode("overwrite").save("/delta/stations")

In [0]:
# ingest trips.csv
trips_df = spark.read.format('csv') \
	.option('inferSchema','false') \
	.option('header','false') \
	.option('sep',',') \
	.load('/FileStore/alegiproject/trips.csv')

trips_df.show(3)

+----------------+-------------+-------------------+-------------------+------------+------------+-----+
|             _c0|          _c1|                _c2|                _c3|         _c4|         _c5|  _c6|
+----------------+-------------+-------------------+-------------------+------------+------------+-----+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|         525|         660|71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|         525|       16806|47854|
|E6159D746B2DBB91|electric_bike|2021-02-09 19:10:18|2021-02-09 19:19:10|KA1503000012|TA1305000029|70870|
+----------------+-------------+-------------------+-------------------+------------+------------+-----+
only showing top 3 rows



In [0]:
# add header into trips_df
spark = SparkSession.builder.appName("AddHeader").getOrCreate()
trips_headers = ["trip_id","rideable_type","started_at","ended_at","start_station_id","end_station_id","rider_id"]
trips_df = trips_df.toDF(*trips_headers)
trips_df.show(3)

+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|   47854|
|E6159D746B2DBB91|electric_bike|2021-02-09 19:10:18|2021-02-09 19:19:10|    KA1503000012|  TA1305000029|   70870|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
only showing top 3 rows



In [0]:
# save trips_df as parquet in delta lake
trips_df.write.format("delta").mode("overwrite").save("/delta/trips")

In [0]:
# ingest dates.csv
dates_df = spark.read.format('csv') \
	.option('inferSchema','false') \
	.option('header','false') \
	.option('sep',',') \
	.load('/FileStore/alegiproject/dates.csv')
    
dates_df.show(3)

+--------+--------+----+---+-------+---+---+---+---+---------+----+
|     _c0|     _c1| _c2|_c3|    _c4|_c5|_c6|_c7|_c8|      _c9|_c10|
+--------+--------+----+---+-------+---+---+---+---+---------+----+
|19910101|1/1/1991|1991|  1|January|  1|  1|  1|  3|  Tuesday|   1|
|19910102|1/2/1991|1991|  1|January|  1|  2|  2|  4|Wednesday|   1|
|19910103|1/3/1991|1991|  1|January|  1|  3|  3|  5| Thursday|   1|
+--------+--------+----+---+-------+---+---+---+---+---------+----+
only showing top 3 rows



In [0]:
# add header into dates_df
spark = SparkSession.builder.appName("AddHeader").getOrCreate()
dates_headers = ["datekey","date","year","month_num","month_name","week_num","day_num_of_year","day_num_of_month","day_num_of_week","day_name","quarter"]
dates_df = dates_df.toDF(*dates_headers)
dates_df.show(3)

+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+
| datekey|    date|year|month_num|month_name|week_num|day_num_of_year|day_num_of_month|day_num_of_week| day_name|quarter|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+
|19910101|1/1/1991|1991|        1|   January|       1|              1|               1|              3|  Tuesday|      1|
|19910102|1/2/1991|1991|        1|   January|       1|              2|               2|              4|Wednesday|      1|
|19910103|1/3/1991|1991|        1|   January|       1|              3|               3|              5| Thursday|      1|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+
only showing top 3 rows



In [0]:
# save dates_df as parquet in delta lake
dates_df.write.format("delta").mode("overwrite").save("/delta/dates")

In [0]:
# create payments table
spark.sql("CREATE TABLE payments USING DELTA LOCATION '/delta/payments'")

Out[22]: DataFrame[]

In [0]:
# create riders table
spark.sql("CREATE TABLE riders USING DELTA LOCATION '/delta/riders'")

Out[27]: DataFrame[]

In [0]:
# create stations table
spark.sql("CREATE TABLE stations USING DELTA LOCATION '/delta/stations'")

Out[28]: DataFrame[]

In [0]:
# create trips table
spark.sql("CREATE TABLE trips USING DELTA LOCATION '/delta/trips'")

Out[29]: DataFrame[]

In [0]:
# create dates table
spark.sql("CREATE TABLE dates USING DELTA LOCATION '/delta/dates'")

Out[30]: DataFrame[]

In [0]:
# creating fact_payment table
payments_tbl = spark.table("default.payments")
payments_tbl.show(3)

+----------+----------+------+--------------+
|payment_id|      date|amount|account_number|
+----------+----------+------+--------------+
|         1|2019-05-01|   9.0|          1000|
|         2|2019-06-01|   9.0|          1000|
|         3|2019-07-01|   9.0|          1000|
+----------+----------+------+--------------+
only showing top 3 rows



In [0]:
# create date_key column
from pyspark.sql.functions import regexp_replace
payments_tbl = payments_tbl.withColumn("date_key", regexp_replace("date", "-", ""))
payments_tbl.show(3)

+----------+----------+------+--------------+--------+
|payment_id|      date|amount|account_number|date_key|
+----------+----------+------+--------------+--------+
|         1|2019-05-01|   9.0|          1000|20190501|
|         2|2019-06-01|   9.0|          1000|20190601|
|         3|2019-07-01|   9.0|          1000|20190701|
+----------+----------+------+--------------+--------+
only showing top 3 rows



In [0]:
# change data type to follow star schema
payments_tbl = payments_tbl.selectExpr("cast(payment_id as int) payment_id", "cast(date as date) date", "cast(amount as float) amount", "cast(account_number as int) account_number", "cast(date_key as int) date_key")
payments_tbl.show(3)

+----------+----------+------+--------------+--------+
|payment_id|      date|amount|account_number|date_key|
+----------+----------+------+--------------+--------+
|         1|2019-05-01|   9.0|          1000|20190501|
|         2|2019-06-01|   9.0|          1000|20190601|
|         3|2019-07-01|   9.0|          1000|20190701|
+----------+----------+------+--------------+--------+
only showing top 3 rows



In [0]:
# save as delta table
payments_tbl.write.format("delta").mode("overwrite").saveAsTable("fact_payment")

In [0]:
# creating dim_rider table
riders_tbl = spark.table("default.riders")
riders_tbl.show(3)

+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|first_name|last_name|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
|    1000|     Diana|    Clark| 1200 Alyssa Squares|1989-02-13|        2019-04-23|            null|     True|
|    1001|  Jennifer|    Smith|     397 Diana Ferry|1976-08-10|        2019-11-01|      2020-09-01|     True|
|    1002|     Karen|    Smith|644 Brittany Row ...|1998-08-10|        2022-02-04|            null|     True|
+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
only showing top 3 rows



In [0]:
# select columns as per star schema and change data type
from pyspark.sql.functions import col
riders_tbl = riders_tbl.select("rider_id","first_name","last_name","address","birthday","is_member") \
    .selectExpr("cast(rider_id as int) rider_id", "cast(first_name as string) first_name", "cast(last_name as string) last_name", "cast(address as string) address", "cast(birthday as date)", "cast(is_member as boolean) is_member")
riders_tbl.show(3)

+--------+----------+---------+--------------------+----------+---------+
|rider_id|first_name|last_name|             address|  birthday|is_member|
+--------+----------+---------+--------------------+----------+---------+
|    1000|     Diana|    Clark| 1200 Alyssa Squares|1989-02-13|     true|
|    1001|  Jennifer|    Smith|     397 Diana Ferry|1976-08-10|     true|
|    1002|     Karen|    Smith|644 Brittany Row ...|1998-08-10|     true|
+--------+----------+---------+--------------------+----------+---------+
only showing top 3 rows



In [0]:
# save as delta table
riders_tbl.write.format("delta").mode("overwrite").saveAsTable("dim_rider")

In [0]:
# creating dim_station table
stations_tbl = spark.table("default.stations")
stations_tbl.show(3)

+------------+--------------------+-----------------+------------------+
|  station_id|        station_name|         latitude|         longitude|
+------------+--------------------+-----------------+------------------+
|         525|Glenwood Ave & To...|        42.012701|-87.66605799999999|
|KA1503000012|  Clark St & Lake St|41.88579466666667|-87.63110066666668|
|         637|Wood St & Chicago...|        41.895634|        -87.672069|
+------------+--------------------+-----------------+------------------+
only showing top 3 rows



In [0]:
# change data type as per star schema
stations_tbl = stations_tbl.selectExpr("cast(station_id as string) station_id", "cast(station_name as string) station_name", "cast(latitude as float) latitude","cast(longitude as float) longitude")
stations_tbl.show(3)

+------------+--------------------+---------+----------+
|  station_id|        station_name| latitude| longitude|
+------------+--------------------+---------+----------+
|         525|Glenwood Ave & To...|  42.0127| -87.66606|
|KA1503000012|  Clark St & Lake St|41.885796|  -87.6311|
|         637|Wood St & Chicago...|41.895634|-87.672066|
+------------+--------------------+---------+----------+
only showing top 3 rows



In [0]:
# save as delta table
stations_tbl.write.format("delta").mode("overwrite").saveAsTable("dim_station")

In [0]:
# creating dim_date table
dates_tbl = spark.table("default.dates")
dates_tbl.show(3)

+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+
| datekey|    date|year|month_num|month_name|week_num|day_num_of_year|day_num_of_month|day_num_of_week| day_name|quarter|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+
|19910101|1/1/1991|1991|        1|   January|       1|              1|               1|              3|  Tuesday|      1|
|19910102|1/2/1991|1991|        1|   January|       1|              2|               2|              4|Wednesday|      1|
|19910103|1/3/1991|1991|        1|   January|       1|              3|               3|              5| Thursday|      1|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+
only showing top 3 rows



In [0]:
# add weekend flag column
from pyspark.sql.functions import expr
dates_tbl = dates_tbl.withColumn("is_weekend", expr("CASE WHEN day_name in ('Saturday','Sunday') THEN 1 ELSE 0 END"))
dates_tbl.show(3)

+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
| datekey|    date|year|month_num|month_name|week_num|day_num_of_year|day_num_of_month|day_num_of_week| day_name|quarter|is_weekend|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
|19910101|1/1/1991|1991|        1|   January|       1|              1|               1|              3|  Tuesday|      1|         0|
|19910102|1/2/1991|1991|        1|   January|       1|              2|               2|              4|Wednesday|      1|         0|
|19910103|1/3/1991|1991|        1|   January|       1|              3|               3|              5| Thursday|      1|         0|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
only showing top 3 rows



In [0]:
# rename column
dates_tbl = dates_tbl.withColumnRenamed("datekey","date_key")
dates_tbl.show(3)

+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
|date_key|    date|year|month_num|month_name|week_num|day_num_of_year|day_num_of_month|day_num_of_week| day_name|quarter|is_weekend|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
|19910101|1/1/1991|1991|        1|   January|       1|              1|               1|              3|  Tuesday|      1|         0|
|19910102|1/2/1991|1991|        1|   January|       1|              2|               2|              4|Wednesday|      1|         0|
|19910103|1/3/1991|1991|        1|   January|       1|              3|               3|              5| Thursday|      1|         0|
+--------+--------+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
only showing top 3 rows



In [0]:
# change data type
dates_tbl = dates_tbl.selectExpr("cast(date_key as int) date_key", "cast(date as date) date", "cast(year as int) year", "cast(month_num as int) month_num","cast(month_name as string) month_name","cast(week_num as int) week_num","cast(day_num_of_year as int) day_num_of_year","cast(day_num_of_month as int) day_num_of_month","cast(day_num_of_week as int) day_num_of_week", "cast(day_name as string) day_name","cast(quarter as int) quarter","cast(is_weekend as boolean) is_weekend")
dates_tbl.show(3)

+--------+----+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
|date_key|date|year|month_num|month_name|week_num|day_num_of_year|day_num_of_month|day_num_of_week| day_name|quarter|is_weekend|
+--------+----+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
|19910101|null|1991|        1|   January|       1|              1|               1|              3|  Tuesday|      1|     false|
|19910102|null|1991|        1|   January|       1|              2|               2|              4|Wednesday|      1|     false|
|19910103|null|1991|        1|   January|       1|              3|               3|              5| Thursday|      1|     false|
+--------+----+----+---------+----------+--------+---------------+----------------+---------------+---------+-------+----------+
only showing top 3 rows



In [0]:
# save as delta table
dates_tbl.write.format("delta").mode("overwrite").saveAsTable("dim_date")

In [0]:
# creating fact_trip table
trips_tbl = spark.table("default.trips")
trips_tbl.show(3)

+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|   47854|
|E6159D746B2DBB91|electric_bike|2021-02-09 19:10:18|2021-02-09 19:19:10|    KA1503000012|  TA1305000029|   70870|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
only showing top 3 rows



In [0]:
# rename rider_id to member_id
trips_tbl = trips_tbl.withColumnRenamed("rider_id","member_id")
trips_tbl.show(3)

+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|member_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|    71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|    47854|
|E6159D746B2DBB91|electric_bike|2021-02-09 19:10:18|2021-02-09 19:19:10|    KA1503000012|  TA1305000029|    70870|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+
only showing top 3 rows



In [0]:
# change data type to timestamp for time calculation
from pyspark.sql.functions import *
trips_tbl = trips_tbl.withColumn("started_at", to_timestamp("started_at", 'yyyy-MM-dd HH:mm:ss'))
trips_tbl = trips_tbl.withColumn("ended_at", to_timestamp("ended_at", 'yyyy-MM-dd HH:mm:ss'))
trips_tbl.show(3)

+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|member_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|    71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|    47854|
|E6159D746B2DBB91|electric_bike|2021-02-09 19:10:18|2021-02-09 19:19:10|    KA1503000012|  TA1305000029|    70870|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+
only showing top 3 rows



In [0]:
# create date_key column
trips_tbl = trips_tbl.withColumn('start_date', col('started_at').cast('date'))
trips_tbl = trips_tbl.withColumn("date_key", regexp_replace("start_date", "-", ""))
trips_tbl.show(3)

+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|member_id|start_date|date_key|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+
|222BB8E5059252D7| classic_bike|2021-06-13 09:48:47|2021-06-13 10:07:23|    KA1503000064|         13021|    34062|2021-06-13|20210613|
|1826E16CB5486018| classic_bike|2021-06-21 22:59:13|2021-06-21 23:04:29|    TA1306000010|         13021|     5342|2021-06-21|20210621|
|3D9B6A0A5330B04D| classic_bike|2021-06-18 16:06:42|2021-06-18 16:12:02|    TA1305000030|         13021|     3714|2021-06-18|20210618|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+
only showing top 3 rows



In [0]:
# extract hour into a separate column
trips_tbl = trips_tbl.withColumn('hour', hour(trips_tbl.started_at))
trips_tbl.show(3)

+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+----+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|member_id|start_date|date_key|hour|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+----+
|222BB8E5059252D7| classic_bike|2021-06-13 09:48:47|2021-06-13 10:07:23|    KA1503000064|         13021|    34062|2021-06-13|20210613|   9|
|1826E16CB5486018| classic_bike|2021-06-21 22:59:13|2021-06-21 23:04:29|    TA1306000010|         13021|     5342|2021-06-21|20210621|  22|
|3D9B6A0A5330B04D| classic_bike|2021-06-18 16:06:42|2021-06-18 16:12:02|    TA1305000030|         13021|     3714|2021-06-18|20210618|  16|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+----+
only showing top 3 r

In [0]:
# calculate trip duration
trips_tbl = trips_tbl.withColumn("trip_length_min", expr("datediff(minute,started_at,ended_at)"))
trips_tbl.show(3)

+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+----+---------------+
|         trip_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|member_id|start_date|date_key|hour|trip_length_min|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+--------+----+---------------+
|222BB8E5059252D7| classic_bike|2021-06-13 09:48:47|2021-06-13 10:07:23|    KA1503000064|         13021|    34062|2021-06-13|20210613|   9|             18|
|1826E16CB5486018| classic_bike|2021-06-21 22:59:13|2021-06-21 23:04:29|    TA1306000010|         13021|     5342|2021-06-21|20210621|  22|              5|
|3D9B6A0A5330B04D| classic_bike|2021-06-18 16:06:42|2021-06-18 16:12:02|    TA1305000030|         13021|     3714|2021-06-18|20210618|  16|              5|
+----------------+-------------+-------------------+------------

In [0]:
# calculate age_at_trip
trips_tbl = trips_tbl.join(riders_tbl, trips_tbl.member_id == riders_tbl.rider_id, "left") \
    .withColumn("age_at_trip", expr("datediff(year,birthday,started_at) - case when dateadd(year,datediff(year,birthday,started_at),birthday) > started_at then 1 else 0 end")) \
    .select("trip_id","rideable_type","member_id","started_at","ended_at","start_station_id","end_station_id","date_key","hour","trip_length_min","age_at_trip")
trips_tbl.show(3)

+----------------+-------------+---------+-------------------+-------------------+----------------+--------------+--------+----+---------------+-----------+
|         trip_id|rideable_type|member_id|         started_at|           ended_at|start_station_id|end_station_id|date_key|hour|trip_length_min|age_at_trip|
+----------------+-------------+---------+-------------------+-------------------+----------------+--------------+--------+----+---------------+-----------+
|222BB8E5059252D7| classic_bike|    34062|2021-06-13 09:48:47|2021-06-13 10:07:23|    KA1503000064|         13021|20210613|   9|             18|         30|
|1826E16CB5486018| classic_bike|     5342|2021-06-21 22:59:13|2021-06-21 23:04:29|    TA1306000010|         13021|20210621|  22|              5|         26|
|3D9B6A0A5330B04D| classic_bike|     3714|2021-06-18 16:06:42|2021-06-18 16:12:02|    TA1305000030|         13021|20210618|  16|              5|         26|
+----------------+-------------+---------+----------------

In [0]:
# change data type
trips_tbl = trips_tbl.selectExpr("cast(trip_id as int) trip_id","cast(rideable_type as string) rideable_type","cast(member_id as int) member_id","started_at","ended_at","start_station_id","end_station_id","cast(date_key as int) date_key","cast(hour as int) hour","cast(trip_length_min as int) trip_length_min","cast(age_at_trip as int) age_at_trip")
trips_tbl.show(3)

+-------+-------------+---------+-------------------+-------------------+----------------+--------------+--------+----+---------------+-----------+
|trip_id|rideable_type|member_id|         started_at|           ended_at|start_station_id|end_station_id|date_key|hour|trip_length_min|age_at_trip|
+-------+-------------+---------+-------------------+-------------------+----------------+--------------+--------+----+---------------+-----------+
|   null| classic_bike|    34062|2021-06-13 09:48:47|2021-06-13 10:07:23|    KA1503000064|         13021|20210613|   9|             18|         30|
|   null| classic_bike|     5342|2021-06-21 22:59:13|2021-06-21 23:04:29|    TA1306000010|         13021|20210621|  22|              5|         26|
|   null| classic_bike|     3714|2021-06-18 16:06:42|2021-06-18 16:12:02|    TA1305000030|         13021|20210618|  16|              5|         26|
+-------+-------------+---------+-------------------+-------------------+----------------+--------------+-------

In [0]:
# save as delta table
trips_tbl.write.format("delta").mode("overwrite").saveAsTable("fact_trip")