## Transform the data
In this notebook we transformed the data into the star schema for a Gold data store

In [0]:
# Create a new schema
spark.sql('''
          CREATE SCHEMA IF NOT EXISTS gold
          ''')

Out[1]: DataFrame[]

#### Dimension Riders

In [0]:
# Load the data from the table
riders_df = spark.read.table("staging.riders")
riders_df.show(2)

+--------+----------+---------+-------------------+----------+------------------+----------------+---------+
|rider_id|first_name|last_name|            address|  birthday|account_start_date|account_end_date|is_member|
+--------+----------+---------+-------------------+----------+------------------+----------------+---------+
|    1000|     Diana|    Clark|1200 Alyssa Squares|1989-02-13|        2019-04-23|            null|     true|
|    1001|  Jennifer|    Smith|    397 Diana Ferry|1976-08-10|        2019-11-01|      2020-09-01|     true|
+--------+----------+---------+-------------------+----------+------------------+----------------+---------+
only showing top 2 rows



In [0]:
# create the dimension table
riders_df.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("gold.dim_riders")

In [0]:
# Verify the data
spark.sql('''
          SELECT * FROM gold.dim_riders;
          ''').show(2)

+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|first_name|last_name|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
|    1005| Christine|Rodriguez|224 Washington Mi...|1974-08-27|        2020-03-24|            null|    false|
|    1008|      John| Crawford|    7691 Evans Court|1987-02-21|        2021-03-28|      2021-07-01|     true|
+--------+----------+---------+--------------------+----------+------------------+----------------+---------+
only showing top 2 rows



#### Dimension Stations

In [0]:
# Load the data from the table
stations_df = spark.read.table("staging.stations")
stations_df.show(2)

+------------+--------------------+-----------------+------------------+
|  station_id|                name|         latitude|         longitude|
+------------+--------------------+-----------------+------------------+
|         525|Glenwood Ave & To...|        42.012701|-87.66605799999999|
|KA1503000012|  Clark St & Lake St|41.88579466666667|-87.63110066666668|
+------------+--------------------+-----------------+------------------+
only showing top 2 rows



In [0]:
# create the station dimension table
stations_df.dropDuplicates(["station_id"]) \
    .withColumnRenamed("name", "station_name") \
    .write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold.dim_stations")

In [0]:
# Verify the data
spark.sql('''
          SELECT * FROM gold.dim_stations;
          ''').show(2)

+----------+--------------------+----------------+-----------------+
|station_id|        station_name|        latitude|        longitude|
+----------+--------------------+----------------+-----------------+
|     13001|Michigan Ave & Wa...|41.8839840647265|-87.6246839761734|
|     13006|LaSalle St & Wash...|       41.882664|        -87.63253|
+----------+--------------------+----------------+-----------------+
only showing top 2 rows



#### Dimension Dates

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# Load the data from the table
trips_df = spark.read.table("staging.trips")
trips_df.show(2)

+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|           start_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|   47854|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
only showing top 2 rows



In [0]:
# Create a partition by date
min_date = trips_df.selectExpr('MIN(start_at) AS started_at').first().asDict()['started_at']
max_date = trips_df.selectExpr('DATEADD(year, 5, MAX(start_at)) AS started_at').first().asDict()['started_at']
expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_dates = spark.createDataFrame([(1,)], ["date_id"])
dim_dates = dim_dates.withColumn("dateinit", f.explode(f.expr(expression)))
dim_dates = dim_dates.withColumn("date", f.to_timestamp(dim_dates.dateinit, "yyyy-MM-dd"))

In [0]:
dim_dates.show(4)

+-------+----------+-------------------+
|date_id|  dateinit|               date|
+-------+----------+-------------------+
|      1|2021-02-01|2021-02-01 00:00:00|
|      1|2021-02-02|2021-02-02 00:00:00|
|      1|2021-02-03|2021-02-03 00:00:00|
|      1|2021-02-04|2021-02-04 00:00:00|
+-------+----------+-------------------+
only showing top 4 rows



