In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("LeituraCSV") \
    .master("local[*]") \
    .getOrCreate()

print("Spark iniciado com sucesso! Versão:", spark.version)

Spark iniciado com sucesso! Versão: 4.0.1


In [5]:
customers = spark.read.csv(r"C:\Users\Visitante\Desktop\Bootcamp\Data-Engineering-and-Big-Data-Project\Dataset Principal\olist\olist_customers_dataset.csv", header=True, inferSchema=True)
geolocation = spark.read.csv(r"C:\Users\Visitante\Desktop\Bootcamp\Data-Engineering-and-Big-Data-Project\Dataset Principal\olist\olist_geolocation_dataset.csv", header=True, inferSchema=True)
order_items = spark.read.csv(r"C:\Users\Visitante\Desktop\Bootcamp\Data-Engineering-and-Big-Data-Project\Dataset Principal\olist\olist_order_items_dataset.csv", header=True, inferSchema=True)
order_payments = spark.read.csv(r"C:\Users\Visitante\Desktop\Bootcamp\Data-Engineering-and-Big-Data-Project\Dataset Principal\olist\olist_order_payments_dataset.csv", header=True, inferSchema=True)
orders = spark.read.csv(r"C:\Users\Visitante\Desktop\Bootcamp\Data-Engineering-and-Big-Data-Project\Dataset Principal\olist\olist_orders_dataset.csv", header=True, inferSchema=True)
products = spark.read.csv(r"C:\Users\Visitante\Desktop\Bootcamp\Data-Engineering-and-Big-Data-Project\Dataset Principal\olist\olist_products_dataset.csv", header=True, inferSchema=True)


In [6]:
# Importa as funções necessárias do módulo pyspark.sql.functions:
# 'col' para facilitar a referência às colunas
# 'avg' para calcular médias numéricas
from pyspark.sql.functions import col, avg


# A tabela 'geolocation' contém várias coordenadas (latitude/longitude)
# para um mesmo CEP. Como CEPs de uma mesma cidade tendem a ser próximos,
# podemos calcular a média das coordenadas por cidade e estado para simplificar a análise.
geo_base = (
    geolocation
    .groupBy("geolocation_city", "geolocation_state")  # Agrupa por cidade e estado
    .agg(
        avg("geolocation_lat").alias("avg_lat"),  # Calcula a média da latitude
        avg("geolocation_lng").alias("avg_lng")   # Calcula a média da longitude
    )
)


# A partir das tabelas principais, realizamos uma série de JOINs
# para consolidar todas as informações do cliente, pedido, produto,
# pagamento e localização em um único DataFrame.

df_base = (
    # Tabela principal: clientes
    customers.alias("C")

    # JOIN 1: Pedidos (orders)
    # Ligação entre cliente e pedido por 'customer_id'
    # Tipo RIGHT JOIN: mantém todos os pedidos, mesmo que algum cliente esteja ausente
    .join(
        orders.alias("O"),
        col("C.customer_id") == col("O.customer_id"),
        "right"
    )

    # JOIN 2: Itens do pedido (order_items)
    # Ligação por 'order_id' para incluir informações de produto e valores
    .join(
        order_items.alias("I"),
        col("O.order_id") == col("I.order_id"),
        "left"
    )

    # JOIN 3: Produtos (products)
    # Ligação por 'product_id' para trazer a categoria e características do produto
    .join(
        products.alias("P"),
        col("I.product_id") == col("P.product_id"),
        "left"
    )

    # JOIN 4: Pagamentos (order_payments)
    # Ligação por 'order_id' para adicionar os valores de pagamento
    .join(
        order_payments.alias("PY"),
        col("O.order_id") == col("PY.order_id"),
        "left"
    )

    # JOIN 5: Geolocalização agregada (geo_base)
    # Ligação entre cidade e estado do cliente e cidade/estado da base geográfica
    # Esse join adiciona a latitude e longitude médias da região do cliente
    .join(
        geo_base.alias("G"),
        (col("C.customer_city") == col("G.geolocation_city")) &
        (col("C.customer_state") == col("G.geolocation_state")),
        "left"
    )

   
    # Mantém apenas as colunas relevantes para a análise.
    .select(
        # ---- Pedidos ----
        col("O.order_id"),
        col("O.order_status"),
        col("O.order_purchase_timestamp"),

        # ---- Clientes ----
        col("C.customer_city"),
        col("C.customer_state"),

        # ---- Geolocalização (médias) ----
        col("G.avg_lat").alias("latitude_media"),
        col("G.avg_lng").alias("longitude_media"),

        # ---- Produtos ----
        col("P.product_id"),
        col("P.product_category_name"),

        # ---- Itens do pedido ----
        col("I.price"),
        col("I.freight_value"),

        # ---- Pagamentos ----
        col("PY.payment_value")
    )
)

