## Spark Overview



<img src="https://spark.apache.org/docs/3.5.1/img/cluster-overview.png" width="1200" />



<img src="https://github.com/dbrownems/SparkDataEngineeringForSQLServerProfessionals/blob/main/cluster_overview2.png?raw=true" width="1200" />

In [None]:
%%pyspark
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
# Load image
image = mpimg.imread(f"{mssparkutils.nbResPath}/builtin/cluster_overview2.png")
# Let the axes disappear
plt.axis('off')
# Plot image in the output
image_plot = plt.imshow(image)


# Introduction to Notebooks



Notebooks are the primary development tool for Spark.

 - Interactive development and data analysis tool
 - But they can also be saved and run as part of a job

 - And in addition to code, they support markdown, so you can embed rich documentation in your jobs

<details>
Additional resources:

Develop, execute, and manage Microsoft Fabric notebooks
https://learn.microsoft.com/en-us/fabric/data-engineering/author-execute-notebook

Python for beginners
https://learn.microsoft.com/en-us/training/paths/beginner-python/

Spark docs
https://spark.apache.org/docs/latest/

Delta docs
https://docs.delta.io/latest/index.html



</details>

# Python code in Notebooks

In [None]:
%%pyspark

# top level variables in notebooks have session scope
msg = "hello from python"

def print_message():
    msg2 = msg
    print(msg)

a = 2


In [None]:
%%pyspark

#print the session variable
print(msg)

#run the function
print_message()

#change the value
msg = "hello again"

#print the changed value
print_message()

#what is the print_message object?
print(print_message)

#asssign a variable to the function
f = print_message

#run that
f()

print(f)

#msg2 isn't defined; it's a local variable in the print_message method
print(msg2)

#notice that all the other commands ran: python is an "interpreted" language

# Working with Data

## Dataframe basics

In [None]:
%%pyspark
df=spark.read.format("Delta").load("Tables/Sales_Customers")
df = df.where("CustomerName like 'A%'")
display(df)

In [None]:
%%pyspark

# the dataframe object has an API to transform the dataframe
# and you can easilly do stuff like rename all the columns

def fix_col_name(name):
    name = name.lower()\
               .replace("cust_","customer_")\
               .replace("addr_","address")
               
    return "".join(x.capitalize() for x in name.lower().split("_"))

df = spark.sql("select 1 ID, 'Ann' CUST_NAME, '123 Garden Way' CUST_ADDRESS")

display(df)
for col in df.columns:
    df = df.withColumnRenamed(col, fix_col_name(col))

display(df)

In [None]:
show tables

In [None]:
%%pyspark
df = spark.sql("SELECT * FROM WideWorldImporters_bronze.Sales_Customers LIMIT 1000")
display(df)

## Loading a Dimension

### Generating Dimension Keys

https://spark.apache.org/docs/latest/api/sql/index.html

In [None]:
--hash of business key and source system
select xxhash64(CustomerId,"CRM") ID, *
from WideWorldImporters_bronze.Sales_Customers limit 10;

--or use a GUID
select uuid() ID, * 
from WideWorldImporters_bronze.Sales_Customers limit 10;

In [None]:
-- or use https://spark.apache.org/docs/latest/api/sql/index.html#monotonically_increasing_id
-- But the sequence has big gaps when processing across multiple worker nodes
select monotonically_increasing_id() ID, *
from WideWorldImporters_gold.Dimension_Customer_by_postalcode

In [None]:
--or use SQL analytic functions to assign monotonically increasing keys
select  coalesce(c.CustomerKey,max(c.CustomerKey) over() 
                             + row_number() over (partition by c.CustomerKey order by s.CustomerID)) CustomerKey, 
       c.CustomerKey ExistingDimKey, 
       s.CustomerID
from WideWorldImporters_bronze.Sales_Customers s 
left join WideWorldImporters_gold.Dimension_Customer c 
  on s.CustomerID = c.WWICustomerID
order by CustomerKey;

### Temporary Views and Temporary Tables

In [None]:
--temporary views are very cool
--like Common Table Expressions or subqueries, but much more powerfull
--They have session lifetime, rather than statement lifetime
create or replace temp view CustomerKeys as
select  coalesce(c.CustomerKey,max(c.CustomerKey) over() 
                             + row_number() over (partition by c.CustomerKey order by s.CustomerID)) CustomerKey, 
        s.CustomerID
from WideWorldImporters_bronze.Sales_Customers s 
left join WideWorldImporters_gold.Dimension_Customer c 
  on s.CustomerID = c.WWICustomerID

In [None]:
explain select * from CustomerKeys

In [None]:
--but temp views can be cached, and they become, essentially temp tables
--data is cached on the executor VMs, so this is useful for Delta tables too
cache table CustomerKeys

In [None]:
explain  select * from CustomerKeys

### Merging the dimension


In [None]:
select * from WideWorldImporters_gold.Dimension_Customer limit 10

In [None]:
-- describe WideWorldImporters_gold.Dimension_Customer;
-- describe WideWorldImporters_bronze.Sales_Customers;
 create or replace temp view CustomerMergeSource 
 as
 select k.CustomerKey CustomerKey,
        c.CustomerID WWICustomerID,
        c.CustomerName Customer,
        bc.CustomerName BillToCustomer,
        cat.CustomerCategoryName Category,
        bg.BuyingGroupName BuyingGroup,
        p.FullName PrimaryContact,
        c.PostalPostalCode PostalCode,
        cast(0 as int) LineageKey,
        c.ValidFrom,
        c.ValidTo
    from WideWorldImporters_bronze.Sales_Customers c
    left join CustomerKeys k
       on k.CustomerID = c.CustomerID
    left join WideWorldImporters_bronze.Sales_Customers bc 
       on c.BillToCustomerID = bc.CustomerID
    left join WideWorldImporters_bronze.Sales_CustomerCategories cat 
       on cat.CustomerCategoryID = c.CustomerCategoryID
    left join WideWorldImporters_bronze.Sales_BuyingGroups bg 
       on c.BuyingGroupId = bg.BuyingGroupID
    left join WideWorldImporters_bronze.Application_People p 
       on p.PersonID = c.PrimaryContactPersonID
        