In [0]:
# Create the dim_dates dateframe
dim_dates_df = dim_dates \
            .withColumn("date_id", f.regexp_replace(to_date(dim_dates.date), "-", "").cast(IntegerType())) \
            .withColumn("day", f.dayofweek(dim_dates.date)) \
            .withColumn("week", f.weekofyear(dim_dates.date)) \
            .withColumn("month", f.month(dim_dates.date)) \
            .withColumn("quarter", f.quarter(dim_dates.date)) \
            .withColumn("year", f.year(dim_dates.date)) \
            .withColumn("is_weekend", f.dayofweek(dim_dates.date).isin([1,7]).cast("int")) \
            .drop(f.col("dateinit")) \
            .drop(f.col("date"))

In [0]:
# Let's diplay 52 rows from the dataframe
dim_dates_df.dropDuplicates(["date_id"]).show(5)

+--------+---+----+-----+-------+----+----------+
| date_id|day|week|month|quarter|year|is_weekend|
+--------+---+----+-----+-------+----+----------+
|20230530|  3|  22|    5|      2|2023|         0|
|20240118|  5|   3|    1|      1|2024|         0|
|20240212|  2|   7|    2|      1|2024|         0|
|20240908|  1|  36|    9|      3|2024|         1|
|20250528|  4|  22|    5|      2|2025|         0|
+--------+---+----+-----+-------+----+----------+
only showing top 5 rows



In [0]:
# create the dimension table
dim_dates_df.dropDuplicates(["date_id"]).write.format("delta").mode("overwrite").saveAsTable("gold.dim_dates")


In [0]:
# Verify the data
spark.sql('''
          SELECT * FROM gold.dim_dates;
          ''').show(2)

+--------+---+----+-----+-------+----+----------+
| date_id|day|week|month|quarter|year|is_weekend|
+--------+---+----+-----+-------+----+----------+
|20230530|  3|  22|    5|      2|2023|         0|
|20240118|  5|   3|    1|      1|2024|         0|
+--------+---+----+-----+-------+----+----------+
only showing top 2 rows



### Fact Payment

In [0]:
# Load the data from the table
payments_df = spark.read.table("staging.payments")
payments_df.show(2)

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|         1|2019-05-01|   9.0|    1000|
|         2|2019-06-01|   9.0|    1000|
+----------+----------+------+--------+
only showing top 2 rows



In [0]:
# create the payments fact table
payments_df.dropDuplicates(["payment_id"]) \
    .write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold.fact_payments")

In [0]:
# Verify the data
spark.sql('''
          SELECT * FROM gold.fact_payments;
          ''').show(2)

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|       148|2017-09-01|  6.86|    1007|
|       463|2021-09-01| 22.35|    1018|
+----------+----------+------+--------+
only showing top 2 rows



#### Fact Trips

For Fact Trips table we used spark.sql with a SQL query, we could have used pyspark.sql.DataFrame.join but it 's more work and we would need to upload the other tables in different dataframe

In [0]:
# Load the data from the table
trips_df = spark.read.table("staging.trips")
trips_df.show(2)

+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|           start_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|   47854|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
only showing top 2 rows



In [0]:
trips_df.count()

Out[68]: 4584921

In [0]:
# Load the data into a dataframe
fact_trips_df = spark.sql('''
                SELECT
                    t.trip_id,
                    t.rideable_type,
                    t.start_at,
                    t.ended_at,
                    DATEDIFF(MINUTE, t.start_at, t.ended_at) AS Duration,
                    DATEDIFF(YEAR, r.birthday, t.ended_at) AS rider_age,
                    t.start_station_id,
                    t.end_station_id,
                    t.rider_id,
                    REPLACE(p.date,"-", "") AS date_id
                FROM staging.trips t
                JOIN staging.riders r ON (t.rider_id=r.rider_id)
                JOIN staging.payments p ON (t.rider_id=p.rider_id)
            ''')

