In [72]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PrevisaoVendasHackathon").getOrCreate()

df_transacoes = spark.read.parquet("dados/transacoes.parquet")
df_produtos = spark.read.parquet("dados/cadastroprodutos.parquet")
df_pdvs = spark.read.parquet("dados/cadastropdv.parquet")

df_transacoes.printSchema()
df_transacoes.show(5)
df_produtos.printSchema()
df_produtos.show(5)
df_pdvs.printSchema()
df_pdvs.show(5) 

root
 |-- internal_store_id: string (nullable = true)
 |-- internal_product_id: string (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- reference_date: date (nullable = true)
 |-- quantity: double (nullable = true)
 |-- gross_value: double (nullable = true)
 |-- net_value: double (nullable = true)
 |-- gross_profit: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- taxes: double (nullable = true)



                                                                                

+-------------------+-------------------+--------------+----------------+--------------+--------+------------------+------------------+------------------+------------------+------------------+
|  internal_store_id|internal_product_id|distributor_id|transaction_date|reference_date|quantity|       gross_value|         net_value|      gross_profit|          discount|             taxes|
+-------------------+-------------------+--------------+----------------+--------------+--------+------------------+------------------+------------------+------------------+------------------+
|7384367747233276219| 328903483604537190|             9|      2022-07-13|    2022-07-01|     1.0|            38.125|         37.890625|10.042625427246094| 3.950000047683716|          0.234375|
|3536908514005606262|5418855670645487653|             5|      2022-03-21|    2022-03-01|     6.0|            107.25|106.44000244140625| 24.73200225830078|17.100000381469727|0.8100000023841858|
|3138231730993449825|10870055626757

detalhe que pode causar problemas mais tarde: produtos e df_pdvs têm uma coluna chamada categoria. quando fizer o join, o df completo terá duas colunas com o mesmo nome, o que é ambíguo e pode gerar erros em operações futuras.

renomear as colunas pra nao ter ambiguidade

In [73]:
df_produtos_renomeado = df_produtos.withColumnRenamed("categoria", "categoria_produto")
df_pdvs_renomeado = df_pdvs.withColumnRenamed("categoria", "categoria_pdv")

df_produtos_renomeado.show(5)
df_pdvs_renomeado.show(5)

+-------------------+-----------------+--------------------+-----------------+-------------+-------------------+--------------------+--------------------+
|            produto|categoria_produto|           descricao|            tipos|        label|       subcategoria|               marca|          fabricante|
+-------------------+-----------------+--------------------+-----------------+-------------+-------------------+--------------------+--------------------+
|2282334733936076502|Distilled Spirits|JOSEPH CARTRON CA...|Distilled Spirits|         Core|Liqueurs & Cordials| Joseph Cartron Cafe|            Spiribam|
|6091840953834683482|Distilled Spirits|SPRINGBANK 18 YEA...|Distilled Spirits|    Specialty|      Scotch Whisky|Springbank 18 Yea...|Pacific Edge Wine...|
|1968645851245092408|Distilled Spirits|J BRANDT TRIPLE S...|Distilled Spirits|Private Label|Liqueurs & Cordials| J Brandt Triple Sec|     Sazerac Spirits|
| 994706710729219179|            Draft|REFORMATION CASHM...|          

a op combina nossa tabela principal de Vendas (df_transacoes) com o Catálogo de Produtos (df_produtos_renomeado), usando como chave de ligação as colunas internal_product_id (Vendas) e produto (Catálogo).

left join garante que todas as vendas sejam mantidas, mesmo que não exista um produto correspondente no catálogo. Quando não houver correspondência, as colunas do catálogo serão preenchidas com valores vazios (null).

result = tabela enriquecida com as informações do produto associado a cada venda.

In [74]:
df_completo = df_transacoes.join(
    df_produtos_renomeado,
    df_transacoes["internal_product_id"] == df_produtos_renomeado["produto"], 
    "left"
).join(
    df_pdvs_renomeado,
    df_transacoes["internal_store_id"] == df_pdvs_renomeado["pdv"],
    "left"
)

In [75]:
print("Schema do DataFrame unificado:")
df_completo.printSchema()

print("\nAmostra dos dados unificados:")
df_completo.show(5)

Schema do DataFrame unificado:
root
 |-- internal_store_id: string (nullable = true)
 |-- internal_product_id: string (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- reference_date: date (nullable = true)
 |-- quantity: double (nullable = true)
 |-- gross_value: double (nullable = true)
 |-- net_value: double (nullable = true)
 |-- gross_profit: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- taxes: double (nullable = true)
 |-- produto: string (nullable = true)
 |-- categoria_produto: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- tipos: string (nullable = true)
 |-- label: string (nullable = true)
 |-- subcategoria: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- pdv: string (nullable = true)
 |-- premise: string (nullable = true)
 |-- categoria_pdv: string (nullable = true)
 |-- zipcode: integer (nullable = true

                                                                                

remocao de coluna id duplicada:
pdf e internal store id

In [76]:
df_completo = df_completo.drop("produto").drop("pdv")

In [77]:
print("Schema do DataFrame unificado:")
df_completo.printSchema()

print("\nAmostra dos dados unificados:")
df_completo.show(5)

Schema do DataFrame unificado:
root
 |-- internal_store_id: string (nullable = true)
 |-- internal_product_id: string (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- reference_date: date (nullable = true)
 |-- quantity: double (nullable = true)
 |-- gross_value: double (nullable = true)
 |-- net_value: double (nullable = true)
 |-- gross_profit: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- taxes: double (nullable = true)
 |-- categoria_produto: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- tipos: string (nullable = true)
 |-- label: string (nullable = true)
 |-- subcategoria: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- premise: string (nullable = true)
 |-- categoria_pdv: string (nullable = true)
 |-- zipcode: integer (nullable = true)


Amostra dos dados unificados:
+-------------------+-------------------

acima dessa celula foi feito tratamento de dados
abaixo dessa celula será feito a analise exploratoria de dados

In [78]:
#verificacoes basicas de quantidade e integridade
from pyspark.sql.functions import col, count, when, isnan, isnull

#tamanho do dataset
total_transacoes = df_completo.count()
print(f"Número total de transações em 2022: {total_transacoes}")

#verificar se os joins criaram valores nulos(o que indicaria transações com produtos ou PDVs não cadastrados)
# a quantidade que der null (join falhou), significa que as transacoes são vendas de lojas "fantasmas" ou não cadastradas
# venda sem ponto de venda
print("\nContagem de valores nulos após o join:")
df_completo.select([
    count(when(isnull(c), c)).alias(c) for c in ["categoria_produto", "categoria_pdv", "zipcode"]
]).show()

                                                                                

Número total de transações em 2022: 6560698

Contagem de valores nulos após o join:




+-----------------+-------------+-------+
|categoria_produto|categoria_pdv|zipcode|
+-----------------+-------------+-------+
|                0|        45582|  45582|
+-----------------+-------------+-------+



                                                                                

decidir o que fazer com as 45582 transacoes que nao tem ponto de venda nem zipcode -> ou apagar as linhas (nao é tanta coisa comparada ao todo) ou fazer uma imputação (atribuir um valor para as linhas nulas)

Como 45.582 representa uma porcentagem muito pequena do total de 6.5 milhões de transações (cerca de 0.7%), a abordagem mais simples e segura é remover essas linhas. não perderei muita informação e garantirei que o modelo treine apenas com dados completos.

In [79]:
df_completo_limpo = df_completo.na.drop(subset=["categoria_pdv"])

print(f"Tamanho original: {df_completo.count()}")
print(f"Tamanho após limpeza: {df_completo_limpo.count()}")

                                                                                

Tamanho original: 6560698




Tamanho após limpeza: 6515116


                                                                                

In [80]:
df_completo_limpo.printSchema()
df_completo_limpo.show(5)

root
 |-- internal_store_id: string (nullable = true)
 |-- internal_product_id: string (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- reference_date: date (nullable = true)
 |-- quantity: double (nullable = true)
 |-- gross_value: double (nullable = true)
 |-- net_value: double (nullable = true)
 |-- gross_profit: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- taxes: double (nullable = true)
 |-- categoria_produto: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- tipos: string (nullable = true)
 |-- label: string (nullable = true)
 |-- subcategoria: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- premise: string (nullable = true)
 |-- categoria_pdv: string (nullable = true)
 |-- zipcode: integer (nullable = true)

+-------------------+-------------------+--------------+----------------+--------------+--------+-----

                                                                                

dado que o objetivo é prever a quantidade semanal por PDV e SKU, a análise deve ser focada em encontrar padrões relacionados a essas três dimensões: o quê (quantidade), onde (internal_store_id) e quando (transaction_date).

análise da variável alvo: quantity

questionamento: qual é a distribuição das quantidades vendidas por transação? tem outliers? ou negativos (devoluções)?

ajuda a identificar a necessidade de limpar os dados (ex: remover devoluções se não fizerem sentido para a previsão) e a entender a escala das vendas. outliers muito grandes podem distorcer o treinamento do modelo.

In [81]:

df_completo_limpo.select("quantity").describe().show()
# quantity que significa a quantidade vendida por CADA transacao 



+-------+-----------------+
|summary|         quantity|
+-------+-----------------+
|  count|          6515116|
|   mean|8.149641143197064|
| stddev|80.71476955086146|
|    min|          -1530.0|
|    max|          94230.0|
+-------+-----------------+



                                                                                

observações sobre as funcoes estatisticas basicas iniciais: 
6,5 milhoes de transacoes
a media da quantidade de produtos vendidos por transacao é 8
o desvio padrao é 10x maior que a media, é um sinal claro de que os seus dados têm uma variabilidade enorme e contêm outliers significativos
o valor max é de quase 100 mil produtos vendidos em uma unica transacao, indicando que a empresa pode ser uma possivel B2b ou um centro de distribuicao

o valor min mostra que temos devolucoes, se existe uma transacao com um valor negativo, existe a mesma transacao com um valor positivo, sendo assim, limpar essa linha negativa para que o treinamento da maquina fique mais preciso acaba sendo uma escolha bem condizente com o que queremos entregar 


In [82]:

print(f"Tamanho do DataFrame antes de filtrar as quantidades: {df_completo_limpo.count()}")

# Filtra o DataFrame para manter apenas as linhas onde a quantidade é maior que zero
df_vendas_reais = df_completo_limpo.filter(df_completo_limpo["quantity"] > 0)

print(f"Tamanho do DataFrame após filtrar (apenas vendas reais): {df_vendas_reais.count()}")

quantidade_removida = df_completo_limpo.count() - df_vendas_reais.count()
print(f"Número de transações removidas (quantidade <= 0): {quantidade_removida}")

# Agora, vamos rodar as estatísticas descritivas novamente neste novo DataFrame limpo
print("\nEstatísticas descritivas apenas para vendas reais:")
df_vendas_reais.select("quantity").describe().show()

                                                                                

Tamanho do DataFrame antes de filtrar as quantidades: 6515116


                                                                                

Tamanho do DataFrame após filtrar (apenas vendas reais): 6423880


                                                                                

Número de transações removidas (quantidade <= 0): 91236

Estatísticas descritivas apenas para vendas reais:




+-------+--------------------+
|summary|            quantity|
+-------+--------------------+
|  count|             6423880|
|   mean|   8.298247866619342|
| stddev|   81.26202216095105|
|    min|1.192092895507812...|
|    max|             94230.0|
+-------+--------------------+



                                                                                

existe um valor minimo que nao é .0 (o que é bem estranho tendo em vista que a quantidade deve ser um valor inteiro)
vou fazer a verificacao se alguma categoria do produto faz sentido ser vendido como peso ou volume

Cenário A: Existem Vendas por Peso/Volume
Se a tabela de saída mostrar produtos em categorias como "Hortifruti", "Padaria", "Frios" ou descrições que incluam "KG", "Granel", etc., isso é uma forte evidência de que seu dataset contém produtos que são vendidos por peso ou volume.

Decisão: Neste caso, não faça o cast para inteiro, pois você perderia informações valiosas. Você precisará manter a coluna como double e talvez arredondar para um número razoável de casas decimais se quiser.

Cenário B: São Apenas "Ruídos" do Tipo de Dado
Se a tabela de saída estiver vazia (contagem_fracionadas for 0), ou se ela mostrar quantidades que são extremamente próximas de um número inteiro (como 5.00000001 ou 2.99999998), isso confirma que não há vendas fracionadas de verdade. São apenas pequenas imprecisões de ponto flutuante.

Decisão: Neste caso, fazer o cast para inteiro é a abordagem correta e segura. Você estará limpando o "ruído" e deixando os dados consistentes com a realidade do negócio.

In [83]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType


# filtragem do df para encontrar linhas onde a coluna 'quantity' não é um número inteiro
# (quantity!= int(quantity)) faz exatamente essa verificação.
df_quantidades_fracionadas = df_vendas_reais.filter(
    col("quantity")!= col("quantity").cast(IntegerType())
)

contagem_fracionadas = df_quantidades_fracionadas.count()

if contagem_fracionadas > 0:
    print(f"{contagem_fracionadas} transações com quantidades que não são inteiras.")
    print("amostra:")
    
    df_quantidades_fracionadas.select("quantity", "categoria_produto", "descricao").show(truncate=False)
else:
    print("nenhuma transação com quantidade fracionada foi encontrada.")

95346 transações com quantidades que não são inteiras.
amostra:
+------------------+-----------------+--------------------------------------------------------------+
|quantity          |categoria_produto|descricao                                                     |
+------------------+-----------------+--------------------------------------------------------------+
|45.9999897480011  |Distilled Spirits|BLANTON'S BOURBON 6/750ML 93PF                                |
|14.000010132789612|Distilled Spirits|CARAVELLA LIMONCELLO 6/750ML 56PF                             |
|139.99979883432388|Distilled Spirits|FIREBALL 12/1L 66PF                                           |
|79.00000804662704 |Distilled Spirits|FIREBALL 12/1L 66PF                                           |
|5.9999880194664   |Distilled Spirits|GRAN GALA ORANGE LIQUEUR12/L 80PF                             |
|83.0000039935112  |Distilled Spirits|PEYCHAUD'S APERTIVO 12/750ML 22PF                             |
|5.999999880790710

claramente sao ruídos do tipo de dado, mas antes de realizar o casting, vou como o show mostra so as primeiras linhas, e nesse caso todos os produtos sao da mesma categoria, vou fazer outro show para ter certeza que a variedade de dados inteira tem o ruído, ou se é apenas coincidencia da categoria

In [84]:
from pyspark.sql.functions import rand

print("Exibindo 10 linhas aleatórias do DataFrame:")

# 1. orderBy(rand()) embaralha o DataFrame de forma aleatória.
# 2. show(10) pega as 10 primeiras linhas do DataFrame já embaralhado.
df_quantidades_fracionadas.orderBy(rand()).select("quantity", "categoria_produto", "descricao").show(10)

Exibindo 10 linhas aleatórias do DataFrame:
+------------------+-----------------+--------------------+
|          quantity|categoria_produto|           descricao|
+------------------+-----------------+--------------------+
| 51.00000041723251|Distilled Spirits|MR BOSTON PEACH S...|
| 40.99999189376831|Distilled Spirits|EAGLE RARE SINGLE...|
|112.00000667572021|Distilled Spirits|MARGARITAVILLE GO...|
|   1.9999920129776|Distilled Spirits| FIREBALL 12/1L 66PF|
|    4.999980032444|Distilled Spirits|BITTERMENS GRAPEF...|
|0.9999989867210388|Distilled Spirits|WELLER WILLIAM LA...|
| 43.00000846385956|Distilled Spirits|RYAN'S IRISH CREA...|
|4.0000081062316895|Distilled Spirits|SCORESBY RARE SCO...|
|2.0000009536743164|Distilled Spirits|GEORGE T STAGG 20...|
|22.999997973442078|Distilled Spirits|HANCOCK'S BOURBON...|
+------------------+-----------------+--------------------+
only showing top 10 rows


é possivel ver que não é coincidencia do tipo de produto
realização da mudanca do tipo de produto

In [85]:
from pyspark.sql.types import IntegerType
# .withColumn() substitui a coluna existente pela nova versão convertida.
df_vendas_final = df_vendas_reais.withColumn("quantity", df_vendas_reais["quantity"].cast(IntegerType()))

# Agora, vamos verificar o schema e as estatísticas novamente para confirmar a mudança.
print("\nSchema do DataFrame final (após cast para inteiro):")
df_vendas_final.printSchema()

print("\nEstatísticas descritivas do DataFrame final:")
df_vendas_final.select("quantity").describe().show()


Schema do DataFrame final (após cast para inteiro):
root
 |-- internal_store_id: string (nullable = true)
 |-- internal_product_id: string (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- reference_date: date (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- gross_value: double (nullable = true)
 |-- net_value: double (nullable = true)
 |-- gross_profit: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- taxes: double (nullable = true)
 |-- categoria_produto: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- tipos: string (nullable = true)
 |-- label: string (nullable = true)
 |-- subcategoria: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- premise: string (nullable = true)
 |-- categoria_pdv: string (nullable = true)
 |-- zipcode: integer (nullable = true)


Estatísticas descritivas do DataFrame final:




+-------+-----------------+
|summary|         quantity|
+-------+-----------------+
|  count|          6423880|
|   mean|8.290080294152444|
| stddev| 81.2568283392018|
|    min|                0|
|    max|            94230|
+-------+-----------------+



                                                                                

acredito que apos todo esse tratamento, quantity está ok
valores inteiros
sem valores negativos (devolução)

Análise das Dimensões Principais: PDV e Produto
Agora, vamos entender os atores principais: os pontos de venda e os produtos.

pergunta: quais são os produtos e PDVs mais vendidos? A venda está concentrada em poucos itens/lojas?

Por que é útil? É provável que a "Lei de Pareto" (80/20) se aplique: uma minoria de produtos/PDVs é responsável pela maioria das vendas. Identificá-los é crucial. Talvez os produtos "campeões de venda" tenham um padrão de venda diferente dos produtos de "cauda longa".

resumo de produtos cauda lojga: