In [0]:
# Step 1: Stage Bronze - Ingest the data into Azure Databricks using Delta Lake to create the Bronze data store

In [0]:
# 1. Payment Tables
payment_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/Data/payments.csv")


#Renamed the column name 
payment_df = payment_df.withColumnRenamed('_c0','payment_id')
payment_df = payment_df.withColumnRenamed('_c1','date')
payment_df = payment_df.withColumnRenamed('_c2','amount')
payment_df = payment_df.withColumnRenamed('_c3','rider_id')


#Input Data to the Data store
payment_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/delta/bronze/bronze_payments")

In [0]:
payment_df.show()

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|         1|2019-05-01|   9.0|    1000|
|         2|2019-06-01|   9.0|    1000|
|         3|2019-07-01|   9.0|    1000|
|         4|2019-08-01|   9.0|    1000|
|         5|2019-09-01|   9.0|    1000|
|         6|2019-10-01|   9.0|    1000|
|         7|2019-11-01|   9.0|    1000|
|         8|2019-12-01|   9.0|    1000|
|         9|2020-01-01|   9.0|    1000|
|        10|2020-02-01|   9.0|    1000|
|        11|2020-03-01|   9.0|    1000|
|        12|2020-04-01|   9.0|    1000|
|        13|2020-05-01|   9.0|    1000|
|        14|2020-06-01|   9.0|    1000|
|        15|2020-07-01|   9.0|    1000|
|        16|2020-08-01|   9.0|    1000|
|        17|2020-09-01|   9.0|    1000|
|        18|2020-10-01|   9.0|    1000|
|        19|2020-11-01|   9.0|    1000|
|        20|2020-12-01|   9.0|    1000|
+----------+----------+------+--------+
only showing top 20 rows



In [0]:
# 2. Riders Tables
riders_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/Data/riders.csv")


#Renamed the column name 
riders_df = riders_df.withColumnRenamed('_c0','rider_id')
riders_df = riders_df.withColumnRenamed('_c1','first')
riders_df = riders_df.withColumnRenamed('_c2','last')
riders_df = riders_df.withColumnRenamed('_c3','address')
riders_df = riders_df.withColumnRenamed('_c4','birthday')
riders_df = riders_df.withColumnRenamed('_c5','account_start_date')
riders_df = riders_df.withColumnRenamed('_c6','account_end_date')
riders_df = riders_df.withColumnRenamed('_c7','is_member')


riders_df.write.format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/delta/bronze/bronze_riders")

In [0]:
riders_df.show()

+--------+-----------+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|      first|     last|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+-----------+---------+--------------------+----------+------------------+----------------+---------+
|    1000|      Diana|    Clark| 1200 Alyssa Squares|1989-02-13|        2019-04-23|            null|     True|
|    1001|   Jennifer|    Smith|     397 Diana Ferry|1976-08-10|        2019-11-01|      2020-09-01|     True|
|    1002|      Karen|    Smith|644 Brittany Row ...|1998-08-10|        2022-02-04|            null|     True|
|    1003|      Bryan|  Roberts|996 Dickerson Tur...|1999-03-29|        2019-08-26|            null|    False|
|    1004|      Jesse|Middleton|7009 Nathan Expre...|1969-04-11|        2019-09-14|            null|     True|
|    1005|  Christine|Rodriguez|224 Washington Mi...|1974-08-27|        2020-03-24|            null|    False|
|

In [0]:
# 3. Stations Tables
stations_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/Data/stations.csv")


#Renamed the column name 
stations_df = stations_df.withColumnRenamed('_c0','station_id')
stations_df = stations_df.withColumnRenamed('_c1','name')
stations_df = stations_df.withColumnRenamed('_c2','latitude')
stations_df = stations_df.withColumnRenamed('_c3','longtitude')


stations_df.write.format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/delta/bronze/bronze_stations")


In [0]:
stations_df.show()

+------------+--------------------+------------------+------------------+
|  station_id|                name|          latitude|        longtitude|
+------------+--------------------+------------------+------------------+
|         525|Glenwood Ave & To...|         42.012701|-87.66605799999999|
|KA1503000012|  Clark St & Lake St| 41.88579466666667|-87.63110066666668|
|         637|Wood St & Chicago...|         41.895634|        -87.672069|
|       13216|  State St & 33rd St|        41.8347335|       -87.6258275|
|       18003|Fairbanks St & Su...| 41.89580766666667|-87.62025316666669|
|KP1705001026|LaSalle Dr & Huro...|         41.894877|        -87.632326|
|       13253|Lincoln Ave & Wav...|         41.948797|        -87.675278|
|KA1503000044|Rush St & Hubbard St|         41.890173|-87.62618499999999|
|KA1504000140|Winchester Ave & ...| 41.92403733333333|-87.67641483333334|
|TA1305000032|Clinton St & Madi...|         41.882242|-87.64106600000001|
|TA1306000012| Wells St & Huron St| 41

