
# Bike Share Data


## Extract Step: Read data from csv and store it as delta

### Read stations.csv

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Specify schema
schema = StructType([
  StructField("station_id", StringType(), True),
  StructField("name", StringType(), True),
  StructField("latitude", FloatType(), True),
  StructField("longitude", FloatType(), True)
])

# Read csv
df = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .schema(schema) \
  .load(f"/FileStore/data/bike-sharing/stations.csv")

display(df.take(10))

# Write to delta
df.write.format("delta").mode("overwrite").save("/delta/bike-sharing/bronze_stations")

station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.88579559326172,-87.631103515625
637,Wood St & Chicago Ave,41.895633697509766,-87.67206573486328
13216,State St & 33rd St,41.83473205566406,-87.62582397460938
18003,Fairbanks St & Superior St,41.895809173583984,-87.62025451660156
KP1705001026,LaSalle Dr & Huron St,41.89487838745117,-87.63232421875
13253,Lincoln Ave & Waveland Ave,41.94879531860352,-87.67527770996094
KA1503000044,Rush St & Hubbard St,41.890174865722656,-87.62618255615234
KA1504000140,Winchester Ave & Elston Ave,41.92403793334961,-87.6764144897461
TA1305000032,Clinton St & Madison St,41.88224029541016,-87.64106750488281
TA1306000012,Wells St & Huron St,41.894752502441406,-87.6343994140625


### Read riders.csv

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, BooleanType

# Specify schema
schema = StructType([
  StructField("rider_id", IntegerType(), True),
  StructField("first", StringType(), True),
  StructField("last", StringType(), True),
  StructField("address", StringType(), True),
  StructField("birthday", DateType(), True),
  StructField("account_start_date", DateType(), True),
  StructField("account_end_date", DateType(), True),
  StructField("is_member", BooleanType(), True)
])

# Read csv
df = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .schema(schema) \
  .load(f"/FileStore/data/bike-sharing/riders.csv")

display(df.take(10))

# Write to delta
df.write.format("delta").mode("overwrite").save("/delta/bike-sharing/bronze_riders")

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True
1010,Tracy,Austin,92973 Mary Ville,1996-04-07,2019-12-27,,True


### Read payments.csv

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DecimalType, DateType

# Specify schema
schema = StructType([
  StructField("payment_id", IntegerType(), True),
  StructField("date", DateType(), True),
  StructField("amount", DecimalType(), True),
  StructField("rider_id", IntegerType(), True),
])

# Read csv
df = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .schema(schema) \
  .load(f"/FileStore/data/bike-sharing/payments.csv")

display(df.take(10))

# Write to delta
df.write.format("delta").mode("overwrite").save("/delta/bike-sharing/bronze_payments")

payment_id,date,amount,rider_id
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000
11,2020-03-01,9.0,1000


### Read trips.csv

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, StringType, TimestampType, IntegerType

# Specify schema
schema = StructType([
  StructField("trip_id", StringType(), True),
  StructField("rideable_type", StringType(), True),
  StructField("started_at", TimestampType(), True),
  StructField("ended_at", TimestampType(), True),
  StructField("start_station_id", IntegerType(), True),
  StructField("end_station_id", IntegerType(), True),
  StructField("rider_id", IntegerType(), True),
])

# Read csv
df = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .schema(schema) \
  .load(f"/FileStore/data/bike-sharing/trips.csv")

display(df.take(10))

# Write to delta
df.write.format("delta").mode("overwrite").save("/delta/bike-sharing/bronze_trips")

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,525.0,16806.0,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18Z,2021-02-09T19:19:10Z,,,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41Z,2021-02-02T17:54:06Z,637.0,,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23Z,2021-02-23T15:22:37Z,13216.0,,39608
BDAA7E3494E8D545,electric_bike,2021-02-24T15:43:33Z,2021-02-24T15:49:05Z,18003.0,,36267
A772742351171257,classic_bike,2021-02-01T17:47:42Z,2021-02-01T17:48:33Z,,,50104
295476889D9B79F8,classic_bike,2021-02-11T18:33:53Z,2021-02-11T18:35:09Z,18003.0,18003.0,19618
362087194BA4CC9A,classic_bike,2021-02-27T15:13:39Z,2021-02-27T15:36:36Z,,,16732
21630F715038CCB0,classic_bike,2021-02-20T08:59:42Z,2021-02-20T09:17:04Z,,,57068
A977EB7FE7F5CD3A,classic_bike,2021-02-20T08:58:16Z,2021-02-20T08:58:41Z,,,32712


