# Data Engineering with Spark
This notebook serves as Part 3 of Lab 4: Data Engineering in Fabric Notebooks. The goal is to demonstrate the foundational steps of data engineering using PySpark, leading to the creation of a Delta tables in the target Lakehouse. We will explore different methods of reading data into a DataFrame and how to transform this data effectively.

## Querying Data using Spark SQL and PySpark

This section demonstrates how to query data from tables using Spark SQL and PySpark. By utilizing Spark SQL, users can easily run SQL queries on their data directly in Spark. This method is highly beneficial for those familiar with SQL syntax and allows seamless interaction with data stored in DataFrames or tables within the Spark ecosystem.

### SQL Query in Spark-SQL

You can directly execute SQL queries using a magic command (%%sql) or by setting the cell to SQL. This approach does not require prior DataFrame registration. Without assigning the results to a dataframe or creating a temp view this approach is primarily for data exploration and profiling exercises.

In [1]:
%%sql
SELECT 
    PackageTypeId,
    PackageTypeName,
    LastEditedBy,
    ValidFrom,
    ValidTo
FROM
    bronze_lakehouse_wtc.package_types -- change lakehouse reference

StatementMeta(, 6cd2b1c1-c203-48ce-b29e-337f48e34d86, 2, Finished, Available, Finished)

<Spark SQL result set with 14 rows and 5 fields>

### SQL Query in Spark-SQL

Creating a temp view commits the results of the Spark-SQL query to memory making them available for use elsewhere in the notebook. Temp views aren't persisted and are dropped when the session terminates.

In [2]:
%%sql
-- Create temp view
CREATE OR REPLACE TEMP VIEW tmp_vw_package_types AS
SELECT 
    PackageTypeId,
    PackageTypeName,
    LastEditedBy,
    ValidFrom,
    ValidTo
FROM
    bronze_lakehouse_wtc.package_types; -- change lakehouse reference

-- Query view
SELECT *
FROM tmp_vw_package_types

StatementMeta(, 6cd2b1c1-c203-48ce-b29e-337f48e34d86, 4, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 14 rows and 5 fields>

### Using Variables

Using variables as part of your development helps to create dynamic data processing patterns. Set the lakehouse variables below before progressing.

In [3]:
bronze_lakehouse = 'bronze_lakehouse_wtc'
silver_lakehouse = 'silver_lakehouse_wtc'

StatementMeta(, 6cd2b1c1-c203-48ce-b29e-337f48e34d86, 6, Finished, Available, Finished)

## Reading Data into DataFrames

This section illustrates different methods to read data into a Spark DataFrame. Each method, while yielding the same result, offers different approaches that can be utilized based on the specific requirements of your data processing task.

### SQL Query Execution in PySpark

Here, we use Spark SQL to load data into a DataFrame. This method is particularly useful if you are comfortable with SQL syntax. It allows you to leverage the power of SQL queries within the Spark environment.

In [4]:
# Use spark.sql to execute SQL queries
df_package_types = spark.sql(f"""
    SELECT 
        PackageTypeId       package_type_id,
        PackageTypeName     package_type_name,
        LastEditedBy        last_edited_by,
        ValidFrom           valid_from,
        ValidTo             valid_to
    FROM
        {bronze_lakehouse}.package_types
""")

# Display the result
display(df_package_types)

StatementMeta(, 6cd2b1c1-c203-48ce-b29e-337f48e34d86, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6902e4a8-efdf-44ee-9e0f-6bf94d5d4aab)

### Using PySpark DataFrame API
Alternatively, we can use the PySpark DataFrame API to achieve the same result. This approach is more native to Spark and utilizes the DataFrame API's methods for data manipulation.

In [5]:
# Load the data into a DataFrame using PySpark DataFrame API
df = spark.table(f"{bronze_lakehouse}.package_types")

# Show the DataFrame
display(df)

StatementMeta(, 6cd2b1c1-c203-48ce-b29e-337f48e34d86, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f7f22ba3-5ae1-48f2-b923-2708267a7ce9)

## Writing the Package Types DataFrame to Silver Table
Moving forward, we will demonstrate how to build a specific data structure - in this case, a silver table for package types. This involves cleansing and preparing the data to meet the requirements of a dimensional model, commonly used in data warehousing and analytics.

In [6]:
# Set target table path
target_table = f'{silver_lakehouse}.package_types'

df_package_types.write.format('delta').mode('overwrite').option("overwriteSchema", True).saveAsTable(target_table)

StatementMeta(, 6cd2b1c1-c203-48ce-b29e-337f48e34d86, 9, Finished, Available, Finished)

## Assigning Explicit Schema and Aliasing Fields using DataFrame API

In this section, we define an explicit schema for a Delta table, use the PySpark DataFrame API to load the data, and apply aliasing the columns.

In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema for the locations delta table
locations_schema = StructType([
    StructField("StateProvinceID", IntegerType(), True),
    StructField("StateProvinceCode", StringType(), True),
    StructField("StateProvinceName", StringType(), True),
    StructField("SalesTerritory", StringType(), True),
    StructField("CityID", IntegerType(), True),
    StructField("CityName", StringType(), True)
])

# Read the Delta table into a DataFrame using the explicit schema
df_locations = spark.table(f'{bronze_lakehouse}.locations') #change lakehouse reference

# Alias the columns with lower-snake_case
df_aliased = df_locations.select(
    df_locations["StateProvinceID"].alias("state_province_id"),
    df_locations["StateProvinceCode"].alias("state_province_code"),
    df_locations["StateProvinceName"].alias("state_province_name"),
    df_locations["SalesTerritory"].alias("sales_territory"),
    df_locations["CityID"].alias("city_id"),
    df_locations["CityName"].alias("city_name")
)

# Show the aliased DataFrame
# display(df_aliased)

# Write silver table
df_aliased.write.format('delta').mode('overwrite').saveAsTable(f'{silver_lakehouse}.locations') #change lakehouse reference

StatementMeta(, 6cd2b1c1-c203-48ce-b29e-337f48e34d86, 10, Finished, Available, Finished)

### This concludes the notebook portion of this lab.