In [0]:
# 4. Trips Tables
trips_df = spark.read.format("csv") \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("/FileStore/Data/trips.csv")


#Renamed the column name 
trips_df = trips_df.withColumnRenamed('_c0','trip_id')
trips_df = trips_df.withColumnRenamed('_c1','rideable_type')
trips_df = trips_df.withColumnRenamed('_c2','start_at')
trips_df = trips_df.withColumnRenamed('_c3','ended_at')  
trips_df = trips_df.withColumnRenamed('_c4','start_station_id')  
trips_df = trips_df.withColumnRenamed('_c5','end_station_id')  
trips_df = trips_df.withColumnRenamed('_c6','rider_id')  


trips_df.write.format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/delta/bronze/bronze_trips")

In [0]:
trips_df.show()

+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|           start_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|   47854|
|E6159D746B2DBB91|electric_bike|2021-02-09 19:10:18|2021-02-09 19:19:10|    KA1503000012|  TA1305000029|   70870|
|B32D3199F1C2E75B| classic_bike|2021-02-02 17:49:41|2021-02-02 17:54:06|             637|  TA1305000034|   58974|
|83E463F23575F4BF|electric_bike|2021-02-23 15:07:23|2021-02-23 15:22:37|           13216|  TA1309000055|   39608|
|BDAA7E3494E8D545|electric_bike|2021-02-24 15:43:33|2021-02-24 15:49:05|           18003

In [0]:
#Step 2: Create a Gold Data Store

In [0]:
payment_bronze_df = spark.read.format("delta").load("/delta/bronze/bronze_payments")
payment_bronze_df.createOrReplaceTempView("payment_bronze_view")

payment_gold_df = spark.sql("""
    SELECT
        payment_id,
        TO_DATE(date) AS date,
        CAST(amount AS DECIMAL(10, 2)) AS amount,
        rider_id
    FROM
        payment_bronze_view
    WHERE
        date IS NOT NULL
""")

payment_gold_df.write.format("delta").mode("overwrite").save("/delta/gold/payment")
payment_gold_df.show()

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|         1|2019-05-01|  9.00|    1000|
|         2|2019-06-01|  9.00|    1000|
|         3|2019-07-01|  9.00|    1000|
|         4|2019-08-01|  9.00|    1000|
|         5|2019-09-01|  9.00|    1000|
|         6|2019-10-01|  9.00|    1000|
|         7|2019-11-01|  9.00|    1000|
|         8|2019-12-01|  9.00|    1000|
|         9|2020-01-01|  9.00|    1000|
|        10|2020-02-01|  9.00|    1000|
|        11|2020-03-01|  9.00|    1000|
|        12|2020-04-01|  9.00|    1000|
|        13|2020-05-01|  9.00|    1000|
|        14|2020-06-01|  9.00|    1000|
|        15|2020-07-01|  9.00|    1000|
|        16|2020-08-01|  9.00|    1000|
|        17|2020-09-01|  9.00|    1000|
|        18|2020-10-01|  9.00|    1000|
|        19|2020-11-01|  9.00|    1000|
|        20|2020-12-01|  9.00|    1000|
+----------+----------+------+--------+
only showing top 20 rows



In [0]:
payment_gold_df.show()

In [0]:
#Create Rider table for gold store
riders_bronze_df = spark.read.format("delta").load("/delta/bronze/bronze_riders")
riders_bronze_df.createOrReplaceTempView("riders_bronze_view")

riders_gold_df = spark.sql("""
    SELECT
        riders_bronze_view.rider_id,
        first,
        last,
        address,
        TO_DATE(birthday) AS birthday,
        TO_DATE(account_start_date) AS account_start_date,
        TO_DATE(account_end_date) AS account_end_date,
        is_member
    FROM
        riders_bronze_view
    WHERE
        rider_id IS NOT NULL
""")

# Store 'riders' DataFrame as Delta Lake table
riders_gold_df.write.format("delta").mode("overwrite").save("/delta/gold/riders")


In [0]:
riders_gold_df.show()

+--------+-----------+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|      first|     last|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+-----------+---------+--------------------+----------+------------------+----------------+---------+
|    1000|      Diana|    Clark| 1200 Alyssa Squares|1989-02-13|        2019-04-23|            null|     True|
|    1001|   Jennifer|    Smith|     397 Diana Ferry|1976-08-10|        2019-11-01|      2020-09-01|     True|
|    1002|      Karen|    Smith|644 Brittany Row ...|1998-08-10|        2022-02-04|            null|     True|
|    1003|      Bryan|  Roberts|996 Dickerson Tur...|1999-03-29|        2019-08-26|            null|    False|
|    1004|      Jesse|Middleton|7009 Nathan Expre...|1969-04-11|        2019-09-14|            null|     True|
|    1005|  Christine|Rodriguez|224 Washington Mi...|1974-08-27|        2020-03-24|            null|    False|
|