## Load Step: Read data from delta and store it as tables

### Create tables from delta files

In [0]:
# Create tables from delta files
spark.sql("CREATE TABLE IF NOT EXISTS trip USING DELTA LOCATION '/delta/bike-sharing/bronze_trips'")
spark.sql("CREATE TABLE IF NOT EXISTS rider USING DELTA LOCATION '/delta/bike-sharing/bronze_riders'")
spark.sql("CREATE TABLE IF NOT EXISTS payment USING DELTA LOCATION '/delta/bike-sharing/bronze_payments'")
spark.sql("CREATE TABLE IF NOT EXISTS station USING DELTA LOCATION '/delta/bike-sharing/bronze_stations'")

DataFrame[]

## Transform Step: Read data from tables and create dimension and fact tables

### Create date dimension table


In [0]:
# Create table for date dimension
spark.sql("drop table gold_dim_date")
spark.sql("CREATE TABLE IF NOT EXISTS gold_dim_date (date DATE, day_of_week BIGINT, hour_of_day BIGINT, month BIGINT, quarter BIGINT, year BIGINT, date_id LONG)")


DataFrame[]

### Create payment fact table

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, dayofweek, hour, month, quarter, year, to_date

# Read payments from table
df_payments = spark.table("payment")
# Read dates from table
df_dim_date = spark.table("gold_dim_date")

# Iterate over date-like columns and replace it with date_id
for datelike in ["date"]:
  # Rename datelike column to date
  df_payments = df_payments.withColumnRenamed(datelike, "date")
  
  # Extract distinct dates
  df_dates = df_payments.select("date").distinct()
  # Calculate day of the week
  df_dates = df_dates.withColumn("day_of_week", dayofweek("date"))
  # Calculate hour of the day
  df_dates = df_dates.withColumn("hour_of_day", hour("date"))
  # Calculate month
  df_dates = df_dates.withColumn("month", month("date"))
  # Calculate month
  df_dates = df_dates.withColumn("quarter", quarter("date"))
  # Calculate month
  df_dates = df_dates.withColumn("year", year("date"))
  
  # Identify dates that are not already present in dim_date table
  df_dates_new = df_dates.join(df_dim_date, on=["date"], how="left_anti")

  # Generate date IDs
  df_dates_new = df_dates_new.withColumn("date_id", monotonically_increasing_id())

  # Add new dates to dim_date
  df_dim_date = df_dim_date.union(df_dates_new)
  
  # Replace date column with date_id
  df_payments = df_payments.join(df_dim_date, on="date", how="left").drop("date", "day_of_week", "hour_of_day", "month", "quarter", "year")

# Convert column formats
df_dim_date = df_dim_date.withColumn("date", to_date("date"))

display(df_payments.take(10))

# Save dim_date to table
df_dim_date.write.format("delta").mode("overwrite").saveAsTable(f"gold_dim_date")

# Save payments to table
df_payments.write.format("delta").mode("overwrite").saveAsTable(f"gold_fact_payment")


payment_id,amount,rider_id,date_id
1064470,9.0,42106,30
1064467,9.0,42106,33
1064465,9.0,42106,34
1064469,9.0,42106,43
1064464,9.0,42106,50
1064468,9.0,42106,57
1064471,9.0,42106,61
1064463,9.0,42106,63
1064466,9.0,42106,84
1064462,9.0,42106,106


### Create trip fact table

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, unix_timestamp, round, col, dayofweek, hour, month, quarter, year, to_date

# Read trips from table
df_trips = spark.table("trip")
# Read riders from table
df_riders = spark.table("rider")
# Read dates from table
df_dim_date = spark.table("gold_dim_date")

# Calculate duration in minutes
df_trips = df_trips.withColumn("duration_in_minutes", round((unix_timestamp("ended_at") - unix_timestamp("started_at")) / 60, 2))
# Join with riders table
df_trips = df_trips.join(df_riders, "rider_id", "inner")
# Calculate rider age at trip start
df_trips = df_trips.withColumn("rider_age_at_trip_start", round((unix_timestamp("started_at") - unix_timestamp("birthday")) / 60 / 60/ 24 / 360, 0))
# Drop rider columns
df_trips = df_trips.drop("first", "last", "address", "birthday", "account_start_date", "account_end_date", "is_member")

