EXtracting and Ingesting Data(Bronz_stage)

In [None]:
from pyspark.sql import functions as F

In [None]:
df_payments = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/payments.csv") \
    .toDF("payment_id", "date","amount","rider_id")

display(df_payments.limit(10))

In [None]:
df_payments.write.format("delta").mode("overwrite").save("/delta/payments")

In [None]:
df_riders = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/riders.csv") \
    .toDF("rider_id", "first_name", "last_name", "address", "birthday", "account_start_date", "account_end_date", "is_member")

display(df_riders.limit(10))

In [None]:
df_riders.write.format("delta").mode("overwrite").save("/delta/riders")

In [None]:
df_stations = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/stations.csv") \
    .toDF("station_id", "name", "latitude", "longitude") 

display(df_stations.limit(10))

In [None]:
df_stations.write.format("delta").mode("overwrite").save("/delta/stations")

In [None]:
df_trips = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "false") \
    .option("sep", ",") \
    .load("dbfs:/FileStore/trips.csv") \
    .toDF("trip_id", "rideable_type", "start_at", "ended_at", "start_station_id", "end_station_id", "rider_id")

display(df_trips.limit(10))

In [None]:
df_trips.write.format("delta").mode("overwrite").save("/delta/trips")

CREATE TABLE or SaveAsTables(silver)

In [None]:
spark.sql("DROP TABLE IF EXISTS stag_payments")
df = spark.read.format("delta").load("/delta/payments")
df.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("stag_payments")

In [None]:
spark.sql("DROP TABLE IF EXISTS stag_riders")
df = spark.read.format("delta").load("/delta/riders")
df.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("stag_riders")


In [None]:
spark.sql("DROP TABLE IF EXISTS stag_trips")
df = spark.read.format("delta").load("/delta/trips")
df.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("stag_trips")


In [None]:
spark.sql("DROP TABLE IF EXISTS stag_stations")
df = spark.read.format("delta").load("/delta/stations")
df.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("stag_stations")

Transforming Gold

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS curated_data")

In [None]:

if spark.catalog.tableExists("curated_data.fact_payments"):
    spark.sql("drop table curated_data.fact_payments")

df = spark.table("stag_payments")

df_transformed = df.withColumn(
    "date_id",
    F.concat(
        F.year(F.col("date")),
        F.lpad(F.month(F.col("date")), 2, '0'),
        F.lpad(F.dayofmonth(F.col("date")), 2, '0')
    )
).dropDuplicates()

display(df_transformed.limit(10))

df_transformed.write.mode("overwrite").saveAsTable("Curated_data.fact_payments")

In [None]:

if spark.catalog.tableExists("curated_data.dim_dates"):
    spark.sql("drop table curated_data.dim_dates")

df = spark.table("default.stag_payments")
df_transformed = df.select(
    F.col("date"),
    F.concat(
        F.year("date"),
        F.lpad(F.month("date"), 2, '0'),
        F.lpad(F.dayofmonth("date"), 2, '0')
    ).alias("date_id"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.dayofweek("date").alias("day_of_week")
).dropDuplicates()

df_transformed.write.mode("overwrite").saveAsTable("Curated_data.dim_dates")
display(df_transformed.limit(10))

In [None]:
if spark.catalog.tableExists("curated_data.dim_riders"):
    spark.sql("drop table default.dim_riders")


df = spark.table("default.stag_riders")

df_transformed = df.dropDuplicates()
df_transformed.write.mode("overwrite").saveAsTable("Curated_data.dim_riders")

display(df_transformed.limit(10))
     

In [None]:
if spark.catalog.tableExists("curated_data.dim_stations"):
    spark.sql("drop table curated_data.dim_stations")

df = spark.table("default.stag_stations")

df_transformed = df.dropDuplicates()
df_transformed.write.mode("overwrite").saveAsTable("curated_data.dim_stations")

display(df_transformed.limit(10))


In [None]:
if spark.catalog.tableExists("curated_data.fact_trips"):
    spark.sql("drop table curated_data.fact_trips")


df = spark.table("default.stag_trips").dropDuplicates()
df_riders = spark.table("default.stag_riders").dropDuplicates()

df_transformed = df.withColumn(
    "date_id",
    F.concat(
        F.year(F.col("start_at")),
        F.lpad(F.month(F.col("start_at")), 2, '0'),
        F.lpad(F.dayofmonth(F.col("start_at")), 2, '0')
    )
)

df_joined = df_transformed.alias("df_transformed").join(
    df_riders.alias("df_riders"), 
    F.col("df_transformed.rider_id") == F.col("df_riders.rider_id"), 
    "left"
)


df_joined = df_joined.withColumn(
    "duration_hours",
    (F.unix_timestamp("df_transformed.ended_at") - F.unix_timestamp("df_transformed.start_at")) / 3600
)

df_joined = df_joined.withColumn(
    "rider_age",
    F.year(F.col("df_transformed.start_at")) - F.year(F.col("df_riders.birthday"))
)


df_result = df_joined.select(
    F.col("df_transformed.trip_id"),
    F.col("df_transformed.rideable_type"),
    F.col("df_transformed.start_at"),
    F.col("df_transformed.ended_at"),
    F.col("df_transformed.start_station_id"),
    F.col("df_transformed.end_station_id"),
    F.col("df_transformed.date_id"),
    F.col("df_transformed.rider_id"),
    F.col("duration_hours").alias("duration"),
    F.col("rider_age")
)

display(df_result.limit(10))

df_result.write.mode("overwrite").saveAsTable("curated_data.fact_trips")

     