##Transform the data into the star schema for a Gold data store;

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

####Table fact_payments

In [0]:
#Create dataframe
payments_df = spark.table("staging_payments")
payments_df.show(5)

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|       154|2018-03-01|  8.63|    1007|
|       493|2021-08-01|   9.0|    1020|
|       760|2019-03-01|   9.0|    1032|
|       819|2021-08-01|  20.9|    1034|
|       962|2015-06-01|   9.0|    1040|
+----------+----------+------+--------+
only showing top 5 rows



In [0]:
#Create table
spark.sql("DROP TABLE IF EXISTS fact_payments")
payments_df.write.format("delta")   \
                 .mode("overwrite") \
                 .saveAsTable("fact_payment")

####Table dim_stations

In [0]:
#Create dataframe
stations_df = spark.table("staging_stations")
stations_df.show(5)

+------------+--------------------+-----------------+------------------+
|  station_id|                name|         latitude|         longitude|
+------------+--------------------+-----------------+------------------+
|       13277|Broadway & Belmon...|        41.940106|        -87.645451|
|      RP-002|    Warren Park East|   42.00454962194|    -87.6806661451|
|       16916|Central Ave & Mad...|41.88016233333333|-87.76319483333332|
|         319|     Roscoe & Harlem|            41.94|            -87.81|
|TA1305000002|Wabash Ave & Roos...|        41.867227|        -87.625961|
+------------+--------------------+-----------------+------------------+
only showing top 5 rows



In [0]:
#Create table
spark.sql("DROP TABLE IF EXISTS dim_stations")
stations_df.write.format("delta")   \
                 .mode("overwrite") \
                 .saveAsTable("dim_stations")

####Table dim_riders

In [0]:
#Create dataframe
rider_df = spark.table("staging_riders")
rider_df.show(5)

+--------+---------+----------+--------------------+----------+------------------+----------------+---------+
|rider_id|  address|first_name|           last_name|  birthday|account_start_date|account_end_date|is_member|
+--------+---------+----------+--------------------+----------+------------------+----------------+---------+
|    1076|    David|     Lewis| 189 Deborah Estates|1991-11-13|        2019-12-26|            null|     True|
|    1081|  Raymond|      Wang|35233 Griffin Ran...|1993-01-26|        2019-10-31|            null|     True|
|    1353|Elizabeth|      Byrd|666 Villanueva Po...|1972-12-26|        2021-11-25|            null|     True|
|    1721|    Maria|       Cox|85213 Thomas Expr...|1999-07-04|        2014-08-01|            null|     True|
|    1933|   Robert|   Francis|52423 Roberts Byp...|1986-07-27|        2021-06-23|            null|     True|
+--------+---------+----------+--------------------+----------+------------------+----------------+---------+
only showi

In [0]:
#Create table
spark.sql("DROP TABLE IF EXISTS dim_riders")
rider_df.write.format("delta")   \
                 .mode("overwrite") \
                 .saveAsTable("dim_riders")

####Table fact_trips

In [0]:
trips = spark.table("staging_trips")
fact_trips = trips.join(rider_df, trips.rider_id == rider_df.rider_id, "left" ) \
            .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('started_at'))/60)) \
            .withColumn('rider_age', round(datediff(to_date(rider_df.account_start_date), to_date(rider_df.birthday))/365.25)) \
            .select("trip_id","staging_trips.rider_id","rideable_type","started_at","ended_at","duration","rider_age","start_station_id","end_station_id")
fact_trips.show(5)

+----------------+--------+-------------+-------------------+-------------------+--------+---------+----------------+--------------+
|         trip_id|rider_id|rideable_type|         started_at|           ended_at|duration|rider_age|start_station_id|end_station_id|
+----------------+--------+-------------+-------------------+-------------------+--------+---------+----------------+--------------+
|AFA0D5E6BCCB4364|   13758| classic_bike|2021-02-27 09:42:27|2021-02-27 10:05:18|    23.0|     24.0|           13058|         13156|
|B1893BA60B66971B|   28569| classic_bike|2021-02-14 23:28:58|2021-02-14 23:45:19|    16.0|     24.0|    TA1307000107|  TA1309000025|
|08244C4BC7103B93|   32453| classic_bike|2021-02-06 12:09:41|2021-02-06 12:47:28|    38.0|     52.0|             517|           523|
|E53F1483EE997E22|   73224| classic_bike|2021-02-20 14:07:16|2021-02-20 14:20:34|    13.0|     39.0|           15622|  TA1309000026|
|13FEB69519CCA839|   30873|electric_bike|2021-02-01 15:47:23|2021-02-

In [0]:
#Create table
spark.sql("DROP TABLE IF EXISTS fact_trips")
fact_trips.write.format("delta")   \
                 .mode("overwrite") \
                 .saveAsTable("fact_trips")

###Create dimDate table

In [0]:
# Get min date from trips
min_date = trips.selectExpr('MIN(started_at) AS started_at').first().asDict()['started_at']
# Get max date from trips
max_date = trips.selectExpr('MAX(started_at) AS started_at').first().asDict()['started_at']
expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_date = spark.createDataFrame([(1,)], ["date_id"])

dim_date = dim_date.withColumn("dateinit", f.explode(f.expr(expression)))
dim_date = dim_date.withColumn("date", f.to_timestamp(dim_date.dateinit, "yyyy-MM-dd"))

dim_date = dim_date \
            .withColumn("day_of_week", f.dayofweek(dim_date.date)) \
            .withColumn("day_of_month", f.dayofmonth(dim_date.date)) \
            .withColumn("month", f.month(dim_date.date)) \
            .withColumn("quarter", f.quarter(dim_date.date)) \
            .withColumn("year", f.year(dim_date.date)) \
            .withColumn("date_id", dim_date.date.cast(StringType())) \
            .drop(f.col("dateinit")) \
            .drop(f.col("date")) 
dim_date.show(5)

+-------------------+-----------+------------+-----+-------+----+
|            date_id|day_of_week|day_of_month|month|quarter|year|
+-------------------+-----------+------------+-----+-------+----+
|2021-02-01 00:00:00|          2|           1|    2|      1|2021|
|2021-02-02 00:00:00|          3|           2|    2|      1|2021|
|2021-02-03 00:00:00|          4|           3|    2|      1|2021|
|2021-02-04 00:00:00|          5|           4|    2|      1|2021|
|2021-02-05 00:00:00|          6|           5|    2|      1|2021|
+-------------------+-----------+------------+-----+-------+----+
only showing top 5 rows



In [0]:
#Create table
spark.sql("DROP TABLE IF EXISTS dim_date")
fact_trips.write.format("delta")   \
                 .mode("overwrite") \
                 .saveAsTable("dim_date")