In [234]:
%%pyspark
import pandas as pd
from datetime import datetime

StatementMeta(sparkpool03, 9, 234, Finished, Available)

In [235]:
def deep_ls(path: str, max_depth=1):
    """
    List all files and folders in specified path and
    subfolders within maximum recursion depth.
    """

    # List all files in path
    li = mssparkutils.fs.ls(path)

    # Return all files
    for x in li:
        if x.size != 0:
            yield x

    # If the max_depth has not been reached, start
    # listing files and folders in subdirectories
    if max_depth > 1:
        for x in li:
            if x.size != 0:
                continue
            for y in deep_ls(x.path, max_depth - 1):
                yield y

    # If max_depth has been reached,
    # return the folders
    else:
        for x in li:
            if x.size == 0:
                yield x

def convertfiles2df(files):
    """
    Converts FileInfo object into Pandas DataFrame to enable display
    """
    # Disable Arrow-based transfers since the Pandas DataFrame is tiny
    spark.conf.set("spark.sql.execution.arrow.enabled", "false")

    schema = ['path','name','size']
    df = pd.DataFrame([[getattr(i,j) for j in schema] for i in files], columns = schema).sort_values('path')
    return(df)

StatementMeta(sparkpool03, 9, 235, Finished, Available)

In [261]:
root = 'abfss://raw@csresearchdpolaplakest.dfs.core.windows.net/erpcore/AdventureWorks/SalesOrderHeader'
files = list(deep_ls(root, max_depth=20))

StatementMeta(sparkpool03, 9, 261, Finished, Available)

In [262]:
df_list_of_files = convertfiles2df(files)

StatementMeta(sparkpool03, 9, 262, Finished, Available)

In [263]:
df_list_of_files.columns

StatementMeta(sparkpool03, 9, 263, Finished, Available)

Index(['path', 'name', 'size'], dtype='object')

In [264]:
df_list_of_files = df_list_of_files.sort_values(['path'], ascending=[False])
display(df_list_of_files)

StatementMeta(sparkpool03, 9, 264, Finished, Available)

SynapseWidget(Synapse.DataFrame, a4785ad8-e800-4d71-86a1-f5fea000c8ed)

In [266]:
source_path=df_list_of_files.iloc[0]['path']

StatementMeta(sparkpool03, 9, 266, Finished, Available)

In [267]:
source_path

StatementMeta(sparkpool03, 9, 267, Finished, Available)

'abfss://raw@csresearchdpolaplakest.dfs.core.windows.net/erpcore/AdventureWorks/SalesOrderHeader/Year=2022/Month=04/Day=05/Time=15:22:50/SalesOrderHeader.csv'

In [269]:
df_sales_order_header = (spark 
        .read
        .format("csv")
        .option("inferSchema", "true")
        .option("header","true")
        .load(source_path)
)

StatementMeta(sparkpool03, 9, 269, Finished, Available)

In [270]:
df_sales_order_header.show()

StatementMeta(sparkpool03, 9, 270, Finished, Available)

+------------+--------------+--------------------+--------------------+--------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+-----------+---------+---------+----------+--------------------+--------------------+--------------------+
|SalesOrderID|RevisionNumber|           OrderDate|             DueDate|            ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|ShipToAddressID|BillToAddressID|       ShipMethod|CreditCardApprovalCode|   SubTotal|   TaxAmt|  Freight|  TotalDue|             Comment|             rowguid|        ModifiedDate|
+------------+--------------+--------------------+--------------------+--------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+-----------+---------+---------+-

In [271]:
display(df_sales_order_header.limit(10))

StatementMeta(sparkpool03, 9, 271, Finished, Available)

SynapseWidget(Synapse.DataFrame, c89c835a-f139-4607-9c6b-0ab1524e2bd3)

In [272]:
df_sales_order_header.count()

StatementMeta(sparkpool03, 9, 272, Finished, Available)

32

In [246]:
df_sales_order_header.printSchema()

StatementMeta(sparkpool03, 9, 246, Finished, Available)

