# Data Extraction
Load data from csv files which are stored in DBFS.

In [0]:
payment_df = spark.read.format('csv').option('sep', ',').load('/FileStore/divvy/payments.csv')
trip_df = spark.read.format('csv').option('sep', ',').load('/FileStore/divvy/trips.csv')
rider_df = spark.read.format('csv').option('sep', ',').load('/FileStore/divvy/riders.csv')
station_df = spark.read.format('csv').option('sep', ',').load('/FileStore/divvy/stations.csv')
display(trip_df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6
89E7AA6C29227EFF,classic_bike,2021-02-12 16:14:56,2021-02-12 16:21:43,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24 15:43:33,2021-02-24 15:49:05,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01 17:47:42,2021-02-01 17:48:33,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11 18:33:53,2021-02-11 18:35:09,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27 15:13:39,2021-02-27 15:36:36,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20 08:59:42,2021-02-20 09:17:04,KP1705001026,KP1705001026,57068


# Workspace Cleanup
Recursively remove the data from delta lake and drop the associated tables if they are exist.

In [0]:
dbutils.fs.rm('/delta/divvy', recurse=True)

spark.sql("DROP TABLE IF EXISTS payments")
spark.sql("DROP TABLE IF EXISTS trips")
spark.sql("DROP TABLE IF EXISTS riders")
spark.sql("DROP TABLE IF EXISTS stations")
spark.sql("DROP TABLE IF EXISTS trip_dates")
spark.sql("DROP TABLE IF EXISTS payment_dates")

Out[16]: DataFrame[]

# Delta Lake Creation
Save the csv files to the `/delta/divvy` delta lake using overwrite mode. After that, create tables from the location of delta lake.

In [0]:
payment_df.write.format('delta').mode('overwrite').save('/delta/divvy/payments')
trip_df.write.format('delta').mode('overwrite').save('/delta/divvy/trips')
rider_df.write.format('delta').mode('overwrite').save('/delta/divvy/riders')
station_df.write.format('delta').mode('overwrite').save('/delta/divvy/stations')

In [0]:
spark.sql("CREATE TABLE payments USING DELTA LOCATION '/delta/divvy/payments'")
spark.sql("CREATE TABLE trips USING DELTA LOCATION '/delta/divvy/trips'")
spark.sql("CREATE TABLE riders USING DELTA LOCATION '/delta/divvy/riders'")
spark.sql("CREATE TABLE stations USING DELTA LOCATION '/delta/divvy/stations'")

Out[18]: DataFrame[]

# Star Schema Design
Generate fact and dimension tables. Create columns which help to query effectively. Designing the tables for the <a href="https://adb-29201057511283.3.azuredatabricks.net/?o=29201057511283#notebook/2129164476750309/command/4298484073601913">business questions</a>. Relational ERD for the Divvy Bikeshare Dataset is the following:
![ERD](/files/project_erd.jpeg)

## Helper Functions
Rename the columns and change the column type according to the ERD diagram.

In [0]:
from pyspark.sql.functions import col, cast


def rename_column(table_path, column_dict):
    df = spark.read.table(table_path)
    for fcol, tcol in column_dict.items():
        df = df.withColumnRenamed(fcol, tcol)

    df.write.format("delta").mode("overwrite").option("overwriteSchema", True).save('/delta/divvy/' + table_path)

def change_column_type(table_path, type_dict):
    df = spark.read.table(table_path)
    for c, t in type_dict.items():
        df = df.withColumn(c, col(c).cast(t))

    df.write.format("delta").mode("overwrite").option("overwriteSchema", True).save('/delta/divvy/' + table_path)

## Data Formatting
Rename the column names and change the column types.

In [0]:
payment_columns = {'_c0':'payment_id', '_c1':'date_id', '_c2':'amount', '_c3':'rider_id'}
payment_types = {'payment_id':'int', 'amount':'decimal', 'date_id':'date', 'rider_id':'int'}

trip_columns = {'_c0':'trip_id', '_c1':'rideable_type', '_c2':'started_at', '_c3':'ended_at', '_c4':'start_station_id', '_c5':'end_station_id', '_c6':'rider_id'}
trip_types = {'trip_id':'string', 'rideable_type':'string', 'started_at':'timestamp', 'ended_at':'timestamp', 'start_station_id':'int', 'end_station_id':'int', 'rider_id':'int'}

rider_columns = {'_c0':'rider_id', '_c1':'first', '_c2':'last', '_c3':'address', '_c4':'birthday', '_c5':'account_start_date', '_c6':'account_end_date', '_c7':'is_member'}
rider_types = {'rider_id':'int', 'first':'string', 'last':'string', 'address':'string', 'birthday':'date', 'account_start_date':'date', 'account_end_date':'date', 'is_member':'boolean'}

station_columns = {'_c0':'station_id', '_c1':'name', '_c2':'latitude', '_c3':'longitude'}
station_types = {'station_id':'string', 'name':'string', 'latitude':'float', 'longitude':'float'}

rename_column('riders', rider_columns)
change_column_type('riders', rider_types)

rename_column('stations', station_columns)
change_column_type('stations', station_types)

rename_column('trips', trip_columns)
change_column_type('trips', trip_types)

rename_column('payments', payment_columns)
change_column_type('payments', payment_types)

Create additional columns considering the business outcome questions

In [0]:
from pyspark.sql.functions import date_trunc, datediff, to_date

# Calculate the trip duration in seconds
trip_df = spark.read.table('trips')
trip_df = trip_df.withColumn("duration", (col("ended_at") - col("started_at")).cast("long"))\
                .withColumn("time_id", date_trunc("hour", col("started_at")))

# Calculate the age at account start date in year
rider_df = spark.read.table('riders')
rider_df = rider_df.withColumn("age_at_account_start", (datediff(rider_df.account_start_date, rider_df.birthday)/365).cast("int"))
rider_df.write.format("delta").mode("overwrite").option("overwriteSchema", True).save('/delta/divvy/riders')

# Store all columns from riders table except rider_id
rider_columns = rider_df.columns
rider_columns.remove('rider_id')

# Calculate the age of the rider at time of the ride
trip_df = trip_df.join(rider_df, 'rider_id')\
    .withColumn("age_at_ride_time", (datediff(to_date(col("started_at")), col("birthday"))/365).cast("int"))\
    .drop(*rider_columns)

# Reorder the columns in the following strucutre: Primary Key -> Fields -> Foreign Key(s)
trip_df.select('trip_id', 'duration', 'rideable_type', 'age_at_ride_time', 'started_at', 'ended_at', 'start_station_id', 'end_station_id', 'time_id', 'rider_id')\
    .write.format("delta").mode("overwrite").option("overwriteSchema", True).save('/delta/divvy/trips')

payment_df = spark.read.table('payments')
payment_df.select('payment_id', 'amount', 'date_id', 'rider_id').write.format("delta").mode("overwrite").option("overwriteSchema", True).save('/delta/divvy/payments')

## Generate Date Dimensions
Seperate date table dimension will be created for the payment and for the trip. Because tables differ from their time granurality.
- Trip date dimension will be generated hourly due to the interest of time of the day (morning, afternoon, night, evening) information.
- Payment date dimension will be generated daily due to the interest of money spending per month, quarter, year.

In [0]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import explode, sequence, to_timestamp

# Retrieve the maximum and minimum dates from payment and trip table
payment_min_date, payment_max_date = payment_df.select(min('date_id'), max('date_id')).first()
trip_min_date, trip_max_date = trip_df.select(min('time_id'), max('time_id')).first()

print(trip_min_date, ' ', trip_max_date)
print(payment_min_date, ' ', payment_max_date)

# Generate temporary date views: one of the view is generated daily, the other view is generated hourly
spark.sql(f"SELECT explode(sequence(to_date('{payment_min_date}'), to_date('{payment_max_date}'), INTERVAL 1 DAY)) AS date").createOrReplaceTempView('payment_dates_view')
spark.sql(f"SELECT explode(sequence(to_timestamp('{trip_min_date}'), to_timestamp('{trip_max_date}'), INTERVAL 1 HOUR)) AS time").createOrReplaceTempView('trip_dates_view')

2021-02-01 01:00:00   2022-01-31 23:00:00
2013-02-01   2022-02-01


In [0]:
%sql SELECT * FROM trip_dates_view

time
2021-02-01T01:00:00.000+0000
2021-02-01T02:00:00.000+0000
2021-02-01T03:00:00.000+0000
2021-02-01T04:00:00.000+0000
2021-02-01T05:00:00.000+0000
2021-02-01T06:00:00.000+0000
2021-02-01T07:00:00.000+0000
2021-02-01T08:00:00.000+0000
2021-02-01T09:00:00.000+0000
2021-02-01T10:00:00.000+0000


In [0]:
trip_dates = spark.sql("""
SELECT
    time as time_id,
    dayofweek(time) AS day_of_week,
    CASE WHEN date_format(time, 'H') BETWEEN 5 AND 11 THEN 'morning'
        WHEN date_format(time, 'H') BETWEEN 12 AND 16 THEN 'afternoon'
        WHEN date_format(time, 'H') BETWEEN 17 AND 21 THEN 'evening'
        ELSE 'night'
     END AS time_of_day
FROM trip_dates_view
ORDER BY time
""")

# Save to delta lake and create table using delta location
trip_dates.write.format('delta').mode('overwrite').save('/delta/divvy/trip_dates')
spark.sql("CREATE TABLE trip_dates USING DELTA LOCATION '/delta/divvy/trip_dates'")

Out[32]: DataFrame[]

In [0]:
payment_dates = spark.sql("""
SELECT
    date AS date_id,
    INT(date_format(date, 'M')) AS month,
    INT(date_format(date, 'Q')) AS quarter,
    INT(date_format(date, 'y')) as year
FROM payment_dates_view
ORDER BY date
""")

# Save to delta lake and create table using delta location
payment_dates.write.format('delta').mode('overwrite').save('/delta/divvy/payment_dates')
spark.sql("CREATE TABLE payment_dates USING DELTA LOCATION '/delta/divvy/payment_dates'")

Out[33]: DataFrame[]

- ~~Update documentation~~
- ~~Implement star schema~~
- ~~Create date_id~~
- ~~Rearrange columns (PK, fields, FKs)~~
- Query buisiness question
- Generate star schema PDF

# Buisiness Questions

Answering the buisiness questions.
- Analyze how much time is spent per ride
  * Based on date and time factors such as day of week and time of day
  * Based on which station is the starting and / or ending station
  * Based on age of the rider at time of the ride
  * Based on whether the rider is a member or a casual rider
- Analyze how much money is spent
  * Per month, quarter, year
  * Per member, based on the age of the rider at account start
- EXTRA CREDIT - Analyze how much money is spent per member
  * Based on how many rides the rider averages per month
  * Based on how many minutes the rider spends on a bike per month

In [0]:
# Load the fact and dimension tables
payment_df = spark.read.table('payments')
trip_df = spark.read.table('trips')

rider_df = spark.read.table('riders')
station_df = spark.read.table('stations')
trip_date_df = spark.read.table('trip_dates')
payment_date_df = spark.read.table('payment_dates')

In [0]:
trip_df.join(trip_date_df, 'time_id').display()

time_id,trip_id,duration,rideable_type,age_at_ride_time,started_at,ended_at,start_station_id,end_station_id,rider_id,day_of_week,time_of_day
2021-02-12T16:00:00.000+0000,89E7AA6C29227EFF,407,classic_bike,37,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525.0,660.0,71934,6,afternoon
2021-02-14T17:00:00.000+0000,0FEFDE2603568365,1171,classic_bike,38,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525.0,16806.0,47854,1,evening
2021-02-09T19:00:00.000+0000,E6159D746B2DBB91,532,electric_bike,33,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,,,70870,3,evening
2021-02-02T17:00:00.000+0000,B32D3199F1C2E75B,265,classic_bike,19,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637.0,,58974,3,evening
2021-02-23T15:00:00.000+0000,83E463F23575F4BF,914,electric_bike,71,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216.0,,39608,3,afternoon
2021-02-24T15:00:00.000+0000,BDAA7E3494E8D545,332,electric_bike,27,2021-02-24T15:43:33.000+0000,2021-02-24T15:49:05.000+0000,18003.0,,36267,4,afternoon
2021-02-01T17:00:00.000+0000,A772742351171257,51,classic_bike,33,2021-02-01T17:47:42.000+0000,2021-02-01T17:48:33.000+0000,,,50104,2,evening
2021-02-11T18:00:00.000+0000,295476889D9B79F8,76,classic_bike,24,2021-02-11T18:33:53.000+0000,2021-02-11T18:35:09.000+0000,18003.0,18003.0,19618,5,evening
2021-02-27T15:00:00.000+0000,362087194BA4CC9A,1377,classic_bike,19,2021-02-27T15:13:39.000+0000,2021-02-27T15:36:36.000+0000,,,16732,7,afternoon
2021-02-20T08:00:00.000+0000,21630F715038CCB0,1042,classic_bike,47,2021-02-20T08:59:42.000+0000,2021-02-20T09:17:04.000+0000,,,57068,7,morning


# References

* Drop delta table: https://stackoverflow.com/a/59042576/10721627
* Rename column name: https://stackoverflow.com/a/67236170/10721627
* Change column type: https://stackoverflow.com/a/74391411/10721627 
* Date difference: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.datediff.html
* Cast string to timestamp: https://stackoverflow.com/a/52369875/10721627
* Simple string name for types: https://stackoverflow.com/a/32286450/10721627
* Generate date dimension: https://www.bluegranite.com/blog/generate-a-calendar-dimension-in-spark
* Datetime patterns for formatting: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
* Join dataframes: https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html
* Rearrange columns: https://stackoverflow.com/a/42913947/10721627