In [1]:
! rm -r ../lake/*
! rm -r ../spark-warehouse/*
! rm -r ../metastore_db/*
! rm -r ../temp/*

In [2]:
import os
os.environ['SPARK_VERSION'] = '3.5.0'

In [3]:
from pyspark.sql import SparkSession
from delta import *
import pydeequ
from pydeequ.analyzers import *

spark = SparkSession.builder.master('local[*]').appName('quick-start') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .config('spark.jars.packages', pydeequ.deequ_maven_coord) \
    .config('spark.jars.excludes', pydeequ.f2j_maven_coord) \
    .config('spark.sql.warehouse.dir', '../spark-warehouse') \
    .config('spark.driver.extraJavaOptions', '-Dderby.system.home="../metastore_db/"') \
    .config('spark.driver.memory', '10g') \
    .config('spark.driver.maxResultSize', '10g') \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .config('spark.databricks.delta.schema.autoMerge.enabled', True) \
    .config('spark.databricks.delta.autoCompact.enabled', True) \
    .enableHiveSupport() \
    .getOrCreate()

In [4]:
spark.sql('SELECT namespace as database_name FROM {df}',df = spark.sql('SHOW DATABASES')).show(truncate=False)

+-------------+
|database_name|
+-------------+
|default      |
+-------------+



In [5]:
spark.sql('CREATE DATABASE IF NOT EXISTS AdventureWorks')
spark.sql('CREATE DATABASE IF NOT EXISTS OtherDB')

spark.sql('SELECT namespace as database_name FROM {df}',df = spark.sql('SHOW DATABASES')).show(truncate=False)

+--------------+
|database_name |
+--------------+
|adventureworks|
|default       |
|otherdb       |
+--------------+



In [6]:
spark.sql('USE AdventureWorks')
spark.sql('SELECT CURRENT_SCHEMA() AS current_database').show(truncate=False)

+----------------+
|current_database|
+----------------+
|adventureworks  |
+----------------+



## Load data into Bronze zone

In [7]:
jdbc_options = {
      'url': 'jdbc:sqlserver://localhost:1433;database=AdventureWorks2022;trustServerCertificate=true',
      'driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
      'user': 'SA',
      'password': 'PiIs&&&31415926535'
}

In [8]:
qryStr = '(SELECT * FROM Person.Person) t'
person_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(person_df.dtypes, ['column_name', 'data_type']).show(person_df.count(), truncate=False)

+---------------------+---------+
|column_name          |data_type|
+---------------------+---------+
|BusinessEntityID     |int      |
|PersonType           |string   |
|NameStyle            |boolean  |
|Title                |string   |
|FirstName            |string   |
|MiddleName           |string   |
|LastName             |string   |
|Suffix               |string   |
|EmailPromotion       |int      |
|AdditionalContactInfo|string   |
|Demographics         |string   |
|rowguid              |string   |
|ModifiedDate         |timestamp|
+---------------------+---------+



In [9]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.Person (
    BusinessEntityID INT,
    PersonType STRING,
    Title STRING,
    FirstName STRING,
    MiddleName STRING,
    LastName STRING,
    Suffix STRING
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Person/Person'
'''
spark.sql(sql)

spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+----------+
|database_name |table_name|
+--------------+----------+
|adventureworks|person    |
+--------------+----------+



In [10]:
spark.sql('''
    INSERT INTO AdventureWorks.Person
    SELECT BusinessEntityID, PersonType, Title, FirstName, MiddleName, LastName, Suffix FROM {df}
    ''', df = person_df)

spark.sql('SELECT COUNT(*) AS person_row_count FROM AdventureWorks.Person').show(truncate=False)

+----------------+
|person_row_count|
+----------------+
|19972           |
+----------------+



In [11]:
qryStr = '(SELECT * FROM Person.BusinessEntity) t'
entity_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(entity_df.dtypes, ['column_name', 'data_type']).show(entity_df.count(), truncate=False)

+----------------+---------+
|column_name     |data_type|
+----------------+---------+
|BusinessEntityID|int      |
|rowguid         |string   |
|ModifiedDate    |timestamp|
+----------------+---------+



In [12]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.BusinessEntity (
    BusinessEntityID INT,
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Person/BusinessEntity'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+--------------+
|database_name |table_name    |
+--------------+--------------+
|adventureworks|businessentity|
|adventureworks|person        |
+--------------+--------------+



In [13]:
spark.sql('''
    INSERT INTO AdventureWorks.BusinessEntity
    SELECT BusinessEntityID, ModifiedDate
    FROM {df}
''', df = entity_df)

spark.sql('SELECT COUNT(*) AS entity_row_count FROM AdventureWorks.BusinessEntity').show(truncate=False)

+----------------+
|entity_row_count|
+----------------+
|20777           |
+----------------+



In [14]:
qryStr = '(SELECT * FROM Person.EmailAddress) t'
email_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(email_df.dtypes, ['column_name', 'data_type']).show(email_df.count(), truncate=False)

+----------------+---------+
|column_name     |data_type|
+----------------+---------+
|BusinessEntityID|int      |
|EmailAddressID  |int      |
|EmailAddress    |string   |
|rowguid         |string   |
|ModifiedDate    |timestamp|
+----------------+---------+



In [15]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.EmailAddress (
    BusinessEntityID INT,
    EmailAddress STRING,
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Person/EmailAddress'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+--------------+
|database_name |table_name    |
+--------------+--------------+
|adventureworks|businessentity|
|adventureworks|emailaddress  |
|adventureworks|person        |
+--------------+--------------+



In [16]:
spark.sql('''
    INSERT INTO AdventureWorks.EmailAddress
    SELECT BusinessEntityID, EmailAddress, ModifiedDate FROM {df}
''', df = email_df)

spark.sql('SELECT COUNT(*) AS email_row_count FROM AdventureWorks.EmailAddress').show(truncate=False)

+---------------+
|email_row_count|
+---------------+
|19972          |
+---------------+



In [17]:
qryStr = '(SELECT * FROM Person.PersonPhone) t'
phone_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(phone_df.dtypes, ['column_name', 'data_type']).show(phone_df.count(), truncate=False)

+-----------------+---------+
|column_name      |data_type|
+-----------------+---------+
|BusinessEntityID |int      |
|PhoneNumber      |string   |
|PhoneNumberTypeID|int      |
|ModifiedDate     |timestamp|
+-----------------+---------+



In [18]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.PersonPhone (
    BusinessEntityID INT,
    PhoneNumber STRING,
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Person/PersonPhone'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+--------------+
|database_name |table_name    |
+--------------+--------------+
|adventureworks|businessentity|
|adventureworks|emailaddress  |
|adventureworks|person        |
|adventureworks|personphone   |
+--------------+--------------+



In [19]:
spark.sql('''
    INSERT INTO AdventureWorks.PersonPhone
    SELECT BusinessEntityID, PhoneNumber, ModifiedDate FROM {df}
''', df = phone_df)

spark.sql('SELECT COUNT(*) AS phone_row_count FROM AdventureWorks.PersonPhone').show(truncate=False)

+---------------+
|phone_row_count|
+---------------+
|19972          |
+---------------+



In [20]:
flat_query = '''
SELECT
    be.BusinessEntityID,
    p.PersonType,
    p.Title,
    p.FirstName,
    p.MiddleName,
    p.LastName,
    p.Suffix,
    ea.EmailAddress,
    pp.PhoneNumber
FROM AdventureWorks.BusinessEntity be 
INNER JOIN AdventureWorks.Person p ON be.BusinessEntityID = p.BusinessEntityID
INNER JOIN AdventureWorks.EmailAddress ea ON ea.BusinessEntityID = be.BusinessEntityID 
INNER JOIN AdventureWorks.PersonPhone pp ON pp.BusinessEntityID = be.BusinessEntityID
'''

spark.sql(flat_query).limit(5)

BusinessEntityID,PersonType,Title,FirstName,MiddleName,LastName,Suffix,EmailAddress,PhoneNumber
1,EM,,Ken,J,Sánchez,,ken0@adventure-wo...,697-555-0142
2,EM,,Terri,Lee,Duffy,,terri0@adventure-...,819-555-0175
3,EM,,Roberto,,Tamburello,,roberto0@adventur...,212-555-0187
4,EM,,Rob,,Walters,,rob0@adventure-wo...,612-555-0100
5,EM,Ms.,Gail,A,Erickson,,gail0@adventure-w...,849-555-0139


In [21]:
qryStr = '(SELECT * FROM Sales.SalesOrderDetail) t'
sod_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(sod_df.dtypes, ['column_name', 'data_type']).show(sod_df.count(), truncate=False)

+---------------------+-------------+
|column_name          |data_type    |
+---------------------+-------------+
|SalesOrderID         |int          |
|SalesOrderDetailID   |int          |
|CarrierTrackingNumber|string       |
|OrderQty             |smallint     |
|ProductID            |int          |
|SpecialOfferID       |int          |
|UnitPrice            |decimal(19,4)|
|UnitPriceDiscount    |decimal(19,4)|
|LineTotal            |decimal(38,6)|
|rowguid              |string       |
|ModifiedDate         |timestamp    |
+---------------------+-------------+



In [22]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.SalesOrderDetail (
    SalesOrderDetailID INT,
    SalesOrderID INT,
    OrderQty SMALLINT,
    ProductID INT,
    UnitPrice DECIMAL(19,4),
    UnitPriceDiscount DECIMAL(19,4),
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Sales/SalesOrderDetail'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+----------------+
|database_name |table_name      |
+--------------+----------------+
|adventureworks|businessentity  |
|adventureworks|emailaddress    |
|adventureworks|person          |
|adventureworks|personphone     |
|adventureworks|salesorderdetail|
+--------------+----------------+



In [23]:
spark.sql('''
    INSERT INTO AdventureWorks.SalesOrderDetail
    SELECT SalesOrderDetailID, SalesOrderID, OrderQty, ProductID, UnitPrice, UnitPriceDiscount, ModifiedDate
    FROM {df}
''', df = sod_df)

spark.sql('SELECT COUNT(*) AS sod_row_count FROM AdventureWorks.SalesOrderDetail').show(truncate=False)

+-------------+
|sod_row_count|
+-------------+
|121317       |
+-------------+



In [24]:
qryStr = '(SELECT * FROM Sales.SalesOrderHeader) t'
soh_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(soh_df.dtypes, ['column_name', 'data_type']).show(soh_df.count(), truncate=False)

+----------------------+-------------+
|column_name           |data_type    |
+----------------------+-------------+
|SalesOrderID          |int          |
|RevisionNumber        |int          |
|OrderDate             |timestamp    |
|DueDate               |timestamp    |
|ShipDate              |timestamp    |
|Status                |int          |
|OnlineOrderFlag       |boolean      |
|SalesOrderNumber      |string       |
|PurchaseOrderNumber   |string       |
|AccountNumber         |string       |
|CustomerID            |int          |
|SalesPersonID         |int          |
|TerritoryID           |int          |
|BillToAddressID       |int          |
|ShipToAddressID       |int          |
|ShipMethodID          |int          |
|CreditCardID          |int          |
|CreditCardApprovalCode|string       |
|CurrencyRateID        |int          |
|SubTotal              |decimal(19,4)|
|TaxAmt                |decimal(19,4)|
|Freight               |decimal(19,4)|
|TotalDue              |d

In [25]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.SalesOrderHeader (
    SalesOrderID INT,
    OrderDate TIMESTAMP,
    ShipDate TIMESTAMP,
    DueDate TIMESTAMP,
    OnlineOrderFlag BOOLEAN,
    CustomerID INT,
    TerritoryID INT,
    TaxAmt DECIMAL(19,4),
    Freight DECIMAL(19,4),
    TotalDue DECIMAL(19,4),
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Sales/SalesOrderHeader'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+----------------+
|database_name |table_name      |
+--------------+----------------+
|adventureworks|businessentity  |
|adventureworks|emailaddress    |
|adventureworks|person          |
|adventureworks|personphone     |
|adventureworks|salesorderdetail|
|adventureworks|salesorderheader|
+--------------+----------------+



In [26]:
spark.sql('''
    INSERT INTO AdventureWorks.SalesOrderHeader
    SELECT
        SalesOrderID, OrderDate, ShipDate, DueDate, OnlineOrderFlag, CustomerID, TerritoryID, TaxAmt, Freight, TotalDue, ModifiedDate
    FROM {df}
''', df = soh_df)

spark.sql('SELECT COUNT(*) AS soh_row_count FROM AdventureWorks.SalesOrderHeader').show(truncate=False)

+-------------+
|soh_row_count|
+-------------+
|31465        |
+-------------+



In [27]:
qryStr = '(SELECT * FROM Sales.Customer) t'
customer_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(customer_df.dtypes, ['column_name', 'data_type']).show(customer_df.count(), truncate=False)

+-------------+---------+
|column_name  |data_type|
+-------------+---------+
|CustomerID   |int      |
|PersonID     |int      |
|StoreID      |int      |
|TerritoryID  |int      |
|AccountNumber|string   |
|rowguid      |string   |
|ModifiedDate |timestamp|
+-------------+---------+



In [28]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.Customer (
    CustomerID INT,
    PersonID INT,
    StoreID INT,
    TerritoryID INT,
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Sales/Customer'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+----------------+
|database_name |table_name      |
+--------------+----------------+
|adventureworks|businessentity  |
|adventureworks|customer        |
|adventureworks|emailaddress    |
|adventureworks|person          |
|adventureworks|personphone     |
|adventureworks|salesorderdetail|
|adventureworks|salesorderheader|
+--------------+----------------+



In [29]:
spark.sql('''
    INSERT INTO AdventureWorks.Customer
    SELECT CustomerID, PersonID, StoreID, TerritoryID, ModifiedDate FROM {df}
''', df = customer_df)

spark.sql('SELECT COUNT(*) AS customer_row_count FROM AdventureWorks.Customer').show(truncate=False)

+------------------+
|customer_row_count|
+------------------+
|19820             |
+------------------+



In [30]:
flat_query = '''
SELECT
    soh.OrderDate, soh.CustomerID, soh.TerritoryID, sod.ProductID, sod.OrderQty, sod.UnitPrice 
FROM AdventureWorks.SalesOrderHeader soh 
INNER JOIN AdventureWorks.SalesOrderDetail sod ON soh.SalesOrderID = sod.SalesOrderID 
INNER JOIN AdventureWorks.Customer c ON c.CustomerID = soh.CustomerID 
'''

spark.sql(flat_query).limit(4)

OrderDate,CustomerID,TerritoryID,ProductID,OrderQty,UnitPrice
2011-05-31 00:00:00,29825,5,776,1,2024.994
2011-05-31 00:00:00,29825,5,777,3,2024.994
2011-05-31 00:00:00,29825,5,778,1,2024.994
2011-05-31 00:00:00,29825,5,771,1,2039.994


In [31]:
qryStr = '(SELECT * FROM Production.Product) t'
product_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(product_df.dtypes, ['column_name', 'data_type']).show(product_df.count(), truncate=False)

+---------------------+-------------+
|column_name          |data_type    |
+---------------------+-------------+
|ProductID            |int          |
|Name                 |string       |
|ProductNumber        |string       |
|MakeFlag             |boolean      |
|FinishedGoodsFlag    |boolean      |
|Color                |string       |
|SafetyStockLevel     |smallint     |
|ReorderPoint         |smallint     |
|StandardCost         |decimal(19,4)|
|ListPrice            |decimal(19,4)|
|Size                 |string       |
|SizeUnitMeasureCode  |string       |
|WeightUnitMeasureCode|string       |
|Weight               |decimal(8,2) |
|DaysToManufacture    |int          |
|ProductLine          |string       |
|Class                |string       |
|Style                |string       |
|ProductSubcategoryID |int          |
|ProductModelID       |int          |
|SellStartDate        |timestamp    |
|SellEndDate          |timestamp    |
|DiscontinuedDate     |timestamp    |
|rowguid    

In [32]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.Product (
    ProductID INT,
    ProductSubcategoryID INT,
    Name STRING,
    Color STRING,
    ListPrice DECIMAL(19,4),
    Size STRING,
    Weight DECIMAL(8,2),
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Production/Product'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+----------------+
|database_name |table_name      |
+--------------+----------------+
|adventureworks|businessentity  |
|adventureworks|customer        |
|adventureworks|emailaddress    |
|adventureworks|person          |
|adventureworks|personphone     |
|adventureworks|product         |
|adventureworks|salesorderdetail|
|adventureworks|salesorderheader|
+--------------+----------------+



In [33]:
spark.sql('''
    INSERT INTO AdventureWorks.Product
    SELECT ProductID, ProductSubcategoryID, Name, Color, ListPrice, Size, Weight, ModifiedDate FROM {df}
''', df = product_df)

spark.sql('SELECT COUNT(*) AS product_row_count FROM AdventureWorks.Product').show(truncate=False)

+-----------------+
|product_row_count|
+-----------------+
|504              |
+-----------------+



In [34]:
qryStr = '(SELECT * FROM Production.ProductCategory) t'
category_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(category_df.dtypes, ['column_name', 'data_type']).show(category_df.count(), truncate=False)

+-----------------+---------+
|column_name      |data_type|
+-----------------+---------+
|ProductCategoryID|int      |
|Name             |string   |
|rowguid          |string   |
|ModifiedDate     |timestamp|
+-----------------+---------+



In [35]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.ProductCategory (
    ProductCategoryID INT,
    Name STRING,
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Production/ProductCategory'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+----------------+
|database_name |table_name      |
+--------------+----------------+
|adventureworks|businessentity  |
|adventureworks|customer        |
|adventureworks|emailaddress    |
|adventureworks|person          |
|adventureworks|personphone     |
|adventureworks|product         |
|adventureworks|productcategory |
|adventureworks|salesorderdetail|
|adventureworks|salesorderheader|
+--------------+----------------+



In [36]:
spark.sql('''
    INSERT INTO AdventureWorks.ProductCategory
    SELECT ProductCategoryID, Name, ModifiedDate FROM {df}
''', df = category_df)

spark.sql('SELECT COUNT(*) AS category_row_count FROM AdventureWorks.ProductCategory').show(truncate=False)

+------------------+
|category_row_count|
+------------------+
|4                 |
+------------------+



In [37]:
qryStr = '(SELECT * FROM Production.ProductSubcategory) t'
subcategory_df = spark.read.format('jdbc').option('dbtable', qryStr ).options(**jdbc_options).load()
spark.createDataFrame(subcategory_df.dtypes, ['column_name', 'data_type']).show(subcategory_df.count(), truncate=False)

+--------------------+---------+
|column_name         |data_type|
+--------------------+---------+
|ProductSubcategoryID|int      |
|ProductCategoryID   |int      |
|Name                |string   |
|rowguid             |string   |
|ModifiedDate        |timestamp|
+--------------------+---------+



In [38]:
sql = '''
CREATE TABLE IF NOT EXISTS AdventureWorks.ProductSubcategory (
    ProductSubcategoryID INT,
    ProductCategoryID INT,
    Name STRING,
    ModifiedDate TIMESTAMP
) USING DELTA LOCATION '../lake/bronze/AdventureWorks/Production/ProductSubcategory'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN AdventureWorks')).show(truncate=False)

+--------------+------------------+
|database_name |table_name        |
+--------------+------------------+
|adventureworks|businessentity    |
|adventureworks|customer          |
|adventureworks|emailaddress      |
|adventureworks|person            |
|adventureworks|personphone       |
|adventureworks|product           |
|adventureworks|productcategory   |
|adventureworks|productsubcategory|
|adventureworks|salesorderdetail  |
|adventureworks|salesorderheader  |
+--------------+------------------+



In [39]:
spark.sql('''
    INSERT INTO AdventureWorks.ProductSubcategory
    SELECT ProductSubcategoryID, ProductCategoryID, Name, ModifiedDate FROM {df}
''', df = subcategory_df)

spark.sql('SELECT COUNT(*) AS subcategory_row_count FROM AdventureWorks.ProductSubcategory').show(truncate=False)

+---------------------+
|subcategory_row_count|
+---------------------+
|37                   |
+---------------------+



In [40]:
flat_query = '''
SELECT
    pc.Name as CategoryName,
    ps.Name as SubCategoryName,
    p.Name as ProductName,
    p.Color,
    p.Size,
    p.Weight 
FROM AdventureWorks.ProductCategory pc 
INNER JOIN AdventureWorks.ProductSubcategory ps ON pc.ProductCategoryID = ps.ProductCategoryID 
INNER JOIN AdventureWorks.Product p ON p.ProductSubcategoryID = ps.ProductSubcategoryID 
'''

spark.sql(flat_query).limit(5)

CategoryName,SubCategoryName,ProductName,Color,Size,Weight
Bikes,Mountain Bikes,Mountain-500 Blac...,Black,52,28.68
Bikes,Mountain Bikes,Mountain-500 Blac...,Black,48,28.42
Bikes,Mountain Bikes,Mountain-500 Blac...,Black,44,28.13
Bikes,Mountain Bikes,Mountain-500 Blac...,Black,42,27.77
Bikes,Mountain Bikes,Mountain-500 Blac...,Black,40,27.35


## Perform more complex queries

In [41]:
# مشتریانی که بین اولین و دومین سفارششان، کمتر از ۱ ماه طول کشیده باشد

query = '''
WITH CustomerFirstTwoOrders AS (
    SELECT 
        CustomerID,
        OrderDate,
        ROW_NUMBER() OVER (PARTITION BY CustomerID ORDER BY OrderDate) AS RowNum
    FROM AdventureWorks.SalesOrderHeader
)
SELECT 
    c.CustomerID,
    MIN(f.OrderDate) AS FirstOrderDate,
    MAX(f.OrderDate) AS SecondOrderDate,
    DATEDIFF(DAY, MIN(f.OrderDate), MAX(f.OrderDate)) AS DaysBetweenOrders
FROM CustomerFirstTwoOrders f
INNER JOIN AdventureWorks.Customer c ON f.CustomerID = c.CustomerID
WHERE f.RowNum <= 2
GROUP BY c.CustomerID
HAVING DATEDIFF(DAY, MIN(f.OrderDate), MAX(f.OrderDate)) < 31;
'''

spark.sql(query).limit(7).show(truncate=False)

+----------+-------------------+-------------------+-----------------+
|CustomerID|FirstOrderDate     |SecondOrderDate    |DaysBetweenOrders|
+----------+-------------------+-------------------+-----------------+
|11015     |2013-06-20 00:00:00|2013-06-20 00:00:00|0                |
|11016     |2013-07-12 00:00:00|2013-07-12 00:00:00|0                |
|11019     |2013-07-15 00:00:00|2013-08-04 00:00:00|20               |
|11020     |2013-05-31 00:00:00|2013-05-31 00:00:00|0                |
|11021     |2013-06-25 00:00:00|2013-06-25 00:00:00|0                |
|11022     |2013-06-22 00:00:00|2013-06-22 00:00:00|0                |
|11024     |2013-11-27 00:00:00|2013-12-26 00:00:00|29               |
+----------+-------------------+-------------------+-----------------+



## Work with files

In [42]:
spark.sql('USE OtherDB')
spark.sql('SELECT current_schema() AS current_database').show(truncate=False)

+----------------+
|current_database|
+----------------+
|otherdb         |
+----------------+



In [43]:
crime_df = spark.read.option('header', 'true').option('inferSchema', 'true').option('delimiter', ',').csv('../../data/crime.csv')
spark.createDataFrame(crime_df.dtypes, ['column_name', 'data_type']).show(crime_df.count(), truncate=False)

+---------------------------+---------+
|column_name                |data_type|
+---------------------------+---------+
|_c0                        |int      |
|arrest_key                 |int      |
|arrest_date                |date     |
|pd_desc                    |string   |
|ofns_desc                  |string   |
|law_code                   |string   |
|law_cat_cd                 |string   |
|age_group                  |string   |
|perp_sex                   |string   |
|perp_race                  |string   |
|latitude                   |double   |
|longitude                  |double   |
|arrest_boro                |string   |
|arrest_precinct            |int      |
|jurisdiction_code          |double   |
|:@computed_region_f5dn_yrer|double   |
|:@computed_region_yeji_bk3q|double   |
|:@computed_region_92fq_4b7q|double   |
|:@computed_region_sbqj_enih|double   |
+---------------------------+---------+



In [44]:
spark.sql('''
    SELECT
        arrest_date, pd_desc, ofns_desc, age_group, perp_sex, perp_race, latitude, longitude
    FROM {df}
    LIMIT 3
''', df = crime_df)

arrest_date,pd_desc,ofns_desc,age_group,perp_sex,perp_race,latitude,longitude
2019-01-26,SEXUAL ABUSE,SEX CRIMES,45-64,M,BLACK,40.800694331000045,-73.94110928599997
2019-02-06,CRIMINAL SALE OF ...,CONTROLLED SUBSTA...,25-44,M,UNKNOWN,40.75783900300007,-73.99121211099998
2016-01-06,RAPE 3,RAPE,25-44,M,BLACK,40.64865008500004,-73.95033556299995


In [45]:
spark.sql('''
    SELECT
        YEAR(arrest_date) AS year,
        MONTH(arrest_date) AS month,
        COALESCE(ofns_desc, Null) AS description,
        COALESCE(age_group, Null) AS age_group,
        CASE
            WHEN perp_sex = 'M' THEN 'Male' 
            WHEN perp_sex = 'F' THEN 'Female' 
        ELSE Null END AS gender,
        latitude,
        longitude
    FROM {df}
    LIMIT 3
''', df = crime_df)

year,month,description,age_group,gender,latitude,longitude
2019,1,SEX CRIMES,45-64,Male,40.800694331000045,-73.94110928599997
2019,2,CONTROLLED SUBSTA...,25-44,Male,40.75783900300007,-73.99121211099998
2016,1,RAPE,25-44,Male,40.64865008500004,-73.95033556299995


In [46]:
sql = '''
CREATE TABLE IF NOT EXISTS crime (
    arrest_date DATE,
    pd_desc STRING,
    ofns_desc STRING,
    age_group STRING,
    perp_sex STRING,
    perp_race STRING,
    latitude DOUBLE,
    longitude DOUBLE
) USING DELTA LOCATION '../lake/bronze/OtherDB/crime'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN OtherDB')).show(truncate=False)

+-------------+----------+
|database_name|table_name|
+-------------+----------+
|otherdb      |crime     |
+-------------+----------+



In [47]:
spark.sql('''
    INSERT INTO crime
    SELECT arrest_date, pd_desc, ofns_desc, age_group, perp_sex, perp_race, latitude, longitude FROM {df} ''', df = crime_df)

spark.sql('SELECT COUNT(*) AS crime_row_count FROM crime').show(truncate=False)

+---------------+
|crime_row_count|
+---------------+
|3881989        |
+---------------+



In [48]:
sql = '''
CREATE OR REPLACE VIEW vw_crime AS
    SELECT
        YEAR(arrest_date) AS year,
        MONTH(arrest_date) AS month,
        COALESCE(ofns_desc, Null) AS description,
        COALESCE(age_group, Null) AS age_group,
        CASE 
            WHEN perp_sex = 'M' THEN 'Male' 
            WHEN perp_sex = 'F' THEN 'Female' 
        ELSE Null END AS gender,
        latitude,
        longitude
    FROM crime
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, viewName as view_name FROM {df}', df = spark.sql('SHOW VIEWS IN OtherDB')).show(truncate=False)
spark.sql('SELECT * FROM vw_crime LIMIT 4')

+-------------+---------+
|database_name|view_name|
+-------------+---------+
|otherdb      |vw_crime |
+-------------+---------+



year,month,description,age_group,gender,latitude,longitude
2019,1,SEX CRIMES,45-64,Male,40.800694331000045,-73.94110928599997
2019,2,CONTROLLED SUBSTA...,25-44,Male,40.75783900300007,-73.99121211099998
2016,1,RAPE,25-44,Male,40.64865008500004,-73.95033556299995
2018,11,RAPE,25-44,Male,40.67458330800008,-73.93022154099998


## Query all files inside a directory

In [49]:
trip_df = spark.read.parquet('../../data/trip/*/*.parquet')

