In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz

!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL").getOrCreate()



In [2]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when, monotonically_increasing_id as miid
from IPython.display import display, HTML

df = (
    spark.read.option("header", "true")
    .csv("/content/Dados brutos.csv")
    .withColumn("data", F.to_date("data", "yyyy-MM-dd"))
    .withColumn("qtd_vendida", F.col("qtd_vendida").cast("integer"))
    .withColumn("valor_total", F.col("valor_total").cast("decimal(10,2)"))
)

df.createOrReplaceTempView("vendas_temp")

display(HTML(f"<pre>{df._jdf.showString(100, 20, False)}</pre>"))


In [3]:
dim_cliente = spark.sql(
    """
    SELECT
        ROW_NUMBER() OVER (ORDER BY nome_cliente) AS id_cliente,
        nome_cliente,
        cidade,
        estado
    FROM
        vendas_temp
    GROUP BY
        nome_cliente,
        cidade,
        estado
"""
)

dim_cliente.createOrReplaceTempView("dim_cliente")
display(HTML(f"<pre>{dim_cliente._jdf.showString(100, 20, False)}</pre>"))

In [4]:
dim_produto = spark.sql(
    """
    SELECT
        ROW_NUMBER() OVER (ORDER BY nome_produto) AS id_produto,
        nome_produto,
        categoria,
        fabricante
    FROM
        vendas_temp
    GROUP BY
        nome_produto,
        categoria,
        fabricante
"""
)

dim_produto.createOrReplaceTempView("dim_produto")
display(HTML(f"<pre>{dim_produto._jdf.showString(100, 20, False)}</pre>"))

In [5]:
dim_data = spark.sql("""
    WITH dim_data_raw AS (
        SELECT DISTINCT
            data,
            YEAR(data) AS ano,
            MONTH(data) AS mes,
            DAY(data) AS dia,
            DATE_FORMAT(data, 'E') AS dia_semana_en
        FROM
            vendas_temp
    )
    SELECT
        ROW_NUMBER() OVER (ORDER BY data) AS id_data,
        data,
        ano,
        mes,
        dia,
        CASE dia_semana_en
            WHEN 'Mon' THEN 'Segunda-feira'
            WHEN 'Tue' THEN 'Terça-feira'
            WHEN 'Wed' THEN 'Quarta-feira'
            WHEN 'Thu' THEN 'Quinta-feira'
            WHEN 'Fri' THEN 'Sexta-feira'
            WHEN 'Sat' THEN 'Sábado'
            WHEN 'Sun' THEN 'Domingo'
            ELSE dia_semana_en
        END AS dia_semana
    FROM
        dim_data_raw
""")

dim_data.createOrReplaceTempView("dim_data")
display(HTML(f"<pre>{dim_data._jdf.showString(100, 20, False)}</pre>"))


In [6]:
fato_vendas = spark.sql("""
    WITH base_fato AS (
        SELECT DISTINCT
            v.qtd_vendida,
            v.valor_total,
            dc.id_cliente,
            dp.id_produto,
            dd.id_data
        FROM vendas_temp v
        JOIN dim_cliente dc
            ON v.nome_cliente = dc.nome_cliente
           AND v.cidade = dc.cidade
           AND v.estado = dc.estado
        JOIN dim_produto dp
            ON v.nome_produto = dp.nome_produto
           AND v.categoria = dp.categoria
           AND v.fabricante = dp.fabricante
        JOIN dim_data dd
            ON v.data = dd.data
    )

    SELECT
        ROW_NUMBER() OVER (ORDER BY id_cliente, id_produto, id_data, qtd_vendida, valor_total) AS id_fato,
        qtd_vendida,
        valor_total,
        id_cliente,
        id_produto,
        id_data
    FROM base_fato
""")

display(HTML(f"<pre>{fato_vendas._jdf.showString(100, 20, False)}</pre>"))


In [7]:
dim_cliente.coalesce(1).write.csv("/content/dim_cliente", header=True, mode="overwrite")
dim_produto.coalesce(1).write.csv("/content/dim_produto", header=True, mode="overwrite")
dim_data.coalesce(1).write.csv("/content/dim_data", header=True, mode="overwrite")
fato_vendas.coalesce(1).write.csv("/content/fato_vendas", header=True, mode="overwrite")
