# Dallas Food Inspection Data Transformation

### Read Dallas parquet file from silver container in azure data lake

In [0]:
# Define storage account info
storage_account_name = "foodinspection2025stg"
storage_account_key = "uI7JBnr/H6GB9Qhcglge+1gciYSvBpIm/G98cSsiJp6Cos+kqBfDQClmGCWWpZ+wXGtP3SsVeCbj+AStiA9Jvg=="
container_name = "silver"
mount_point = "/mnt/silver"

# Configure the mount with the storage credentials
configs = {
  f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key
}

try:
  dbutils.fs.mount(
    source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
    mount_point = mount_point,
    extra_configs = configs
  )
  print(f"Successfully mounted {container_name} to {mount_point}")
except Exception as e:
  print(f"Mount point already exists or error: {str(e)}")

# Now read the parquet file from the mount point
df_dallas = spark.read.parquet(f"{mount_point}/dallas.parquet")

# Display sample data
df_dallas.show(5)

# Get schema information
df_dallas.printSchema()

# Count rows
row_count = df_dallas.count()
print(f"Total rows: {row_count}")

Mount point already exists or error: An error occurred while calling o399.mount.
: java.rmi.RemoteException: java.lang.IllegalArgumentException: requirement failed: Directory already mounted: /mnt/silver; nested exception is: 
	java.lang.IllegalArgumentException: requirement failed: Directory already mounted: /mnt/silver
	at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:135)
	at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:69)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.createOrUpdateMount(DBUtilsCore.scala:1053)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.$anonfun$mount$1(DBUtilsCore.scala:1079)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:571)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:667)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:685)
	at com.dat

In [0]:
df_dallas_copy = df_dallas.select("*")

In [0]:
df_dallas_copy.select("violation_description1").show(5, truncate=False)

+-----------------------------------------------------------------+
|violation_description1                                           |
+-----------------------------------------------------------------+
|44 Trash can provided for papertowel waste                       |
|10 Clean Sight and Touch                                         |
|28 Original cont. of RTE/PHF/Day1= day of opening                |
|35 Eating food, chewing gum, drinking beverages, or using tobacco|
|02 Cold Hold (41øF/45øF or below)                                |
+-----------------------------------------------------------------+
only showing top 5 rows



### Transformations 

1. Convert Texas to 'TX'
2. Add inspection id with '_dallas' suffix
3. Calculate risk based on inspection score
4. Calculate results based on inspection score
5. Mark business_type as 'Unknown'

In [0]:
from pyspark.sql.functions import concat, row_number, lit, when, col, monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Define window specification
window_spec = Window.orderBy(monotonically_increasing_id())

# Add both columns to df_dallas
df_dalls_details = df_dallas_copy.withColumn(
                    "state",
                    F.when(F.col("state") == "Texas", "TX")
                    .otherwise(F.col("state"))
                ).withColumn("inspection_id", 
                          concat(row_number().over(window_spec).cast("string"), 
                                lit("_dallas"))
                ).withColumn("risk",
                        when((col("inspection_score") >= 79) & (col("inspection_score") <= 100), "Low")
                        .when((col("inspection_score") >= 60) & (col("inspection_score") <= 78), "Medium")
                        .otherwise("High")
                ).withColumn("results",
                        when((col("inspection_score") >= 80) & (col("inspection_score") <= 100), "Pass")
                        .when((col("inspection_score") >= 60) & (col("inspection_score") <= 78), "Pass w/ Conditions")
                        .when((col("inspection_score") >= 60) & (col("inspection_score") <= 69), "Fail")
                        .otherwise("Out of Business")
                ).withColumn("business_type", lit("Unknown")
                ).withColumn("license_number", lit(-9999))
                 
df_dalls_details.select("inspection_id", "restaurant_name", "risk", "results").show(5)

+-------------+--------------------+----+---------------+
|inspection_id|     restaurant_name|risk|        results|
+-------------+--------------------+----+---------------+
|     1_dallas|CHIPOTLE MEXICAN ...|High|Out of Business|
|     2_dallas|       LUSH CATERING|High|Out of Business|
|     3_dallas|  THE WINSTON SCHOOL|High|Out of Business|
|     4_dallas|MARY'S TWISTED KI...|High|Out of Business|
|     5_dallas|BELMONTS CAFE ON ...|High|Out of Business|
+-------------+--------------------+----+---------------+
only showing top 5 rows



In [0]:
import re
from pyspark.sql import functions as F

df = df_dalls_details

# discover and sort columns 
def sorted_cols(prefix):
    return sorted(
      [c for c in df.columns if c.startswith(prefix)],
      key=lambda c: int(re.search(r"(\d+)$", c).group(1))
    )

