# Notebook to cleansed bronze.sales into silver.sales

In [0]:
%pip install shapely

Define constant variables

In [0]:
ORIGIN_SCHEMA="bronze"
ORIGIN_TABLE="sales"
TARGET_SCHEMA="silver"
TARGET_TABLE=ORIGIN_TABLE
TABLE_TO_MERGE="neighborhoods"

###  Load most recent time-window data from bronze.customers 

Doing it this way, there's a time process improvement when fetching the data related to matching the needed partitions not the entire entity.

In [0]:
from pyspark.sql.functions import col,current_timestamp, expr

bronze_df= spark.read.table(f"{ORIGIN_SCHEMA}.{ORIGIN_TABLE}").filter(col("ingestime")>= current_timestamp() - expr("INTERVAL 12 HOUR"))

Add silver_ingest timestamp to keep track of the data

In [0]:
bronze_df=bronze_df.withColumn("silver_ingestime",current_timestamp())
columns = ["silver_ingestime"] + [col for col in bronze_df.columns if col != "silver_ingestime"]
bronze_df = bronze_df.select(columns)

### Data set enrichment

rename date column

In [0]:
bronze_df = bronze_df.withColumnRenamed("date", "event_date")

#### Date/Time enrichment

In [0]:
from pyspark.sql.functions import (
    col, to_timestamp, date_format, dayofmonth, hour,
    minute, month, second, year
)

bronze_df = bronze_df.withColumn("timestamp_date", to_timestamp(col("event_date"), "dd/MM/yyyy HH:mm:ss"))

bronze_df = bronze_df.withColumn("partition_date", date_format(col("timestamp_date"), "ddMMyyyy")) \
       .withColumn("event_day", dayofmonth(col("timestamp_date"))) \
       .withColumn("event_hour", hour(col("timestamp_date"))) \
       .withColumn("event_minute", minute(col("timestamp_date"))) \
       .withColumn("event_month", month(col("timestamp_date"))) \
       .withColumn("event_second", second(col("timestamp_date"))) \
       .withColumn("event_year", year(col("timestamp_date")))

bronze_df=bronze_df.drop("timestamp_date")

#### Location enrichment

In [0]:
neighborhoods_df = spark.read.table(f"{TARGET_SCHEMA}.{TABLE_TO_MERGE}")
neighborhoods_df = neighborhoods_df.select([
    "name",
    "identification",
    "geojson"
])
neighborhoods_df = neighborhoods_df.withColumnRenamed("name", "district").withColumnRenamed("identification", "neighborhood")


Perform cross join between sales and neighborhoods

In [0]:
complete_df=bronze_df.crossJoin(neighborhoods_df)

Define a function to validate if a given point is inside a Polygon/Multipolygon

In [0]:
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import BooleanType
from shapely.geometry import Point, shape
import json

# UDF logic: receives lat/lon and geojsons per row
def point_in_polygon(lat, lon, geojson):
    try:
        point = Point(lon, lat)
        polygon_neighborhoods = shape(json.loads(geojson)) if geojson else None
        in_neighborhoods = polygon_neighborhoods.contains(point) if polygon_neighborhoods else False
        return in_neighborhoods
    except:
        return False

# Register the UDF
point_in_polygon_udf = udf(point_in_polygon, BooleanType())

Apply function and return the district and neighboorhood for each record.

In [0]:
complete_df=complete_df.withColumn("match_location",point_in_polygon_udf(col("latitude"),col("longitude"),col("geojson")))
complete_df=complete_df.filter(col("match_location") == True)

records_df=complete_df.select(*["order_id","customer_id","employee_id","district","neighborhood"])

Validate against original bronze_df to find the record which location is unknown

In [0]:
bronze_df=bronze_df.join(records_df, on=["order_id","customer_id","employee_id"], how="left")

bronze_df=bronze_df.fillna({"district":"unknown","neighborhood":"unknown"})

bronze_df=bronze_df.filter(col("district") != "unknown")

### Create Schema in Catalog

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {TARGET_SCHEMA}")

#### Create Empty table in the schema before merge

Get schema of bronze DataFrame

In [0]:
bronze_schema=bronze_df.schema

Create empty table

In [0]:
empty_df = spark.createDataFrame([], schema=bronze_schema)
empty_df.write.partitionBy("silver_ingestime").format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{TARGET_SCHEMA}.{TARGET_TABLE}")

### Write into silver.sales using MERGE

Load silver.sales table as Delta Table

In [0]:
from delta.tables import DeltaTable
silver_table = DeltaTable.forName(spark, f"{TARGET_SCHEMA}.{TARGET_TABLE}")

Perform the MERGE

In [0]:
silver_table.alias("target").merge(
    source=bronze_df.alias("source"),
    condition="target.order_id IS NOT NULL AND target.order_id = source.order_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()