# 🚀 Exercise 1 - Developing Spark Applications  

Welcome to this hands-on lab! In this exercise, you'll explore key concepts and techniques to develop Spark applications efficiently. Let's dive in!  


### 1.2.3 Understanding Markdown vs. Code Cells
In Fabric Notebook, you can use Markdown and Code cells to enhance your development and collaboration. 

1. To insert a new Mardown or Code cell, hover above or below an existing one. You'll see options to add either a Markdown or Code cell—simply select the type you need!

In [1]:
# Import required libraries
from pyspark.sql import Row

# Create a sample DataFrame with 3 records
data = [Row(id=1, name="Alice", age=25),
        Row(id=2, name="Bob", age=30),
        Row(id=3, name="Charlie", age=35)]

df = spark.createDataFrame(data)

# Display the DataFrame
df.show()

StatementMeta(, de845b2a-1b8f-46a7-931d-bdbd2f8e6f34, 3, Finished, Available, Finished)

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



**Using PySpark, Scala, or Spark SQL**: 
You can use PySpark, Scala, or Spark SQL for processing data. Choose the language you are most comfortable with. By default, the Notebook will show the PySpark kernel. 

You can change the kernel at the notebook level or cell level with:

- %%spark for Scala.
- %%pyspark for PySpark.
- %%sql for Spark SQL.

In [2]:
%%pyspark
# Import required libraries
from pyspark.sql import Row

# Create a sample DataFrame with 3 records
data = [Row(id=1, name="Alice", age=25),
        Row(id=2, name="Bob", age=30),
        Row(id=3, name="Charlie", age=35)]

df = spark.createDataFrame(data)

# Display the DataFrame
df.show()

StatementMeta(, de845b2a-1b8f-46a7-931d-bdbd2f8e6f34, 4, Finished, Available, Finished)

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



In [3]:
%%spark
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

// Define schema
val schema = StructType(Seq(
  StructField("id", IntegerType, nullable = false),
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

// Create sample data
val data = Seq(
  Row(1, "Alice", 25),
  Row(2, "Bob", 30),
  Row(3, "Charlie", 35)
)

// Create DataFrame
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

// Display DataFrame
df.show()

StatementMeta(, de845b2a-1b8f-46a7-931d-bdbd2f8e6f34, 6, Finished, Available, Finished)

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false),StructField(name,StringType,false),StructField(age,IntegerType,false))
data: Seq[org.apache.spark.sql.Row] = List([1,Alice,25], [2,Bob,30], [3,Charlie,35])
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]


In [4]:
%%sql

CREATE OR REPLACE TEMP VIEW people AS
SELECT 1 AS id, 'Alice' AS name, 25 AS age UNION ALL
SELECT 2 AS id, 'Bob' AS name, 30 AS age UNION ALL
SELECT 3 AS id, 'Charlie' AS name, 35 AS age;

SELECT * FROM people

StatementMeta(, de845b2a-1b8f-46a7-931d-bdbd2f8e6f34, 8, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 3 rows and 3 fields>

## 1.3 Spark Basics: Reading, Transforming, and Writing Data with Bronze, Silver, and Gold Layers

### 1.3.1 Bronze Layer: Load Raw Data

#### Load JSON Data into a DataFrame

To begin, let's load a JSON file from Azure Data Lake Storage (ADLS) into a Spark DataFrame. 

In a new code cell, use the following code to:
- read the JSON files
- print the schema
- display the data

In [6]:
from pyspark.sql.functions import from_json,col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

adls_path = "abfss://fabricconlab@vengcatadls001.dfs.core.windows.net/ordersjson/*.json"

json_schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("OrderDate", StringType(), True),
    StructField("CustomerName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("ListPrice", DoubleType(), True),
    StructField("Discount", DoubleType(), True)
])

# Load JSON data from ADLS
order_df = spark.read.schema(json_schema).json(adls_path)

# order_parsed_df = order_df.withColumn("parsed_json", from_json(col("value"), json_schema)) \
#     .select("parsed_json.*")

