In [0]:
from pyspark.sql.functions import col, date_format, min, max, unix_timestamp, datediff

from pyspark.sql import functions as f
from pyspark.sql.types import StringType

In [0]:
# define names
delta_names = ["payments", "riders", "stations", "trips"]

In [0]:
# read delta files into dataframes
for name in delta_names:
    globals()[f"df_{name}"] = spark.read.format("delta") \
        .load(f"/FileStore/delta/bronze/bronze_{name}")

In [0]:
# manipulation of df_payments

# create date_id string
df_payments = df_payments.withColumnRenamed("date", "date_id") \
                         .withColumn("date_id", date_format(col("date_id"), "yyyyMMdd"))

df_payments.createOrReplaceTempView(f"temp_payments")


In [0]:
# manipulation of df_rider

df_riders = df_riders.withColumnRenamed("first", "rider_firstname") \
        .withColumnRenamed("last", "rider_lastname") \
        .withColumnRenamed("address", "rider_address") \
        .withColumnRenamed("birthday", "rider_birthday") \
        .withColumnRenamed("account_start_date", "rider_account_start") \
        .withColumnRenamed("account_end_date", "rider_account_end") \
        .withColumnRenamed("is_member", "rider_member")

df_riders.createOrReplaceTempView(f"temp_riders")

In [0]:
# manipulation of df_stations

df_stations = df_stations.withColumnRenamed("name", "station_name") \
        .withColumnRenamed("latitude", "station_latitude") \
        .withColumnRenamed("longitude", "station_longitude")

df_stations.createOrReplaceTempView(f"temp_stations")

In [0]:
# manipulation of df_trips

# minimum and maximum date
min_date = df_trips.select(min("started_at")).collect()[0][0]
max_date = df_trips.select(max("started_at")).collect()[0][0]

df_trips = df_trips.withColumn("date_id", date_format(col("started_at"), "yyyyMMdd")) \
        .withColumnRenamed("rideable_type", "trip_rideable_type") \
        .withColumnRenamed("started_at", "trip_start") \
        .withColumnRenamed("ended_at", "trip_end") \
        .withColumn("trip_duration", 
                               (unix_timestamp(col("trip_end")) - unix_timestamp(col("trip_start"))) / 60)
 # join rider
df_trips = df_trips.join(df_riders.select("rider_id", "rider_birthday"), "rider_id")  

# Add trip_rider_age column
df_trips = df_trips.withColumn("trip_rider_age", 
                               datediff(col("trip_start"), col("rider_birthday")) / 365)

df_trips = df_trips.select("trip_id"
                             , "date_id"
                             , "rider_id"
                             , "start_station_id"
                             , "end_station_id"
                             , "trip_start"
                             , "trip_end"
                             , "trip_duration"
                             , "trip_rider_age"
                             , "trip_rideable_type")

df_trips.createOrReplaceTempView(f"temp_trips")


In [0]:
# create date table

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"

df_dates = spark.createDataFrame([(1,)], ["time_id"])

df_dates = df_dates.withColumn("date_date", f.explode(f.expr(expression)))
df_dates = df_dates.withColumn("date", f.to_timestamp(df_dates.date_date, "yyyy-MM-dd"))

df_dates = df_dates \
            .withColumn("date_day", f.day(df_dates.date_date)) \
            .withColumn("date_month", f.month(df_dates.date_date)) \
            .withColumn("date_year", f.year(df_dates.date_date)) \

df_dates = df_dates \
            .withColumn("date_id", date_format(col("date"), "yyyyMMdd")) \
            .drop("date")

df_dates = df_dates.select("date_id"
                             , "date_date"
                             , "date_day"
                             , "date_month"
                             , "date_year")

df_dates.createOrReplaceTempView(f"temp_dates")

In [0]:
delta_names.append("dates")

In [0]:
# save dataframes as delta using spark SQL
for name in delta_names:
    spark.sql(f"DROP TABLE IF EXISTS silver_{name}")
    spark.sql(f"""
        CREATE OR REPLACE TABLE silver_{name} 
        USING DELTA 
        LOCATION '/FileStore/delta/silver/silver_{name}'
        AS
        SELECT * FROM temp_{name}
    """)