# Udacity Project: Divvy Bikeshare dataset

In this project, you'll build a data lake solution for Divvy bikeshare.

Divvy is a bike sharing program in Chicago, Illinois USA that allows riders to purchase a pass at a kiosk or use a mobile application to unlock a bike at stations around the city and use the bike for a specified amount of time. The bikes can be returned to the same station or to another station. The City of Chicago makes the anonymized bike trip data publicly available for projects like this where we can analyze the data.

Since the data from Divvy are anonymous, we have generated fake rider and account profiles along with fake payment data to go along with the data from Divvy.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType, DateType, TimestampType

spark = SparkSession.builder.appName("Divvy Bikeshare Dataset").getOrCreate()

# Extract Step
The notebook should contain Python code to extract information from CSV files stored in Databricks and write it to the Delta file system.

## Payments

In [None]:
schema = StructType([
    StructField("payment_id", IntegerType(), nullable=False),
    StructField("date", DateType(), nullable=False),
    StructField("amount", DoubleType(), nullable=False),
    StructField("account_number", IntegerType(), nullable=False)
])
payments_df = spark.read.csv("/FileStore/divvy/payments.csv", schema=schema)
payments_df.show()
payments_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/delta/payments")

## Riders

In [None]:
schema = StructType([
    StructField("rider_id", IntegerType(), nullable=False),
    StructField("first", StringType(), nullable=False),
    StructField("last", StringType(), nullable=False),
    StructField("address", StringType(), nullable=False),
    StructField("birthday", DateType(), nullable=False),
    StructField("account_start_date", DateType(), nullable=False),
    StructField("account_end_date", DateType(), nullable=True),
    StructField("is_member", StringType(), nullable=False)
])
riders_df = spark.read.format("csv").load("/FileStore/divvy/riders.csv", schema=schema)
riders_df.show()
riders_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/delta/riders")

## Stations

In [None]:
schema = StructType([
    StructField("station_key", StringType(), nullable=False),
    StructField("name", StringType(), nullable=False),
    StructField("lat", DoubleType(), nullable=False),
    StructField("long", DoubleType(), nullable=False)
])
stations_df = spark.read.format("csv").load("/FileStore/divvy/stations.csv", schema=schema)
stations_df.show()
stations_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/delta/stations")

## Trips

In [None]:
schema = StructType([
    StructField("trip_id", StringType(), nullable=False),
    StructField("rideable_type", StringType(), nullable=False),
    StructField("start_at", TimestampType(), nullable=False),
    StructField("ended_at", TimestampType(), nullable=False),
    StructField("start_station_id", StringType(), nullable=False),
    StructField("end_station_id", StringType(), nullable=False),
    StructField("rider_id", IntegerType(), nullable=False)
])
trips_df = spark.read.format("csv").load("/FileStore/divvy/trips.csv", schema=schema)
trips_df.show()
trips_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/delta/trips")

# Load step
The notebook should contain code that creates tables and loads data from Delta files. The learner should use spark.sql statements to create the tables and then load data from the files that were extracted in the Extract step.

**NOTE**: table schema's are already defined during extract step (when writing to Delta lake above)

## Payment table


In [None]:
_ = spark.sql("""
    DROP TABLE IF EXISTS staging_payment;
""")
_ = spark.sql("""
    CREATE TABLE staging_payment
    USING DELTA LOCATION "/delta/payments";
""")

## Rider table

In [None]:
_ = spark.sql("""
    DROP TABLE IF EXISTS staging_rider;
""")
_ = spark.sql("""
    CREATE TABLE staging_rider
    USING DELTA LOCATION "/delta/riders";
""")

## Station table

In [None]:
_ = spark.sql("""
    DROP TABLE IF EXISTS staging_station;
""")
_ = spark.sql("""
    CREATE TABLE staging_station
    USING DELTA LOCATION "/delta/stations";
""")

## Trip table

In [None]:
_ = spark.sql("""
    DROP TABLE IF EXISTS staging_trip;
""")
_ = spark.sql("""
    CREATE TABLE staging_trip
    USING DELTA LOCATION "/delta/trips";
""")

# Transform Step

- The transform scripts should at minimum adhere to the following: should write to delta; should use overwrite mode; save as a table in delta.
- The dimension Python scripts should match the schema diagram. Dimensions should generate appropriate keys and should not contain facts.
- The fact table Python scripts should contain appropriate keys from the dimensions. In addition, the fact table scripts should appropriately generate the correct facts based on the diagrams provided in the first step.

## Dimension tables

### Date Dimension

In [None]:
date_df = spark.sql("""
    SELECT
        DISTINCT(CAST(date_format(d,'yyyyMMdd') AS INT)) AS date_key,
        d AS date_date,
        EXTRACT(YEAR FROM d) AS year,
        EXTRACT(QUARTER FROM d) AS quarter,
        EXTRACT(MONTH FROM d) AS month,
        EXTRACT(WEEK FROM d) AS week_of_year,
        EXTRACT(DAYOFWEEK FROM d) AS weekday,
        CASE 
            WHEN weekday(d) IN (5, 6) THEN 1
            ELSE 0
        END AS is_weekend
    FROM (
        SELECT TRY_CAST(date AS DATE) AS d FROM staging_payment
        UNION ALL
        SELECT TRY_CAST(account_start_date AS DATE) FROM staging_rider
        UNION ALL
        SELECT TRY_CAST(account_end_date AS DATE) FROM staging_rider
        UNION ALL
        SELECT TRY_CAST(birthday AS DATE) FROM staging_rider
        UNION ALL
        SELECT TRY_CAST(start_at AS DATE) FROM staging_trip
        UNION ALL
        SELECT TRY_CAST(ended_at AS DATE) FROM staging_trip
    ) t
    WHERE d IS NOT NULL
""")

