In [3]:
# @title Imports
# Load the Drive helper and mount
from google.colab import drive
import sqlite3
import pandas as pd
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# @title Pyspark Setup
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
from pyspark.sql.functions import col, lit, sum, count, date_format, to_date, year, month, dayofmonth, quarter, weekofyear, monotonically_increasing_id
from pyspark.sql.window import Window

spark= SparkSession \
       .builder \
       .appName("Liven Tech") \
       .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

spark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,703 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,985 kB]
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,953 kB]
Get:13 http://archive.ubuntu.com/ubunt

In [9]:
# @title Funções auxiliares
def read_database_table(table, database_url):
  try:
    conn = sqlite3.connect(database_url)

    # Lê a tabela
    df_table = pd.read_sql_query(f"SELECT * FROM {table}", conn)
    conn.close()

    # Converte para Spark
    df_spark = spark.createDataFrame(df_table)
    return df_spark
  except:
    print(f"Falha na leitura da tabela: {table}")
    raise Exception("Falha na leitura da tabela!")


def check_data_quality(data_frame, columns):
  errors = []
  for column in columns:
    if data_frame.filter(col(column).isNull()).count() > 0:
      errors.append(f"Coluna {column} possui valores nulos!")

  if errors:
    error_message = "\n".join(errors)
    raise Exception(f"Falha no processo de Data Quality:\n{error_message}")
  else:
    print(f"Dataset validado com sucesso!")


In [7]:
# @title Leitura dos dados originais
database_url = "/content/drive/MyDrive/liven-tech-db/olist.sqlite"

products_df = read_database_table("products", database_url)
order_items_df = read_database_table("order_items", database_url)
orders_df = read_database_table("orders", database_url)
customers_df = read_database_table("customers", database_url)
sellers_df = read_database_table("sellers", database_url)
order_payments_df = read_database_table("order_payments", database_url)
order_reviews_df = read_database_table("order_reviews", database_url)
product_category_translation_df = read_database_table("product_category_name_translation", database_url)

In [11]:
# @title Verificação de dados para as chaves, mas extensivel para outras colunas

check_data_quality(products_df, ["product_id"])
check_data_quality(order_items_df, ["order_id", "order_item_id", "product_id", "seller_id"])
check_data_quality(orders_df, ["order_id", "customer_id"])
check_data_quality(customers_df, ["customer_id", "customer_unique_id", "customer_zip_code_prefix"])
check_data_quality(sellers_df, ["seller_id", "seller_zip_code_prefix"])
check_data_quality(order_payments_df, ["order_id"])
check_data_quality(order_reviews_df, ["review_id", "order_id"])
check_data_quality(product_category_translation_df, ["product_category_name"])

Dataset validado com sucesso!
Dataset validado com sucesso!
Dataset validado com sucesso!
Dataset validado com sucesso!
Dataset validado com sucesso!
Dataset validado com sucesso!
Dataset validado com sucesso!
Dataset validado com sucesso!


In [12]:
# @title Criação da dimensão PRODUTO
dim_produto = products_df.join(
    product_category_translation_df,
    products_df.product_category_name == product_category_translation_df.product_category_name,
    "left"
).select(
    products_df.product_id.alias("id_produto"),
    products_df.product_id.alias("product_id_original"),
    products_df.product_category_name.alias("categoria_pt"),
    product_category_translation_df.product_category_name_english.alias("categoria_en"),
    products_df.product_weight_g.alias("peso_g"),
    products_df.product_length_cm.alias("comprimento_cm"),
    products_df.product_height_cm.alias("altura_cm"),
    products_df.product_width_cm.alias("largura_cm")
)

dim_produto.createOrReplaceTempView("dim_produto")

In [13]:
# @title Criação da dimensão CLIENTE
dim_cliente = customers_df.select(
    customers_df.customer_id.alias("id_cliente"),
    customers_df.customer_unique_id.alias("customer_id_original"),
    customers_df.customer_city.alias("cidade"),
    customers_df.customer_state.alias("estado"),
    customers_df.customer_zip_code_prefix.alias("cep_prefixo")
)

dim_cliente.createOrReplaceTempView("dim_cliente")

In [14]:
# @title Criação da dimensão VENDEDOR
dim_vendedor = sellers_df.select(
    sellers_df.seller_id.alias("id_vendedor"),
    sellers_df.seller_id.alias("seller_id_original"),
    sellers_df.seller_city.alias("cidade"),
    sellers_df.seller_state.alias("estado"),
    sellers_df.seller_zip_code_prefix.alias("cep_prefixo")
)