In [0]:
fact_trips_df.count()

Out[70]: 117816784

From 4584921 to 117816784 is a lot, that's mean we have some duplicated data

In [0]:
# Let's display the first 10 rows to verify
fact_trips_df.show(10)

+----------------+-------------+-------------------+-------------------+--------+---------+----------------+--------------+--------+--------+
|         trip_id|rideable_type|           start_at|           ended_at|Duration|rider_age|start_station_id|end_station_id|rider_id| date_id|
+----------------+-------------+-------------------+-------------------+--------+---------+----------------+--------------+--------+--------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|       6|       37|             525|           660|   71934|20220201|
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|       6|       37|             525|           660|   71934|20220101|
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|       6|       37|             525|           660|   71934|20211201|
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|       6|       37|             525|           660|   71934|20211101|
|89E7A

We have noticed that we have some dupricates rows but we will drop the duplicates data before we insert them into the table.

In [0]:
# create the trips fact table
fact_trips_df.dropDuplicates(["trip_id"]) \
    .write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold.fact_trips")

In [0]:
# Verify the data and check if we still have the duplicates data
spark.sql('''
          SELECT * FROM gold.fact_trips;
          ''').show(10)

+----------------+-------------+-------------------+-------------------+--------+---------+----------------+--------------+--------+--------+
|         trip_id|rideable_type|           start_at|           ended_at|Duration|rider_age|start_station_id|end_station_id|rider_id| date_id|
+----------------+-------------+-------------------+-------------------+--------+---------+----------------+--------------+--------+--------+
|00000CAE95438C9D| classic_bike|2021-07-20 15:40:46|2021-07-20 17:38:17|     117|       22|           13022|  TA1305000003|   71748|20220201|
|00001DCF2BC423F4|  docked_bike|2021-06-13 12:00:49|2021-06-13 12:29:51|      29|       39|           13008|  TA1307000048|   21478|20220201|
|00002E8260690FFF| classic_bike|2021-09-29 17:43:26|2021-09-29 17:49:06|       5|       33|           13193|  TA1309000058|   23449|20220201|
|0000578C9F82736A| classic_bike|2021-10-06 22:31:26|2021-10-06 23:03:50|      32|       42|           13257|  KA1503000041|   46411|20220101|
|00008

In [0]:
# Check how many record we have in the table
spark.sql('''
          SELECT * FROM gold.fact_trips;
          ''').count()

Out[74]: 4463338

In [0]:
spark.sql('''
          SELECT t.trip_id, p.date, r.first_name, r.last_name, r.is_member, t.Duration, p.amount, t.rideable_type, d.is_weekend FROM gold.fact_trips t
          JOIN gold.dim_dates d
          ON t.date_id=d.date_id
          JOIN gold.dim_riders r
          ON r.rider_id=t.rider_id
          JOIN gold.fact_payments p
          ON p.rider_id=r.rider_id
          ''').show(5)

+----------------+----------+----------+---------+---------+--------+------+-------------+----------+
|         trip_id|      date|first_name|last_name|is_member|Duration|amount|rideable_type|is_weekend|
+----------------+----------+----------+---------+---------+--------+------+-------------+----------+
|07E0D1DCA9268F3C|2021-05-01|      John| Crawford|     true|       3|   9.0| classic_bike|         0|
|07E0D1DCA9268F3C|2021-06-01|      John| Crawford|     true|       3|   9.0| classic_bike|         0|
|07E0D1DCA9268F3C|2021-07-01|      John| Crawford|     true|       3|   9.0| classic_bike|         0|
|07E0D1DCA9268F3C|2021-04-01|      John| Crawford|     true|       3|   9.0| classic_bike|         0|
|0C50695A71DD5697|2021-05-01|      John| Crawford|     true|       8|   9.0| classic_bike|         0|
+----------------+----------+----------+---------+---------+--------+------+-------------+----------+
only showing top 5 rows

