In [0]:
from datetime import datetime
from pyspark.sql.functions import lit

source_origin_param = "dbfs:/FileStore/FileStore/Vendas_Silver/extracted_at=%s" % datetime.now().strftime('%Y-%m-%d')
dest_origin_param   = "dbfs:/FileStore/FileStore/Vendas_Gold/extracted_at=%s" % datetime.now().strftime('%Y-%m-%d')
extracted_at_param  =  datetime.now().strftime('%Y-%m-%d')

SILVER_PATH = "{}/{}".format(source_origin_param, 'sales_data_sample_silver')
GOLD_PATH = "{}/{}".format(dest_origin_param, 'sales_data_sample_gold')

print(SILVER_PATH)
print(GOLD_PATH)

dbfs:/FileStore/FileStore/Vendas_Silver/extracted_at=2024-01-29/sales_data_sample_silver
dbfs:/FileStore/FileStore/Vendas_Gold/extracted_at=2024-01-29/sales_data_sample_gold


In [0]:
SILVER_PATH_DELTA = SILVER_PATH + ".delta"
print(SILVER_PATH_DELTA)
dfVendas = spark.read.option("fetchsize","100000").format("delta").load(SILVER_PATH_DELTA)
dfVendas.display()

dbfs:/FileStore/FileStore/Vendas_Silver/extracted_at=2024-01-29/sales_data_sample_silver.delta


ORDERNUMBER,QUANTITYORDERED,PRICEEACH,ORDERLINENUMBER,SALES,ORDERDATE,STATUS,QTR_ID,MONTH_ID,YEAR_ID,PRODUCTLINE,MSRP,PRODUCTCODE,CUSTOMERNAME,PHONE,ADDRESSLINE1,ADDRESSLINE2,CITY,STATE,POSTALCODE,COUNTRY,TERRITORY,CONTACTLASTNAME,CONTACTFIRSTNAME,DEALSIZE,DATA_CARGA
10107,30,95.7,2,2871.0,,Shipped,1,2,2003,Motorcycles,95,S10_1678,Land of Toys Inc.,2125557818,897 Long Airport Avenue,,NYC,NY,10022,USA,,Yu,Kwai,Small,2024-01-29
10121,34,81.35,5,2765.9,,Shipped,2,5,2003,Motorcycles,95,S10_1678,Reims Collectables,26.47.1555,59 rue de l'Abbaye,,Reims,,51100,France,EMEA,Henriot,Paul,Small,2024-01-29
10134,41,94.74,2,3884.34,,Shipped,3,7,2003,Motorcycles,95,S10_1678,Lyon Souveniers,+33 1 46 62 7555,27 rue du Colonel Pierre Avia,,Paris,,75508,France,EMEA,Da Cunha,Daniel,Medium,2024-01-29
10145,45,83.26,6,3746.7,,Shipped,3,8,2003,Motorcycles,95,S10_1678,Toys4GrownUps.com,6265557265,78934 Hillside Dr.,,Pasadena,CA,90003,USA,,Young,Julie,Medium,2024-01-29
10159,49,100.0,14,5205.27,,Shipped,4,10,2003,Motorcycles,95,S10_1678,Corporate Gift Ideas Co.,6505551386,7734 Strong St.,,San Francisco,CA,,USA,,Brown,Julie,Medium,2024-01-29
10168,36,96.66,1,3479.76,,Shipped,4,10,2003,Motorcycles,95,S10_1678,Technics Stores Inc.,6505556809,9408 Furth Circle,,Burlingame,CA,94217,USA,,Hirano,Juri,Medium,2024-01-29
10180,29,86.13,9,2497.77,,Shipped,4,11,2003,Motorcycles,95,S10_1678,Daedalus Designs Imports,20.16.1555,"184, chausse de Tournai",,Lille,,59000,France,EMEA,Rance,Martine,Small,2024-01-29
10188,48,100.0,1,5512.32,,Shipped,4,11,2003,Motorcycles,95,S10_1678,Herkku Gifts,+47 2267 3215,"Drammen 121, PR 744 Sentrum",,Bergen,,N 5804,Norway,EMEA,Oeztan,Veysel,Medium,2024-01-29
10201,22,98.57,2,2168.54,,Shipped,4,12,2003,Motorcycles,95,S10_1678,Mini Wheels Co.,6505555787,5557 North Pendale Street,,San Francisco,CA,,USA,,Murphy,Julie,Small,2024-01-29
10211,41,100.0,14,4708.44,,Shipped,1,1,2004,Motorcycles,95,S10_1678,Auto Canal Petit,(1) 47.55.6555,"25, rue Lauriston",,Paris,,75016,France,EMEA,Perrier,Dominique,Medium,2024-01-29


