In [0]:
storage_key = dbutils.secrets.get(scope="SecretsAzure", key="blobStorageKey")

In [0]:
 # Montando a camada bronze da Azure
 dbutils.fs.mount(
     source = "wasbs://bronze@falcaristorage.blob.core.windows.net/", 
     mount_point = "/mnt/falcaristorage/bronze",
     extra_configs = {"fs.azure.account.key.falcaristorage.blob.core.windows.net":storage_key}
 )

True

In [0]:
 # Montando a camada silver da Azure
 dbutils.fs.mount(
     source = "wasbs://silver@falcaristorage.blob.core.windows.net/", 
     mount_point = "/mnt/falcaristorage/silver",
     extra_configs = {"fs.azure.account.key.falcaristorage.blob.core.windows.net":storage_key}
 )

True

In [0]:
 # Montando a camada gold da Azure
 dbutils.fs.mount(
     source = "wasbs://gold@falcaristorage.blob.core.windows.net/", 
     mount_point = "/mnt/falcaristorage/gold",
     extra_configs = {"fs.azure.account.key.falcaristorage.blob.core.windows.net":storage_key}
 )

True

In [0]:
# ler arquivo csv da camada bronze
df_sales_bronze = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=';').load("dbfs:/mnt/falcaristorage/bronze/sales.csv")

In [0]:
# Visualizando o DataFrame
df_sales_bronze.show()

+--------+------------+-----------------+--------------------+----------+----------+-----+-----------+
|id_venda|id_comprador|             Nome|              e-mail|id_produto|quantidade|valor|data_compra|
+--------+------------+-----------------+--------------------+----------+----------+-----+-----------+
|       1|          89|    Maria Pereira|maria.pereira@gma...|        27|         7|48.41| 2023-07-06|
|       2|          13|      Rafael Lima|rafael.lima@yahoo...|        42|         6|46.64| 2023-10-01|
|       3|          34|      Pedro Silva|pedro.silva@yahoo...|         7|         4|18.37| 2023-03-01|
|       4|          95|  Gabriel Pereira|gabriel.pereira@h...|        14|         6|29.56| 2023-04-23|
|       5|          41| Gabriel Oliveira|gabriel.oliveira@...|        50|         5|65.08| 2023-04-15|
|       6|          55|       João Silva|joão.silva@outloo...|         9|         9|74.88| 2023-07-07|
|       7|           6|      Julia Alves|julia.alves@outlo...|        17|

In [0]:
# Importações 
from pyspark.sql.functions import lower, regexp_replace, to_date, format_number, year, month, avg, sum

In [0]:
# LIMPEZA DE DADOS
# Padronizar formato de email
df_sales_silver = df_sales_bronze.withColumn('e-mail', lower(regexp_replace('e-mail', '[^a-zA-Z0-9@.]', '')))

# removendo duplicatas  
df_sales_silver = df_sales_silver.dropDuplicates()

In [0]:
# carregando a tabela de categorias de produtos (arquivo csv)
df_categories = spark.read.csv("dbfs:/mnt/falcaristorage/bronze/categories.csv", header=True, inferSchema=True)

# Enriquecer os dados com a categoria do produto 
df_sales_silver_enriched  = df_sales_silver.join(df_categories, "id_produto", "left")

In [0]:
# visualizando o dataframe
df_sales_silver_enriched.show()

+----------+--------+------------+-----------------+--------------------+----------+-----+-----------+-----------+
|id_produto|id_venda|id_comprador|             Nome|              e-mail|quantidade|valor|data_compra|  categoria|
+----------+--------+------------+-----------------+--------------------+----------+-----+-----------+-----------+
|        42|       2|          13|      Rafael Lima|rafael.lima@yahoo...|         6|46.64| 2023-10-01|  Vestuário|
|         6|      18|          47|       Pedro Lima|pedro.lima@gmail.com|         1|28.57| 2023-01-24|Eletrônicos|
|        14|      12|          92|     Ana Oliveira|ana.oliveira@yaho...|         7|59.59| 2023-04-27|     Móveis|
|        41|      15|          93|      Ana Pereira|ana.pereira@gmail...|         8|82.16| 2023-01-11|Eletrônicos|
|         9|       6|          55|       João Silva|joo.silva@outlook...|         9|74.88| 2023-07-07|     Móveis|
|        18|      11|          21|    Julia Pereira|julia.pereira@out...|       

