In [0]:
%sql
-- Pre-check to inspect the csv file. Do not include it in the job.
/*
select * 
from read_files(
  '/Volumes/job_catalog/default/dataset/orders/',
  format => 'csv'
  );
*/

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, current_timestamp, col

#
# Ingest data from the orders.csv file and create the bronze_orders table
#

#Step 1: Read the csv file and create a Spark DataFrame
file_path = "/Volumes/job_catalog/default/dataset/orders/"

# Define schema for the product JSON
product_schema = StructType([
    StructField("curr", StringType()),
    StructField("id", StringType()),
    StructField("name", StringType()),
    StructField("price", IntegerType()),
    StructField("qty", IntegerType()),
    StructField("unit", StringType())
])

df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("quote", '"')
    .option("escape", '"')
    .load(file_path)
    .withColumn("processing_time", current_timestamp())
    .withColumn("file_name", col("_metadata.file_path"))
)

df = df.withColumn("product_json", from_json(col("ordered_products"), product_schema))
df = df.drop("ordered_products").withColumnRenamed("product_json", "ordered_products")

# Rearrange the columns so it looks like in the demo video
df = df.select(
    "customer_id",
    "number_of_line_items",
    "order_datetime",
    "order_number",
    "ordered_products",
    "processing_time",
    "file_name"
)

#display(df)

#Step 2: Create a delta table
(df
.write
.mode("overwrite")
.format("delta")
.saveAsTable("job_catalog.default.bronze_orders")
)

In [0]:
# Post-check cell. Do not include it later in the job
#spark.read.table("job_catalog.default.bronze_orders").display()