# O parâmetro truncate=False evita que valores longos sejam cortados.
df_base.show(20, truncate=False)


+--------------------------------+------------+------------------------+-----------------------+--------------+-------------------+-------------------+--------------------------------+---------------------------------+------+-------------+-------------+
|order_id                        |order_status|order_purchase_timestamp|customer_city          |customer_state|latitude_media     |longitude_media    |product_id                      |product_category_name            |price |freight_value|payment_value|
+--------------------------------+------------+------------------------+-----------------------+--------------+-------------------+-------------------+--------------------------------+---------------------------------+------+-------------+-------------+
|76c6e866289321a7c93b82b54852dc33|delivered   |2017-01-23 18:29:09     |faxinalzinho           |RS            |-27.421768635363264|-52.67502180601173 |ac1789e492dcd698c5c10b97a671243a|moveis_decoracao                 |19.9  |16.05        

In [19]:
# A função 'dropDuplicates()' remove linhas duplicadas do DataFrame,
# considerando apenas as colunas especificadas.
 
# Como um mesmo pedido pode conter vários produtos (e vice-versa em casos de erro),
# o objetivo é garantir que cada combinação de pedido e produto apareça apenas uma vez.
df_base = df_base.dropDuplicates(["order_id", "product_id"])
df_base = df_base.dropDuplicates(["order_id", "product_id"])

df_base.show(20, truncate=False)

+--------------------------------+------------+------------------------+--------------+--------------+-------------------+-------------------+--------------------------------+---------------------------------+------+-------------+-------------+----------+
|order_id                        |order_status|order_purchase_timestamp|customer_city |customer_state|latitude_media     |longitude_media    |product_id                      |product_category_name            |price |freight_value|payment_value|order_date|
+--------------------------------+------------+------------------------+--------------+--------------+-------------------+-------------------+--------------------------------+---------------------------------+------+-------------+-------------+----------+
|00048cc3ae777c65dbb7d2a0634bc1ea|delivered   |2017-05-15 21:42:34     |uberaba       |MG            |-19.752823866371905|-47.935837449992846|ef92defde845ab8450f9d70c526ef70f|utilidades_domesticas            |21.9  |12.69        |34

In [20]:
# - col() - facilita a referência a colunas.
# - when() - cria expressões condicionais (tipo "if").
# - count() - conta o número de ocorrências.
from pyspark.sql.functions import col, when, count

# Cria uma lista de contagens de valores nulos (NULL) para cada coluna do df_base.
# 'when(col(c).isNull(), c)' verifica se o valor da coluna 'c' é nulo.
# 'count(...)' conta quantas vezes essa condição é verdadeira.
# 'alias(c)' renomeia o resultado com o nome da própria coluna.

df_base.select([count(when(col(c).isNull(), c)).alias(c) for c in df_base.columns]).show()


+--------+------------+------------------------+-------------+--------------+--------------+---------------+----------+---------------------+-----+-------------+-------------+----------+
|order_id|order_status|order_purchase_timestamp|customer_city|customer_state|latitude_media|longitude_media|product_id|product_category_name|price|freight_value|payment_value|order_date|
+--------+------------+------------------------+-------------+--------------+--------------+---------------+----------+---------------------+-----+-------------+-------------+----------+
|       0|           0|                       0|            0|             0|             0|              0|         0|                    0|    0|            0|            0|         0|
+--------+------------+------------------------+-------------+--------------+--------------+---------------+----------+---------------------+-----+-------------+-------------+----------+



In [9]:
#Como algumas linhas nao fazem sentido estar nulo, precisa tirar essas linhas da tabela para nao interferir

from pyspark.sql.functions import col

# Limpeza geral da tabela
df_base = (
    df_base
    # Remove linhas essenciais que não podem ser nulas
    .dropna(subset=["product_id", "price", "payment_value", "latitude_media", "longitude_media"])

    # Substitui valores ausentes em colunas opcionais
    .fillna({
        "product_category_name": "desconhecido",
        "freight_value": 0
    })
)



In [10]:
df_base.select([count(when(col(c).isNull(), c)).alias(c) for c in df_base.columns]).show()