In [0]:
# salvando os dados na camada silver dentro do conteiner criado na azure
df_sales_silver_enriched.write.format("parquet").save("dbfs:/mnt/falcaristorage/silver/sales.csv") 

In [0]:
# Salvando os dados no formato delta 
df_sales_silver_enriched.write.format("delta").mode('overwrite').save("dbfs:/mnt/falcaristorage/silver/sales_clean.csv")

In [0]:
# criando uma tabela com spark sql para acessar os dados da camada silver
spark.sql("""
    CREATE TABLE IF NOT EXISTS sales_clean
    USING DELTA 
    LOCATION '/mnt/falcaristorage/silver/sales_clean.csv'
""")

DataFrame[]

In [0]:
# visualizando a tabela criada 
display(spark.sql("SELECT * FROM sales_clean"))

id_produto,id_venda,id_comprador,Nome,e-mail,quantidade,valor,data_compra,categoria
33,71,39,Ana Alves,ana.alves@yahoo.com,5,16.39,2023-10-26,Alimentos
34,82,93,Pedro Gomes,pedro.gomes@hotmail.com,3,32.69,2023-05-27,Móveis
17,100,83,Maria Lima,maria.lima@outlook.com,5,21.2,2023-10-23,Vestuário
15,181,7,Maria Fernandes,maria.fernandes@gmail.com,6,71.33,2023-07-17,Livros
4,110,60,Maria Souza,maria.souza@yahoo.com,6,90.21,2023-07-02,Móveis
16,175,6,Rafael Oliveira,rafael.oliveira@yahoo.com,9,44.75,2023-04-28,Eletrônicos
45,150,97,Pedro Santos,pedro.santos@hotmail.com,10,65.21,2023-01-06,Livros
3,132,12,Julia Santos,julia.santos@gmail.com,2,69.89,2023-01-03,Alimentos
23,101,95,João Alves,joo.alves@yahoo.com,2,79.25,2023-06-22,Alimentos
32,168,92,Pedro Gomes,pedro.gomes@gmail.com,6,41.69,2023-05-18,Vestuário


In [0]:
# Carregar dados da camada silver 
silver_sales_path = "/mnt/falcaristorage/silver/sales.csv"
df_sales_silver = spark.read.parquet(silver_sales_path)

# Converter a coluna de data (tipo string) para o tipo date
df_sales_silver = df_sales_silver.withColumn("data_compra", to_date(df_sales_silver["data_compra"], "yyyy-MM-dd"))

In [0]:
# Trabalhando da camada gold 

# Agrupando dados de vendas por mês
df_sales_month = (
    df_sales_silver
    .groupBy(year("data_compra").alias("ano"), month("data_compra").alias("month"))
    .agg(sum("valor").alias("total_vendas"))
    .orderBy("total_vendas", ascending=False)  # Ordena corretamente antes da formatação
)

df_sales_month = df_sales_month.withColumn("total_vendas", format_number("total_vendas", 2))

In [0]:
# visualizando o dataframe df_sales_month
df_sales_month.show()   

+----+-----+------------+
| ano|month|total_vendas|
+----+-----+------------+
|2023|    1|    1,157.72|
|2023|   11|    1,104.32|
|2023|    6|    1,082.06|
|2023|   10|    1,060.72|
|2023|    5|    1,012.46|
|2023|    4|      962.72|
|2023|    9|      942.71|
|2023|    7|      866.50|
|2023|    8|      795.92|
|2023|   12|      771.95|
|2023|    2|      706.32|
|2023|    3|      662.39|
+----+-----+------------+