In [50]:
spark.sql('SELECT COUNT(*) count_rides FROM {df}', df = trip_df).show(truncate=False)

+-----------+
|count_rides|
+-----------+
|565621687  |
+-----------+



In [51]:
spark.sql('''
    SELECT
        tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, payment_type, fare_amount, mta_tax, tip_amount, tolls_amount, total_amount
    FROM {df}
    LIMIT 3
''', df = trip_df)

tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,payment_type,fare_amount,mta_tax,tip_amount,tolls_amount,total_amount
2016-01-01 00:12:22,2016-01-01 00:29:14,1,3.2,1,14.0,0.5,3.06,0.0,18.36
2016-01-01 00:41:31,2016-01-01 00:55:10,2,1.0,2,9.5,0.5,0.0,0.0,10.8
2016-01-01 00:53:37,2016-01-01 00:59:57,1,0.9,2,6.0,0.5,0.0,0.0,7.3


## Use DuckDB to safely bring multiple files 

In [52]:
import duckdb
conn = duckdb.connect('../temp/tempdb.duckdb')
conn.sql('''
    SELECT COUNT(*) AS trip_row_count FROM read_parquet('../../data/trip/*/*.parquet')
''').df()

Unnamed: 0,trip_row_count
0,565621687


In [53]:
sql = '''
CREATE OR REPLACE TABLE trip AS
    SELECT
        tpep_pickup_datetime,
        tpep_dropoff_datetime,
        passenger_count,
        trip_distance,
        payment_type,
        fare_amount,
        mta_tax,
        tip_amount,
        tolls_amount,
        total_amount
    FROM read_parquet('../../data/trip/*/*.parquet')
'''