+--------+------------+------------------------+-------------+--------------+--------------+---------------+----------+---------------------+-----+-------------+-------------+
|order_id|order_status|order_purchase_timestamp|customer_city|customer_state|latitude_media|longitude_media|product_id|product_category_name|price|freight_value|payment_value|
+--------+------------+------------------------+-------------+--------------+--------------+---------------+----------+---------------------+-----+-------------+-------------+
|       0|           0|                       0|            0|             0|             0|              0|         0|                    0|    0|            0|            0|
+--------+------------+------------------------+-------------+--------------+--------------+---------------+----------+---------------------+-----+-------------+-------------+



In [11]:
from pyspark.sql.functions import to_date

# Converte o timestamp para apenas data
df_base = df_base.withColumn("order_date", to_date("order_purchase_timestamp"))

# Mostra resultado
df_base.select("order_purchase_timestamp", "order_date").show(10, truncate=False)


+------------------------+----------+
|order_purchase_timestamp|order_date|
+------------------------+----------+
|2017-05-14 20:28:25     |2017-05-14|
|2017-06-13 21:11:26     |2017-06-13|
|2017-11-28 21:00:44     |2017-11-28|
|2018-06-04 08:33:09     |2018-06-04|
|2017-07-12 18:45:24     |2017-07-12|
|2017-12-07 15:40:14     |2017-12-07|
|2017-03-09 16:18:47     |2017-03-09|
|2017-11-26 21:21:07     |2017-11-26|
|2017-10-24 11:34:33     |2017-10-24|
|2017-10-07 00:16:57     |2017-10-07|
+------------------------+----------+
only showing top 10 rows


In [12]:
from pyspark.sql.functions import lower, trim, col, upper

df_base = (
    df_base
    # Deixa todos os valores da coluna 'order_status' em letras minúsculas e remove espaços extras
    .withColumn("order_status", lower(trim(col("order_status"))))

    # Deixa todos os valores da coluna 'customer_state' em letras maiúsculas e remove espaços extras
    .withColumn("customer_state", upper(trim(col("customer_state"))))

    # Deixa todos os valores da coluna 'product_category_name' em letras minúsculas e remove espaços extras
    .withColumn("product_category_name", lower(trim(col("product_category_name"))))

    # Converte a coluna 'price' para tipo numérico (double)
    .withColumn("price", col("price").cast("double"))

    # Converte a coluna 'freight_value' para tipo numérico (double)
    .withColumn("freight_value", col("freight_value").cast("double"))

    # Converte a coluna 'payment_value' para tipo numérico (double)
    .withColumn("payment_value", col("payment_value").cast("double"))
)

# Mostra o esquema final do DataFrame (nomes das colunas e tipos de dados)
df_base.printSchema()

df_base.show(15, truncate=False)

root
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- latitude_media: double (nullable = true)
 |-- longitude_media: double (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = false)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = false)
 |-- payment_value: double (nullable = true)
 |-- order_date: date (nullable = true)

+--------------------------------+------------+------------------------+--------------+--------------+-------------------+-------------------+--------------------------------+---------------------------------+-----+-------------+-------------+----------+
|order_id                        |order_status|order_purchase_timestamp|customer_city |customer_state|latitude_media     |longitude_media    |produc

In [13]:
# Exibe estatísticas descritivas (como média, desvio padrão, mínimo e máximo)
# das colunas numéricas selecionadas: 'price', 'freight_value' e 'payment_value'
df_base.describe(["price", "freight_value", "payment_value"]).show()

# Agrupa os pedidos pelo status ('order_status') e conta quantos pedidos há em cada status
# Exemplo: delivered, shipped, canceled etc.
df_base.groupBy("order_status").count().show()

# Agrupa os clientes por estado ('customer_state') e conta quantos registros existem em cada um
# Isso ajuda a visualizar a distribuição de pedidos por estado brasileiro
df_base.groupBy("customer_state").count().show()



+-------+------------------+------------------+------------------+
|summary|             price|     freight_value|     payment_value|
+-------+------------------+------------------+------------------+
|  count|            102349|            102349|            102349|
|   mean|124.40438001347657|20.104408543316925|161.60666982578988|
| stddev|188.91316646363524|15.870970831547204| 221.0484858186441|
|    min|              0.85|               0.0|               0.0|
|    max|            6735.0|            409.68|          13664.08|
+-------+------------------+------------------+------------------+

+------------+------+
|order_status| count|
+------------+------+
|     shipped|  1126|
|    canceled|   465|
|    approved|     2|
|    invoiced|   322|
|   delivered|100122|
| unavailable|     7|
|  processing|   305|
+------------+------+

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SC| 3737|
|            RO|  258|
|            PI|  504|
|            AM

In [15]:
output_path = r"C:\Users\Visitante\Desktop\Bootcamp\ml-data-engineering-project\notebooks\ETL pronto"
df_base.write.mode("overwrite").parquet(output_path)