In [0]:
#Create Station table for gold store
stations_bronze_df = spark.read.format("delta").load("/delta/bronze/bronze_stations")
stations_bronze_df.createOrReplaceTempView("stations_bronze_view")

stations_gold_df = spark.sql("""
    SELECT
        stations_bronze_view.station_id,
        name,
        CAST(latitude AS FLOAT) AS latitude,
        CAST(longtitude AS FLOAT) AS longitude
    FROM
        stations_bronze_view
    WHERE
        station_id IS NOT NULL
""")

# Store 'Stations' DataFrame as Delta Lake table
stations_gold_df.write.format("delta").mode("overwrite").save("/delta/gold/stations")

In [0]:
stations_gold_df.show()

+------------+--------------------+---------+----------+
|  station_id|                name| latitude| longitude|
+------------+--------------------+---------+----------+
|         525|Glenwood Ave & To...|  42.0127| -87.66606|
|KA1503000012|  Clark St & Lake St|41.885796|  -87.6311|
|         637|Wood St & Chicago...|41.895634|-87.672066|
|       13216|  State St & 33rd St|41.834732|-87.625824|
|       18003|Fairbanks St & Su...| 41.89581|-87.620255|
|KP1705001026|LaSalle Dr & Huro...| 41.89488|-87.632324|
|       13253|Lincoln Ave & Wav...|41.948795| -87.67528|
|KA1503000044|Rush St & Hubbard St|41.890175| -87.62618|
|KA1504000140|Winchester Ave & ...|41.924038|-87.676414|
|TA1305000032|Clinton St & Madi...| 41.88224| -87.64107|
|TA1306000012| Wells St & Huron St|41.894753|  -87.6344|
|       13133|Damen Ave & Cortl...| 41.91598| -87.67734|
|      SL-005|Indiana Ave & Roo...| 41.86789| -87.62304|
|       13235|Southport Ave & W...| 41.94815| -87.66394|
|TA1307000139| MLK Jr Dr & 29th

In [0]:
#Create Trips tables for gold store
trips_bronze_df = spark.read.format("delta").load("/delta/bronze/bronze_trips")
trips_bronze_df.createOrReplaceTempView("trips_bronze_view")

trips_gold_df = spark.sql("""
    SELECT
        trips_bronze_view.trip_id,
        rideable_type,
        start_at,
        ended_at,
        start_station_id,
        end_station_id,
        rider_id
    FROM
        trips_bronze_view
    WHERE
        trip_id IS NOT NULL
""")

# Store 'trips' DataFrame as Delta Lake table
trips_gold_df.write.format("delta").mode("overwrite").save("/delta/gold/trips")

In [0]:
trips_gold_df.show()

+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|           start_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|222BB8E5059252D7| classic_bike|2021-06-13 09:48:47|2021-06-13 10:07:23|    KA1503000064|         13021|   34062|
|1826E16CB5486018| classic_bike|2021-06-21 22:59:13|2021-06-21 23:04:29|    TA1306000010|         13021|    5342|
|3D9B6A0A5330B04D| classic_bike|2021-06-18 16:06:42|2021-06-18 16:12:02|    TA1305000030|         13021|    3714|
|07E82F5E9C9E490F| classic_bike|2021-06-17 16:46:23|2021-06-17 17:02:45|    TA1305000034|         13021|   18793|
|A8E94BAECBF0C2DD|  docked_bike|2021-06-13 17:36:29|2021-06-13 18:30:39|    TA1308000009|  TA1308000009|   43342|
|378F4AB323AA1D14|  docked_bike|2021-06-13 13:20:10|2021-06-13 14:06:14|    TA1308000009

In [0]:
# Input the table to databricks trips tables
trips_gold_df_table = spark.read.format("delta") \
    .load("/delta/gold/trips")

trips_gold_df_table.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("trips")   

In [0]:
spark.sql("""
    CREATE TABLE Trip_Fact
    USING delta
    AS SELECT * FROM trips
""")

Out[60]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Input the table to databricks payments tables
payment_gold_df_table = spark.read.format("delta") \
    .load("/delta/gold/payment")

payment_gold_df_table.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("payment")

In [0]:
# Create Payment fact table from the payment table
spark.sql("""
    CREATE TABLE Payment_Fact
    USING delta
    AS SELECT * FROM payment
""")

Out[6]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Create Date_Dim
spark.sql("""
    CREATE TABLE Date_Dim
    USING delta
    AS SELECT DISTINCT date,
        day(date) AS day,
        dayofweek(date) AS day_of_week,
        month(date) AS month,
        quarter(date) AS quarter,
        year(date) AS year,
        CASE WHEN dayofweek(date) IN (1, 7) THEN 1 ELSE 0 END AS is_weekend
    FROM payment
""")