conn.sql(sql)
conn.close()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [54]:
conn = duckdb.connect('../temp/tempdb.duckdb')
conn.sql('COPY trip TO "../temp/trip.parquet" (FORMAT PARQUET)')
conn.close()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [55]:
trip_df = spark.read.parquet('../temp/trip.parquet')
spark.createDataFrame(trip_df.dtypes, ['column_name', 'data_type']).show(trip_df.count(), truncate=False)

+---------------------+-------------+
|column_name          |data_type    |
+---------------------+-------------+
|tpep_pickup_datetime |timestamp_ntz|
|tpep_dropoff_datetime|timestamp_ntz|
|passenger_count      |bigint       |
|trip_distance        |double       |
|payment_type         |bigint       |
|fare_amount          |double       |
|mta_tax              |double       |
|tip_amount           |double       |
|tolls_amount         |double       |
|total_amount         |double       |
+---------------------+-------------+



In [56]:
# partition table for larger datasets

sql = '''
CREATE TABLE IF NOT EXISTS trip (
    tpep_pickup_datetime TIMESTAMP_NTZ,
    tpep_dropoff_datetime TIMESTAMP_NTZ,
    passenger_count BIGINT,
    trip_distance DOUBLE,
    payment_type BIGINT,
    fare_amount DOUBLE,
    mta_tax DOUBLE,
    tip_amount DOUBLE,
    tolls_amount DOUBLE,
    total_amount DOUBLE
) USING DELTA LOCATION '../lake/bronze/OtherDB/trip' PARTITIONED BY (payment_type)
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN OtherDB')).show(truncate=False)

+-------------+----------+
|database_name|table_name|
+-------------+----------+
|otherdb      |crime     |
|otherdb      |trip      |
|otherdb      |vw_crime  |
+-------------+----------+



In [57]:
spark.sql('''
    INSERT INTO trip
    SELECT
        tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, payment_type, fare_amount, mta_tax, tip_amount, tolls_amount, total_amount
    FROM {df}
