In [0]:
df_station = spark.read.table("station")

dim_station = (df_station
               .select("station_id","name","latitude","longitude")
               )

# To persist the result as a Delta table in Databricks
dim_station.write.format("delta").mode("overwrite").saveAsTable("dim_station")

In [0]:
from pyspark.sql.functions import md5, concat_ws, year, month, dayofmonth, dayofweek

df_payment = spark.read.table("payment")
dim_date = df_payment.select(
    md5(concat_ws("", df_payment["date"].cast("string"))).alias("date_id"),
    df_payment["date"],
    year("date").alias("year"),
    ((month("date") + 2) / 3).cast("int").alias("quarter"),
    month("date").alias("month"),
    dayofmonth("date").alias("day"),
    dayofweek("date").alias("weekday")
)

dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")

In [0]:
from pyspark.sql.functions import datediff, year, current_date, sum, col

df_rider = spark.read.table("rider")
df_payment = spark.read.table("payment")
df_trip = spark.read.table("trip")

# Join the DataFrames
joined_df = (df_rider
            .join(df_payment, "rider_id")
            .join(df_trip, "rider_id"))

# Perform the transformations
dim_rider = (joined_df
            .groupBy(
                df_rider.rider_id,
                df_rider["address"],
                df_rider["first"],
                df_rider["last"],
                df_rider.birthday,
                df_rider.account_start_date,
                df_rider.account_end_date,
                df_rider.is_member)
            .agg(
                sum(df_payment.amount).alias("total_payed"),
                sum(datediff(df_trip.started_at, df_trip.ended_at)).alias("total_duration"),
                (datediff(current_date(), df_rider.birthday) / 365).alias("age_start_account")
            )
)
dim_rider.write.format("delta").mode("overwrite").saveAsTable("dim_rider")

In [0]:
from pyspark.sql.functions import datediff, minute, hour, md5, concat_ws

df_trip = spark.read.table("trip")

fact_trip = df_trip.select(
    col("trip_id"),
    col("rider_id"),
    col("start_station_id"),
    col("end_station_id"),
    (datediff(col("ended_at"), col("started_at")) * 24 * 60 + 
     (hour(col("ended_at")) - hour(col("started_at"))) * 60 + 
     (minute(col("ended_at")) - minute(col("started_at")))).alias("duration_minutes"),
    md5(concat_ws("", col("started_at").cast("string"))).alias("date_start_id"),
    md5(concat_ws("", col("ended_at").cast("string"))).alias("date_end_id")
)
fact_trip.write.format("delta").mode("overwrite").saveAsTable("fact_trip")

In [0]:
from pyspark.sql.functions import md5, concat_ws

df_payment = spark.read.table("payment")

fact_payment = df_payment.select(
    col("payment_id"),
    col("rider_id"),
    md5(concat_ws("", col("date").cast("string"))).alias("date_id"),
    col("amount")
)
fact_payment.write.format("delta").mode("overwrite").saveAsTable("fact_payment")

In [0]:
spark.sql("SELECT * FROM dim_date LIMIT 5").show()

+--------------------+-------------------+----+-------+-----+---+-------+
|             date_id|               date|year|quarter|month|day|weekday|
+--------------------+-------------------+----+-------+-----+---+-------+
|a772a0d1a8bd3662c...|2019-05-01 00:00:00|2019|      2|    5|  1|      4|
|553969c5cf601b52d...|2019-06-01 00:00:00|2019|      2|    6|  1|      7|
|c4149503a2ab5a01f...|2019-07-01 00:00:00|2019|      3|    7|  1|      2|
|8d5503db9e4042bcf...|2019-08-01 00:00:00|2019|      3|    8|  1|      5|
|c027f06afee66571b...|2019-09-01 00:00:00|2019|      3|    9|  1|      1|
+--------------------+-------------------+----+-------+-----+---+-------+