In [0]:
# Principais compradores em termos de receita
df_main_buyers = df_sales_silver.groupBy("id_comprador", "nome").agg(format_number(sum("valor"), 2).alias("total_comprado")).orderBy("total_comprado", ascending=False)

In [0]:
# visualizando o dataframe df_main_buyers
df_main_buyers.show()

+------------+-----------------+--------------+
|id_comprador|             nome|total_comprado|
+------------+-----------------+--------------+
|          98|  Paula Fernandes|         99.98|
|          97|Gabriel Fernandes|         99.92|
|          14|       Lucas Lima|         98.84|
|          73|  Lucas Fernandes|         98.75|
|          57|    Pedro Pereira|         97.72|
|          37|       João Silva|         96.57|
|           4|   Julia Oliveira|         96.54|
|          44|   João Rodrigues|         95.95|
|          67|      Paula Silva|         95.64|
|           2|    Lucas Pereira|         95.43|
|          46|   João Fernandes|         95.31|
|          86|      Pedro Silva|         95.27|
|          17|      Pedro Silva|         94.88|
|           3|     Carla Santos|         94.73|
|          84|     Ana Oliveira|         94.69|
|          17|        Ana Souza|         93.21|
|          64|       João Silva|         92.14|
|          11|     Rafael Alves|        

In [0]:
# Salvando o resultado das agregações por mês e ano 
df_sales_month.write.format("delta").mode("overwrite").save("/mnt/falcaristorage/gold/sales_month")


In [0]:
# Salvando o resutado dos pricipais compradores
df_main_buyers.write.format("delta").mode("overwrite").save("/mnt/falcaristorage/gold/main_buyers")

In [0]:
# Criando Views sales_month e main_buyers para facilitar a consulta
df_sales_month.createOrReplaceTempView("sales_month_view")
df_main_buyers.createOrReplaceTempView("main_buyers_view")

In [0]:
%sql
SELECT * FROM sales_month_view

ano,month,total_vendas
2023,1,1157.72
2023,11,1104.32
2023,6,1082.06
2023,10,1060.72
2023,5,1012.46
2023,4,962.72
2023,9,942.71
2023,7,866.5
2023,8,795.92
2023,12,771.95


In [0]:
%sql
SELECT * FROM main_buyers_view

id_comprador,nome,total_comprado
98,Paula Fernandes,99.98
97,Gabriel Fernandes,99.92
14,Lucas Lima,98.84
73,Lucas Fernandes,98.75
57,Pedro Pereira,97.72
37,João Silva,96.57
4,Julia Oliveira,96.54
44,João Rodrigues,95.95
67,Paula Silva,95.64
2,Lucas Pereira,95.43


In [0]:
# pegar as informações dos secrets
username = dbutils.secrets.get(scope="SecretsAzure", key="AZZURE_SQL_DB_USERNAME")
password = dbutils.secrets.get(scope="SecretsAzure", key="AZZURE_SQL_DB_PASSWORD")


In [0]:
# URL de conexão com o Azure SQL Database
url = "jdbc:sqlserver://falcari-spark-server.database.windows.net:1433;database=falcari-database"

# Opções de conexão com o Azure SQL Database
connection_properties = {
    "user": username, 
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
 }

In [0]:
# salvando a tabela vendas por mes e ano no banco de dados Azure SQL Database
query = """
    SELECT * FROM sales_month_view
"""

result_df = spark.sql(query)

result_df.write.jdbc(url, "sales_month", mode="overwrite", properties=connection_properties)

In [0]:
# salvando a tabela principais clientes no banco de dados Azure SQL Database
query = """
    SELECT * FROM main_buyers_view
"""

result_df = spark.sql(query)

result_df.write.jdbc(url, "main_buyers", mode="overwrite", properties=connection_properties)