## Imports

In [87]:
# Bibliotecas padrão
import io
import unidecode

# Bibliotecas de terceiros
import pandas as pd
import boto3
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import col, sum as _sum, lower, udf, first, avg, when

## Leitura dos dados da Bronze (S3)

In [None]:
s3 = boto3.client("s3")
bucket = "bronze-autoglass"

def read_parquet_from_s3(key):
    buffer = io.BytesIO()
    s3.download_fileobj(bucket, key, buffer)
    buffer.seek(0)
    return pd.read_parquet(buffer)

orders = read_parquet_from_s3("olist_orders_dataset.parquet")
items = read_parquet_from_s3("olist_order_items_dataset.parquet")
products = read_parquet_from_s3("olist_products_dataset.parquet")
sellers = read_parquet_from_s3("olist_sellers_dataset.parquet")
cnae = read_parquet_from_s3("cnae_classes.parquet")

In [None]:
spark = SparkSession.builder.getOrCreate()

#dataframes
orders_sdf = spark.createDataFrame(orders)
items_sdf = spark.createDataFrame(items)
products_sdf = spark.createDataFrame(products)
sellers_sdf = spark.createDataFrame(sellers)
cnae_sdf = spark.createDataFrame(cnae)

customers = read_parquet_from_s3("olist_customers_dataset.parquet")
customers_sdf = spark.createDataFrame(customers)

## Transformações

### a. Consolidar vendas por estado e categoria de produto

In [None]:
vendas_estado_categoria = (
    items_sdf
    .join(orders_sdf, "order_id")
    .join(customers_sdf, "customer_id")
    .join(products_sdf, "product_id")
    .groupBy("customer_state", "product_category_name")
    .agg(_sum("price").alias("total_vendas"))
)

In [83]:
vendas_estado_categoria.coalesce(1).write.mode("overwrite").parquet("/tmp/vendas_estado_categoria_temp")


