In [1]:
# IMPORTANDO BIBLIOTECAS
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, year, month, dayofmonth, to_date

In [2]:
# CRIANDO SESSÃO SPARK
spark = SparkSession.builder \
    .appName("Desafio Gaudium - Data Mart") \
    .getOrCreate()

In [3]:
# Ler os dados
df = spark.read.csv("dados_brutos_csv/dados_brutos.csv", header=True, inferSchema=True)

In [4]:
# Mostra os dados
df.show(5)

+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
| nome_cliente|        cidade|estado|nome_produto|categoria|fabricante|      data|qtd_vendida|valor_total|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
|Lucas Pereira|  Porto Alegre|    RS|  Detergente|  Limpeza|       Ypê|2024-01-26|          6|         90|
|Lucas Pereira|  Porto Alegre|    RS|      Feijão| Alimento|   Kicaldo|2024-01-14|         10|        240|
| Ana Oliveira|Rio de Janeiro|    RJ|Refrigerante|   Bebida| Coca-Cola|2024-01-15|          3|        150|
| Pedro Santos|      Curitiba|    PR|      Feijão| Alimento|   Kicaldo|2024-01-28|          4|        152|
| Pedro Santos|      Curitiba|    PR|       Arroz| Alimento|     Camil|2024-01-24|          3|         87|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
only showing top 5 rows



### CRIACAO DAS TABELAS DIMENSÃO

In [5]:
# Dimensão Cliente
dim_cliente = df.select("nome_cliente").distinct() \
    .withColumn("id_cliente", monotonically_increasing_id())

In [6]:
# Dimensão Produto
dim_produto = df.select("nome_produto", "categoria", "fabricante").distinct() \
    .withColumn("id_produto", monotonically_increasing_id())

In [7]:
# Dimensão Localidade
dim_localidade = df.select("cidade", "estado").distinct() \
    .withColumn("id_localidade", monotonically_increasing_id())

In [8]:
# Dimensão Tempo
dim_tempo = df.select("data").distinct() \
    .withColumn("data", to_date(col("data"), "yyyy-MM-dd")) \
    .withColumn("ano", year(col("data"))) \
    .withColumn("mes", month(col("data"))) \
    .withColumn("dia", dayofmonth(col("data"))) \
    .withColumn("id_tempo", monotonically_increasing_id())

### CRIACAO DA TABELA FATO

In [9]:
# Juntando todos os IDs para a tabela fato
df_fato = df \
    .join(dim_cliente, "nome_cliente", "left") \
    .join(dim_produto, ["nome_produto", "categoria", "fabricante"], "left") \
    .join(dim_localidade, ["cidade", "estado"], "left") \
    .join(dim_tempo, "data", "left") \
    .select(
        col("id_cliente"),
        col("id_produto"),
        col("id_localidade"),
        col("id_tempo"),
        col("qtd_vendida"),
        col("valor_total")
    )

### EXPORTANDO OS DADOS PARA CSV

In [10]:
# Definir o caminho da pasta
output_path = "tabelas_fato_dim/"

# Salvando Dimensões
dim_cliente.select("id_cliente", "nome_cliente") \
    .write.mode("overwrite").option("header", True).csv(output_path + "dim_cliente")

dim_produto.select("id_produto", "nome_produto", "categoria", "fabricante") \
    .write.mode("overwrite").option("header", True).csv(output_path + "dim_produto")

dim_localidade.select("id_localidade", "cidade", "estado") \
    .write.mode("overwrite").option("header", True).csv(output_path + "dim_localidade")

dim_tempo.select("id_tempo", "data", "ano", "mes", "dia") \
    .write.mode("overwrite").option("header", True).csv(output_path + "dim_tempo")

# Salvando Fato
df_fato.write.mode("overwrite").option("header", True).csv(output_path + "fato_vendas")

def mover_renomear_csv(pasta_saida, nome_final_csv):
    arquivos = os.listdir(pasta_saida)
    for arquivo in arquivos:
        if arquivo.startswith("part-") and arquivo.endswith(".csv"):
            origem = os.path.join(pasta_saida, arquivo)
            destino = os.path.join("tabelas_fato_dim", nome_final_csv)
            shutil.move(origem, destino)
            print(f"Arquivo {arquivo} renomeado para {nome_final_csv}")
    # Após mover, remover a pasta vazia
    shutil.rmtree(pasta_saida)
    print(f"Pasta {pasta_saida} removida.")

# Executando para cada dimensão/fato
mover_renomear_csv("tabelas_fato_dim/dim_cliente", "dim_cliente.csv")
mover_renomear_csv("tabelas_fato_dim/dim_produto", "dim_produto.csv")
mover_renomear_csv("tabelas_fato_dim/dim_localidade", "dim_localidade.csv")
mover_renomear_csv("tabelas_fato_dim/dim_tempo", "dim_tempo.csv")
mover_renomear_csv("tabelas_fato_dim/fato_vendas", "fato_vendas.csv")

Arquivo part-00000-a7c3353f-1c38-4a5a-839c-1e211754a307-c000.csv renomeado para dim_cliente.csv
Pasta tabelas_fato_dim/dim_cliente removida.
Arquivo part-00000-12dd9753-039f-4bef-90df-3d592ef10e3a-c000.csv renomeado para dim_produto.csv
Pasta tabelas_fato_dim/dim_produto removida.
Arquivo part-00000-2f7eac53-d46a-4d25-9a00-21261f932111-c000.csv renomeado para dim_localidade.csv
Pasta tabelas_fato_dim/dim_localidade removida.
Arquivo part-00000-149525de-409e-4b59-ab33-7544e7c58215-c000.csv renomeado para dim_tempo.csv
Pasta tabelas_fato_dim/dim_tempo removida.
Arquivo part-00000-938b97d3-f9ab-42fc-b652-d1de7c421448-c000.csv renomeado para fato_vendas.csv
Pasta tabelas_fato_dim/fato_vendas removida.


#### FINALIZANDO SESSÃO

In [11]:
spark.stop()
print("Processo concluído com sucesso!")

Processo concluído com sucesso!