order_df.printSchema()

order_df.show(truncate=False)

StatementMeta(, 813ede6b-7a62-4691-94b3-46039463eaf5, 8, Finished, Available, Finished)

root
 |-- OrderID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- ListPrice: double (nullable = true)
 |-- Discount: double (nullable = true)

+-------+--------+----------+-------------------+-------------------------------+-----------------------+---------+---------+--------+
|OrderID|Quantity|OrderDate |CustomerName       |Email                          |Product                |ProductID|ListPrice|Discount|
+-------+--------+----------+-------------------+-------------------------------+-----------------------+---------+---------+--------+
|SO49171|1       |2021-01-01|Mariah Foster      |mariah21@adventure-works.com   |Road-250 Black, 48     |1        |2181.5625|174.525 |
|SO49172|1       |2021-01-01|Brian Howard       |brian23@adventure-works.com    |Road-250 

#### Read Parquet Data

In [9]:
# Define the ADLS path for the Parquet files
adls_parquet_path = "abfss://fabricconlab@vengcatadls001.dfs.core.windows.net/product_data_parquet/"

# Read Parquet files into a DataFrame
product_df = spark.read.parquet(adls_parquet_path)

# Show the contents of the DataFrame
product_df.show(truncate=False)

StatementMeta(, 813ede6b-7a62-4691-94b3-46039463eaf5, 11, Finished, Available, Finished)

+---------+-------------------------+--------------+---------+
|ProductID|ProductName              |Category      |ListPrice|
+---------+-------------------------+--------------+---------+
|771      |Mountain-100 Silver, 38  |Mountain Bikes|3399.99  |
|772      |Mountain-100 Silver, 42  |Mountain Bikes|3399.99  |
|773      |Mountain-100 Silver, 44  |Mountain Bikes|3399.99  |
|774      |Mountain-100 Silver, 48  |Mountain Bikes|3399.99  |
|775      |Mountain-100 Black, 38   |Mountain Bikes|3374.99  |
|776      |Mountain-100 Black, 42   |Mountain Bikes|3374.99  |
|777      |Mountain-100 Black, 44   |Mountain Bikes|3374.99  |
|778      |Mountain-100 Black, 48   |Mountain Bikes|3374.99  |
|779      |Mountain-200 Silver, 38  |Mountain Bikes|2319.99  |
|780      |Mountain-200 Silver, 42  |Mountain Bikes|2319.99  |
|781      |Mountain-200 Silver, 46  |Mountain Bikes|2319.99  |
|782      |Mountain-200 Black, 38   |Mountain Bikes|2294.99  |
|783      |Mountain-200 Black, 42   |Mountain Bikes|229

### 1.3.2 Silver Layer: Cleaning and De-duplicating Data

Now that you've loaded both JSON and Parquet data to Dataframe, let’s clean and flatten data from both the Order and Product tables and prepare them in the Silver Layer.

**Remove duplicates** from the orders dataframe and **cleaning** products dataframe

In [8]:
from pyspark.sql import functions as F

# Remove duplicates based on all columns in the 'orders_df'
orders_df_cleaned = order_df.dropDuplicates()

# Show the cleaned data to verify
orders_df_cleaned.show(truncate=False)

# Filter out rows where ProductID or ProductName is null
filtered_products_df = product_df.filter(
    (F.col("ProductID").isNotNull()) & (F.col("ProductName").isNotNull())
)

# Show the filtered data to verify
filtered_products_df.show(truncate=False)

StatementMeta(, 813ede6b-7a62-4691-94b3-46039463eaf5, 10, Finished, Available, Finished)