Out[7]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Create Time_Dim
spark.sql("""
    CREATE TABLE Time_Dim
    USING delta
    AS
    SELECT DISTINCT
        start_at AS time,
        HOUR(start_at) AS hour,
        MINUTE(start_at) AS minute,
        SECOND(start_at) AS second
    FROM trips
    UNION
    SELECT DISTINCT
        ended_at AS time,
        HOUR(ended_at) AS hour,
        MINUTE(ended_at) AS minute,
        SECOND(ended_at) AS second
    FROM trips
""")


Out[9]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Input the table to databricks stations tables
stations_gold_df_table = spark.read.format("delta") \
    .load("/delta/gold/stations")

stations_gold_df_table.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("stations")

In [0]:
# Create Station_Dim
spark.sql("""
    CREATE TABLE Station_Dim
    USING delta
    AS SELECT * FROM stations
""")

Out[11]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Input riders to databricks
riders_gold_df_table = spark.read.format("delta") \
    .load("/delta/gold/riders")

riders_gold_df_table.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("riders")

In [0]:
# Create Rider_Dim
spark.sql("""
    CREATE TABLE Rider_Dim
    USING delta
    AS SELECT * FROM riders
""")

Out[22]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
#Step 4: Transforming Data into the Star Schema

In [0]:
# Fact Table insert
spark.sql("""
    INSERT INTO Payment_Fact
    SELECT *
    FROM payment
""")

Out[12]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Date_Dim Insert
spark.sql("""
    INSERT INTO Date_Dim
    SELECT DISTINCT date,
        day(date) AS day,
        dayofweek(date) AS day_of_week,
        month(date) AS month,
        quarter(date) AS quarter,
        year(date) AS year,
        CASE WHEN dayofweek(date) IN (1, 7) THEN 1 ELSE 0 END AS is_weekend
    FROM payment
""")

Out[13]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Time_Dim Insert
spark.sql("""
    INSERT INTO Time_Dim
    SELECT DISTINCT
        start_at AS time,
        HOUR(start_at) AS hour,
        MINUTE(start_at) AS minute,
        SECOND(start_at) AS second
    FROM trips
    UNION
    SELECT DISTINCT
        ended_at AS time,
        HOUR(ended_at) AS hour,
        MINUTE(ended_at) AS minute,
        SECOND(ended_at) AS second
    FROM trips
""")


Out[14]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Station_Dim Insert
spark.sql("""
    INSERT INTO Station_Dim
    SELECT *
    FROM stations
""")

Out[15]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("""
    INSERT INTO Rider_Dim
    SELECT 
        rider_id,
        first,
        last,
        address,
        birthday,
        account_start_date,
        account_end_date,
        is_member
    FROM riders
""")

Out[23]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Analyze money spent (using Payment_Fact table):

In [0]:
spark.sql("""
    SELECT month, quarter, year, SUM(amount) AS total_amount
    FROM Payment_Fact PF, Date_Dim DD
    WHERE PF.`date` = DD.`date`
    GROUP BY month, quarter, year
    ORDER BY total_amount DESC
""").show()

+-----+-------+----+------------+
|month|quarter|year|total_amount|
+-----+-------+----+------------+
|    2|      1|2022|  2397885.08|
|    1|      1|2022|  2361996.72|
|   12|      4|2021|  2316919.68|
|   11|      4|2021|  2264562.44|
|   10|      4|2021|  2222626.40|
|    9|      3|2021|  2163974.12|
|    8|      3|2021|  2109123.04|
|    7|      3|2021|  2052054.68|
|    6|      2|2021|  2000826.60|
|    5|      2|2021|  1942877.00|
|    4|      2|2021|  1887006.64|
|    3|      1|2021|  1835136.68|
|    2|      1|2021|  1789721.48|
|    1|      1|2021|  1739564.24|
|   12|      4|2020|  1682818.20|
|   11|      4|2020|  1639681.76|
|   10|      4|2020|  1593460.12|
|    9|      3|2020|  1542305.36|
|    8|      3|2020|  1497586.20|
|    7|      3|2020|  1449733.96|
+-----+-------+----+------------+
only showing top 20 rows



In [0]:
spark.sql("""
    SELECT *
    FROM Payment_Fact PF
""").show()

+----------+----------+------+--------+
|payment_id|      date|amount|rider_id|
+----------+----------+------+--------+
|         1|2019-05-01|  9.00|    1000|
|         2|2019-06-01|  9.00|    1000|
|         3|2019-07-01|  9.00|    1000|
|         4|2019-08-01|  9.00|    1000|
|         5|2019-09-01|  9.00|    1000|
|         6|2019-10-01|  9.00|    1000|
|         7|2019-11-01|  9.00|    1000|
|         8|2019-12-01|  9.00|    1000|
|         9|2020-01-01|  9.00|    1000|
|        10|2020-02-01|  9.00|    1000|
|        11|2020-03-01|  9.00|    1000|
|        12|2020-04-01|  9.00|    1000|
|        13|2020-05-01|  9.00|    1000|
|        14|2020-06-01|  9.00|    1000|
|        15|2020-07-01|  9.00|    1000|
|        16|2020-08-01|  9.00|    1000|
|        17|2020-09-01|  9.00|    1000|
|        18|2020-10-01|  9.00|    1000|
|        19|2020-11-01|  9.00|    1000|
|        20|2020-12-01|  9.00|    1000|
+----------+----------+------+--------+
only showing top 20 rows



