# 3. Medallion Architecture

A medallion architecture is a data design pattern used to logically organize data within a lakehouse, consisting of three layers where data quality is progressively improved: the Bronze layer contains raw data, the Silver layer holds validated and cleansed data, and the Gold layer features enriched, business-ready data geared towards analytics.

![medallion](./screenshots/medallion.png)


In [0]:
%run "./1. Configure"

### Data Engineering using PySpark
This is how we are going to build an example medallion architecture from our raw data in Python

![medallion-workshop](./screenshots/medallion-workshop.png)

In [0]:
# Set the database to use
sql_command = f"USE SCHEMA {my_database}"
spark.sql(sql_command)

#### Silver Layer in PySpark

![silver-flagged](./screenshots/silver-flagged.png)

In [0]:
# Bronze layer: Raw data ingestion
bronze_trips = (
    spark.read.table("bronze_trips")
)

**Flagged Rides** step by step:

It starts with a dataset called `bronze_trips`.

From it, it selects four columns:

* The pickup time, but rounded down to the start of the week (date_trunc('week', tpep_pickup_datetime)),

* The pickup zip code (renamed to zip),

* The fare amount,

* The trip distance.

It keeps only the rides that match one of these suspicious conditions:

The pickup and dropoff are in the same zip code, but the fare is greater than 50,

The trip distance is less than 5 miles, but the fare is greater than 50.

In [0]:
from pyspark.sql.functions import date_trunc

# Silver layer 1: Flagged rides
silver_flagged_rides = (
    bronze_trips
    .selectExpr("date_trunc('week', tpep_pickup_datetime) as week", "pickup_zip as zip", "fare_amount", "trip_distance")
    .filter("(pickup_zip = dropoff_zip AND fare_amount > 50) OR (trip_distance < 5 AND fare_amount > 50)")
)
display(silver_flagged_rides)

In [0]:
# write the silver layer
silver_flagged_rides.write.mode("overwrite").saveAsTable("silver_flagged_rides")

![silver-weekly](./screenshots/silver-weekly.png)

**Weekly Stats** step by step:

It starts with the dataset bronze_trips.

It creates a new column called week, which rounds each trip’s pickup time down to the start of that week.

It then groups all the trips by week.

For each week, it calculates two averages:

* the average fare amount (avg(fare_amount)),

* the average trip distance (avg(trip_distance)).

It renames these averages to avg_amount (for fares) and avg_distance (for distances).

Finally, it sorts the results by week in order.

In [0]:
# Silver layer 2: Weekly statistics
silver_weekly_stats = (
    bronze_trips
    .withColumn("week", date_trunc("week", bronze_trips.tpep_pickup_datetime))
    .groupBy("week")
    .agg(
        {"fare_amount": "avg", "trip_distance": "avg"}
    )
    .withColumnRenamed("avg(fare_amount)", "avg_amount")
    .withColumnRenamed("avg(trip_distance)", "avg_distance")
    .orderBy("week")
)
display(silver_weekly_stats)

In [0]:
silver_weekly_stats.write.mode("overwrite").saveAsTable("silver_weekly_stats")

#### Gold Layer in PySpark

![gold](./screenshots/gold.png)

**gold_top_n** starts silver_flagged_rides (the suspicious trips dataset).

It joins that with silver_weekly_stats (the weekly averages dataset) using the week column, so each ride also has the average fare and distance for its week.

It selects these fields to keep:

* The week,

* The weekly average fare (rounded to 2 decimals),

* The weekly average distance (rounded to 3 decimals),

* The ride’s fare amount,

* The ride’s trip distance,

* The ride’s pickup zip.

It sorts the rides by fare_amount in descending order (highest first).

It takes only the top 3 rides.

In [0]:
# Gold layer: Top N rides to investigate
gold_top_n = (
    silver_flagged_rides
    .join(silver_weekly_stats, "week", "left")
    .selectExpr("week", "ROUND(avg_amount, 2) as avg_amount", "ROUND(avg_distance, 3) as avg_distance", "fare_amount", "trip_distance", "zip")
    .orderBy("fare_amount", ascending=False)
    .limit(3)
)
display(gold_top_n)

In [0]:
gold_top_n.write.mode("overwrite").saveAsTable("gold_top_n")

### Data Engineering using SQL

This is how we are going to build an example medallion architecture from our raw data in SQL. The logic is the same as in PySpark

![medallion-workshop](./screenshots/medallion-workshop.png)

#### Silver Layer in SQL


![silver-flagged](./screenshots/silver-flagged.png)

In [0]:
%sql
SELECT
    date_trunc('week', tpep_pickup_datetime) AS week,
    pickup_zip AS zip,
    fare_amount,
    trip_distance
FROM bronze_trips
WHERE
    (pickup_zip = dropoff_zip AND fare_amount > 50)
    OR
    (trip_distance < 5 AND fare_amount > 50)


In [0]:
%sql
-- now add the create table statement to save the query
CREATE OR REPLACE TABLE silver_flagged_rides AS
SELECT
    date_trunc('week', tpep_pickup_datetime) AS week,
    pickup_zip AS zip,
    fare_amount,
    trip_distance
FROM bronze_trips
WHERE
    (pickup_zip = dropoff_zip AND fare_amount > 50)
    OR
    (trip_distance < 5 AND fare_amount > 50)


![silver-weekly](./screenshots/silver-weekly.png)

In [0]:
%sql
SELECT
    date_trunc('week', tpep_pickup_datetime) AS week,
    AVG(fare_amount) AS avg_amount,
    AVG(trip_distance) AS avg_distance
FROM bronze_trips
GROUP BY
    date_trunc('week', tpep_pickup_datetime)
ORDER BY
    week

In [0]:
%sql
-- save this query as a silver table
CREATE OR REPLACE TABLE silver_weekly_stats AS
SELECT
    date_trunc('week', tpep_pickup_datetime) AS week,
    AVG(fare_amount) AS avg_amount,
    AVG(trip_distance) AS avg_distance
FROM bronze_trips
GROUP BY
    date_trunc('week', tpep_pickup_datetime)
ORDER BY
    week


#### Gold Layer in SQL

![gold](./screenshots/gold.png)

In [0]:
%sql
SELECT
    sf.week,
    ROUND(sw.avg_amount, 2) AS avg_amount,
    ROUND(sw.avg_distance, 3) AS avg_distance,
    sf.fare_amount,
    sf.trip_distance,
    sf.zip
FROM silver_flagged_rides sf
LEFT JOIN silver_weekly_stats sw
    ON sf.week = sw.week
ORDER BY
    sf.fare_amount DESC
LIMIT 3


In [0]:
%sql
-- write as a gold table
CREATE OR REPLACE TABLE gold_top_n AS
SELECT
    sf.week,
    ROUND(sw.avg_amount, 2) AS avg_amount,
    ROUND(sw.avg_distance, 3) AS avg_distance,
    sf.fare_amount,
    sf.trip_distance,
    sf.zip
FROM silver_flagged_rides sf
LEFT JOIN silver_weekly_stats sw
    ON sf.week = sw.week
ORDER BY
    sf.fare_amount DESC
LIMIT 3


Additional Databricks Training

If you are interested in learning more about data engineering and the medallion architecture with SQL and PySpark in Databricks, please check out the following material online at https://customer-academy.databricks.com/learn. You can set up an account with our customer academy using your `.gov` email address

- [Getting Started with Databricks for Data Engineering](https://customer-academy.databricks.com/learn/courses/2469/get-started-with-databricks-for-data-engineering)