''', df = trip_df)

spark.sql('SELECT COUNT(*) AS trip_row_count FROM OtherDB.trip').show(truncate=False)

+--------------+
|trip_row_count|
+--------------+
|565621687     |
+--------------+



In [58]:
! rm -r ../temp/trip.parquet

## Fast query a delta table using DuckDB

In [59]:
import duckdb
duckdb.sql('''SELECT extension_name, installed, description FROM duckdb_extensions() WHERE extension_name='delta' ''').df()

Unnamed: 0,extension_name,installed,description
0,delta,True,Adds support for Delta Lake


In [60]:
duckdb.sql('LOAD DELTA')

sql = '''
SELECT
    payment_type AS 'Payment type',
    COALESCE(AVG(passenger_count), 0) AS 'Passengers Average',
    COALESCE(SUM(total_amount), 0) AS Amount
FROM delta_scan('../lake/bronze/OtherDB/trip/')
GROUP BY payment_type
ORDER BY 3 DESC, 2 DESC
'''

duckdb.sql(sql).df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,Payment type,Passengers Average,Amount
0,1,1.548711,7722359000.0
1,2,1.625162,2371349000.0
2,0,0.0,174156600.0
3,3,1.261405,50615900.0
4,4,1.305835,10854760.0
5,5,1.116883,798.2


In [61]:
sql = '''
CREATE OR REPLACE VIEW vw_trip_duration AS
    SELECT
        total_amount as amount,
        UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime) AS duration,
        trip_distance as distance,
        hour(tpep_pickup_datetime) as hour,
        CAST(passenger_count AS INT) AS passenger,
        payment_type as payment
    FROM trip
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN OtherDB')).show(truncate=False)