dim_vendedor.createOrReplaceTempView("dim_vendedor")

In [15]:
# @title Criação da dimensão PAGAMENTO
dim_pagamento = order_payments_df.select(
    order_payments_df.payment_type.alias("tipo_pagamento"),
    order_payments_df.payment_installments.alias("parcelas")
).distinct().withColumn(
    "id_pagamento",
    monotonically_increasing_id()  # Criar um ID único
)

dim_pagamento.createOrReplaceTempView("dim_pagamento")

In [16]:
# @title Criação da dimensão TEMPO
dim_tempo = orders_df.select(
    to_date(orders_df.order_purchase_timestamp).alias("data_completa")
).distinct().select(
    date_format(col("data_completa"), "yyyyMMdd").cast("int").alias("id_data"),
    col("data_completa"),
    year("data_completa").alias("ano"),
    month("data_completa").alias("mes"),
    dayofmonth("data_completa").alias("dia"),
    quarter("data_completa").alias("trimestre"),
    weekofyear("data_completa").alias("semana")
)

dim_tempo.createOrReplaceTempView("dim_tempo")

In [17]:
# @title Criação da tabela fato VENDAS
fato_vendas = order_items_df.join(
    orders_df,
    order_items_df.order_id == orders_df.order_id,
    "inner"
).join(
    dim_produto,
    order_items_df.product_id == dim_produto.product_id_original,
    "inner"
).join(
    dim_cliente,
    orders_df.customer_id == dim_cliente.id_cliente,
    "inner"
).join(
    dim_vendedor,
    order_items_df.seller_id == dim_vendedor.seller_id_original,
    "inner"
).join(
    order_payments_df,
    order_items_df.order_id == order_payments_df.order_id,
    "left"
).join(
    dim_pagamento,
    (order_payments_df.payment_type == dim_pagamento.tipo_pagamento) &
    (order_payments_df.payment_installments == dim_pagamento.parcelas),
    "left"
).join(
    dim_tempo,
    to_date(orders_df.order_purchase_timestamp) == dim_tempo.data_completa,
    "inner"
).select(
    order_items_df.order_id.alias("id_venda"),
    dim_produto.id_produto,
    dim_cliente.id_cliente,
    dim_vendedor.id_vendedor,
    dim_pagamento.id_pagamento,
    dim_tempo.id_data,
    lit(1).alias("quantidade_itens"),  # Cada linha é um item
    order_items_df.price.alias("valor_unitario"),
    order_items_df.freight_value.alias("frete")
).groupBy(
    "id_venda", "id_produto", "id_cliente", "id_vendedor", "id_pagamento", "id_data"
).agg(
    sum("quantidade_itens").alias("quantidade_itens"),
    sum("valor_unitario").alias("valor_faturado"),
    sum("frete").alias("frete_total")
)

fato_vendas.createOrReplaceTempView("fato_vendas")

In [18]:
# @title Criação da tabela fato AVALIACOES
fato_avaliacoes = order_reviews_df.join(
    orders_df,
    order_reviews_df.order_id == orders_df.order_id,
    "inner"
).join(
    dim_cliente,
    orders_df.customer_id == dim_cliente.id_cliente,
    "inner"
).join(
    dim_tempo,
    to_date(order_reviews_df.review_creation_date) == dim_tempo.data_completa,
    "inner"
).select(
    order_reviews_df.review_id.alias("id_avaliacao"),
    orders_df.order_id.alias("id_pedido"),
    dim_cliente.id_cliente,
    dim_tempo.id_data.alias("id_data_avaliacao"),
    order_reviews_df.review_score,
    # Contar pedidos do cliente como "volume_compras"
    count(orders_df.order_id).over(
        Window.partitionBy(orders_df.customer_id)
    ).alias("volume_compras")
)

fato_avaliacoes.createOrReplaceTempView("fato_avaliacoes")