### Validate the data

In [None]:
%%pyspark 

ids = spark.sql("select WWICustomerID from CustomerMergeSource where CustomerKey is null").collect()

if len(ids) > 0:
    raise( ValueError(f"Invalid CustomerKey values for {len(ids)} keys example: {ids[0]}"))

ids = spark.sql("select CustomerKey from CustomerMergeSource group by CustomerKey having count(*)>1").collect()

if len(ids) > 0:
    raise( ValueError(f"Duplicate CustomerKey values for {len(ids)} keys example: {ids[0]}"))

### Upsert the Dimension

In [None]:
merge into WideWorldImporters_gold.Dimension_Customer dest
using CustomerMergeSource src
on src.WWICustomerID = dest.WWICustomerID
when matched then update set *
when not matched then insert *


## Bring in Unstructured Data

In [None]:
%%sql

alter table WideWorldImporters_gold.Dimension_Customer add columns( Latitude float, Longitude float )

```
[
    {
        "CustomerID": 1,
        "Location": "POINT (-102.6201979 41.4972022)"
    },
    {
        "CustomerID": 2,
        "Location": "POINT (-115.8743507 48.7163356)"
    },
    {
        "CustomerID": 3,
        "Location": "POINT (-112.7271223 34.2689145)"
    },
    {
        "CustomerID": 4,
        "Location": "POINT (-98.580361 37.2811339)"
    },
```

In [None]:
%%pyspark 

df = spark.read.text("Files/CustomerLocations.json").take(20)
display(df)

In [None]:
%%pyspark

# to read data files without a built-in schema, supply the schema explicitly
# You can infer the schema, and then save and modify it if you like

from pyspark.sql.types import *

schema = StructType([
    StructField("CustomerID",IntegerType(),True),
    StructField("Location",StringType(),True)
])

dfCustLocations = spark.read\
                       .schema(schema)\
                       .option("multiLine", True)\
                       .json("Files/CustomerLocations.json")

display(dfCustLocations)

In [None]:
select * 
from dfCustLocations

In [None]:
%%pyspark

dfCustLocations.createOrReplaceTempView("CustLocations")

In [None]:
--   'POINT (-123.8860114 47.4631419)''
  
  select CustomerId, split(Location,' ') locSplit
  from CustLocations

In [None]:


-- select CustomerId, split(Location,' ') locSplit
-- from CustLocations;
-- "["POINT","(-120.1290272","36.0041223)"]"
create or replace temp view CustLocations2 as
with q AS
(
  select CustomerId, split(Location,' ') locSplit
  from CustLocations
)
select CustomerId, try_cast(replace(locSplit[1],'(','') as double) Long, try_cast(replace(locSplit[2],')','') as double) Lat
from q;


In [None]:
%%pyspark
%pip install shapely

In [None]:
%%pyspark
from shapely import wkt

shape = wkt.loads('POINT (-123.8860114 47.4631419)')
shape.centroid.x

In [None]:
%%pyspark
from shapely import wkt
from pyspark.sql.functions import *
from pyspark.sql.types import *

def lat(s):
    shape = wkt.loads(s)
    return float(shape.centroid.y)


def lon(s):
    shape = wkt.loads(s)
    return float(shape.centroid.x)


spark.udf.register("lat", lat, FloatType())
spark.udf.register("lon", lon, FloatType())

lon('POINT (-123.8860114 47.4631419)')

In [None]:
select lat('POINT (-123.8860114 47.4631419)') lat

In [None]:
create or replace temp view CustLocations2 as

select CustomerId, lon(location) Long, lat(location) Lat
from CustLocations;

In [None]:
with q as
(
    select c.*, l.Long NewLongitude, l.Lat NewLatitude
    from WideWorldImporters_gold.Dimension_Customer c
    left join CustLocations2 l 
    on c.WWICustomerID = l.CustomerID
)
update q set Latitude = NewLatitude, Longitude = NewLongitude

In [None]:
update WideWorldImporters_gold.Dimension_Customer c
set Latitude = (select Latitude from CustLocations2 l where l.CustomerID = c.WWICustomerID ),
    Longitude = (select Longitude from CustLocations2 l where l.CustomerID = c.WWICustomerID )

   

In [None]:
merge into  WideWorldImporters_gold.Dimension_Customer dest 
using CustLocations2 src
on src.CustomerID = dest.WWICustomerID
when matched then update set Latitude = src.Lat, Longitude = src.Long

In [None]:
select * from  WideWorldImporters_gold.Dimension_Customer limit 10


## Delta table history

In [None]:
describe history WideWorldImporters_gold.Dimension_Customer

In [None]:
SELECT * FROM WideWorldImporters_gold.Dimension_Customer VERSION AS OF 1

In [None]:
restore table WideWorldImporters_gold.Dimension_Customer to version as of 1

# Notebook Orchestration

In [None]:
%%pyspark

mssparkutils.notebook.runMultiple(["LoadCustomerDim","LoadDateDim","LoadEmployeeDim"])
mssparkutils.notebook.runMultiple(["LoadPurchaseFact","LoadSaleFact"])
mssparkutils.notebook.runMultiple(["LoadOrderFact"])