desc_cols   = sorted_cols("violation_description")
detail_cols = sorted_cols("violation_detail")
memo_cols   = sorted_cols("violation_memo")

# build a desc_arr from all description columns
desc_arr = F.array(*[F.col(c) for c in desc_cols])

# build a comment_arr by pairing detail/memo
max_cm = max(len(detail_cols), len(memo_cols))
comment_elems = []

for i in range(max_cm):
    d = F.col(detail_cols[i]) if i < len(detail_cols) else F.lit(None)
    m = F.col(memo_cols[i])   if i < len(memo_cols)   else F.lit(None)

    # trim & define “unknown” predicates
    d_trim    = F.trim(d)
    m_trim    = F.trim(m)
    d_unknown = d.isNull() | (d_trim == "") | (F.lower(d_trim) == "unknown")
    m_unknown = m.isNull() | (m_trim == "") | (F.lower(m_trim) == "unknown")

    # build the conditional per rules:
    # 1) both unknown      => "Unknown"
    # 2) only detail valid => detail
    # 3) only memo valid   => memo
    # 4) both valid        => detail + " " + memo
    comment_elems.append(
        F.when(d_unknown & m_unknown, F.lit("Unknown"))
         .when(~d_unknown & m_unknown, d_trim)
         .when(d_unknown & ~m_unknown, m_trim)
         .otherwise(F.concat_ws(" ", d_trim, m_trim))
    )

comment_arr = F.array(*comment_elems)

zipped = F.arrays_zip(desc_arr.alias("desc"), comment_arr.alias("comments"))

Split violation description into code, description

Rules followed for transformation:

1. if violation code is not present, and description is not present -> insert (9999, 'Unknown')
2. if violation code is not present, and description is present -> insert (-1, description)
3. if violation code is present, and description is present -> insert (code, desc)

In [0]:
df_out = df.withColumn("zipped", zipped) \
    .withColumn("violation_code",
        F.transform("zipped", lambda x:
            F.when(
                x["desc"].isNull() |
                (F.trim(x["desc"]) == "") |
                (F.lower(F.trim(x["desc"])) == "unknown"),
                F.lit(-9999)
            )
            .when(
                F.regexp_extract(x["desc"], r"^(\d+)", 1) != "",
                F.regexp_extract(x["desc"], r"^(\d+)", 1).cast("int")
            )
            .otherwise(F.lit(-1))
        )
    ) \
    .withColumn("violation_description",
        F.transform("zipped", lambda x:
            F.when(
                x["desc"].isNull() |
                (F.trim(x["desc"]) == "") |
                (F.lower(F.trim(x["desc"])) == "unknown"),
                F.lit("Unknown")
            )
            .when(
                F.regexp_extract(x["desc"], r"^\d+\s+(.*)", 1) != "",
                F.regexp_extract(x["desc"], r"^\d+\s+(.*)", 1)
            )
            .otherwise(F.trim(x["desc"]))
        )
    ) \
    .withColumn("violation_comments",
        F.transform("zipped", lambda x:
            F.when(
                x["comments"].isNull() |
                (F.trim(x["comments"]) == ""),
                F.lit("Unknown")
            )
            .otherwise(F.trim(x["comments"]))
        )
    ) \
    .drop("zipped")

# all three arrays now have the same length 
df_out.select("inspection_id",
              "restaurant_name",
    "violation_code",
    "violation_description",
    "violation_comments"
).show(2, truncate=False)

+-------------+----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Flatten the data

In [0]:
from pyspark.sql import functions as F

df_zipped = df_out.withColumn(
    "zipped2",
    F.arrays_zip(
        "violation_code",
        "violation_description",
        "violation_comments"
    )
)

# flatten
df_exploded = df_zipped.withColumn("z", F.explode("zipped2"))

# select *all original columns except* the 3 array cols + zipped2,z, 
cols_to_drop = ["violation_code", "violation_description", "violation_comments", "zipped2", "z"]
df_flat = df_exploded.select(
    *[c for c in df_exploded.columns if c not in cols_to_drop],
    F.col("z.violation_code").alias("violation_code"),
    F.col("z.violation_description").alias("violation_description"),
    F.col("z.violation_comments").alias("violation_comments")
)

# drop duplicates
df_flat_dedup = df_flat.dropDuplicates(
    ["inspection_id", "violation_code", "violation_description", "violation_comments"]
)

df_flat_dedup.select("inspection_id",
              "restaurant_name",
    "violation_code",
    "violation_description",
    "violation_comments"
).show(2, truncate=False)