In [0]:
dfVendas_Gold = dfVendas
dfVendas_Gold = dfVendas_Gold \
    .withColumn("TOTAL_PRODUTOS", dfVendas_Gold.QUANTITYORDERED * dfVendas_Gold.PRICEEACH)

In [0]:
#gravando a saída GOLD
PARQUET_PATH = GOLD_PATH + '.parquet'
dfVendas_Gold.write.format("parquet").mode("overwrite").save(PARQUET_PATH)

GOLD_PATH = GOLD_PATH + '.delta'
dfVendas_Gold.write.format("delta").mode("overwrite").option("mergeSchema", "true").partitionBy("YEAR_ID").save(GOLD_PATH)

In [0]:
from pyspark.sql.functions import sum, avg

# Soma
dfVendas_Gold.select(sum("TOTAL_PRODUTOS")).show()  

#Agrupamento
dfVendas_Gold.groupBy("PRODUCTLINE").agg(sum("SALES")).show()

+-------------------+
|sum(TOTAL_PRODUTOS)|
+-------------------+
|  8290886.785461426|
+-------------------+

+----------------+------------------+
|     PRODUCTLINE|        sum(SALES)|
+----------------+------------------+
|     Motorcycles|1166388.3392333984|
|    Vintage Cars| 1903150.835571289|
|           Ships| 714437.1301269531|
|Trucks and Buses|1127789.8432617188|
|    Classic Cars|3919615.6607666016|
|          Trains|226243.46899414062|
|          Planes| 975003.5713500977|
+----------------+------------------+



In [0]:
spark.sql("""
          CREATE DATABASE IF NOT EXISTS TESTE_DATABRICKS 
          """)

Out[28]: DataFrame[]

In [0]:
spark.sql("""
          USE TESTE_DATABRICKS
""")          
spark.sql("""
    DROP TABLE IF EXISTS VENDAS
""")

spark.sql(f"""
    CREATE TABLE VENDAS
    USING DELTA
    LOCATION '{GOLD_PATH}'
""")

Out[29]: DataFrame[]

In [0]:
df_gold = spark.sql("""SELECT * FROM TESTE_DATABRICKS.VENDAS""")
display(df_gold)