In [0]:
spark.sql("""
    SELECT *
    FROM rider_dim PF
""").show()

+--------+-----------+---------+--------------------+----------+------------------+----------------+---------+
|rider_id|      first|     last|             address|  birthday|account_start_date|account_end_date|is_member|
+--------+-----------+---------+--------------------+----------+------------------+----------------+---------+
|    1000|      Diana|    Clark| 1200 Alyssa Squares|1989-02-13|        2019-04-23|            null|     True|
|    1001|   Jennifer|    Smith|     397 Diana Ferry|1976-08-10|        2019-11-01|      2020-09-01|     True|
|    1002|      Karen|    Smith|644 Brittany Row ...|1998-08-10|        2022-02-04|            null|     True|
|    1003|      Bryan|  Roberts|996 Dickerson Tur...|1999-03-29|        2019-08-26|            null|    False|
|    1004|      Jesse|Middleton|7009 Nathan Expre...|1969-04-11|        2019-09-14|            null|     True|
|    1005|  Christine|Rodriguez|224 Washington Mi...|1974-08-27|        2020-03-24|            null|    False|
|

In [0]:
spark.sql("""
    SELECT *
    FROM station_dim
""").show()

+------------+--------------------+---------+----------+
|  station_id|                name| latitude| longitude|
+------------+--------------------+---------+----------+
|         525|Glenwood Ave & To...|  42.0127| -87.66606|
|KA1503000012|  Clark St & Lake St|41.885796|  -87.6311|
|         637|Wood St & Chicago...|41.895634|-87.672066|
|       13216|  State St & 33rd St|41.834732|-87.625824|
|       18003|Fairbanks St & Su...| 41.89581|-87.620255|
|KP1705001026|LaSalle Dr & Huro...| 41.89488|-87.632324|
|       13253|Lincoln Ave & Wav...|41.948795| -87.67528|
|KA1503000044|Rush St & Hubbard St|41.890175| -87.62618|
|KA1504000140|Winchester Ave & ...|41.924038|-87.676414|
|TA1305000032|Clinton St & Madi...| 41.88224| -87.64107|
|TA1306000012| Wells St & Huron St|41.894753|  -87.6344|
|       13133|Damen Ave & Cortl...| 41.91598| -87.67734|
|      SL-005|Indiana Ave & Roo...| 41.86789| -87.62304|
|       13235|Southport Ave & W...| 41.94815| -87.66394|
|TA1307000139| MLK Jr Dr & 29th

In [0]:
spark.sql("""
    SELECT *
    FROM date_dim
""").show()

+----------+---+-----------+-----+-------+----+----------+
|      date|day|day_of_week|month|quarter|year|is_weekend|
+----------+---+-----------+-----+-------+----+----------+
|2017-02-01|  1|          4|    2|      1|2017|         0|
|2018-07-01|  1|          1|    7|      3|2018|         1|
|2021-08-01|  1|          1|    8|      3|2021|         1|
|2021-11-01|  1|          2|   11|      4|2021|         0|
|2017-08-01|  1|          3|    8|      3|2017|         0|
|2021-05-01|  1|          7|    5|      2|2021|         1|
|2020-01-01|  1|          4|    1|      1|2020|         0|
|2019-02-01|  1|          6|    2|      1|2019|         0|
|2015-07-01|  1|          4|    7|      3|2015|         0|
|2015-03-01|  1|          1|    3|      1|2015|         1|
|2013-08-01|  1|          5|    8|      3|2013|         0|
|2021-03-01|  1|          2|    3|      1|2021|         0|
|2015-04-01|  1|          4|    4|      2|2015|         0|
|2020-05-01|  1|          6|    5|      2|2020|         

In [0]:
spark.sql("""
    SELECT *
    FROM time_dim
""").show()

+-------------------+----+------+------+
|               time|hour|minute|second|
+-------------------+----+------+------+
|2021-06-29 15:44:47|  15|    44|    47|
|2021-06-10 15:32:58|  15|    32|    58|
|2021-06-11 17:32:34|  17|    32|    34|
|2021-06-24 19:28:59|  19|    28|    59|
|2021-06-18 19:03:56|  19|     3|    56|
|2021-06-12 16:12:36|  16|    12|    36|
|2021-06-27 21:22:15|  21|    22|    15|
|2021-06-03 13:22:56|  13|    22|    56|
|2021-06-14 16:59:02|  16|    59|     2|
|2021-06-19 22:43:09|  22|    43|     9|
|2021-06-09 19:36:49|  19|    36|    49|
|2021-06-08 10:06:26|  10|     6|    26|
|2021-06-21 21:37:54|  21|    37|    54|
|2021-06-04 18:51:57|  18|    51|    57|
|2021-06-18 18:22:36|  18|    22|    36|
|2021-06-11 23:33:48|  23|    33|    48|
|2021-06-27 14:27:22|  14|    27|    22|
|2021-06-22 16:17:39|  16|    17|    39|
|2021-06-26 09:18:41|   9|    18|    41|
|2021-06-04 02:57:12|   2|    57|    12|
+-------------------+----+------+------+
only showing top