root
 |-- SalesOrderID: integer (nullable = true)
 |-- RevisionNumber: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- DueDate: string (nullable = true)
 |-- ShipDate: string (nullable = true)
 |-- Status: integer (nullable = true)
 |-- OnlineOrderFlag: boolean (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- ShipToAddressID: integer (nullable = true)
 |-- BillToAddressID: integer (nullable = true)
 |-- ShipMethod: string (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- Freight: double (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- Comment: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)

In [273]:
# Replace null values with zero
df_sales_order_header = df_sales_order_header.na.fill(0)

StatementMeta(sparkpool03, 9, 273, Finished, Available)

In [274]:
# Replace null strings with empty
df_sales_order_header = df_sales_order_header.na.fill("")

StatementMeta(sparkpool03, 9, 274, Finished, Available)

In [275]:
display(df_sales_order_header.limit(10))

StatementMeta(sparkpool03, 9, 275, Finished, Available)

SynapseWidget(Synapse.DataFrame, 011267ef-9e63-453d-9a76-992f44e3c63b)

In [276]:
# Enginer new features Year & Month
from pyspark.sql.functions import year, month
df_sales_order_header = df_sales_order_header.withColumn('OrderYear',year(df_sales_order_header.OrderDate))
df_sales_order_header = df_sales_order_header.withColumn('OrderMonth',month(df_sales_order_header.OrderDate))
df_sales_order_header.show()

StatementMeta(sparkpool03, 9, 276, Finished, Available)

+------------+--------------+--------------------+--------------------+--------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+-----------+---------+---------+----------+--------------------+--------------------+--------------------+---------+----------+
|SalesOrderID|RevisionNumber|           OrderDate|             DueDate|            ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|ShipToAddressID|BillToAddressID|       ShipMethod|CreditCardApprovalCode|   SubTotal|   TaxAmt|  Freight|  TotalDue|             Comment|             rowguid|        ModifiedDate|OrderYear|OrderMonth|
+------------+--------------+--------------------+--------------------+--------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+--------------

In [277]:
# Create temp views to easily query with Spark SQL

df_sales_order_header.createOrReplaceTempView('SalesOrderHeader')

StatementMeta(sparkpool03, 9, 277, Finished, Available)

In [252]:
%%sql
-- Merge statement to handle upsert of sales order header changes into delta table
MERGE INTO aw_sales_order_header t
USING (
    SELECT ord.SalesOrderID AS MergeKey, ord.*
    FROM SalesOrderHeader AS ord

    UNION ALL

    SELECT NULL AS MergeKey, ord.*
    FROM SalesOrderHeader AS ord
    JOIN aw_sales_order_header AS de ON ord.SalesOrderID = de.SalesOrderID
    WHERE de.CurrentRecord = 1 AND ord.ModifiedDate<> de.ModifiedDate
) s
ON t.SalesOrderID = s.MergeKey
WHEN MATCHED AND t.CurrentRecord = 1 AND t.ModifiedDate <> s.ModifiedDate THEN
  UPDATE SET t.CurrentRecord = 0, 
             t.EndDate = (CURRENT_DATE)
WHEN NOT MATCHED THEN
  INSERT (SalesOrderID, RevisionNumber, OrderDate, DueDate, ShipDate, Status, OnlineOrderFlag, SalesOrderNumber, PurchaseOrderNumber, AccountNumber, CustomerID, ShipToAddressID, BillToAddressID, ShipMethod, CreditCardApprovalCode, SubTotal, TaxAmt, Freight, TotalDue, Comment, rowguid, ModifiedDate, OrderYear, OrderMonth, BeginDate, EndDate, CurrentRecord)
    VALUES (s.SalesOrderID, s.RevisionNumber, s.OrderDate, s.DueDate, s.ShipDate, s.Status, s.OnlineOrderFlag, s.SalesOrderNumber, s.PurchaseOrderNumber, s.AccountNumber, s.CustomerID, s.ShipToAddressID, s.BillToAddressID, s.ShipMethod, s.CreditCardApprovalCode, s.SubTotal, s.TaxAmt, s.Freight, s.TotalDue, s.Comment, s.rowguid, s.ModifiedDate, s.OrderYear, s.OrderMonth, CURRENT_DATE, '2999-12-31', 1)

StatementMeta(sparkpool03, 9, 252, Finished, Cancelled)

In [278]:
%%sql
MERGE INTO aw_sales_order_header
USING SalesOrderHeader
ON aw_sales_order_header.SalesOrderID = SalesOrderHeader.SalesOrderID
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

StatementMeta(sparkpool03, 9, 278, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [279]:
%%sql
-- Retrieve the version/change history of the Delta table
DESCRIBE HISTORY aw_sales_order_header

StatementMeta(sparkpool03, 9, 279, Finished, Available)

<Spark SQL result set with 3 rows and 14 fields>

In [280]:
%%sql
SELECT *
FROM aw_sales_order_header
where SalesOrderID = 71774

StatementMeta(sparkpool03, 9, 280, Finished, Available)

<Spark SQL result set with 1 rows and 24 fields>

In [284]:
# Load a previous version of the DELTA_Employees table into a dataframe
df = spark.read.format("delta").option("versionAsOf", 2).load("abfss://enriched@csresearchdpolaplakest.dfs.core.windows.net/erpcore/AdventureWorks/SalesOrderHeader")
df.show()

StatementMeta(sparkpool03, 9, 284, Finished, Available)

+------------+--------------+--------------------+--------------------+--------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+-----------+----------+----------+-----------+--------------------+--------------------+--------------------+---------+----------+
|SalesOrderID|RevisionNumber|           OrderDate|             DueDate|            ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|ShipToAddressID|BillToAddressID|       ShipMethod|CreditCardApprovalCode|   SubTotal|    TaxAmt|   Freight|   TotalDue|             Comment|             rowguid|        ModifiedDate|OrderYear|OrderMonth|
+------------+--------------+--------------------+--------------------+--------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+--------

In [31]:
display(spark.read.text("abfss://enriched@csresearchdpolaplakest.dfs.core.windows.net/erpcore/AdventureWorks/SalesOrderHeader/_delta_log/00000000000000000000.json"))

StatementMeta(sparkpool03, 9, 31, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3b622eb2-56ac-4469-8ee6-825711e3376b)