ORDERNUMBER,QUANTITYORDERED,PRICEEACH,ORDERLINENUMBER,SALES,ORDERDATE,STATUS,QTR_ID,MONTH_ID,YEAR_ID,PRODUCTLINE,MSRP,PRODUCTCODE,CUSTOMERNAME,PHONE,ADDRESSLINE1,ADDRESSLINE2,CITY,STATE,POSTALCODE,COUNTRY,TERRITORY,CONTACTLASTNAME,CONTACTFIRSTNAME,DEALSIZE,DATA_CARGA,TOTAL_PRODUTOS
10211,41,100.0,14,4708.44,,Shipped,1,1,2004,Motorcycles,95,S10_1678,Auto Canal Petit,(1) 47.55.6555,"25, rue Lauriston",,Paris,,75016,France,EMEA,Perrier,Dominique,Medium,2024-01-29,4100.0
10223,37,100.0,1,3965.66,,Shipped,1,2,2004,Motorcycles,95,S10_1678,"Australian Collectors, Co.",03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,APAC,Ferguson,Peter,Medium,2024-01-29,3700.0
10237,23,100.0,7,2333.12,,Shipped,2,4,2004,Motorcycles,95,S10_1678,Vitachrome Inc.,2125551500,2678 Kingston Rd.,Suite 101,NYC,NY,10022,USA,,Frick,Michael,Small,2024-01-29,2300.0
10251,28,100.0,2,3188.64,,Shipped,2,5,2004,Motorcycles,95,S10_1678,Tekni Collectables Inc.,2015559350,7476 Moss Rd.,,Newark,NJ,94019,USA,,Brown,William,Medium,2024-01-29,2800.0
10263,34,100.0,2,3676.76,,Shipped,2,6,2004,Motorcycles,95,S10_1678,Gift Depot Inc.,2035552570,25593 South Bay Ln.,,Bridgewater,CT,97562,USA,,King,Julie,Medium,2024-01-29,3400.0
10275,45,92.83,1,4177.35,,Shipped,3,7,2004,Motorcycles,95,S10_1678,La Rochelle Gifts,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,EMEA,Labrune,Janine,Medium,2024-01-29,4177.35
10285,36,100.0,6,4099.68,,Shipped,3,8,2004,Motorcycles,95,S10_1678,Marta's Replicas Co.,6175558555,39323 Spinnaker Dr.,,Cambridge,MA,51247,USA,,Hernandez,Marta,Medium,2024-01-29,3600.0
10299,23,100.0,9,2597.39,,Shipped,3,9,2004,Motorcycles,95,S10_1678,"Toys of Finland, Co.",90-224 8555,Keskuskatu 45,,Helsinki,,21240,Finland,EMEA,Karttunen,Matti,Small,2024-01-29,2300.0
10309,41,100.0,5,4394.38,,Shipped,4,10,2004,Motorcycles,95,S10_1678,Baane Mini Imports,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,EMEA,Bergulfsen,Jonas,Medium,2024-01-29,4100.0
10318,46,94.74,1,4358.04,,Shipped,4,11,2004,Motorcycles,95,S10_1678,Diecast Classics Inc.,2155551555,7586 Pompton St.,,Allentown,PA,70267,USA,,Yu,Kyung,Medium,2024-01-29,4358.04


In [0]:
# cria tabela agregada
colunas_selecionadas = ["YEAR_ID","MONTH_ID","PRODUCTLINE","CITY"]
df_gold_agg = df_gold.groupBy(colunas_selecionadas).agg(sum("QUANTITYORDERED").alias("QTDE_TOTAL_VENDA"),sum("SALES").alias("TOTAL_VENDA"))
display(df_gold_agg)

YEAR_ID,MONTH_ID,PRODUCTLINE,CITY,QTDE_TOTAL_VENDA,TOTAL_VENDA
2004,5,Classic Cars,Paris,125,15154.81005859375
2004,2,Trucks and Buses,Brisbane,83,6823.3302001953125
2004,8,Vintage Cars,Torino,493,45263.36010742188
2004,10,Classic Cars,Madrid,342,31513.8203125
2004,9,Vintage Cars,Torino,73,7765.340087890625
2004,5,Classic Cars,Espoo,144,25901.18017578125
2004,11,Classic Cars,San Rafael,33,1225.2900390625
2004,11,Ships,Lule,80,6673.2900390625
2004,11,Planes,Bergamo,342,28551.32983398437
2004,2,Vintage Cars,Melbourne,61,3217.4600830078125


In [0]:
#gravando a saída GOLD
PARQUET_PATH = GOLD_PATH + '_agg.parquet'
df_gold_agg.write.format("parquet").mode("overwrite").save(PARQUET_PATH)

GOLD_PATH = GOLD_PATH + '_agg.delta'
df_gold_agg.write.format("delta").mode("overwrite").option("mergeSchema", "true").partitionBy("YEAR_ID").save(GOLD_PATH)