In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType,StringType
from pyspark.sql.functions import lit, current_timestamp,col


In [0]:
# Function to process and write batches of data into a Delta table

def process_batch(df, batch_id, table_name):
    # Add a batch ID column to track processing
    df = df.withColumn("batch_id", lit(batch_id))
    # Add a timestamp column for data ingestion time        
    df = df.withColumn("ingest_datetime", current_timestamp())
    # Write the DataFrame to a Delta table in append mode
    df.write.format("delta").mode("append").saveAsTable(table_name)


# Define the schema for customer data
customer_schema = StructType([
    StructField("CUSTOMER_ID", IntegerType(), False),               # Unique identifier for each customer
    StructField("CUST_FIRST_NAME", StringType(), True),             # Customer's first name
    StructField("CUST_LAST_NAME", StringType(), True),              # Customer's last name
    StructField("CUST_ADDRESS.COUNTRY_ID", StringType(), True),     # Country ID in the address
    StructField("CUST_ADDRESS.STATE_PROVINCE", StringType(), True), # State or province in the address
    StructField("CUST_ADDRESS.CITY", StringType(), True),           # City in the address
    StructField("CUST_ADDRESS.POSTAL_CODE", StringType(), True),    # Postal code in the address
    StructField("CUST_ADDRESS.STREET_ADDRESS", StringType(), True), # Street address
    StructField("PHONE_NUMBER", StringType(), True),                # Customer's phone number
    StructField("CUST_EMAIL", StringType(), True),                  # Customer's email address
    StructField("ACCOUNT_MGR_ID", StringType(), True),              # Account manager ID
    StructField("DATE_OF_BIRTH", StringType(), True),               # Customer's date of birth
    StructField("MARITAL_STATUS", StringType(), True),              # Marital status
    StructField("GENDER", StringType(), True) ])                    # Gender


# Specify table name, data source location, and checkpoint directory

customer_table_name = 'stoyan.bronze_customers'
customer_load_location = 's3://data-engineering-upskill-final-exam/stoyan/input_data/customers'
customer_checkpoint_location = 's3://data-engineering-upskill-final-exam/stoyan/bronze_customers/checkpoint'

# Define a streaming DataFrame for reading customer data

customer_stream = (
spark.readStream                
.format("cloudFiles")                                               # Enable Auto Loader for cloud file sources
.option("wholeFile", True)                                          # Treat the entire file as a single input
.option("multiline",True)                                           # Support multi-line CSV rows
.option("cloudFiles.format", "csv")                                 # Specify the input file format
.option("header", "true")                                           # Indicate that the first row contains headers
.schema(customer_schema)                                            # Apply the defined schema
.load(customer_load_location)                                       # Load data from the specified S3 location
)




In [0]:
# Write the stream to a Delta table with batch processing

customer_stream.writeStream \
.foreachBatch(lambda df, batch_id: process_batch(df, batch_id, customer_table_name)) \
.option("checkpointLocation", customer_checkpoint_location) \
.trigger(availableNow=True) \
.outputMode("append") \
.start() 

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7febccdca990>