In [0]:
spark.sql("""
    SELECT *
    FROM trip_fact
""").show()

+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|         trip_id|rideable_type|           start_at|           ended_at|start_station_id|end_station_id|rider_id|
+----------------+-------------+-------------------+-------------------+----------------+--------------+--------+
|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   71934|
|0FEFDE2603568365| classic_bike|2021-02-14 17:52:38|2021-02-14 18:12:09|             525|         16806|   47854|
|E6159D746B2DBB91|electric_bike|2021-02-09 19:10:18|2021-02-09 19:19:10|    KA1503000012|  TA1305000029|   70870|
|B32D3199F1C2E75B| classic_bike|2021-02-02 17:49:41|2021-02-02 17:54:06|             637|  TA1305000034|   58974|
|83E463F23575F4BF|electric_bike|2021-02-23 15:07:23|2021-02-23 15:22:37|           13216|  TA1309000055|   39608|
|BDAA7E3494E8D545|electric_bike|2021-02-24 15:43:33|2021-02-24 15:49:05|           18003

In [0]:
# Analyze how much money is spent per member (Extra Credit):

In [0]:
spark.sql("""
    SELECT rd.rider_id,
        CONCAT(rd.first, ' ', rd.last) AS rider_name,
        SUM(pf.amount) AS total_amount_earned
    FROM Payment_Fact pf
    JOIN Rider_Dim rd ON pf.rider_id = rd.rider_id
    GROUP BY rd.rider_id, rider_name
""").show()

+--------+------------------+-------------------+
|rider_id|        rider_name|total_amount_earned|
+--------+------------------+-------------------+
|    1268|         Kim Lyons|            1692.00|
|    1316|     Philip Barnes|             108.00|
|    2141|        Lisa Green|             994.96|
|    2167|     Sarah Stewart|             540.00|
|    2222|   Mathew Matthews|            1080.00|
|    2316|       Sarah Banks|            1800.00|
|    2361|    Janet Mcdonald|              72.00|
|    2416|       Rachel Cruz|             108.00|
|    2575|       Sally Scott|              36.00|
|    2700|    Randy Chambers|            3276.00|
|    3008|          Joy King|            1008.00|
|    3037|      Sophia Smith|            2808.00|
|    3088|      Maria Martin|             790.24|
|    3311|      Regina Bowen|             540.00|
|    3535|  Elizabeth Graham|            1404.00|
|    3700|       Karen Pitts|              98.48|
|    3836|    Stephanie Hull|              36.00|


In [0]:
#Step 5 Bussiness outcomes

In [0]:
# Analyze how much time is spent per ride:
spark.sql("""
    SELECT trip_id, 
           TIMESTAMP(start_at) AS start_time, 
           TIMESTAMP(ended_at) AS end_time, 
           round((UNIX_TIMESTAMP(ended_at) - UNIX_TIMESTAMP(start_at)) / 60, 3) AS ride_duration_minutes
    FROM trip_fact
""").show()

+----------------+-------------------+-------------------+---------------------+
|         trip_id|         start_time|           end_time|ride_duration_minutes|
+----------------+-------------------+-------------------+---------------------+
|89E7AA6C29227EFF|2021-02-12 16:14:56|2021-02-12 16:21:43|                6.783|
|0FEFDE2603568365|2021-02-14 17:52:38|2021-02-14 18:12:09|               19.517|
|E6159D746B2DBB91|2021-02-09 19:10:18|2021-02-09 19:19:10|                8.867|
|B32D3199F1C2E75B|2021-02-02 17:49:41|2021-02-02 17:54:06|                4.417|
|83E463F23575F4BF|2021-02-23 15:07:23|2021-02-23 15:22:37|               15.233|
|BDAA7E3494E8D545|2021-02-24 15:43:33|2021-02-24 15:49:05|                5.533|
|A772742351171257|2021-02-01 17:47:42|2021-02-01 17:48:33|                 0.85|
|295476889D9B79F8|2021-02-11 18:33:53|2021-02-11 18:35:09|                1.267|
|362087194BA4CC9A|2021-02-27 15:13:39|2021-02-27 15:36:36|                22.95|
|21630F715038CCB0|2021-02-20

