## Transformation Step

## 00_dim_riders

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
SILVER_TABLE = "divvy.dim_riders"

spark.sql(f"DROP TABLE IF EXISTS {SILVER_TABLE};")


# COMMAND ----------

df = spark.table("divvy.bronze_riders")

df = df.withColumn("ride_id", df.rider_id.cast("int"))
df = df.withColumn("birthday", df.birthday.cast("date"))
df = df.withColumn("account_start_date", df.account_start_date.cast("date"))
df = df.withColumn("account_end_date", df.account_end_date.cast("date"))
df = df.withColumn("is_member", df.is_member.cast("boolean"))

df.write.format("delta").mode("overwrite").saveAsTable(SILVER_TABLE)


## 01_dim_stations

In [None]:
SILVER_TABLE = "divvy.dim_stations"

spark.sql(f"DROP TABLE IF EXISTS {SILVER_TABLE};")


# COMMAND ----------

df = spark.table("divvy.bronze_stations")

df = df.withColumn("latitute", df.latitute.cast("float"))
df = df.withColumn("longitude", df.longitude.cast("float"))

df.write.format("delta").mode("overwrite").saveAsTable(SILVER_TABLE)


## 02_dim_date

In [None]:
from pyspark.sql.functions import explode, sequence, to_date

SILVER_TABLE_NAME = "divvy.dim_date"

spark.sql(f"DROP TABLE IF EXISTS {SILVER_TABLE_NAME};")


# COMMAND ----------

beginDate = "2015-01-01"
endDate = "2030-12-31"

spark.sql(
    f"select explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as date"
).createOrReplaceTempView("dates")


# COMMAND ----------

spark.sql(
    """
create or replace table divvy.dim_date
using delta
as select
  year(date) * 10000 + month(date) * 100 + day(date) as date_int,
  date,
  year(date) AS year,
  date_format(date, 'MMMM') as calendar_month,
  month(date) as month,
  date_format(date, 'EEEE') as calendar_day,
  dayofweek(date) as day_of_week,
  weekday(date) + 1 as day_of_week_start_monday,
  case
    when weekday(date) < 5 then 'Y'
    else 'N'
  end as is_week_day,
  dayofmonth(date) as day_of_month,
  case
    when date = last_day(date) then 'Y'
    else 'N'
  end as is_last_day_of_month,
  dayofyear(date) as day_of_year,
  weekofyear(date) as week_of_year_iso,
  quarter(date) as quarter_of_year
from
  dates
"""
)

spark.sql("optimize divvy.dim_date zorder by (date)")


## 03_fact_trips

In [None]:
# Databricks notebook source
from pyspark.sql.functions import months_between, col

SILVER_TABLE = "divvy.fact_trips"

spark.sql(f"DROP TABLE IF EXISTS {SILVER_TABLE};")

# COMMAND ----------

riders_df = spark.table("divvy.dim_riders").select(
    col("rider_id").alias("rider_rider_id"),
    col("birthday"),
)
trips_df = spark.table("divvy.bronze_trips")

trips_df = trips_df.withColumn("start_at", trips_df.start_at.cast("timestamp"))
trips_df = trips_df.withColumn("ended_at", trips_df.ended_at.cast("timestamp"))
trips_df = trips_df.withColumn("start_at_date", trips_df.start_at.cast("date"))
trips_df = trips_df.withColumn("ended_at_date", trips_df.ended_at.cast("date"))
trips_df = trips_df.withColumn("rider_id", trips_df.rider_id.cast("int"))
trips_df = trips_df.withColumn(
    "time_spent", trips_df.ended_at.cast("long") - trips_df.start_at.cast("long")
)


# COMMAND ----------

joined_df = trips_df.join(
    riders_df, trips_df.rider_id == riders_df.rider_rider_id, "inner"
)
joined_df = joined_df.withColumn(
    "rider_age_at_time",
    (months_between(trips_df.start_at, joined_df.birthday) / 12).cast("int"),
)
joined_df.drop(
    "rider_rider_id",
)

joined_df.write.format("delta").mode("overwrite").saveAsTable(SILVER_TABLE)


## 04_fact_payments

In [None]:
from pyspark.sql.functions import months_between, col

SILVER_TABLE = "divvy.fact_payments"

spark.sql(f"DROP TABLE IF EXISTS {SILVER_TABLE};")

# COMMAND ----------

riders_df = spark.table("divvy.dim_riders").select(
    col("rider_id").alias("rider_rider_id"), col("birthday"), col("account_start_date")
)
payments_df = spark.table("divvy.bronze_payments")

payments_df = payments_df.withColumn("payment_id", payments_df.payment_id.cast("int"))
payments_df = payments_df.withColumn("date", payments_df.date.cast("date"))
payments_df = payments_df.withColumn("amount", payments_df.amount.cast("decimal"))
payments_df = payments_df.withColumn("rider_id", payments_df.rider_id.cast("int"))

joined_df = payments_df.join(
    riders_df, payments_df.rider_id == riders_df.rider_rider_id, "inner"
)
joined_df = joined_df.withColumn(
    "rider_age_account_start",
    (months_between(joined_df.account_start_date, joined_df.birthday) / 12).cast("int"),
)
joined_df.drop("rider_rider_id", "birthday", "account_start_date")

joined_df.write.format("delta").mode("overwrite").saveAsTable(SILVER_TABLE)