In [21]:
#@title Faturamento por Categoria de Produto
query_faturamento = """
SELECT
    dp.categoria_pt AS categoria_produto,
    COUNT(DISTINCT fv.id_venda) AS quantidade_compras,
    SUM(fv.valor_faturado) AS valor_faturado_total,
    SUM(fv.frete_total) AS valor_frete_total,
    SUM(fv.valor_faturado + fv.frete_total) AS valor_total
FROM
    fato_vendas fv
JOIN
    dim_produto dp ON fv.id_produto = dp.id_produto
JOIN
    dim_vendedor dv ON fv.id_vendedor = dv.id_vendedor
JOIN
    dim_pagamento dpg ON fv.id_pagamento = dpg.id_pagamento
JOIN
    dim_tempo dt ON fv.id_data = dt.id_data
WHERE
    1=1
    -- Filtros
    -- AND dt.data_completa BETWEEN '2024-01-01' AND '2024-12-31'  -- Filtro por data
    -- AND dv.id_vendedor = 123  -- Ex Filtro por vendedor
    -- AND dp.id_produto = 456  -- Ex Filtro por produto
    -- AND dpg.tipo_pagamento = 'credit_card'  -- Ex Filtro por tipo de pagamento
GROUP BY
    dp.categoria_pt
ORDER BY
    valor_faturado_total DESC;"""

In [22]:
#debug
df_faturamento = spark.sql(query_faturamento)
display(df_faturamento)

categoria_produto,quantidade_compras,valor_faturado_total,valor_frete_total,valor_total
beleza_saude,8835,1297355.8000000168,188524.4900000005,1485880.2899999996
relogios_presentes,5624,1253143.3000000129,104335.51999999987,1357478.8200000045
cama_mesa_banho,9417,1092551.0200000338,217733.1100000008,1310284.1300000024
esporte_lazer,7720,1023996.3400000164,174528.0099999997,1198524.350000002
informatica_acess...,6689,942277.5700000122,153179.40999999968,1095456.9799999986
moveis_decoracao,6449,765093.8900000055,181499.2300000003,946593.1200000036
utilidades_domest...,5884,666586.9999999984,154264.2400000001,820851.2400000021
cool_stuff,3632,662309.4899999939,87840.53000000012,750150.0200000013
automotivo,3897,616752.5099999948,95689.56999999992,712442.080000001
ferramentas_jardim,3518,518217.5399999927,105634.43999999962,623851.9800000003


In [23]:
#@title Distribuição de avaliações por nota e volume de compras
query_nota = """
SELECT
    fa.review_score AS nota_avaliacao,
    COUNT(fa.id_avaliacao) AS quantidade_avaliacoes,
    AVG(fa.volume_compras) AS media_compras_cliente,
    MIN(fa.volume_compras) AS min_compras_cliente,
    MAX(fa.volume_compras) AS max_compras_cliente,
    dc.estado AS estado_cliente
FROM
    fato_avaliacoes fa
JOIN
    dim_cliente dc ON fa.id_cliente = dc.id_cliente
JOIN
    dim_tempo dt ON fa.id_data_avaliacao = dt.id_data
WHERE
    1=1
    -- Filtros
    -- AND dt.data_completa BETWEEN '2024-01-01' AND '2024-12-31'  -- Filtro por data avaliação
    -- AND dc.id_cliente = 789  -- Ex Filtro por cliente específico
    -- AND dc.estado = 'SP'  -- Ex Filtro por estado do cliente
GROUP BY
    fa.review_score, dc.estado
ORDER BY
    fa.review_score, dc.estado;"""

In [24]:
#debug
df_nota = spark.sql(query_nota)
display(df_nota)

nota_avaliacao,quantidade_avaliacoes,media_compras_cliente,min_compras_cliente,max_compras_cliente,estado_cliente
1,7,1.0,1,1,AC
1,72,1.0416666666666667,1,2,AL
1,12,1.1666666666666667,1,2,AM
1,3,1.0,1,1,AP
1,503,1.0059642147117296,1,2,BA
1,210,1.0095238095238095,1,2,CE
1,253,1.0118577075098814,1,2,DF
1,242,1.0041322314049588,1,2,ES
1,232,1.012931034482759,1,2,GO
1,131,1.0,1,1,MA


In [None]:
# Exemplo de escrita particionada
fato_vendas.repartition("id_data").write.partitionBy("id_data")\
    .mode("overwrite")\
    .format("parquet")\
    .save("/caminho/para/fato_vendas")


fato_avaliacoes.repartition("review_score").write.partitionBy("review_score")\
    .mode("overwrite")\
    .format("parquet")\
    .save("/caminho/para/fato_avaliacoes")