In [101]:
# Importa dependencias e criar spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, dayofmonth, month, year, quarter, dayofweek
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

spark = SparkSession.builder \
    .appName("gaudium") \
    .getOrCreate()

In [91]:
# Carrega os dados brutos com esquema definido
schema = StructType([
    StructField("nome_cliente", StringType(), True),
    StructField("cidade", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("nome_produto", StringType(), True),
    StructField("categoria", StringType(), True),
    StructField("fabricante", StringType(), True),
    StructField("data", DateType(), True),
    StructField("qtd_vendida", IntegerType(), True),
    StructField("valor_total", IntegerType(), True),
])
tabela_vendas = spark.read.csv("tabelas/dados_brutos.csv", header=True, schema=schema)
tabela_vendas.createOrReplaceTempView("vendas")

In [147]:
# dim_categoria
dim_categoria = spark.sql("SELECT DISTINCT(categoria) AS nome FROM vendas")
dim_categoria = dim_categoria.withColumn("id", expr("uuid()")).withColumnRenamed("categoria", "nome")
dim_categoria.createOrReplaceTempView("dim_categoria")

# dim_fabricante
dim_fabricante = spark.sql("SELECT DISTINCT(fabricante) AS nome FROM vendas")
dim_fabricante = dim_fabricante.withColumn("id", expr("uuid()")).withColumnRenamed("fabricante", "nome")
dim_fabricante.createOrReplaceTempView("dim_fabricante")

# dim_produto
dim_produto = spark.sql("""
SELECT
    DISTINCT(v.nome_produto) AS nome, c.id AS categoria_id, f.id AS fabricate_id
FROM
    vendas AS v
JOIN
    dim_categoria as c ON c.nome = categoria
JOIN
    dim_fabricante as f ON f.nome = fabricante
""")
dim_produto = dim_produto.withColumn("id", expr("uuid()"))
dim_produto.createOrReplaceTempView("dim_produto")

In [158]:
# dim_cliente
dim_cliente = spark.sql("SELECT DISTINCT(nome_cliente) FROM vendas")
dim_cliente = dim_cliente.withColumn("id", expr("uuid()")).withColumnRenamed("nome_cliente", "nome")
dim_cliente.createOrReplaceTempView("dim_cliente")

# dim_endereco
dim_endereco = spark.sql("SELECT DISTINCT cidade, estado FROM vendas")
dim_endereco = dim_endereco.withColumn("id", expr("uuid()"))
dim_endereco.createOrReplaceTempView("dim_endereco")

# dim_data
dim_data = spark.sql("SELECT DISTINCT(data) FROM vendas")
dim_data = dim_data.withColumn("dia", dayofmonth(dim_data["data"])) \
    .withColumn("mes", month(dim_data["data"])) \
    .withColumn("ano", year(dim_data["data"])) \
    .withColumn("trimestre", quarter(dim_data["data"])) \
    .withColumn("dia_semana", dayofweek(dim_data["data"]))
dim_data.createOrReplaceTempView("dim_data")

In [161]:
# fato_vendas
fato_vendas = spark.sql("""
SELECT
    v.qtd_vendida, v.valor_total, data,
    c.id AS cliente_id, e.id AS endereco_id, p.id AS produto_id
FROM
    vendas AS v
JOIN
    dim_cliente AS c ON c.nome = v.nome_cliente
JOIN
    dim_endereco AS e ON e.cidade = v.cidade AND e.estado = v.estado
JOIN
    dim_produto AS p ON p.nome = v.nome_produto
""")
fato_vendas.createOrReplaceTempView("fato_vendas")

In [174]:
# Salva as tabelas para arquivos .csv
dim_categoria.toPandas() \
    .to_csv("tabelas/dim_categoria.csv", mode="w", index=False, header=True)
dim_fabricante.toPandas() \
    .to_csv("tabelas/dim_fabricante.csv", mode="w", index=False, header=True)
dim_produto.toPandas() \
    .to_csv("tabelas/dim_produto.csv", mode="w", index=False, header=True)
dim_cliente.toPandas() \
    .to_csv("tabelas/dim_cliente.csv", mode="w", index=False, header=True)
dim_endereco.toPandas() \
    .to_csv("tabelas/dim_endereco.csv", mode="w", index=False, header=True)
dim_data.toPandas() \
    .to_csv("tabelas/dim_data.csv", mode="w", index=False, header=True)
fato_vendas.toPandas() \
    .to_csv("tabelas/fato_vendas.csv", mode="w", index=False, header=True)