# Iterate over date-like columns and replace it with date_id
for datelike in ["started_at", "ended_at"]:  
  # Rename datelike column to date
  df_trips = df_trips.withColumnRenamed(datelike, "date")
  
  # Extract distinct dates
  df_dates = df_trips.select("date").distinct()
  # Calculate day of the week
  df_dates = df_dates.withColumn("day_of_week", dayofweek("date"))
  # Calculate hour of the day
  df_dates = df_dates.withColumn("hour_of_day", hour("date"))
  # Calculate month
  df_dates = df_dates.withColumn("month", month("date"))
  # Calculate month
  df_dates = df_dates.withColumn("quarter", quarter("date"))
  # Calculate month
  df_dates = df_dates.withColumn("year", year("date"))
  
  # Identify dates that are not already present in dim_date table
  df_dates_new = df_dates.join(df_dim_date, on=["date"], how="left_anti")

  # Generate date IDs
  df_dates_new = df_dates_new.withColumn("date_id", monotonically_increasing_id())

  # Add new dates to dim_date
  df_dim_date = df_dim_date.union(df_dates_new)
  
  # Replace date column with date_id
  df_trips = df_trips.join(df_dim_date, on="date", how="left").drop("date", "day_of_week", "hour_of_day", "month", "quarter", "year")
  # Rename date ID column
  df_trips = df_trips.withColumnRenamed("date_id", f"{datelike}_date_id").drop("date_id")

# Convert date to date format
df_dim_date = df_dim_date.withColumn("date", to_date("date"))

display(df_trips.take(10))

# Save dim_date to table
df_dim_date.write.format("delta").mode("overwrite").saveAsTable(f"gold_dim_date")

# Save payments to table
df_trips.write.format("delta").mode("overwrite").saveAsTable(f"gold_fact_trip")

rider_id,trip_id,rideable_type,start_station_id,end_station_id,duration_in_minutes,rider_age_at_trip_start,started_at_date_id,ended_at_date_id
33902,5E98DA99CB0B52E4,classic_bike,13353.0,13242.0,25.32,38.0,17180040363,103191
70456,F6F309843C09CAAC,classic_bike,,,12.72,37.0,8590818167,207869
37747,ADFF32195521E952,classic_bike,13288.0,,8.12,20.0,17179932936,10220
16732,362087194BA4CC9A,classic_bike,,,22.95,20.0,717516,473701
45050,5B788004F8A5204C,classic_bike,13353.0,13242.0,24.03,29.0,34126,61443
19618,295476889D9B79F8,classic_bike,18003.0,18003.0,1.27,24.0,8590452269,545415
57068,21630F715038CCB0,classic_bike,,,17.37,48.0,592079,557441
6693,378F4AB323AA1D14,docked_bike,,,46.07,29.0,472219,320614
43342,A8E94BAECBF0C2DD,docked_bike,,,54.17,29.0,404130,2212499
71976,268A00298A05078B,classic_bike,,,12.92,34.0,17180656812,295583


### Create rider dimension table

In [None]:
from pyspark.sql.functions import when, current_timestamp, col, monotonically_increasing_id, dayofweek, hour, month, quarter, year

# Read rider from table
df_riders = spark.table("rider")

# Save riders to table
df_riders.write.format("delta").mode("overwrite").saveAsTable(f"gold_dim_rider")

### Create station dimension table

In [0]:
# Read stations from table
df_stations = spark.table("station")

# Save stations to table
df_stations.write.format("delta").mode("overwrite").saveAsTable(f"gold_dim_station")

## Business Questions

### Time spent per ride based on date and time factors such as day of week and time of day

In [0]:
from pyspark.sql.functions import round, avg

# Read trips from table
df_trips = spark.table("gold_fact_trip")
# Read dates from table
df_dates = spark.table("gold_dim_date")
# Join with date dimension
df_trips = df_trips.join(df_dates, df_trips.started_at_date_id == df_dates.date_id, "inner")

# Group by day of the week and aggregate average duration
df_grouped = df_trips.groupBy("day_of_week").agg(round(avg("duration_in_minutes"), 2).alias("avg_duration_in_minutes")).orderBy("day_of_week")

display(df_grouped)

day_of_week,avg_duration_in_minutes
1,21.01
2,21.77
3,20.58
4,20.51
5,20.99
6,20.69
7,20.58


In [0]:
from pyspark.sql.functions import round, avg

# Read trips from table
df_trips = spark.table("gold_fact_trip")
# Read dates from table
df_dates = spark.table("gold_dim_date")
# Join with date dimension
df_trips = df_trips.join(df_dates, df_trips.started_at_date_id == df_dates.date_id, "inner")

