In [0]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, ArrayType
)

# Item schema
item_schema = StructType([
    StructField("sku", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("line_total", DoubleType(), True)
])

# Payment schema
payment_schema = StructType([
    StructField("method", StringType(), True),
    StructField("transaction_id", StringType(), True),
    StructField("status", StringType(), True)
])

# Shipping address schema
shipping_address_schema = StructType([
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("country", StringType(), True)
])

# Metadata schema
metadata_schema = StructType([
    StructField("source_system", StringType(), True),
    StructField("ingestion_timestamp", StringType(), True),
    StructField("batch_id", StringType(), True)
])

# Complete orders schema
orders_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("order_timestamp_utc", StringType(), True),
    StructField("items", ArrayType(item_schema), True),
    StructField("subtotal", DoubleType(), True),
    StructField("discount_percent", IntegerType(), True),
    StructField("discount_amount", DoubleType(), True),
    StructField("shipping_cost", DoubleType(), True),  # Mixed int/float in source - DoubleType handles both
    StructField("tax_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("payment", payment_schema, True),
    StructField("shipping_address", shipping_address_schema, True),
    StructField("notes", StringType(), True),  # Contains nulls - handled correctly
    StructField("channel", StringType(), True),
    StructField("_metadata", metadata_schema, True)
])

# Usage
df = spark.read \
    .schema(orders_schema) \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json("dbfs:/Volumes/workspace/landingzone/orders_files/orders.json")

In [0]:
#Create a table in the bronze schema
df.write.mode("overwrite").saveAsTable("bronze.orders")