25/05/23 11:29:39 WARN TaskSetManager: Stage 127 contains a task of very large size (4147 KiB). The maximum recommended task size is 1000 KiB.
25/05/23 11:29:44 WARN TaskSetManager: Stage 128 contains a task of very large size (2073 KiB). The maximum recommended task size is 1000 KiB.
25/05/23 11:29:49 WARN TaskSetManager: Stage 130 contains a task of very large size (3305 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### b. Enriquecer os dados dos vendedores com descrições do CNAE

#### Para enriquecer os dados dos vendedores com descrições do CNAE, buqueicolunas que se relacionam nas tabelas para fazer o Join. A possibilidade que encontrei está na coluna descricao da tabela cnae_sdf com a coluna product_category_name da tabela products_sdf. 

In [None]:
cnae_sdf.printSchema() 

root
 |-- id: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- descricao_normalizada: string (nullable = true)



In [89]:
sellers_sdf.printSchema() 

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: long (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [None]:
products_sdf.printSchema() 

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: double (nullable = true)
 |-- product_description_lenght: double (nullable = true)
 |-- product_photos_qty: double (nullable = true)
 |-- product_weight_g: double (nullable = true)
 |-- product_length_cm: double (nullable = true)
 |-- product_height_cm: double (nullable = true)
 |-- product_width_cm: double (nullable = true)
 |-- categoria_normalizada: string (nullable = true)



In [91]:
orders_sdf.printSchema() 

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)



In [92]:
items_sdf.printSchema() 

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [None]:
products_sdf.select("product_category_name").distinct().show(truncate=False)



+---------------------------------+
|product_category_name            |
+---------------------------------+
|pcs                              |
|bebes                            |
|artes                            |
|cine_foto                        |
|moveis_decoracao                 |
|pc_gamer                         |
|construcao_ferramentas_construcao|
|tablets_impressao_imagem         |
|fashion_roupa_masculina          |
|artigos_de_festas                |
|artigos_de_natal                 |
|la_cuisine                       |
|flores                           |
|livros_tecnicos                  |
|telefonia_fixa                   |
|construcao_ferramentas_seguranca |
|cool_stuff                       |
|eletrodomesticos                 |
|livros_importados                |
|pet_shop                         |
+---------------------------------+
only showing top 20 rows


                                                                                

In [None]:
cnae_sdf.select("descricao").distinct().show(truncate=False)

[Stage 18:>                                                         (0 + 4) / 4]

+-------------------------------------------------------------------------------------------------------------------------------+
|descricao                                                                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------+
|FABRICAÇÃO DE TINTAS DE IMPRESSÃO                                                                                              |
|FABRICAÇÃO DE ESPECIARIAS, MOLHOS, TEMPEROS E CONDIMENTOS                                                                      |
|FABRICAÇÃO DE ARTEFATOS TÊXTEIS PARA USO DOMÉSTICO                                                                             |
|FABRICAÇÃO DE ARTEFATOS DE CORDOARIA                                                                                           |
|FABRICAÇÃO DE ACESSÓRIOS DO VESTUÁRIO, EXCETO PARA SEGURANÇA E PROTEÇÃO                  

                                                                                

#### Normalização de Texto para garantir que as comparações sejam feitas de forma consistente

In [None]:
# Função para normalizar o texto
def normalizar(texto):
    if texto:
        texto = unidecode.unidecode(texto.lower())
        return texto
    return ''

# Aplicar normalização às colunas relevantes
products_sdf = products_sdf.withColumn("categoria_normalizada", lower(col("product_category_name")))
cnae_sdf = cnae_sdf.withColumn("descricao_normalizada", lower(col("descricao")))

# Coletar os dados para o driver
produtos = [normalizar(row['categoria_normalizada']) for row in products_sdf.select("categoria_normalizada").distinct().collect()]
descricoes_cnae = [normalizar(row['descricao_normalizada']) for row in cnae_sdf.select("descricao_normalizada").distinct().collect()]

# Combinar os textos para o vetor TF-IDF
corpus = produtos + descricoes_cnae

# Vetorização TF-IDF
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(corpus)

# Calcular a similaridade cosseno entre produtos e descrições do CNAE
similaridade = cosine_similarity(tfidf_matrix[:len(produtos)], tfidf_matrix[len(produtos):])

# Criar um DataFrame com os resultados
resultados = []
for idx, categoria in enumerate(produtos):
    similaridades = list(enumerate(similaridade[idx]))
    similaridades_ordenadas = sorted(similaridades, key=lambda x: x[1], reverse=True)[:3]  # Top 3 correspondências
    correspondencias = [(descricoes_cnae[i], score) for i, score in similaridades_ordenadas]
    resultados.append({
        'categoria_produto': categoria,
        'cnae_correspondencias': correspondencias
    })

# Converter para DataFrame do Pandas para visualização
df_resultados = pd.DataFrame(resultados)
print(df_resultados)


                                                                                

                     categoria_produto  \
0                                  pcs   
1                                bebes   
2                                artes   
3                            cine_foto   
4                     moveis_decoracao   
..                                 ...   
69                             bebidas   
70                     fashion_esporte   
71  construcao_ferramentas_ferramentas   
72                   cds_dvds_musicais   
73                                       

                                cnae_correspondencias  
0   [(extracao de minerais metalicos nao ferrosos ...  
1   [(extracao de minerais metalicos nao ferrosos ...  
2   [(artes cenicas, espetaculos e atividades comp...  
3   [(extracao de minerais metalicos nao ferrosos ...  
4   [(extracao de minerais metalicos nao ferrosos ...  
..                                                ...  
69  [(comercio atacadista de bebidas, 0.7317682757...  
70  [(extracao de minerais metalicos nao ferros

In [58]:
display(df_resultados)

Unnamed: 0,categoria_produto,cnae_correspondencias
0,pcs,[(extracao de minerais metalicos nao ferrosos ...
1,bebes,[(extracao de minerais metalicos nao ferrosos ...
2,artes,"[(artes cenicas, espetaculos e atividades comp..."
3,cine_foto,[(extracao de minerais metalicos nao ferrosos ...
4,moveis_decoracao,[(extracao de minerais metalicos nao ferrosos ...
...,...,...
69,bebidas,"[(comercio atacadista de bebidas, 0.7317682757..."
70,fashion_esporte,[(extracao de minerais metalicos nao ferrosos ...
71,construcao_ferramentas_ferramentas,[(extracao de minerais metalicos nao ferrosos ...
72,cds_dvds_musicais,[(extracao de minerais metalicos nao ferrosos ...


#### É possível verificar pela amostra que a associação da coluna product_category_name com as decrições do CNAE funcionou bem. Porém a sellers_sdf não tem coluna correspondente, então para enriquecer a tabela sellers usei a order_items_df. 

In [None]:
spark = SparkSession.builder.getOrCreate()

# Definir o esquema para o df do PySpark
schema = StructType([
    StructField("categoria_produto", StringType(), True),
    StructField("cnae_correspondencias", ArrayType(StringType()), True)
])

# Converter o df do Pandas para o PySpark
correspondencias_sdf = spark.createDataFrame(df_resultados, schema=schema)


In [69]:
df_resultados.dtypes

categoria_produto        object
cnae_correspondencias    object
dtype: object

In [None]:
df_resultados["cnae_correspondencias"] = df_resultados["cnae_correspondencias"].astype(str)
df_resultados_sdf = spark.createDataFrame(df_resultados)


In [None]:
df_resultados_sdf = spark.createDataFrame(df_resultados)

# Juntar items com produtos para pegar a categoria do produto
items_with_categories = items_sdf.join(
    products_sdf.select("product_id", "product_category_name"),
    on="product_id",
    how="left"
)

# Juntar com df_resultados para obter correspondências CNAE
items_with_cnae = items_with_categories.join(
    df_resultados_sdf,
    items_with_categories["product_category_name"] == df_resultados_sdf["categoria_produto"],
    how="left"
)

# Selecionar seller_id com categoria e CNAE (podem haver várias por seller)
seller_cnae_raw = items_with_cnae.select(
    "seller_id",
    "categoria_produto",
    "cnae_correspondencias"
).dropDuplicates(["seller_id", "categoria_produto"])

# Agregar uma correspondência de CNAE por seller (ajuste aqui se quiser lógica diferente)
seller_cnae_agg = seller_cnae_raw.groupBy("seller_id").agg(
    first("categoria_produto").alias("categoria_produto"),
    first("cnae_correspondencias").alias("cnae_correspondencias")
)

# Juntar com sellers_sdf
sellers_enriched = sellers_sdf.join(
    seller_cnae_agg,
    on="seller_id",
    how="left"
)

# Exibir o resultado
sellers_enriched.show()


25/05/23 10:53:14 WARN TaskSetManager: Stage 54 contains a task of very large size (3305 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------------+----------------------+-----------------+------------+--------------------+---------------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|   categoria_produto|cnae_correspondencias|
+--------------------+----------------------+-----------------+------------+--------------------+---------------------+
|e38db885400cd35c7...|                 70740|         brasilia|          DF|     cama_mesa_banho| [('extracao de mi...|
|05a48cc8859962767...|                  5372|        sao paulo|          SP|  ferramentas_jardim| [('extracao de mi...|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|       esporte_lazer| [('extracao de mi...|
|3442f8959a84dea7e...|                 13023|         campinas|          SP|       esporte_lazer| [('extracao de mi...|
|4e6015589b781adaa...|                 11440|          guaruja|          SP|       esporte_lazer| [('extracao de mi...|
|768a86e36ad6aae3d...|                  

In [None]:
sellers_enriched.coalesce(1).write.mode("overwrite").parquet("/tmp/vendedores_descricao_cnae_temp")

25/05/23 11:21:27 WARN TaskSetManager: Stage 109 contains a task of very large size (3305 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### c. Categorizar produtos em faixas de valor médio (baixo, médio, alto)

In [None]:
# Verificação de preços para definir os parâmetros de baixo, médio e alto

# preço médio por produto
preco_medio_por_produto = (
    items_sdf.groupBy("product_id")
    .agg(avg("price").alias("preco_medio"))
)

# Juntar com a tabela de produtos para trazer a categoria
produtos_com_categoria = preco_medio_por_produto.join(products_sdf.select("product_id", "product_category_name"), on="product_id", how="left")

# 3. Coletar os valores para análise estatística
stats = produtos_com_categoria.select("preco_medio").toPandas()["preco_medio"].describe()
print(stats)


25/05/23 11:44:53 WARN TaskSetManager: Stage 143 contains a task of very large size (3305 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

count    32951.000000
mean       145.302464
std        246.895756
min          0.850000
25%         39.900000
50%         79.000000
75%        154.900000
max       6735.000000
Name: preco_medio, dtype: float64


In [None]:
# Classificar os produtos com base nos quartis observados (40 e 155)
produtos_categorizados = produtos_com_categoria.withColumn(
    "faixa_valor",
    when(col("preco_medio") < 40, "baixo")
    .when((col("preco_medio") >= 40) & (col("preco_medio") < 155), "medio")
    .otherwise("alto")
)

# Selecionar apenas o que será salvo/analisado
df_final = produtos_categorizados.select(
    "product_id",
    "product_category_name",
    "faixa_valor"
)

# Exibir resultado
df_final.show()


25/05/23 11:50:49 WARN TaskSetManager: Stage 148 contains a task of very large size (3305 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------------+---------------------+-----------+
|          product_id|product_category_name|faixa_valor|
+--------------------+---------------------+-----------+
|0b0172eb0fd18479d...|          eletronicos|      baixo|
|42a2bd596fda1baef...|      cama_mesa_banho|      medio|
|460a66fcc404a3d73...| utilidades_domest...|      baixo|
|30360c8b0b2ac6918...|   relogios_presentes|       alto|
|13b4ff901d43edec6...|      cama_mesa_banho|       alto|
|290ada89b05e1dca2...|       telefonia_fixa|      baixo|
|7724696de32f44179...|        esporte_lazer|       alto|
|abe236a52dbc43e90...|      cama_mesa_banho|      medio|
|e85cdca8790ea0026...|          moveis_sala|      medio|
|eb1549e153933dfee...| industria_comerci...|      medio|
|35bc6c77029697004...| informatica_acess...|       alto|
|878699846fa5ea02a...|     moveis_decoracao|       alto|
|2f9c2888168b8c2d8...|            telefonia|      medio|
|75f3ef6a5cb0f2d5a...|       consoles_games|       alto|
|3f1a741cf55913844...|         

In [86]:
df_final.coalesce(1).write.mode("overwrite").parquet("/tmp/produtos_faixa_valor_temp")

25/05/23 11:53:40 WARN TaskSetManager: Stage 154 contains a task of very large size (3305 KiB). The maximum recommended task size is 1000 KiB.
                                                                                