# Group by hour of day and aggregate average duration
df_grouped = df_trips.groupBy("hour_of_day").agg(round(avg("duration_in_minutes"), 2).alias("avg_duration_in_minutes")).orderBy("hour_of_day")

display(df_grouped)

hour_of_day,avg_duration_in_minutes
0,18.25
1,22.04
2,21.08
3,23.47
4,20.83
5,23.24
6,20.77
7,20.42
8,21.31
9,18.55



### Time spent per ride based on which station is the starting and / or ending station

In [0]:
from pyspark.sql.functions import round, avg, desc

# Read trips from table
df_trips = spark.table("gold_fact_trip")
# Drop rows without a start station
df_trips = df_trips.dropna(subset=["start_station_id"])

# Group by start station and aggregate average duration
df_grouped = df_trips.groupBy("start_station_id").agg(round(avg("duration_in_minutes"), 2).alias("avg_duration_in_minutes")).orderBy(desc("avg_duration_in_minutes"))

display(df_grouped.take(10))

start_station_id,avg_duration_in_minutes
556,538.04
665,535.64
587,409.7
564,353.39
16915,317.65
20205,308.64
20230,285.37
642,283.13
537,280.02
20211,266.13


In [0]:
from pyspark.sql.functions import round, avg, desc

# Read trips from table
df_trips = spark.table("gold_fact_trip")
# Drop rows without a start station
df_trips = df_trips.dropna(subset=["end_station_id"])

# Group by start station and aggregate average duration
df_grouped = df_trips.groupBy("end_station_id").agg(round(avg("duration_in_minutes"), 2).alias("avg_duration_in_minutes")).orderBy(desc("avg_duration_in_minutes"))

display(df_grouped.take(10))

end_station_id,avg_duration_in_minutes
16903,569.73
572,502.02
20110,376.76
20235,371.81
587,367.32
20204,316.16
20206,276.37
576,275.55
583,259.76
20116,245.5


### Time spent per ride based on age of the rider at time of the ride

In [0]:
from pyspark.sql.functions import hour, round, unix_timestamp, year, to_date, avg, desc

# Read trips from table
df_trips = spark.table("gold_fact_trip")

# Group by rider age and aggregate average duration
df_grouped = df_trips.groupBy("rider_age_at_trip_start").agg(round(avg("duration_in_minutes"), 2).alias("avg_duration_in_minutes")).orderBy("rider_age_at_trip_start")

display(df_grouped.take(10))

rider_age_at_trip_start,avg_duration_in_minutes
15.0,34.73
16.0,25.71
17.0,19.05
18.0,22.83
19.0,21.45
20.0,17.48
21.0,18.92
22.0,19.97
23.0,24.1
24.0,23.83


### Time spent per ride based on whether the rider is a member or a casual rider

In [0]:
from pyspark.sql.functions import hour, round, unix_timestamp, year, to_date, avg, desc

# Read trips from table
df_trips = spark.table("gold_fact_trip")
# Read riders from table
df_riders = spark.table("gold_dim_rider")

# Join with rider dimension
df_trips = df_trips.join(df_riders, "rider_id", "inner")

# Group by membership type and aggragate average duration
df_grouped = df_trips.groupBy("is_member").agg(round(avg("duration_in_minutes"), 2).alias("avg_duration_in_minutes"))

display(df_grouped)

is_member,avg_duration_in_minutes
True,21.3
False,21.2


### Money spent per month

In [0]:
from pyspark.sql.functions import sum

# Read payments from table
df_payments = spark.table("gold_fact_payment")
# Read dates from table
df_dates = spark.table("gold_dim_date")
# Join with date dimension
df_payments = df_payments.join(df_dates, "date_id", "inner")

# Group by month and aggregate total amount
df_grouped = df_payments.groupBy("month").agg(sum("amount").alias("money_spent")).orderBy("month")

display(df_grouped)

month,money_spent
1,1855960
2,1908071
3,1348931
4,1395922
5,1441363
6,1491380
7,1539179
8,40510690
9,1642068
10,1696403


### Money spent per quarter


In [0]:
from pyspark.sql.functions import sum

# Read payments from table
df_payments = spark.table("gold_fact_payment")
# Read dates from table
df_dates = spark.table("gold_dim_date")
# Join with date dimension
df_payments = df_payments.join(df_dates, "date_id", "inner")

# Group by quarter and aggregate total amount
df_grouped = df_payments.groupBy("quarter").agg(sum("amount").alias("money_spent")).orderBy("quarter")