+-------+--------+----------+----------------+-----------------------------+--------------------------+---------+---------+--------+
|OrderID|Quantity|OrderDate |CustomerName    |Email                        |Product                   |ProductID|ListPrice|Discount|
+-------+--------+----------+----------------+-----------------------------+--------------------------+---------+---------+--------+
|SO49216|1       |2021-01-04|Rafael Sun      |rafael14@adventure-works.com |Road-650 Red, 44          |1        |782.99   |62.6392 |
|SO49426|1       |2021-01-26|Melinda Carlson |melinda13@adventure-works.com|Road-650 Red, 60          |1        |782.99   |62.6392 |
|SO51038|1       |2021-05-27|Emily Rodriguez |emily21@adventure-works.com  |Mountain-200 Black, 46    |1        |2049.0982|163.9279|
|SO51195|1       |2021-06-01|Melody Jimenez  |melody6@adventure-works.com  |Mountain-400-W Silver, 40 |1        |769.49   |61.5592 |
|SO51240|1       |2021-06-03|Megan Anderson  |megan13@adventure-works

**Storing Cleaned Data into Silver Layer**


In [9]:
# Saving cleaned data to the Silver Layer (as Parquet)
orders_df_cleaned.write.format("delta").mode("overwrite").saveAsTable("orders_silver")

StatementMeta(, 4a4b34b5-30d6-43bc-842a-e3f9805464c6, 11, Finished, Available, Finished)

In [11]:
filtered_products_df.write.format("delta").mode("overwrite").saveAsTable("products_silver")

StatementMeta(, 813ede6b-7a62-4691-94b3-46039463eaf5, 13, Finished, Available, Finished)

## Bonus

In [40]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType

outer_json_schema = StructType([
    StructField("json_data", StringType(), True)
])

inner_json_schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("Product", StringType(), True)
])

adls_path = "abfss://fabricconlab@vengcatadls001.dfs.core.windows.net/jsonstreaming/*.json"

input_df = spark.readStream \
    .schema(outer_json_schema) \
    .json(adls_path)

input_df.printSchema()

parsed_df = input_df.withColumn("parsed_json", from_json(col("json_data"), inner_json_schema)) \
    .select("parsed_json.*")

parsed_df.printSchema()

def process_batch(df, epoch_id):
    df.show(truncate=False)
    df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("orders1")

query = parsed_df.writeStream \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "abfss://fabricconlab@vengcatadls001.dfs.core.windows.net/jsonstreaming/checkpoints/") \
    .start()

query.awaitTermination()

StatementMeta(, 6c3254de-b204-4b7c-8937-a4555090b436, 42, Finished, Cancelled, Cancelled)

root
 |-- json_data: string (nullable = true)

root
 |-- OrderID: string (nullable = true)
 |-- Product: string (nullable = true)



In [41]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType

outer_json_schema = StructType([
    StructField("json_data", StringType(), True)
])

inner_json_schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("Product", StringType(), True)
])

adls_path = "abfss://fabricconlab@vengcatadls001.dfs.core.windows.net/jsonstreaming/*.json"

input_df = spark.readStream \
    .schema(outer_json_schema) \
    .json(adls_path)

input_df.printSchema()

parsed_df = input_df.withColumn("parsed_json", from_json(col("json_data"), inner_json_schema)) \
    .select("parsed_json.*")

parsed_df.printSchema()

def process_batch(df, epoch_id):
    df.show(truncate=False)
    
    # Drop duplicates based on "OrderID" to avoid duplicates
    df = df.dropDuplicates(["OrderID"])
    
    # Write the batch using merge to ensure no duplicates
    delta_table = DeltaTable.forName("orders1")
    
    delta_table.alias("existing") \
        .merge(
            df.alias("new"),
            "existing.OrderID = new.OrderID"
        ) \
        .whenNotMatchedInsertAll() \
        .execute()

query = parsed_df.writeStream \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "abfss://fabricconlab@vengcatadls001.dfs.core.windows.net/jsonstreaming/checkpoints/") \
    .start()

query.awaitTermination()

StatementMeta(, 6c3254de-b204-4b7c-8937-a4555090b436, 43, Finished, Cancelled, Cancelled)

root
 |-- json_data: string (nullable = true)

root
 |-- OrderID: string (nullable = true)
 |-- Product: string (nullable = true)

