In [1]:
#import libraries
import notebookutils
from pyspark.sql.functions import col

StatementMeta(, 489eccfd-6531-4423-812e-63253c0e27e6, 3, Finished, Available, Finished)

# <u>Best Practice #1:</u>
## Use notebookutils for orchestration, file management, and dynamic references!

In [17]:
import notebookutils

#Need help?
notebookutils.help()

#Note the variables Library! I didn't have time to build a demo for this, but it's cool!

StatementMeta(, dab5a874-c969-429d-84f3-7ee558fe65c8, 21, Finished, Available, Finished)

In [6]:
#Call a notebook in the same workspace
output = notebookutils.notebook.run("Sparks_LargeDFTest", 5000, {"destination_file": "This is a passed-in value"})
display(output)

StatementMeta(, dab5a874-c969-429d-84f3-7ee558fe65c8, 10, Finished, Available, Finished)

'The passed-in parameter was This is a passed-in value'

In [None]:
# Returns a lakehouse object
lakehouse = notebookutils.lakehouse.get("LHS_DataBard_Demo")
print(lakehouse)

In [11]:
# I can now connect to the tables in the lakhouse using an absolute path derived from the get call above
df = spark.read.format("delta").load(f"{lakehouse.properties['abfsPath']}/Tables/dbo/dimension_customer")
display(df)

StatementMeta(, 489eccfd-6531-4423-812e-63253c0e27e6, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, de0ebe6c-9073-4371-b409-60bd6890396e)

# ETL 1: Row/column filtering!

# <u>Best Practice #1.5:</u> 

## If using a single lakehouse, consider using default lakehouses for relative paths.

In [4]:
# Notice this call is to a relative path to the file, not an absolute path
# This is enabled by the default lakehouse defined in the explorer on the left
# If this doesn't work, check if your lakehouse is attached!
df = spark.read.option("header","true").csv('Files/Input/updated_music_industry_data.csv')
#display(df)

#What if I want specific columns or records?
#Once in a dataframe, you can select specific columns from there
subset_columns_df = df.select("id", "FirstName", "LastName", "Genre", "Age")
#display(subset_columns_df)

#Filter on a predicate
subset_rows_df = subset_columns_df.filter((subset_columns_df["Age"] >= 20) & (subset_columns_df["Age"] <= 40))
display(subset_rows_df)

#subset_rows_df.explain(extended=True)

StatementMeta(, 489eccfd-6531-4423-812e-63253c0e27e6, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, aee80c6a-2109-4c9d-a1ba-d308770e7e8e)

In [3]:
#Does performance improve if I perform all of the operations in a single call?
#Not significantly
#Here's the same code snippet as the previous cell, but as a single statement
single_call_df = spark.read.option("header", "true").csv('Files/Input/updated_music_industry_data.csv') \
    .select("id", "FirstName", "LastName", "Genre", "Age") \
    .filter((col("Age") >= 20) & (col("Age") <= 40))
display(single_call_df)

#single_call_df.explain(extended=True)

StatementMeta(, 489eccfd-6531-4423-812e-63253c0e27e6, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1614f78c-aa4a-4e0c-a110-c0b0500b9e7c)

# Both examples above filter the columns from the file directly. How? Lazy Evaluation
- Spark execution plans use Lazy Evaluation. Essentially this means Spark doesn't perform every operation exactly when you notate it.
- What triggers the execution? Writing to files, printing to console, and aggregations are the most common examples.
## Execution example: Column Pruning
- In the above column filtering case, Spark handles this behind the scenes using <u>'column pruning'</u>. However, how effective pruning is depends on your source file.
- Certain files types, like <u>Parquet, Delta, Avro, and ORC</u>, have optimizations for column pruning.
- For other common files types like CSV, pruning can occur, but not as efficiently. 
- Depending on situation, the entire file may have to be read into memory before selecting specific columns.