date_df.write.format("delta").mode("overwrite").saveAsTable("dim_date")

### Rider Dimension

In [None]:
rider_df = spark.sql("""
    SELECT
        ROW_NUMBER() OVER (ORDER BY TRY_CAST(rider_id AS INT)) AS rider_id,
        rider_id AS rider_key,
        first AS first,
        last AS last,
        address AS address,
        birthday AS birthday_date,
        CAST(date_format(TRY_CAST(account_start_date AS DATE),'yyyyMMdd') AS INT) AS account_start_date_key,
        CAST(date_format(TRY_CAST(account_end_date AS DATE),'yyyyMMdd') AS INT) AS account_end_date_key,
        CASE WHEN LOWER(TRIM(is_member)) IN ('1','true','yes','y') THEN 1 ELSE 0 END AS is_member,
        -- age at account start (years); returns NULL if birthday or account_start_date invalid
        CASE 
        WHEN TRY_CAST(birthday AS DATE) IS NOT NULL AND TRY_CAST(account_start_date AS DATE) IS NOT NULL
        THEN DATEDIFF(year, TRY_CAST(birthday AS DATE), TRY_CAST(account_start_date AS DATE))
            ELSE NULL 
        END AS rider_age_at_account_start
    FROM staging_rider
""")

rider_df.write.format("delta").mode("overwrite").saveAsTable("dim_rider")

### Time dimension

In [None]:
time_df = spark.sql("""
    SELECT
        DISTINCT
        hour(dts) * 60 + minute(dts) AS time_key,
        hour(dts) AS hour,
        minute(dts) AS minute,
        CASE 
            WHEN hour(dts) BETWEEN 5 AND 11 THEN 'morning' 
            WHEN hour(dts) BETWEEN 12 AND 16 THEN 'afternoon' 
            WHEN hour(dts) BETWEEN 17 AND 20 THEN 'evening' 
            ELSE 'night'
        END AS time_of_day,
        CASE 
            WHEN hour(dts) IN (7,8,16,17) THEN 1 
            ELSE 0 
        END AS is_rush_hour
    FROM (
        SELECT TRY_CAST(start_at AS timestamp) AS dts FROM staging_trip
        UNION ALL
        SELECT TRY_CAST(ended_at AS timestamp) FROM staging_trip
    ) t
    WHERE dts IS NOT NULL
""")

date_df.write.format("delta").mode("overwrite").saveAsTable("dim_time")

### Station Dimension

In [None]:
station_df = spark.sql("""
    SELECT
        ROW_NUMBER() OVER (ORDER BY station_key) AS station_id,
        station_key AS station_key,
        name AS name,
        lat AS lat,
        long as long
    FROM staging_station
""")

station_df.write.format("delta").mode("overwrite").saveAsTable("dim_station")

## Fact tables

### Trip fact

In [None]:

trip_df = spark.sql("""
    SELECT
        st.trip_id AS trip_id,
        dr.rider_key AS rider_key,
        dd_start_date.date_key AS start_date_key,
        dt_start_time.time_key AS start_time_key,
        dd_end_date.date_key AS end_date_key,
        dt_end_time.time_key AS end_time_key,
        DATEDIFF(minute, TRY_CAST(st.start_at AS TIMESTAMP), TRY_CAST(st.ended_at AS TIMESTAMP)) AS trip_duration_minutes,
        ss.station_key AS start_station_key, 
        ss.station_key AS end_station_key, 
        DATEDIFF(year,dr.birthday_date,TRY_CAST(st.start_at AS TIMESTAMP)) AS rider_age_at_trip_start
    FROM staging_trip AS st
    LEFT JOIN dim_date AS dd_start_date
        ON CAST(date_format(TRY_CAST(st.start_at AS TIMESTAMP), 'yyyyMMdd') AS INT) = dd_start_date.date_key
    LEFT JOIN dim_time AS dt_start_time
        ON EXTRACT(hour FROM TRY_CAST(st.start_at AS TIMESTAMP)) * 60 + EXTRACT(minute FROM TRY_CAST(st.start_at AS TIMESTAMP)) = dt_start_time.time_key
    LEFT JOIN dim_date AS dd_end_date
        ON CAST(date_format(TRY_CAST(st.ended_at AS TIMESTAMP), 'yyyyMMdd') AS INT) = dd_end_date.date_key
    LEFT JOIN dim_time AS dt_end_time
        ON EXTRACT(hour FROM TRY_CAST(st.ended_at AS TIMESTAMP)) * 60 + EXTRACT(minute FROM TRY_CAST(st.ended_at AS TIMESTAMP)) = dt_end_time.time_key
    LEFT JOIN dim_rider AS dr
        ON st.rider_id = dr.rider_key
    LEFT JOIN dim_station AS ss
        ON st.start_station_id = ss.station_key
""")

station_df.write.format("delta").mode("overwrite").saveAsTable("fact_trip")

### Payment fact

In [None]:
payment_df = spark.sql("""
    SELECT
        sp.payment_id AS payment_id,
        dd.date_key AS date_key,
        sp.amount AS amount,
        dr.rider_key AS rider_sk
    FROM staging_payment AS sp
    LEFT JOIN dim_date AS dd
        ON CAST(date_format(TRY_CAST(sp.date AS DATE), 'yyyyMMdd') AS INT) = dd.date_key
    LEFT JOIN dim_rider AS dr
      ON sp.account_number = dr.rider_key;
""")

station_df.write.format("delta").mode("overwrite").saveAsTable("fact_payment")