In [0]:
# Analyze time-based factors:
spark.sql("""
    SELECT trip_id, 
           date_dim.day_of_week, 
           time_dim.hour, 
           time_dim.minute
    FROM trip_fact
    JOIN date_dim ON trip_fact.start_at = date_dim.date
    JOIN time_dim ON trip_fact.start_at = time_dim.time
""").show()

+----------------+-----------+----+------+
|         trip_id|day_of_week|hour|minute|
+----------------+-----------+----+------+
|42D4B6A9BBE42B39|          3|  15|    56|
|42D4B6A9BBE42B39|          3|  15|    56|
|246E53CB58DD424D|          1|  21|    26|
|246E53CB58DD424D|          1|  21|    26|
|D9FBB3DC6536583C|          1|  12|    32|
|D9FBB3DC6536583C|          1|  12|    32|
|3F4708BA32882D58|          3|  20|    59|
|3F4708BA32882D58|          3|  20|    59|
|D530A96BED14F1D2|          6|  11|    29|
|D530A96BED14F1D2|          6|  11|    29|
|2A03CCF2C1964880|          6|  16|     8|
|2A03CCF2C1964880|          6|  16|     8|
|39F7A0625024FEA8|          5|  22|     3|
|39F7A0625024FEA8|          5|  22|     3|
|32A4D4A8BC639CDB|          5|  11|    42|
|32A4D4A8BC639CDB|          5|  11|    42|
|07E326B5534970C1|          4|  18|    32|
|07E326B5534970C1|          4|  18|    32|
|A473AF58FB4060C5|          4|  18|    32|
|A473AF58FB4060C5|          4|  18|    32|
+----------

In [0]:
# Analyze based on starting and/or ending stations:
spark.sql("""
    SELECT trip_id, 
           start_station_id, 
           end_station_id
    FROM trip_fact
""").show()

+----------------+----------------+--------------+
|         trip_id|start_station_id|end_station_id|
+----------------+----------------+--------------+
|89E7AA6C29227EFF|             525|           660|
|0FEFDE2603568365|             525|         16806|
|E6159D746B2DBB91|    KA1503000012|  TA1305000029|
|B32D3199F1C2E75B|             637|  TA1305000034|
|83E463F23575F4BF|           13216|  TA1309000055|
|BDAA7E3494E8D545|           18003|  KP1705001026|
|A772742351171257|    KP1705001026|  KP1705001026|
|295476889D9B79F8|           18003|         18003|
|362087194BA4CC9A|    KP1705001026|  KP1705001026|
|21630F715038CCB0|    KP1705001026|  KP1705001026|
|A977EB7FE7F5CD3A|    KP1705001026|  KP1705001026|
|8B868B03D6753C2A|    KP1705001026|  KP1705001026|
|BD331D658B9D2C31|             525|           520|
|8DFEA9BAFE6BAA62|           13253|  TA1309000050|
|27BE9F6E67AFD86C|             525|         15578|
|9B790D47A0A0F7F1|    KA1503000044|  KA1504000142|
|3C2DF72600B1DE6C|    KA1503000

In [0]:
# Analyze based on age of the rider at the time of the ride:
spark.sql("""
    SELECT trip_id, 
           TIMESTAMPDIFF(year, rider_dim.birthday, trip_fact.start_at) AS rider_age
    FROM trip_fact
    JOIN rider_dim ON trip_fact.rider_id = rider_dim.rider_id
""").show()

+----------------+---------+
|         trip_id|rider_age|
+----------------+---------+
|222BB8E5059252D7|       30|
|222BB8E5059252D7|       30|
|1826E16CB5486018|       26|
|1826E16CB5486018|       26|
|3D9B6A0A5330B04D|       26|
|3D9B6A0A5330B04D|       26|
|07E82F5E9C9E490F|       18|
|07E82F5E9C9E490F|       18|
|A8E94BAECBF0C2DD|       28|
|A8E94BAECBF0C2DD|       28|
|378F4AB323AA1D14|       28|
|378F4AB323AA1D14|       28|
|38AD311DC2EB1FBE|       56|
|38AD311DC2EB1FBE|       56|
|1D466737F0B18097|       40|
|1D466737F0B18097|       40|
|27E1142E1ACFAEFB|       21|
|27E1142E1ACFAEFB|       21|
|67F2A115DAE77924|       37|
|67F2A115DAE77924|       37|
+----------------+---------+
only showing top 20 rows



In [0]:
# Analyze based on rider membership status:
spark.sql("""
    SELECT trip_id, 
           rider_dim.is_member
    FROM trip_fact
    JOIN rider_dim ON trip_fact.rider_id = rider_dim.rider_id
""").show()