# <u>Best Practice #2:</u> 
## When working in Fabric, <u>get your data into Delta format as far upstream as possible.</u> This will enable more dynamic options for efficiently processing data.


# Load (Be careful)

In [5]:
#We briefly pause our 'extract' here by writing the data to delta.
#Then we can try to do more performant things down the road.

df.write.format("delta").mode("overwrite").save("Tables/Bronze/updated_music_industry_data")

#Let's save it to csv again just to show different syntax
df.write.format("csv").mode("overwrite").save("Files/Processed/updated_music_industry_data.csv")

#...on second thought, you may want to check that CSV write. 
#You may not have gotten the result you're looking for.
#Spark is distributed. Just because you read a single file doesn't mean you will write a single file.


StatementMeta(, 489eccfd-6531-4423-812e-63253c0e27e6, 7, Finished, Available, Finished)

In [6]:
#What if we want to see how the data is partitioned.
#delta_df = spark.read.format("delta").load("abfss://84e6d815-34b7-49bb-a433-ebec208e5cdb@onelake.dfs.fabric.microsoft.com/b51dab19-219b-4c3a-b8ac-014d9ddedf2e/Tables/Bronze/updated_music_industry_data")

partitions = df.rdd.getNumPartitions()
display(f"Number of partitions: {partitions}")

#How many files were written?
files = notebookutils.fs.ls("Files/Processed/updated_music_industry_data.csv")

display(len(files))

StatementMeta(, 489eccfd-6531-4423-812e-63253c0e27e6, 8, Finished, Available, Finished)

'Number of partitions: 73'

74

# How do we see Spark's execution plan. Allow me to Explain

In [8]:
#So, like SQL, there's an execution plan being created to optimize the efforts requested by your code.

subset_columns_df.explain(mode="extended") #explain provides the actual execution plan
subset_rows_df.explain(mode="extended")
#With time, you can learn to read them like you can execution plans of other engines.

StatementMeta(, 489eccfd-6531-4423-812e-63253c0e27e6, 10, Finished, Available, Finished)

