   # Exploração de Dados com PySpark
   Este notebook demonstra como usar o PySpark para carregar e processar dados de um pipeline de e-commerce.

   ## Importação de Bibliotecas
   Importamos as bibliotecas necessárias para criar uma sessão Spark e manipular dados.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, BooleanType
import json
import re

   ## Mapeamento de Tipos
   Definimos um dicionário para mapear tipos de dados de strings para tipos Spark.

In [2]:
type_mapping = {
    "string": StringType(),
    "integer": IntegerType(),
    "double": DoubleType(),
    "timestamp": TimestampType(),
    "boolean": BooleanType()
}

   ## Função para Carregar Esquema de JSON
   Esta função lê um arquivo JSON e retorna um esquema Spark.

In [3]:
def load_schema_from_json(json_path):
    with open(json_path, "r") as f:
        schema_json = json.load(f)
    return StructType([
        StructField(f["name"], type_mapping[f["type"]], f["nullable"])
        for f in schema_json
    ])

   ## Função para Converter Nomes de Colunas para Snake Case
   Esta função converte os nomes das colunas de um DataFrame para o formato snake_case.

In [None]:
def to_snake_case(df):
    new_columns = [re.sub(r'(?<!^)(?=[A-Z])', '_', col).lower() for col in df.columns]
    return df.toDF(*new_columns)

   ## Criação da Sessão Spark
   Criamos uma sessão Spark para processar os dados.

In [5]:
spark = SparkSession.builder.appName("EcommercePipeline").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/03 21:02:11 WARN Utils: Your hostname, DESKTOP-542JM0I, resolves to a loopback address: 127.0.1.1; using 172.18.30.52 instead (on interface eth0)
25/06/03 21:02:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/03 21:02:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


   ## Informações dos Datasets
   Definimos um dicionário com informações sobre os datasets, incluindo caminhos para os arquivos CSV e seus esquemas.

In [6]:
datasets_info = {
        "customers": {
            "csv": "../data_source/olist_customers_dataset.csv",
            "schema": "../config/schemas/customers_schema.json"
        },
        "geolocation": {
            "csv": "../data_source/olist_geolocation_dataset.csv",
            "schema": "../config/schemas/geolocation_schema.json"
        },
        "order_items": {
            "csv": "../data_source/olist_order_items_dataset.csv",
            "schema": "../config/schemas/order_items_schema.json"
        },
        "order_payments": {
            "csv": "../data_source/olist_order_payments_dataset.csv",
            "schema": "../config/schemas/order_payments_schema.json"
        },
          "order_reviews": {
            "csv": "../data_source/olist_order_reviews_dataset.csv",
            "schema": "../config/schemas/order_reviews_schema.json"
        },
        "orders": {
            "csv": "../data_source/olist_orders_dataset.csv",
            "schema": "../config/schemas/orders_schema.json"
        },
        "product_category_name_translation": {
            "csv": "../data_source/product_category_name_translation.csv",
            "schema": "../config/schemas/product_category_name_translation_schema.json"
        },
        "products": {
            "csv": "../data_source/olist_products_dataset.csv",
            "schema": "../config/schemas/products_schema.json"
        },
        "sellers": {
            "csv": "../data_source/olist_sellers_dataset.csv",
            "schema": "../config/schemas/sellers_schema.json"
        },
    }

   ## Processamento dos Datasets
   Iteramos sobre cada dataset, carregando o esquema e os dados, e aplicamos a função de conversão para snake_case.

In [7]:
for name, info in datasets_info.items():
    print(f"Processando dataset: {name}")

    schema = load_schema_from_json(info["schema"])
    df = spark.read.schema(schema).option("header", True).csv(info["csv"])

    df_clean = to_snake_case(df)
    df.printSchema()

spark.stop()

Processando dataset: customers
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

Processando dataset: geolocation
root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)

Processando dataset: order_items
root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

Processando dataset: order_payments
root
 |-- order_id: string (nullable = true)
 |-- pa