+-------------+----------------+
|database_name|table_name      |
+-------------+----------------+
|otherdb      |crime           |
|otherdb      |trip            |
|otherdb      |vw_crime        |
|otherdb      |vw_trip_duration|
+-------------+----------------+



In [62]:
spark.sql('SELECT * FROM vw_trip_duration LIMIT 3').show(truncate=False)

+------+--------+--------+----+---------+-------+
|amount|duration|distance|hour|passenger|payment|
+------+--------+--------+----+---------+-------+
|67.8  |3168    |22.0    |13  |1        |1      |
|12.35 |363     |0.9     |15  |2        |1      |
|12.3  |633     |0.8     |13  |1        |1      |
+------+--------+--------+----+---------+-------+



## Create view for reports

In [63]:
sql = '''
CREATE OR REPLACE VIEW vw_trip_report AS
    WITH trip AS (
        SELECT
            MONTH(tpep_pickup_datetime) AS month,
            CAST(passenger_count AS INT) AS passenger_count,
            payment_type,
            UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime) AS duration
    FROM trip
    )
    SELECT
        CASE month
            WHEN 1 THEN 'January'
            WHEN 2 THEN 'February'
            WHEN 3 THEN 'March'
            WHEN 4 THEN 'April'
            WHEN 5 THEN 'May'
            WHEN 6 THEN 'June'
            WHEN 7 THEN 'July'
            WHEN 8 THEN 'August'
            WHEN 9 THEN 'September'
            WHEN 10 THEN 'October'
            WHEN 11 THEN 'November'
            WHEN 12 THEN 'December'
        END AS month,
        CASE payment_type
            WHEN 0 THEN 'Cash'
            WHEN 1 THEN 'Credit Card'
            WHEN 2 THEN 'Debit Card'
            WHEN 3 THEN 'Free of Charge'
            ELSE 'Unknown'
        END AS payment_type,
        COALESCE(
            SUM(passenger_count), 0
        ) AS total_passenger_count
    FROM trip
    GROUP BY month, payment_type
    ORDER BY total_passenger_count DESC
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, viewName as view_name FROM {df}', df = spark.sql('SHOW VIEWS IN OtherDB')).show(truncate=False)

+-------------+----------------+
|database_name|view_name       |
+-------------+----------------+
|otherdb      |vw_crime        |
|otherdb      |vw_trip_duration|
|otherdb      |vw_trip_report  |
+-------------+----------------+



In [64]:
spark.sql('SELECT * FROM vw_trip_report LIMIT 10').show(truncate=False)

+---------+------------+---------------------+
|month    |payment_type|total_passenger_count|
+---------+------------+---------------------+
|March    |Credit Card |56547957             |
|February |Credit Card |55005683             |
|January  |Credit Card |54953193             |
|October  |Credit Card |52828619             |
|May      |Credit Card |52607317             |
|April    |Credit Card |52167256             |
|June     |Credit Card |50495234             |
|November |Credit Card |49611323             |
|December |Credit Card |49373428             |
|September|Credit Card |47257834             |
+---------+------------+---------------------+



## Data Quality check

In [65]:
trip_df = spark.sql('SELECT * FROM OtherDB.trip')

In [66]:
analysisResult = AnalysisRunner(spark) \
                    .onData(trip_df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness('tpep_pickup_datetime')) \
                    .addAnalyzer(ApproxCountDistinct('payment_type')) \
                    .addAnalyzer(Mean('passenger_count')) \
                    .addAnalyzer(Correlation('payment_type', 'tip_amount')) \
                    .addAnalyzer(Compliance('more-5 passenger_count', 'passenger_count >= 5')) \
                    .addAnalyzer(Compliance('less-0 passenger_count', 'passenger_count <= 0')) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show(truncate=False)

+-----------+-----------------------+-------------------+---------------------+
|entity     |instance               |name               |value                |
+-----------+-----------------------+-------------------+---------------------+
|Multicolumn|payment_type,tip_amount|Correlation        |-3.518101135256185E-4|
|Column     |tpep_pickup_datetime   |Completeness       |1.0                  |
|Column     |more-5 passenger_count |Compliance         |0.06527827848298186  |
|Dataset    |*                      |Size               |5.65621687E8         |
|Column     |payment_type           |ApproxCountDistinct|6.0                  |
|Column     |passenger_count        |Mean               |1.5685495143314883   |
|Column     |less-0 passenger_count |Compliance         |0.009154656051934586 |
+-----------+-----------------------+-------------------+---------------------+





## Silver zone

In [67]:
spark.sql('CREATE DATABASE IF NOT EXISTS Warehouse')
spark.sql('SHOW DATABASES').show(truncate=False)
spark.sql('SELECT current_schema() AS current_database').show(truncate=False)

+--------------+
|namespace     |
+--------------+
|adventureworks|
|default       |
|otherdb       |
|warehouse     |
+--------------+

+----------------+
|current_database|
+----------------+
|otherdb         |
+----------------+



In [68]:
sql = '''
SELECT
    p.ProductID,
    p.Name AS ProductName,
    p.Color,
    p.Size,
    p.Weight,
    sc.Name AS SubCategory,
    c.Name AS Category