== Parsed Logical Plan ==
'Project ['id, 'FirstName, 'LastName, 'Genre, 'Age]
+- Relation [id#891,Name#892,FirstName#893,LastName#894,Age#895,Genre#896,Address#897,Email#898,PhoneNumber#899,AgeBucket#900] csv

== Analyzed Logical Plan ==
id: string, FirstName: string, LastName: string, Genre: string, Age: string
Project [id#891, FirstName#893, LastName#894, Genre#896, Age#895]
+- Relation [id#891,Name#892,FirstName#893,LastName#894,Age#895,Genre#896,Address#897,Email#898,PhoneNumber#899,AgeBucket#900] csv

== Optimized Logical Plan ==
Project [id#891, FirstName#893, LastName#894, Genre#896, Age#895]
+- Relation [id#891,Name#892,FirstName#893,LastName#894,Age#895,Genre#896,Address#897,Email#898,PhoneNumber#899,AgeBucket#900] csv

== Physical Plan ==
*(1) Project [id#891, FirstName#893, LastName#894, Genre#896, Age#895]
+- FileScan csv [id#891,FirstName#893,LastName#894,Age#895,Genre#896] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[abfss://84e6d815-

# A basic read across a bunch of cells isn't the best example.

## Let's complicate things more with "display" actions thrown into the mix and see the impact of execution plans.

In [11]:
#This example has display and rowcount functions that should interrupt the execution plan
interrupted_delta_df = spark.read.format("delta").load("Tables/Bronze/updated_music_industry_data")
selected_interrupted_delta_df = interrupted_delta_df.select("id", "FirstName", "LastName", "Genre", "Age")
display(selected_interrupted_delta_df)
#selected_interrupted_delta_df.show()
rowcount = selected_uninterrupted_delta_df.count()
selected_interrupted_delta_df = selected_interrupted_delta_df.filter((selected_interrupted_delta_df["Age"] >= 20) & (selected_interrupted_delta_df["Age"] <= 40))

selected_interrupted_delta_df.explain(extended=True)

# Trigger another action to confirm lazy evaluation
print(f"Row count: {rowcount}")

StatementMeta(, 5d55e665-2c48-4ea0-9a20-f866e5098057, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8526bb4a-b922-4a19-8259-cb7c7a0ceed2)

== Parsed Logical Plan ==
'Filter ((Age#2024 >= 20) AND (Age#2024 <= 40))
+- Project [id#2020, FirstName#2022, LastName#2023, Genre#2025, Age#2024]
   +- Relation [id#2020,Name#2021,FirstName#2022,LastName#2023,Age#2024,Genre#2025,Address#2026,Email#2027,PhoneNumber#2028,AgeBucket#2029] parquet

== Analyzed Logical Plan ==
id: string, FirstName: string, LastName: string, Genre: string, Age: string
Filter ((cast(Age#2024 as int) >= 20) AND (cast(Age#2024 as int) <= 40))
+- Project [id#2020, FirstName#2022, LastName#2023, Genre#2025, Age#2024]
   +- Relation [id#2020,Name#2021,FirstName#2022,LastName#2023,Age#2024,Genre#2025,Address#2026,Email#2027,PhoneNumber#2028,AgeBucket#2029] parquet

== Optimized Logical Plan ==
Project [id#2020, FirstName#2022, LastName#2023, Genre#2025, Age#2024]
+- Filter (isnotnull(Age#2024) AND ((cast(Age#2024 as int) >= 20) AND (cast(Age#2024 as int) <= 40)))
   +- Relation [id#2020,Name#2021,FirstName#2022,LastName#2023,Age#2024,Genre#2025,Address#2026,Email

In [8]:
#What does this look like when explained via a delta table
#In most tests I've run, this cell should perform faster than the above statement.
uninterrupted_delta_df = spark.read.format("delta").load("Tables/Bronze/updated_music_industry_data")
selected_uninterrupted_delta_df = uninterrupted_delta_df.select("id", "FirstName", "LastName", "Genre", "Age")
selected_uninterrupted_delta_df = selected_uninterrupted_delta_df.filter((selected_uninterrupted_delta_df["Age"] >= 20) & (selected_uninterrupted_delta_df["Age"] <= 40))

display(selected_uninterrupted_delta_df)
#selected_uninterrupted_delta_df.show()
rowcount = selected_uninterrupted_delta_df.count()
selected_uninterrupted_delta_df.explain(extended=True)

# Trigger another action to confirm lazy evaluation
print(f"Row count: {rowcount}")

StatementMeta(, 5d55e665-2c48-4ea0-9a20-f866e5098057, 10, Finished, Available, Finished)

+---+---------+--------+-------+---+
| id|FirstName|LastName|  Genre|Age|
+---+---------+--------+-------+---+
|  0| Weird Al|Yankovic|Hip Hop| 34|
|  1| Weird Al|Yankovic|Hip Hop| 34|
|  2| Weird Al|Yankovic|Hip Hop| 34|
|  3| Weird Al|Yankovic|Hip Hop| 34|
|  4| Weird Al|Yankovic|Hip Hop| 34|
|  5| Weird Al|Yankovic|Hip Hop| 34|
|  6| Weird Al|Yankovic|Hip Hop| 34|
|  7| Weird Al|Yankovic|Hip Hop| 34|
|  8| Weird Al|Yankovic|Hip Hop| 34|
|  9| Weird Al|Yankovic|Hip Hop| 34|
| 10| Weird Al|Yankovic|Hip Hop| 34|
| 11| Weird Al|Yankovic|Hip Hop| 34|
| 12| Weird Al|Yankovic|Hip Hop| 34|
| 13| Weird Al|Yankovic|Hip Hop| 34|
| 14| Weird Al|Yankovic|Hip Hop| 34|
| 15| Weird Al|Yankovic|Hip Hop| 34|
| 16| Weird Al|Yankovic|Hip Hop| 34|
| 17| Weird Al|Yankovic|Hip Hop| 34|
| 18| Weird Al|Yankovic|Hip Hop| 34|
| 19| Weird Al|Yankovic|Hip Hop| 34|
+---+---------+--------+-------+---+
only showing top 20 rows

== Parsed Logical Plan ==
'Filter ((Age#1850 >= 20) AND (Age#1850 <= 40))
+- Project [

### Logically, both of the above cells return the same information. Yet one takes longer than the other.
### It's because of where 'display' is. This changes the execution plan.

# <u>Best Practice #3:</u> 
## For Production workloads, only perform write/display operations when needed (i.e. Display when you want to display.) 


# ETL from multiple tables
## Separate steps make more sense, and knowing Lazy Evaluation allows us to avoid impacting performance

In [3]:
#What about getting data from multiple tables?
#Join tables
df_customer = spark.read.format("delta").load("Tables/dbo/dimension_customer")
df_sales = spark.read.format("delta").load("Tables/dbo/fact_sale")
df_city = spark.read.format("delta").load("Tables/dbo/dimension_city")

#Join time. Join Sales and Customer
join_df = df_sales.join(df_customer, "CustomerKey", "inner")
#display(join_df) #only uncomment this for debugging, as it impacts performance

#Need to join more tables? Rinse and repeat
join_df = join_df.join(df_city, "CityKey", "inner")
display(join_df)

#Need more complex criteria? You can adjust the second argument to have more predicates
complex_df = df_sales.join(df_customer, (df_sales['CustomerKey'] == df_customer['CustomerKey']) & (df_sales['TotalIncludingTax'] >= 50.0), "inner")
#Do we have any sales transactions over $50?
display(complex_df)
#complex_df.explain(extended=True)

StatementMeta(, fcd57f82-09dd-4c0d-84aa-640f91020808, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 96953f19-fcba-4d13-8d16-a32760cab05e)

SynapseWidget(Synapse.DataFrame, 25e7cbb7-4412-4ec4-a086-935803b906f4)

### Can we use the same approach to perform a key lookup? Yes we can!

In [4]:
#Let's modify the above to perform a key lookup (it's already looked up, but humor me)
df_sales = spark.read.format("delta").load("Tables/dbo/fact_sale")
df_city = spark.read.format("delta").load("Tables/dbo/dimension_city")

#For performance, let's restrict city to the key we're using for the lookup,
#and a field of interest, such as a name
df_city = df_city.select("CityKey", "City")

#Now join on the key, and the new dataframe will have the new value
df_sales_withcity = df_sales.join(df_city, "CityKey", "inner")
display(df_sales_withcity)


StatementMeta(, ba6502b8-471b-4754-a1a6-6c8ba51e0e28, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a9f9ced9-5e33-47f7-ac1f-8ba0278733e7)

# Now you can build a reusable lookup function!

In [5]:
#Want to make your lookups reusable? Build a function!
def dimension_city_lookup(df):
    df_city = spark.read.format("delta").load("Tables/dbo/dimension_city")

    #For performance, let's restrict city to the key we're using for the lookup,
    #and a field of interest, such as a name
    df_city = df_city.select("CityKey", "City")

    #Now join on the key, and the new dataframe will have the new value
    #This assumes your passed in dataframe has a CityKey. If not, you could adapt this further.
    df_withcity = df.join(df_city, "CityKey", "inner")

    return df_withcity

#Test our function
df_sales = spark.read.format("delta").load("Tables/dbo/fact_sale")
display(df_sales)

df_sales = dimension_city_lookup(df_sales)
display(df_sales)

StatementMeta(, ba6502b8-471b-4754-a1a6-6c8ba51e0e28, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e5f57641-7da4-4b1f-9dd9-2d108b6c4686)

SynapseWidget(Synapse.DataFrame, 650b2997-68e1-4d5d-9223-0c19bedf42be)

# Debug Section

In [1]:
destination_location = 'Files/Input/music_industry_data.csv'
destination_location_updated = 'Files/Input/updated_music_industry_data.csv'
destination_location_delta = 'Tables/Bronze/updated_music_industry_full'

StatementMeta(, 1d7e1b1d-66d5-46ab-8309-63fefc73bfe1, 5, Finished, Available, Finished)

In [4]:
##Spark Version of creating a dataframe for ETL demo
##Create demo data
##How did I create the Weird Al Data?
from pyspark.sql.functions import lit, when, col, expr, rand, floor
import random

# Demo data. Let's create a bunch of possible values for every column
names = ["John Doe", "Jane Smith", "Alice Johnson", "Bob Brown", "Charlie Davis", "Diana Evans", "Eve Foster", "Frank Green", "Grace Harris"]
first_names = ["John", "Jane", "Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Smith", "Bill", "Jon", "Vivian", "Stacy", "Heidi", "Karen", "Otto", "Belinda"]
last_names = ["Doe", "Smith", "Johnson", "Brown", "Davis", "Evans", "Foster", "Green", "Harris", "Johann", "Pingel", "Kelbert", "Hiddleston", "Windsor", "Workmann", "Drews"]
genres = ["Rock", "Pop", "Jazz", "Classical", "Hip Hop", "Country", "Electronic", "Reggae", "Blues", "Metal"]
addresses = ["123 Main St", "456 Elm St", "789 Oak St", "101 Maple Ave", "202 Pine Rd", "303 Cedar Blvd", "404 Birch Ln", "505 Spruce Dr", "606 Willow Ct", "707 Aspen Pl"]
emails = ["example1@example.com", "example2@example.com", "example3@example.com", "example4@example.com", "example5@example.com", "example6@example.com", "example7@example.com", "example8@example.com", "example9@example.com", "example10@example.com"]
phone_numbers = ["555-1234", "555-5678", "555-8765", "555-4321", "555-6789", "555-9876", "555-3456", "555-6543", "555-7890", "555-0987"]

# Add in some environment info for where we're going to save our results
lakehouse_address = 'abfss://84e6d815-34b7-49bb-a433-ebec208e5cdb@onelake.dfs.fabric.microsoft.com/b389d6fd-e091-479b-ac43-6640a58407bd'
file_name = 'music_industry_data.csv'
updated_file_name = 'updated_music_industry_data.csv'

#Create some derived variables based on the environment info
file_address = f"{lakehouse_address}/Files/Input/{file_name}"
updated_file_address = f"{lakehouse_address}/Files/Input/{updated_file_name}"

# Create a function to generate random age as a string (whole number or decimal)
def generate_random_age():
    return random.randint(18, 70) if random.choice([True, False]) else random.uniform(18, 70)

# # Generate 100s of records in a Spark dataframe
# data = spark.range(0, 100000000).withColumn("Name", when((col("id") < 20) | (col("id") % 5 == 0), lit("Weird Al Yankovic"))
#                                             .otherwise(lit(random.choice(names)))) \
#                                 .withColumn("FirstName", when((col("id") < 20) | (col("id") % 5 == 0), lit("Weird Al"))
#                                             .otherwise(lit(random.choice(first_names)))) \
#                                 .withColumn("LastName", when((col("id") < 20) | (col("id") % 5 == 0), lit("Yankovic"))
#                                             .otherwise(lit(random.choice(last_names)))) \
#                                 .withColumn("Age", lit(generate_random_age())) \
#                                 .withColumn("Address", lit(random.choice(addresses))) \
#                                 .withColumn("Email", lit(random.choice(emails))) \
#                                 .withColumn("PhoneNumber", lit(random.choice(phone_numbers))) \
#                                 .withColumn("Genre", lit(random.choice(genres))) \
#                                 .withColumn("AgeBucket", lit("Unknown"))


data = spark.range(0, 100000000).withColumn(
    "Name",
    when((col("id") < 20) | (col("id") % 5 == 0), lit("Weird Al Yankovic"))
    .otherwise(expr("element_at(array(" + ", ".join([f"'{name}'" for name in names]) + "), cast(rand() * " + str(len(names)) + " + 1 as int))"))
).withColumn(
    "FirstName",
    when((col("id") < 20) | (col("id") % 5 == 0), lit("Weird Al"))
    .otherwise(expr("element_at(array(" + ", ".join([f"'{first_name}'" for first_name in first_names]) + "), cast(rand() * " + str(len(first_names)) + " + 1 as int))"))
).withColumn(
    "LastName",
    when((col("id") < 20) | (col("id") % 5 == 0), lit("Yankovic"))
    .otherwise(expr("element_at(array(" + ", ".join([f"'{last_name}'" for last_name in last_names]) + "), cast(rand() * " + str(len(last_names)) + " + 1 as int))"))
).withColumn(
    "Age",
    (floor(rand() * 53) + 18).cast("int")  # Random age between 18 and 70
).withColumn(
    "Genre",
    expr("element_at(array(" + ", ".join([f"'{genre}'" for genre in genres]) + "), cast(rand() * " + str(len(genres)) + " + 1 as int))")
).withColumn(
    "Address",
    expr("element_at(array(" + ", ".join([f"'{address}'" for address in addresses]) + "), cast(rand() * " + str(len(addresses)) + " + 1 as int))")
).withColumn(
    "Email",
    expr("element_at(array(" + ", ".join([f"'{email}'" for email in emails]) + "), cast(rand() * " + str(len(emails)) + " + 1 as int))")
).withColumn(
    "PhoneNumber",
    expr("element_at(array(" + ", ".join([f"'{phone}'" for phone in phone_numbers]) + "), cast(rand() * " + str(len(phone_numbers)) + " + 1 as int))")
).withColumn(
    "AgeBucket",
    lit("Unknown")
)
#If we don't want the ID anymore, so we'll remove that here
#data = data.drop("id")


# Display the DataFrame
#print(data)

# Save DataFrame to a CSV file
#data.write.mode("overwrite").format("csv").option("header", "true").save(destination_location)

# Now, let's mess with the data
# Obviously this isn't something you'd do in a production setting, this is purely for dramatic effect.
# Find the first record with the name 'Weird Al Yankovic'
first_weird_al_record = data.filter(col("Name") == "Weird Al Yankovic").limit(1).collect()[0]

data_updated = data.withColumn("Age", when(col("Name") == "Weird Al Yankovic", lit(first_weird_al_record["Age"])).otherwise(col("Age"))) \
                   .withColumn("Address", when(col("Name") == "Weird Al Yankovic", lit(first_weird_al_record["Address"])).otherwise(col("Address"))) \
                   .withColumn("Email", when(col("Name") == "Weird Al Yankovic", lit(first_weird_al_record["Email"])).otherwise(col("Email"))) \
                   .withColumn("PhoneNumber", when(col("Name") == "Weird Al Yankovic", lit(first_weird_al_record["PhoneNumber"])).otherwise(col("PhoneNumber"))) \
                   .withColumn("Genre", when(col("Name") == "Weird Al Yankovic", lit(first_weird_al_record["Genre"])).otherwise(col("Genre"))) \
                   .withColumn("AgeBucket", when(col("Name") == "Weird Al Yankovic", lit(first_weird_al_record["AgeBucket"])).otherwise(col("AgeBucket")))


# # Display the updated DataFrame
# print(data_updated)

# Save the updated DataFrame to a new CSV file
#df.to_csv(updated_file_address, index=False)
data_updated.write.format("csv").mode("overwrite").option("header", "true").save(destination_location_updated)
data_updated.write.format("delta").mode("overwrite").option("header", "true").save(destination_location_delta)

StatementMeta(, 1419a2d0-d122-47aa-9c24-79b059eaabe2, 8, Finished, Available, Finished)