### Refining the data before curation

#### Costumers

O que é Particionamento no Spark?
É uma técnica de organização de dados que:

Divide fisicamente os arquivos por valores de colunas

Cria subpastas automáticas no formato nome_coluna=valor

Melhora performance em consultas filtradas por essas colunas

In [3]:
# spark_config.py
import os
from pyspark.sql import SparkSession

def get_spark_session(app_name):
    """Configuração compartilhada por todos os scripts"""
    # 1. Configuração do Hadoop para Windows
    hadoop_path = "C:\\hadoop"
    os.environ['HADOOP_HOME'] = hadoop_path
    os.environ['PATH'] = f"{os.environ['PATH']};{hadoop_path}\\bin"
    os.environ['HADOOP_USER_NAME'] = "user"
    
    # 2. Configurações essenciais do Spark
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .config("spark.sql.parquet.compression.codec", "snappy") \
        .getOrCreate()

In [2]:
# customer_refinement_fixed.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

# 1. Configuração de caminhos ABSOLUTOS
BASE_DIR = os.path.dirname(os.path.abspath(os.getcwd()))  # Volta um nível para o projeto
INPUT_PATH = os.path.join(BASE_DIR, "data_lake", "parquet", "brazilian_ecommerce_v4", "olist_customers_dataset")
OUTPUT_PATH = os.path.join(BASE_DIR, "data_lake", "refined", "customers")  # Pasta definitiva

# 2. Função de transformação
def transform_customers(df):
    # Limpeza
    df_clean = (df
        .withColumn("customer_city_clean", lower(trim(col("customer_city"))))
        .withColumn("customer_state_clean", upper(trim(col("customer_state"))))
        .dropDuplicates(["customer_id"])
    )
    
    # Cria região (exemplo simplificado)
    return df_clean.withColumn(
        "customer_region",
        when(col("customer_state_clean").isin(["SP", "RJ", "MG", "ES"]), "Sudeste")
        .when(col("customer_state_clean").isin(["RS", "SC", "PR"]), "Sul")
        .when(col("customer_state_clean").isin(["MT", "MS", "GO", "DF"]), "Centro-Oeste")
        .when(col("customer_state_clean").isin(["AM", "PA", "AC", "RO", "RR", "AP", "TO"]), "Norte")
        .when(col("customer_state_clean").isin(["BA", "SE", "AL", "PE", "PB", "RN", "CE", "PI", "MA"]), "Nordeste")
        .otherwise("Outros")  # Apenas para casos extremos não mapeados
    )

# 3. Execução principal
if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("CustomerRefinement") \
        .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
        .getOrCreate()

    try:
        # Garante que a pasta de output existe
        os.makedirs(OUTPUT_PATH, exist_ok=True)
        
        print("📂 Lendo dados de:", INPUT_PATH)
        customers_df = spark.read.parquet(INPUT_PATH)
        
        print("🔄 Transformando dados...")
        customers_refined = transform_customers(customers_df)
        
        print("💾 Salvando em:", OUTPUT_PATH)
        (customers_refined.write
            .mode("overwrite")
            .partitionBy("customer_region", "customer_state_clean")  # Partição dupla
            .parquet(OUTPUT_PATH))
        
        # Verificação
        print("✅ Estrutura criada:")
        for root, dirs, files in os.walk(OUTPUT_PATH):
            print(f"📁 {root.replace(BASE_DIR, '...')}")
            for dir in dirs:
                if "=" in dir:  # Mostra apenas pastas de partição
                    print(f"   └── {dir}")

    except Exception as e:
        print(f"❌ Erro: {str(e)}")
    finally:
        spark.stop()

📂 Lendo dados de: c:\Users\pacie\Desktop\dataplatform-pipelines\data_lake\parquet\brazilian_ecommerce_v4\olist_customers_dataset
❌ Erro: An error occurred while calling o62.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:817)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1415)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1620)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:739)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:961)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:218)
	at org.apache.spark.util.HadoopFSUtil

### Geolocation

In [21]:
# geolocation_refinement.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

# Configuração de caminhos
BASE_DIR = os.path.dirname(os.path.abspath(os.getcwd()))  # Volta um nível
INPUT_PATH = os.path.join(BASE_DIR, "data_lake", "parquet", "brazilian_ecommerce_v4", "olist_geolocation_dataset")
OUTPUT_PATH = os.path.join(BASE_DIR, "data_lake", "refined", "geolocation")