In [17]:
df_base.coalesce(1).write.csv(
    r"C:\Users\Visitante\Desktop\Bootcamp\ml-data-engineering-project\notebooks\ETL pronto",
    header=True,
    mode="overwrite"
)


In [18]:
df_base.show(15, truncate=False)

+--------------------------------+------------+------------------------+--------------+--------------+-------------------+-------------------+--------------------------------+---------------------------------+-----+-------------+-------------+----------+
|order_id                        |order_status|order_purchase_timestamp|customer_city |customer_state|latitude_media     |longitude_media    |product_id                      |product_category_name            |price|freight_value|payment_value|order_date|
+--------------------------------+------------+------------------------+--------------+--------------+-------------------+-------------------+--------------------------------+---------------------------------+-----+-------------+-------------+----------+
|00048cc3ae777c65dbb7d2a0634bc1ea|delivered   |2017-05-15 21:42:34     |uberaba       |MG            |-19.752823866371905|-47.935837449992846|ef92defde845ab8450f9d70c526ef70f|utilidades_domesticas            |21.9 |12.69        |34.59 

In [17]:
#Pegando valores únicos
city_date_df = df_base \
    .select("customer_city", "customer_state", "order_date", "latitude_media", "longitude_media") \
    .distinct()

In [18]:
#Amostragem do dataset, o número no limit() é quantas amostras vamos pegar. máximo 10 mil
sampled_df = city_date_df.limit(5000)
sampled_df.show(5, truncate=False)

+-------------------+--------------+----------+-------------------+-------------------+
|customer_city      |customer_state|order_date|latitude_media     |longitude_media    |
+-------------------+--------------+----------+-------------------+-------------------+
|sao paulo          |SP            |2018-03-03|-23.570860168123758|-46.63324168698188 |
|sao jose dos campos|SP            |2018-05-01|-23.21087970704398 |-45.885186496536896|
|curitiba           |PR            |2017-02-21|-25.453054020386823|-49.27499629427101 |
|santos             |SP            |2018-01-15|-23.96252319074526 |-46.32780350647331 |
|sao joao do triunfo|PR            |2018-07-05|-25.68486273907674 |-50.29852266825625 |
+-------------------+--------------+----------+-------------------+-------------------+
only showing top 5 rows


In [19]:
import requests
import time

def get_weather(lat, lon, date):
  try:
    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": date,
        "end_date": date,
        "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,weathercode",
        "timezone": "America/Sao_Paulo"
    }
    r = requests.get(url, params=params)
    r.raise_for_status()
    return r.json()
  except Exception:
    return None


In [20]:
weather_results = []

for row in sampled_df.collect():
  lat = row["latitude_media"]
  lon = row["longitude_media"]
  date = row["order_date"].strftime("%Y-%m-%d")
  city = row["customer_city"]

  data = get_weather(lat, lon, date)
  if data:
    daily = data.get("daily", {})
    weather_results.append({
        "city": city,
        "date": date,
        "temp_max": daily.get("temperature_2m_max", [None])[0],
        "temp_min": daily.get("temperature_2m_min", [None])[0],
        "precip": daily.get("precipitation_sum", [0])[0],
        "weather": daily.get("weathercode", [None])[0]
    })
  time.sleep(0.01)

In [21]:
weather_df = spark.createDataFrame(weather_results)

olist_enriched_df = df_base.join(
    weather_df,
    (df_base.customer_city == weather_df.city) &
    (df_base.order_date == weather_df.date),
    "left"
)

In [22]:
olist_enriched_df.filter(col("weather").isNotNull()).show(5, truncate=False)

+--------------------------------+------------+------------------------+---------------------+--------------+-------------------+------------------+--------------------------------+---------------------+-----+-------------+-------------+----------+---------------------+----------+------+--------+--------+-------+
|order_id                        |order_status|order_purchase_timestamp|customer_city        |customer_state|latitude_media     |longitude_media   |product_id                      |product_category_name|price|freight_value|payment_value|order_date|city                 |date      |precip|temp_max|temp_min|weather|
+--------------------------------+------------+------------------------+---------------------+--------------+-------------------+------------------+--------------------------------+---------------------+-----+-------------+-------------+----------+---------------------+----------+------+--------+--------+-------+
|0020262c8a370bd5a174ea6a2a267321|delivered   |2017-11-

In [24]:
olist_enriched_df.coalesce(1).write.mode("overwrite").option("header", True).csv(
    r"C:\Users\Gabriel\Documents\ml-data-engineering-project\input\final-dataset"
)