+----------------+---------+
|         trip_id|is_member|
+----------------+---------+
|222BB8E5059252D7|     True|
|222BB8E5059252D7|     True|
|1826E16CB5486018|     True|
|1826E16CB5486018|     True|
|3D9B6A0A5330B04D|     True|
|3D9B6A0A5330B04D|     True|
|07E82F5E9C9E490F|     True|
|07E82F5E9C9E490F|     True|
|A8E94BAECBF0C2DD|     True|
|A8E94BAECBF0C2DD|     True|
|378F4AB323AA1D14|     True|
|378F4AB323AA1D14|     True|
|38AD311DC2EB1FBE|     True|
|38AD311DC2EB1FBE|     True|
|1D466737F0B18097|     True|
|1D466737F0B18097|     True|
|27E1142E1ACFAEFB|     True|
|27E1142E1ACFAEFB|     True|
|67F2A115DAE77924|     True|
|67F2A115DAE77924|     True|
+----------------+---------+
only showing top 20 rows



In [0]:
# Analyze how much money is spent per month, quarter, and year:
spark.sql("""
    SELECT EXTRACT(month FROM date) AS month,
           EXTRACT(quarter FROM date) AS quarter,
           EXTRACT(year FROM date) AS year,
           SUM(amount) AS total_amount
    FROM payment_fact
    GROUP BY month, quarter, year
""").show()

+-----+-------+----+------------+
|month|quarter|year|total_amount|
+-----+-------+----+------------+
|    1|      1|2015|    58716.72|
|    8|      3|2020|   748793.10|
|   11|      4|2020|   819840.88|
|   12|      4|2015|   103565.96|
|    7|      3|2014|    39084.94|
|    6|      2|2021|  1000413.30|
|    1|      1|2021|   869782.12|
|    3|      1|2015|    65875.08|
|    2|      1|2014|    25169.84|
|    6|      2|2019|   486183.00|
|    9|      3|2020|   771152.68|
|    3|      1|2020|   644732.74|
|    2|      1|2013|       25.80|
|   10|      4|2021|  1111313.20|
|    6|      2|2018|   327605.28|
|    1|      1|2019|   412790.68|
|    3|      1|2019|   439194.82|
|   10|      4|2013|    15772.24|
|    3|      1|2013|     1635.50|
|   11|      4|2018|   383882.86|
+-----+-------+----+------------+
only showing top 20 rows



In [0]:
# Analyze how much money is spent per member based on average rides and minutes per month:
spark.sql("""
    SELECT rider_dim.rider_id,
           ROUND(COUNT(trip_fact.trip_id) / COUNT(DISTINCT EXTRACT(month FROM trip_fact.start_at)),3) AS avg_rides_per_month,
           ROUND(SUM((UNIX_TIMESTAMP(trip_fact.ended_at) - UNIX_TIMESTAMP(trip_fact.start_at)) / 60) / COUNT(DISTINCT EXTRACT(month FROM trip_fact.start_at)),3) AS avg_minutes_per_month
    FROM trip_fact
    JOIN rider_dim ON trip_fact.rider_id = rider_dim.rider_id
    GROUP BY rider_dim.rider_id
""").show()

+--------+-------------------+---------------------+
|rider_id|avg_rides_per_month|avg_minutes_per_month|
+--------+-------------------+---------------------+
|   70962|             22.182|              510.561|
|   67196|              5.556|               93.759|
|   13772|                2.0|                103.3|
|   20868|             27.333|              709.186|
|   64842|             26.667|               370.05|
|   70721|               10.4|              107.277|
|   13192|              3.143|              129.138|
|   64595|             93.833|               3263.9|
|   28503|             13.091|              176.685|
|   13865|               3.75|               58.733|
|   15271|             17.167|               312.75|
|   29089|             14.909|              188.197|
|   47880|             69.333|              1132.05|
|    8304|                2.0|               135.35|
|   68285|             13.167|              310.461|
|   28197|             62.833|             146

In [0]:
# Analyze how much money is spent per member based on the age of the rider at account start:
spark.sql("""
    SELECT rider_dim.rider_id,
           TIMESTAMPDIFF(year, rider_dim.birthday, rider_dim.account_start_date) AS rider_age_at_start,
           SUM(payment_fact.amount) AS total_amount_spent
    FROM payment_fact
    JOIN rider_dim ON payment_fact.rider_id = rider_dim.rider_id
    GROUP BY rider_dim.rider_id, rider_age_at_start
""").show()

+--------+------------------+------------------+
|rider_id|rider_age_at_start|total_amount_spent|
+--------+------------------+------------------+
|    1028|                31|           1642.88|
|    1197|                44|           4840.52|
|    1588|                30|             72.00|
|    1632|                15|            108.00|
|    2014|                52|            540.00|
|    2038|                48|           3096.00|
|    2309|                27|           3024.00|
|    2430|                40|            144.00|
|    2443|                24|            396.00|
|    2937|                46|           1260.00|
|    2945|                16|            468.00|
|    3206|                24|           1404.00|
|    3510|                25|           1188.00|
|    3607|                27|           2412.00|
|    3765|                15|           3357.40|
|    4175|                23|            756.00|
|    4333|                26|           1656.00|
|    4436|          