def transform_geolocation_data(df):
    """Transforma os dados de geolocalização com:
    - Limpeza de textos
    - Categorização por região
    - Controle de qualidade
    """
    # Limpeza e padronização
    df_clean = (df
        .withColumn("geolocation_city_clean", lower(trim(col("geolocation_city"))))
        .withColumn("geolocation_state_clean", upper(trim(col("geolocation_state"))))
        .na.drop(subset=["geolocation_zip_code_prefix"])  # Remove registros sem CEP
    )
    
    # Cria região geográfica (mesma lógica dos clientes)
    df_with_region = df_clean.withColumn(
        "region",
        when(col("geolocation_state_clean").isin(["SP", "RJ", "MG", "ES"]), "Sudeste")
        .when(col("geolocation_state_clean").isin(["RS", "SC", "PR"]), "Sul")
        .when(col("geolocation_state_clean").isin(["MT", "MS", "GO", "DF"]), "Centro-Oeste")
        .when(col("geolocation_state_clean").isin(["AM", "PA", "AC", "RO", "RR", "AP", "TO"]), "Norte")
        .otherwise("Nordeste")
    )
    
    # Cria ponto geográfico (lat+lng)
    return df_with_region.withColumn(
        "geo_point",
        format_string("POINT(%s %s)", col("geolocation_lng"), col("geolocation_lat"))
    )

if __name__ == "__main__":
    spark = SparkSession.builder.appName("GeolocationRefinement").getOrCreate()
    
    try:
        # Leitura
        print("📍 Lendo dados de geolocalização...")
        geo_df = spark.read.parquet(INPUT_PATH)
        
        # Transformação
        print("🛠️ Processando dados...")
        geo_refined = transform_geolocation_data(geo_df)
        
        # Escrita com particionamento por região/estado
        print("💾 Salvando em:", OUTPUT_PATH)
        (geo_refined.write
            .mode("overwrite")
            .partitionBy("region", "geolocation_state_clean")
            .option("compression", "snappy")
            .parquet(OUTPUT_PATH))
        
        # Verificação
        print("✅ Dados salvos! Estrutura:")
        print(f"Total de registros: {geo_refined.count()}")
        print("\nAmostra dos dados transformados:")
        geo_refined.show(3)
        
        print("\nExemplo de estrutura gerada:")
        for region in ["Sudeste", "Nordeste", "Sul"]:
            path = os.path.join(OUTPUT_PATH, f"region={region}")
            if os.path.exists(path):
                print(f"📁 {path}")
                print(f"   Estados: {os.listdir(path)}")

    except Exception as e:
        print(f"❌ Falha no processamento: {str(e)}")
        if 'geo_df' in locals():
            print("\nEsquema original:", geo_df.printSchema())
    finally:
        spark.stop()

📍 Lendo dados de geolocalização...
🛠️ Processando dados...
💾 Salvando em: c:\Users\pacie\Desktop\Projeto A\data_lake\refined\geolocation
✅ Dados salvos! Estrutura:
Total de registros: 1000163