+-------------+----------------+--------------+---------------------+------------------+
|inspection_id|restaurant_name |violation_code|violation_description|violation_comments|
+-------------+----------------+--------------+---------------------+------------------+
|10000_dallas |7-ELEVEN #38461A|-9999         |Unknown              |Unknown           |
|10001_dallas |FRESHII         |-9999         |Unknown              |Unknown           |
+-------------+----------------+--------------+---------------------+------------------+
only showing top 2 rows



In [0]:
df_flat_dedup.filter(F.col("inspection_id") == "1_dallas").select("inspection_id",
    "violation_code",
    "violation_description",
    "violation_comments"
).show(truncate=False)

+-------------+--------------+---------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|inspection_id|violation_code|violation_description                                                                  |violation_comments                                                                                                                                                                                                                                                                            

Number of rows with Violation code -9999 and Violation description 'Unknown' before removing duplicate rows:

In [0]:
df_flat_dedup.filter(F.col("violation_code") == -9999).count()

28223

In [0]:
w = Window.partitionBy("inspection_id")

# add a flag: 1 if there's any code != -9999 in that group, else 0
df_flagged = df_flat_dedup.withColumn(
    "has_real_violation",
    F.max((F.col("violation_code") != -9999).cast("int")).over(w)
)

# filter:
#  – if has_real_violation == 1, keep only rows where code != -9999  
#  – if has_real_violation == 0, keep the placeholder row
df_final = df_flagged.filter(
    ((F.col("has_real_violation") == 1) & (F.col("violation_code") != -9999))
    | (F.col("has_real_violation") == 0)
).drop("has_real_violation")

# 4) inspect
df_final.select("inspection_id",
    "restaurant_name",
    "violation_code",
    "violation_description",
    "violation_comments"
).show(3, truncate=False)

+-------------+----------------+--------------+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|inspection_id|restaurant_name |violation_code|violation_description|violation_comments                                                                                                                                                                                                                                                                                                                                             

Number of rows with Violation code -9999 and description 'Unknown' after removing duplicate rows:

In [0]:
df_final.filter(F.col("violation_code") == -9999).count()

2665

In [0]:
df_dallas_food_inspection = df_final.withColumnRenamed("restaurant_name", "business_name").withColumnRenamed("street_address", "address")

In [0]:
# Define the list of columns to keep
final_columns = [
    'inspection_id',
    'business_name',
    'business_type',
    'license_number',
    'address',
    'zipcode',
    'city',
    'state',
    'inspection_type',
    'inspection_date',
    'risk',
    'results',
    'violation_code',
    'violation_description',
    'violation_comments'
    
]

df_dallas_final = df_dallas_food_inspection.select(*final_columns)

df_dallas_final.show(5, truncate=False)

+-------------+----------------+-------------+--------------+---------------------+-------+------+-----+---------------+---------------+----+---------------+--------------+---------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|inspection_id|business_name   |business_type|license_number|address              |zipcode|city  |state|inspection_type|inspection_date|risk|results        |violation_code|violation_description                        |violation_comments                                   

In [0]:
null_counts = df_dallas_final.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_dallas_final.columns
])

null_counts.show(truncate=False)

+-------------+-------------+-------------+--------------+-------+-------+----+-----+---------------+---------------+----+-------+--------------+---------------------+------------------+
|inspection_id|business_name|business_type|license_number|address|zipcode|city|state|inspection_type|inspection_date|risk|results|violation_code|violation_description|violation_comments|
+-------------+-------------+-------------+--------------+-------+-------+----+-----+---------------+---------------+----+-------+--------------+---------------------+------------------+
|0            |0            |0            |0             |0      |0      |0   |0    |0              |0              |0   |0      |0             |0                    |0                 |
+-------------+-------------+-------------+--------------+-------+-------+----+-----+---------------+---------------+----+-------+--------------+---------------------+------------------+



In [0]:
df_dallas_final.count()

140478

In [0]:
# Set up Snowflake options
sfOptions = {
    "sfURL": "KOMAXUA-FHA53164.snowflakecomputing.com",  
    "sfDatabase": "FOOD_INSPECTION_DB",                 
    "sfSchema": "RAW_STAGE_SCHEMA",                     
    "sfWarehouse": "DADABI_WH",                         
    "sfRole": "DEVELOPER",                              
    "sfUser": "DADABI_USER",                            
    "sfPassword": "snowflake123#"                       
}

### Load the cleaned and transformed data to Snowflake Dallas stage table

In [0]:
# Write DataFrame to Snowflake
(
    df_dallas_final.write
    .format("net.snowflake.spark.snowflake")
    .options(**sfOptions)
    .option("dbtable", "STG_DALLAS")
    .mode("overwrite")
    .save()
)