In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

In [0]:
%sql
--DROP DATABASE IF EXISTS ADVENTUREWORKS_GOLD CASCADE

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS ADVENTUREWORKS_GOLD
LOCATION '/mnt/03-gold/ADVENTUREWORKS/'

In [0]:
from pyspark.sql.functions import col, format_number

# Consulta SQL
query = """
SELECT
    C.CustomerID,
    CONCAT(C.FirstName, ' ', C.LastName) AS CustomerName,
    A.City,
    A.StateProvince,
    A.CountryRegion,
    P.ProductID,
    P.Name AS ProductName,
    O.SalesOrderID AS OrderID,
    O.OrderDate,
    D.OrderQty AS Quantity,
    cast(D.LineTotal as decimal(18,2)) AS TotalPrice
FROM adventureworks_silver.tb_Customer C
JOIN adventureworks_silver.tb_CustomerAddress CA ON C.CustomerID = CA.CustomerID
JOIN adventureworks_silver.tb_Address A ON CA.AddressID = A.AddressID
JOIN adventureworks_silver.tb_SalesOrderHeader O ON C.CustomerID = O.CustomerID
JOIN adventureworks_silver.tb_SalesOrderDetail D ON O.SalesOrderID = D.SalesOrderID
JOIN adventureworks_silver.tb_Product P ON D.ProductID = P.ProductID;
"""

# Executa a consulta SQL e carrega o resultado em um DataFrame
df_fato_vendas_adventureworks = spark.sql(query)

#Atualiza a tabela Fato_Vendas_AdventureWorks na camada Gold do Delta Lake
df_fato_vendas_adventureworks.write\
    .mode("overwrite")\
    .option("overwriteSchema","true")\
    .saveAsTable("ADVENTUREWORKS_GOLD.fato_vendas_adventureworks")

In [0]:
import os

output_path = f"dbfs:/mnt/03-gold/ADVENTUREWORKS/fato_vendas_adventureworks_csv"
output_folder = "/dbfs/mnt/03-gold/ADVENTUREWORKS/fato_vendas_adventureworks_csv"
output_filename = "fato_vendas_adventureworks.csv"

# Crie o diretório de saída, se ele não existir
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Reduz o DataFrame a uma única partição e, em seguida, salva-o como um arquivo CSV
df_fato_vendas_adventureworks.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .option("delimiter", "|") \
    .csv(output_path)

# Lista o conteúdo do diretório de saída
files = os.listdir(output_folder)

# Encontra o arquivo gerado (pode haver apenas um arquivo no diretório)
for file in files:
    if file.startswith("part-") and file.endswith(".csv"):
        # Renomeie o arquivo gerado
        os.rename(os.path.join(output_folder, file), os.path.join(output_folder, output_filename))
    elif not file.endswith(".csv"):
        os.remove(os.path.join(output_folder, file))