display(df_grouped)

quarter,money_spent
1,5112962
2,4328665
3,43691937
4,5243748


### Money spent per year


In [0]:
from pyspark.sql.functions import sum

# Read payments from table
df_payments = spark.table("gold_fact_payment")
# Read dates from table
df_dates = spark.table("gold_dim_date")
# Join with date dimension
df_payments = df_payments.join(df_dates, "date_id", "inner")

# Group by year and aggregate total amount
df_grouped = df_payments.groupBy("year").agg(sum("amount").alias("money_spent")).orderBy("year")

display(df_grouped)

year,money_spent
2013,53705
2014,227451
2015,477229
2016,825176
2017,1308377
2018,2000331
2019,2979094
2020,4315927
2021,44999864
2022,1190158


### Money spent per member based on how many rides the rider averages per month

In [0]:
from pyspark.sql.functions import when, col, current_timestamp

# Read trips from table
df_trips = spark.table("gold_fact_trip")
# Group by rider and count trips
df_trips = df_trips.groupBy("rider_id").count().withColumnRenamed("count", "trip_count")

# Read payments from table
df_payments = spark.table("gold_fact_payment")
# Group by rider and sum up amount
df_payments = df_payments.groupBy("rider_id").agg(sum("amount").alias("money_spent"))

# Read riders from table
df_riders = spark.table("gold_dim_rider")
# Calculate account duration in months (use current date as end date if not set)
df_riders = df_riders.withColumn("account_duration_in_months", round((unix_timestamp(when(col("account_end_date").isNull(), current_timestamp()).otherwise(col("account_end_date"))) - unix_timestamp(col("account_start_date"))) / 60 / 60 / 24 / 30, 2))
# Join riders with trips and payments
df_riders = df_riders.join(df_trips, "rider_id", "inner").join(df_payments, "rider_id", "inner")
# Calculate money spent per monthly trips
df_riders = df_riders.withColumn("money_spent_per_trips_per_month", col("money_spent") / (col("trip_count") / col("account_duration_in_months")))
# Filter out non-members
df_riders = df_riders.filter(df_riders.is_member)
# Select colums
df_riders = df_riders.select("rider_id", "money_spent_per_trips_per_month")

display(df_riders.take(10))

rider_id,money_spent_per_trips_per_month
73683,8.20353982300885
51415,167.13000000000002
39285,149.42636363636362
53963,64.99858361774744
37251,191.49012605042017
46266,3.0796363636363635
4935,16.779071274298055
8086,3.210847975553858
5300,40.45869402985075
11858,29.33237288135593


### Money spent per member based on how many minutes the rider spends on a bike per month

In [0]:
from pyspark.sql.functions import when, col, current_timestamp, sum

# Read trips from table
df_trips = spark.table("gold_fact_trip")
# Group by rider and sum up duration
df_trips = df_trips.groupBy("rider_id").agg(sum("duration_in_minutes").alias("total_duration_in_minutes"))

# Read payments from table
df_payments = spark.table("gold_fact_payment")
# Group by rider and sum up amount
df_payments = df_payments.groupBy("rider_id").agg(sum("amount").alias("money_spent"))

# Read riders from table
df_riders = spark.table("gold_dim_rider")
# Calculate account duration in months (use current date as end date if not set)
df_riders = df_riders.withColumn("account_duration_in_months", round((unix_timestamp(when(col("account_end_date").isNull(), current_timestamp()).otherwise(col("account_end_date"))) - unix_timestamp(col("account_start_date"))) / 60 / 60 / 24 / 30, 2))
# Join riders with trips and payments
df_riders = df_riders.join(df_trips, "rider_id", "inner").join(df_payments, "rider_id", "inner")
# Calculate money spent per monthly trip duration
df_riders = df_riders.withColumn("money_spent_per_duration_per_month", col("money_spent") / (col("total_duration_in_minutes") / col("account_duration_in_months")))
# Filter out non-members
df_riders = df_riders.filter(col('is_member') == True)
# Select colums
df_riders = df_riders.select("rider_id", "money_spent_per_duration_per_month")

display(df_riders.take(10))

rider_id,money_spent_per_duration_per_month
73683,0.5566848725239684
51415,5.100091547146781
39285,6.307566798158312
53963,2.524422614388529
37251,4.521088323350324
46266,0.1433407240661442
4935,1.015854937456358
8086,0.0576345007279389
5300,2.4512216625824013
11858,2.128540680154972