FROM AdventureWorks.Product p
LEFT JOIN AdventureWorks.ProductSubCategory sc ON p.ProductSubcategoryID = sc.ProductSubcategoryID
LEFT JOIN AdventureWorks.ProductCategory c ON sc.ProductCategoryID = c.ProductCategoryID
'''

data = spark.sql(sql)

In [69]:
sql = '''
CREATE TABLE IF NOT EXISTS Warehouse.DimProduct (
    Sk INT NOT NULL,
    ProductID INT,
    ProductName STRING,
    Color STRING,
    Size STRING,
    Weight DECIMAL(8,2),
    SubCategory STRING,
    Category STRING,
    StartDate TIMESTAMP,
    EndDate TIMESTAMP
) USING DELTA LOCATION '../lake/silver/Warehouse/DimProduct'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN Warehouse')).show(truncate=False)

+-------------+----------+
|database_name|table_name|
+-------------+----------+
|warehouse    |dimproduct|
+-------------+----------+



In [70]:
## Create Dimension with SCD type 2 support

sql = '''
INSERT INTO Warehouse.DimProduct
SELECT
    ROW_NUMBER() OVER(ORDER BY NULL) + (SELECT COALESCE(MAX(Sk), 0) FROM Warehouse.DimProduct) AS Sk,
    *,
    CURRENT_TIMESTAMP() AS StartDate, 
    NULL as EndDate
FROM {df}
'''