Amostra dos dados transformados:
+---------------------------+------------------+-------------------+----------------+-----------------+----------------------+-----------------------+--------+--------------------+
|geolocation_zip_code_prefix|   geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|geolocation_city_clean|geolocation_state_clean|  region|           geo_point|
+---------------------------+------------------+-------------------+----------------+-----------------+----------------------+-----------------------+--------+--------------------+
|                      50760|-8.072588074383969| -34.91359211518264|          recife|               PE|                recife|                     PE|Nordeste|POINT(-34.9135921...|
|                      50740|-8.042627210658422|-3

### Order Items

In [1]:
# order_items_refinement.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

def get_spark_session(app_name):
    """
    Configuração compartilhada por todos os scripts, com opções para Hadoop.
    
    Verifica a presença de 'winutils.exe' para evitar 'UnsatisfiedLinkError' no Windows.
    """
    # 1. Configuração do Hadoop para Windows (ajuste se necessário para outros SOs)
    # ATENÇÃO: Verifique o caminho para o seu diretório Hadoop e baixe o binário
    # do winutils.exe compatível com a sua versão do Spark/Hadoop.
    hadoop_path = "C:\\hadoop"
    winutils_path = os.path.join(hadoop_path, "bin", "winutils.exe")
    
    # Adicionando debug para verificar os caminhos
    print(f"✔️ Verificando HADOOP_HOME em: {hadoop_path}")
    print(f"✔️ Verificando winutils.exe em: {winutils_path}")
    
    # Verifica se o winutils.exe existe antes de tentar configurar as variáveis de ambiente
    if not os.path.exists(winutils_path):
        raise FileNotFoundError(
            f"❌ Erro: O arquivo winutils.exe não foi encontrado em '{winutils_path}'. "
            f"Isso geralmente indica um problema com a variável de ambiente HADOOP_HOME "
            f"ou que o binário necessário não está no local correto. "
            f"Por favor, verifique se a pasta '{hadoop_path}' contém a estrutura de pastas "
            f"'bin' e se o 'winutils.exe' está dentro dela."
        )
        
    os.environ['HADOOP_HOME'] = hadoop_path
    os.environ['PATH'] = f"{os.environ['PATH']};{hadoop_path}\\bin"
    os.environ['HADOOP_USER_NAME'] = "user"
    
    # 2. Configurações essenciais do Spark
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .config("spark.sql.parquet.compression.codec", "snappy") \
        .getOrCreate()


# Configuração de caminhos
BASE_DIR = os.path.dirname(os.path.abspath(os.getcwd()))
INPUT_PATH = os.path.join(BASE_DIR, "data_lake", "parquet", "brazilian_ecommerce_v4", "olist_order_items_dataset")
OUTPUT_PATH = os.path.join(BASE_DIR, "data_lake", "refined", "order_items")

def transform_order_items(df):
    """Transformações principais para order_items"""
    # 1. Limpeza básica
    df_clean = df.withColumn(
        "shipping_limit_date", 
        to_timestamp(col("shipping_limit_date"))
    ).na.drop(subset=["order_id", "product_id", "seller_id"])
    
    # 2. Cálculo de valores totais
    df_with_totals = df_clean.withColumn(
        "total_value", 
        round(col("price") + col("freight_value"), 2)
    )
    
    # 3. Categorização por faixa de preço
    df_with_price_range = df_with_totals.withColumn(
        "price_range",
        when(col("price") < 50, "0-50")
        .when((col("price") >= 50) & (col("price") < 100), "50-100")
        .when((col("price") >= 100) & (col("price") < 150), "100-150") 
        .otherwise("150+")
    )
    
    # 4. Categorização por frete
    df_with_freight_category = df_with_price_range.withColumn(
        "freight_ratio",
        round(col("freight_value") / col("price"), 2)
    ).withColumn(
        "freight_category",
        when(col("freight_ratio") < 0.1, "Baixo")
        .when((col("freight_ratio") >= 0.1) & (col("freight_ratio") < 0.25), "Médio")
        .otherwise("Alto")
    )
    
    # 5. Adição de período do dia (baseado no horário do shipping)
    return df_with_freight_category.withColumn(
        "shipping_time_period",
        when(hour(col("shipping_limit_date")).between(6, 11), "Manhã")
        .when(hour(col("shipping_limit_date")).between(12, 17), "Tarde")
        .when(hour(col("shipping_limit_date")).between(18, 23), "Noite")
        .otherwise("Madrugada")
    )

if __name__ == "__main__":
    # Inicia a sessão Spark usando a função de configuração
    spark = get_spark_session("OrderItemsRefinement")
    
    try:
        # Leitura
        print("📦 Lendo dados de order_items...")
        order_items_df = spark.read.parquet(INPUT_PATH)
        
        # Transformação
        print("🛠️ Enriquecendo dados...")
        order_items_refined = transform_order_items(order_items_df)
        
        # Escrita com particionamento
        print("💾 Salvando dados refinados...")
        (order_items_refined.write
            .mode("overwrite")
            .partitionBy("price_range", "freight_category")  # Particionamento estratégico
            .parquet(OUTPUT_PATH))
        
        # Verificação
        print("✅ Processo concluído! Amostra dos dados:")
        order_items_refined.show(5)
        
        print("\n📊 Estatísticas:")
        order_items_refined.select(
            mean("price").alias("avg_price"),
            mean("freight_value").alias("avg_freight"),
            countDistinct("order_id").alias("unique_orders")
        ).show()
        
    except Exception as e:
        print(f"❌ Erro: {str(e)}")
    finally:
        spark.stop()


✔️ Verificando HADOOP_HOME em: C:\hadoop
✔️ Verificando winutils.exe em: C:\hadoop\bin\winutils.exe
📦 Lendo dados de order_items...
🛠️ Enriquecendo dados...
💾 Salvando dados refinados...
✅ Processo concluído! Amostra dos dados:
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-----------+-----------+-------------+----------------+--------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|total_value|price_range|freight_ratio|freight_category|shipping_time_period|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-----------+-----------+-------------+----------------+--------------------+
|8ac26cb701a7887cc...|            1|4ebb87ba41ca44632...|7a67c85e85bb2ce85...|2017-05-22 16:05:14|109.99|        18.02|     128.01|    100-150|         0.16|           Médio|          