spark.sql(sql, df = data)
spark.sql('SELECT * FROM Warehouse.DimProduct LIMIT 3')

Sk,ProductID,ProductName,Color,Size,Weight,SubCategory,Category,StartDate,EndDate
1,1,Adjustable Race,,,,,,2024-11-01 10:37:...,
2,2,Bearing Ball,,,,,,2024-11-01 10:37:...,
3,3,BB Ball Bearing,,,,,,2024-11-01 10:37:...,


In [71]:
# create fact order

sql = '''
SELECT
    soh.SalesOrderID AS OrderId,
    sod.SalesOrderDetailID AS OrderDetailsId,
    CAST(soh.OrderDate AS DATE) AS OrderDate,
    soh.CustomerID AS CustomerId,
    sod.ProductID AS ProductId,
    sod.OrderQty AS Quantity,
    sod.UnitPrice AS Price
FROM AdventureWorks.SalesOrderHeader soh
LEFT JOIN AdventureWorks.SalesOrderDetail sod ON soh.SalesOrderID = sod.SalesOrderID
'''

data = spark.sql(sql)

In [72]:
sql = '''
CREATE TABLE IF NOT EXISTS Warehouse.FactOrder (
    OrderId INT,
    OrderDetailsId INT,
    OrderDate DATE,
    CustomerId INT,
    ProductId INT,
    Quantity SMALLINT,
    Price DECIMAL(19,4)
) USING DELTA LOCATION '../lake/silver/Warehouse/FactOrder'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN Warehouse')).show(truncate=False)

+-------------+----------+
|database_name|table_name|
+-------------+----------+
|warehouse    |dimproduct|
|warehouse    |factorder |
+-------------+----------+



In [73]:
spark.sql('''
    INSERT INTO Warehouse.FactOrder
    SELECT * FROM {df}
''', df = data)

spark.sql('SELECT * FROM Warehouse.FactOrder LIMIT 3')

OrderId,OrderDetailsId,OrderDate,CustomerId,ProductId,Quantity,Price
43659,12,2011-05-31,29825,711,4,20.1865
43659,11,2011-05-31,29825,712,2,5.1865
43659,10,2011-05-31,29825,709,6,5.7


In [74]:
# create dimension date

spark.sql('SELECT MIN(OrderDate), MAX(OrderDate) FROM Warehouse.FactOrder').show(truncate=False)

+--------------+--------------+
|min(OrderDate)|max(OrderDate)|
+--------------+--------------+
|2011-05-31    |2014-06-30    |
+--------------+--------------+



In [75]:
from pyspark.sql.functions import col

sql = '''
SELECT 
    date_key AS DateKey,
    YEAR(date_key) AS Year,
    MONTH(date_key) AS MonthKey,
    MONTHNAME(date_key) AS MonthName,
    DAY(date_key) AS Day,
    DAYNAME(date_key) AS DayName,
    DAYOFYEAR(date_key) AS DayOfYear,
    DAYOFMONTH(date_key) AS DayOfMonth,
    QUARTER(date_key) AS Quarter
FROM (
        SELECT CAST(range AS DATE) AS date_key FROM RANGE(DATE '2011-05-30', DATE '2014-06-29', INTERVAL 1 DAY)
) q
'''

data = spark.createDataFrame( 
    duckdb.sql(sql).df()
)

data = data.withColumn('DateKey', col('DateKey').cast('date'))

In [76]:
sql = '''
CREATE TABLE IF NOT EXISTS Warehouse.DimDate (
    Sk INT NOT NULL,
    DateKey DATE,
    Year BIGINT,
    MonthKey BIGINT,
    MonthName STRING,
    Day BIGINT,
    DayName STRING,
    DayOfYear BIGINT,
    DayOfMonth BIGINT,
    Quarter BIGINT,
    StartDate TIMESTAMP,
    EndDate TIMESTAMP
) USING DELTA LOCATION '../lake/silver/Warehouse/DimDate'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN Warehouse')).show(truncate=False)

+-------------+----------+
|database_name|table_name|
+-------------+----------+
|warehouse    |dimdate   |
|warehouse    |dimproduct|
|warehouse    |factorder |
+-------------+----------+



In [77]:
sql = '''
INSERT INTO Warehouse.DimDate
SELECT
    ROW_NUMBER() OVER(ORDER BY NULL) + (SELECT COALESCE(MAX(Sk), 0) FROM Warehouse.DimDate) AS Sk,
    *,
    CURRENT_TIMESTAMP() AS StartDate, 
    NULL as EndDate
FROM {df}
'''

spark.sql(sql, df = data)
spark.sql('SELECT * FROM Warehouse.DimDate LIMIT 3')

Sk,DateKey,Year,MonthKey,MonthName,Day,DayName,DayOfYear,DayOfMonth,Quarter,StartDate,EndDate
1,2011-05-30,2011,5,May,30,Monday,150,30,2,2024-11-01 10:37:...,
2,2011-05-31,2011,5,May,31,Tuesday,151,31,2,2024-11-01 10:37:...,
3,2011-06-01,2011,6,June,1,Wednesday,152,1,2,2024-11-01 10:37:...,


In [78]:
# create dimension customer

sql = '''
SELECT
	c.CustomerID,
	CONCAT(
		COALESCE(p.FirstName, ''), ' ',
		COALESCE(p.MiddleName, ''), ' ',
		COALESCE (p.LastName, '')
	) AS FullName,
	ea.EmailAddress,
	pp.PhoneNumber 
FROM AdventureWorks.Customer c
INNER JOIN AdventureWorks.Person p ON c.PersonID = p.BusinessEntityID
INNER JOIN AdventureWorks.BusinessEntity be ON be.BusinessEntityID = p.BusinessEntityID 
INNER JOIN AdventureWorks.EmailAddress ea ON ea.BusinessEntityID = be.BusinessEntityID 
INNER JOIN AdventureWorks.PersonPhone pp ON pp.BusinessEntityID = be.BusinessEntityID 
'''

data = spark.sql(sql)

In [79]:
sql = '''
CREATE TABLE IF NOT EXISTS Warehouse.DimCustomer (
    Sk INT NOT NULL,
    CustomerID INT,
    FullName STRING,
    EmailAddress STRING,
    PhoneNumber STRING,
    StartDate TIMESTAMP,
    EndDate TIMESTAMP
) USING DELTA LOCATION '../lake/silver/Warehouse/DimCustomer'
'''

spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN Warehouse')).show(truncate=False)

+-------------+-----------+
|database_name|table_name |
+-------------+-----------+
|warehouse    |dimcustomer|
|warehouse    |dimdate    |
|warehouse    |dimproduct |
|warehouse    |factorder  |
+-------------+-----------+



In [80]:
sql = '''
INSERT INTO Warehouse.DimCustomer
SELECT
    ROW_NUMBER() OVER(ORDER BY NULL) + (SELECT COALESCE(MAX(Sk), 0) FROM Warehouse.DimCustomer) AS Sk,
    *,
    CURRENT_TIMESTAMP() AS StartDate, 
    NULL AS EndDate
FROM {df}
'''

spark.sql(sql, df = data)
spark.sql('SELECT * FROM Warehouse.DimCustomer LIMIT 3')

Sk,CustomerID,FullName,EmailAddress,PhoneNumber,StartDate,EndDate
1,29484,Gustavo Achong,gustavo0@adventur...,398-555-0132,2024-11-01 10:37:...,
2,29485,Catherine R. Abel,catherine0@advent...,747-555-0171,2024-11-01 10:37:...,
3,29486,Kim Abercrombie,kim2@adventure-wo...,334-555-0137,2024-11-01 10:37:...,


## Farsi data

In [81]:
digikala_df = spark.read.option('header', 'true').option('inferSchema', 'true').option('delimiter', ',').csv('../../data/digikala/digikala_orders.csv')
spark.createDataFrame(digikala_df.dtypes, ['column_name', 'data_type']).show(digikala_df.count(), truncate=False)

+---------------------+---------+
|column_name          |data_type|
+---------------------+---------+
|ID_Order             |int      |
|ID_Customer          |int      |
|ID_Item              |int      |
|DateTime_CartFinalize|timestamp|
|Amount_Gross_Order   |double   |
|city_name_fa         |string   |
|Quantity_item        |double   |
+---------------------+---------+



In [82]:
spark.sql('''
    SELECT * FROM {df} LIMIT 3
''', df = digikala_df)

ID_Order,ID_Customer,ID_Item,DateTime_CartFinalize,Amount_Gross_Order,city_name_fa,Quantity_item
2714054,469662,21386,2015-10-15 08:50:56,597982.0,محمود آباد,1.0
11104039,3063877,248497,2018-02-11 00:29:26,980000.0,خرمدره,1.0
4228130,3184893,50144,2016-06-14 00:30:08,229358.0,قرچک,1.0


In [83]:
sql = '''
CREATE TABLE IF NOT EXISTS OtherDB.dg_Orders (
    OrderID INT,
    CustomerID INT,
    ProductID INT,
    OrderDate TIMESTAMP,
    Amount DOUBLE,
    Quantity DOUBLE,
    CityName STRING
) USING DELTA LOCATION '../lake/bronze/OtherDB/dg_orders'
'''


spark.sql(sql)
spark.sql('SELECT namespace as database_name, tableName as table_name FROM {df}', df = spark.sql('SHOW TABLES IN OtherDB')).show(truncate=False)

+-------------+----------------+
|database_name|table_name      |
+-------------+----------------+
|otherdb      |crime           |
|otherdb      |dg_orders       |
|otherdb      |trip            |
|otherdb      |vw_crime        |
|otherdb      |vw_trip_duration|
|otherdb      |vw_trip_report  |
+-------------+----------------+



In [84]:
spark.sql('''
    INSERT INTO OtherDB.dg_Orders
    SELECT ID_Order, ID_Customer, ID_Item, DateTime_CartFinalize, Amount_Gross_Order, Quantity_item, city_name_fa
    FROM {df}
''', df = digikala_df)

spark.sql('SELECT * FROM OtherDB.dg_Orders LIMIT 3')

OrderID,CustomerID,ProductID,OrderDate,Amount,Quantity,CityName
1496884,468717,20187,2014-11-05 16:05:03,2844074.0,1.0,کرج
5958461,2397262,78180,2017-01-02 22:03:04,369450.0,1.0,کرمانشاه
6345216,1040870,273295,2017-02-11 01:20:22,156789.0,1.0,قزوین


In [85]:
spark.stop()