## Obtenção da Base Via URL

In [4]:
from pathlib import Path
import os
import urllib.request
import zipfile
import sys

url = "https://resources.trase.earth/20251121/data/supply-chains/brazil_beef_v2_2_1.zip"
data_dir = Path(os.environ.get("TRASE_DATA_DIR") or Path.cwd() / "data" / "trase_data")
data_dir.mkdir(parents=True, exist_ok=True)
zip_path = data_dir / os.path.basename(url)
extract_dir = data_dir / zip_path.stem


def download_progress_hook(block_num, block_size, total_size):
    if total_size <= 0:
        return
    downloaded = block_num * block_size
    percent = min(downloaded / total_size * 100, 100)
    bar_len = 40
    filled_len = int(bar_len * percent / 100)
    bar = "#" * filled_len + "-" * (bar_len - filled_len)
    sys.stdout.write(f"\r[Baixando] |{bar}| {percent:6.2f}%")
    sys.stdout.flush()
    if downloaded >= total_size:
        sys.stdout.write("\n")


print(f"Baixando arquivo da Trase para {zip_path}...")
urllib.request.urlretrieve(url, str(zip_path), reporthook=download_progress_hook)
print(f"Download concluído! Arquivo salvo em: {zip_path}")


def extract_with_progress(zip_file, dest_dir):
    members = zip_file.infolist()
    total = len(members)
    for idx, member in enumerate(members, start=1):
        zip_file.extract(member, dest_dir)
        percent = idx / total * 100
        bar_len = 40
        filled_len = int(bar_len * percent / 100)
        bar = "#" * filled_len + "-" * (bar_len - filled_len)
        sys.stdout.write(f"\r[Extraindo] |{bar}| {percent:6.2f}%")
        sys.stdout.flush()
    sys.stdout.write("\n")


if not extract_dir.exists():
    print(f"Extraindo arquivos para {extract_dir}...")
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        extract_with_progress(zip_ref, extract_dir)
    print("Extração concluída!")
    
    # Remove arquivo ZIP após extração para economizar espaço
    if zip_path.exists():
        zip_path.unlink()
        print(f"Arquivo ZIP removido: {zip_path}")
else:
    print(f"Pasta {extract_dir} já existe. Pulei a extração.")
    
    # Remove ZIP mesmo se já extraiu antes (caso exista)
    if zip_path.exists():
        zip_path.unlink()
        print(f"Arquivo ZIP removido: {zip_path}")


Baixando arquivo da Trase para /home/jovyan/work/data/trase_data/brazil_beef_v2_2_1.zip...
[Baixando] |########################################| 100.00%
Download concluído! Arquivo salvo em: /home/jovyan/work/data/trase_data/brazil_beef_v2_2_1.zip
Pasta /home/jovyan/work/data/trase_data/brazil_beef_v2_2_1 já existe. Pulei a extração.
Arquivo ZIP removido: /home/jovyan/work/data/trase_data/brazil_beef_v2_2_1.zip


## Configuração da Sessão Spark e Minio

In [6]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("TraseBrazilBeef")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .getOrCreate()
)

spark

## Configurações Minio

In [9]:
from pyspark.sql import functions as F
from pathlib import Path
import boto3
from botocore.exceptions import ClientError

# Diretório em que os arquivos foram extraídos (já definido na sua célula anterior)
print("Diretório de extração:", extract_dir)

# Procura automaticamente o CSV extraído
csv_files = list(extract_dir.glob("*.csv"))
if not csv_files:
    raise FileNotFoundError(f"Nenhum CSV encontrado em {extract_dir}")

csv_path = str(csv_files[0])
print("Arquivo CSV encontrado:", csv_path)

# Configura caminhos no MinIO
BUCKET = "dados-supplychain"  # sem barra no final
BASE_PATH = f"s3a://{BUCKET}/trase/brazil_beef"

RAW_PATH    = f"{BASE_PATH}/raw/full_csv_dump"
BRONZE_PATH = f"{BASE_PATH}/bronze/full_supply_chain"

# Criar bucket automaticamente no MinIO se não existir
print(f"\nVerificando bucket '{BUCKET}' no MinIO...")
s3_client = boto3.client(
    's3',
    endpoint_url='http://minio:9000',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin',
    region_name='us-east-1'
)

try:
    s3_client.head_bucket(Bucket=BUCKET)
    print(f"Bucket '{BUCKET}' já existe.")
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == '404':
        print(f"Bucket '{BUCKET}' não existe. Criando...")
        s3_client.create_bucket(Bucket=BUCKET)
        print(f"Bucket '{BUCKET}' criado com sucesso!")
    else:
        print(f"Erro ao verificar bucket: {e}")
        raise

# Lê CSV original (dados RAW)
print("\nLendo CSV...")
df_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)

print("Schema RAW:")
df_raw.printSchema()
df_raw.show(5)


Diretório de extração: /home/jovyan/work/data/trase_data/brazil_beef_v2_2_1
Arquivo CSV encontrado: /home/jovyan/work/data/trase_data/brazil_beef_v2_2_1/brazil_beef_v2_2_1.csv

Verificando bucket 'dados-supplychain' no MinIO...
Bucket 'dados-supplychain' já existe.

Lendo CSV...


                                                                                

Schema RAW:
root
 |-- year: integer (nullable = true)
 |-- country_of_production: string (nullable = true)
 |-- biome: string (nullable = true)
 |-- state_of_production: string (nullable = true)
 |-- state_of_production_trase_id: string (nullable = true)
 |-- municipality_of_production: string (nullable = true)
 |-- municipality_of_production_trase_id: string (nullable = true)
 |-- logistics_hub: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- port_of_export: string (nullable = true)
 |-- exporter: string (nullable = true)
 |-- exporter_group: string (nullable = true)
 |-- zero_deforestation_brazil_beef: string (nullable = true)
 |-- forest_500_beef: string (nullable = true)
 |-- importer: string (nullable = true)
 |-- importer_group: string (nullable = true)
 |-- country_of_destination: string (nullable = true)
 |-- economic_bloc: string (nullable = true)
 |-- country_of_production_trase_id: string (nullable = true)
 |-- country_of_destination_trase_id: strin

## Camada Raw

In [10]:
# 1) Salva RAW como Parquet (backup fiel dos dados de origem)
(
    df_raw
    .write
    .mode("overwrite")
    .parquet(RAW_PATH)
)
print(f"RAW salvo em: {RAW_PATH}")

25/11/30 03:07:29 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

RAW salvo em: s3a://dados-supplychain/trase/brazil_beef/raw/full_csv_dump


## Camada Bronze

In [11]:
# 2) Cria BRONZE: tipagem mínima + particionamento por ano
df_bronze = (
    df_raw
    .withColumn("year", F.col("year").cast("int"))
)

(
    df_bronze
    .write
    .mode("overwrite")
    .partitionBy("year")
    .parquet(BRONZE_PATH)
)
print(f"BRONZE salvo em: {BRONZE_PATH}")

25/11/30 03:08:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/30 03:08:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/30 03:08:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/30 03:08:54 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/30 03:08:55 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/30 03:08:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                      

BRONZE salvo em: s3a://dados-supplychain/trase/brazil_beef/bronze/full_supply_chain


## Camada Silver

In [12]:
SILVER_BASE = f"{BASE_PATH}/silver"

# Lê a camada BRONZE
df_bronze_reload = spark.read.parquet(BRONZE_PATH)

# Garante tipos numéricos
numeric_cols = [
    "cattle_deforestation_5_year_total_exposure",
    "co2_gross_emissions_cattle_deforestation_5_year_total_exposure",
    "co2_net_emissions_cattle_deforestation_5_year_total_exposure",
    "land_use",
    "volume",
    "fob"
]

df_bronze_num = df_bronze_reload
for col in numeric_cols:
    if col in df_bronze_num.columns:
        df_bronze_num = df_bronze_num.withColumn(col, F.col(col).cast("double"))

# Forest 500 às vezes vem como string, vamos tentar converter pra número
if "forest_500_beef" in df_bronze_num.columns:
    df_bronze_num = df_bronze_num.withColumn(
        "forest_500_beef_num",
        F.regexp_replace("forest_500_beef", "[^0-9.]", "").cast("double")
    )
else:
    df_bronze_num = df_bronze_num.withColumn("forest_500_beef_num", F.lit(None).cast("double"))

# Cria flag ZDC (zero_deforestation_brazil_beef == 'yes' ou algo assim)
if "zero_deforestation_brazil_beef" in df_bronze_num.columns:
    df_bronze_num = df_bronze_num.withColumn(
        "zdc_flag",
        F.when(F.lower(F.col("zero_deforestation_brazil_beef")).isin("yes", "y", "true", "sim"), 1.0).otherwise(0.0)
    )
else:
    df_bronze_num = df_bronze_num.withColumn("zdc_flag", F.lit(0.0))

# --- SILVER: agregação por estado + ano ---
group_cols_state = ["year", "state_of_production", "state_of_production_trase_id"]

df_silver_state = (
    df_bronze_num
    .groupBy(*group_cols_state)
    .agg(
        F.sum("volume").alias("total_volume_t"),
        F.sum("fob").alias("total_fob_usd"),
        F.sum("cattle_deforestation_5_year_total_exposure").alias("total_cattle_defor_ha"),
        F.sum("co2_gross_emissions_cattle_deforestation_5_year_total_exposure").alias("total_co2_gross_tco2"),
        F.sum("co2_net_emissions_cattle_deforestation_5_year_total_exposure").alias("total_co2_net_tco2"),
        F.sum("land_use").alias("total_pasture_area_ha"),
        F.avg("forest_500_beef_num").alias("avg_forest500_beef"),
        F.sum("zdc_flag").alias("sum_zdc_flag"),
        F.sum(F.when(F.col("zdc_flag") == 1.0, F.col("volume")).otherwise(0.0)).alias("zdc_volume_t")
    )
    .withColumn(
        "zdc_share_volume",
        F.when(F.col("total_volume_t") > 0, F.col("zdc_volume_t") / F.col("total_volume_t")).otherwise(None)
    )
)

df_silver_state.write.mode("overwrite").parquet(f"{SILVER_BASE}/flows_by_state")
print(f"SILVER (flows_by_state) salvo em: {SILVER_BASE}/flows_by_state")


                                                                                

SILVER (flows_by_state) salvo em: s3a://dados-supplychain/trase/brazil_beef/silver/flows_by_state


In [13]:
# --- SILVER: agregação por município + estado + ano ---
group_cols_muni = [
    "year",
    "state_of_production",
    "state_of_production_trase_id",
    "municipality_of_production",
    "municipality_of_production_trase_id"
]

df_silver_muni = (
    df_bronze_num
    .groupBy(*group_cols_muni)
    .agg(
        F.sum("volume").alias("total_volume_t"),
        F.sum("fob").alias("total_fob_usd"),
        F.sum("cattle_deforestation_5_year_total_exposure").alias("total_cattle_defor_ha"),
        F.sum("co2_gross_emissions_cattle_deforestation_5_year_total_exposure").alias("total_co2_gross_tco2"),
        F.sum("co2_net_emissions_cattle_deforestation_5_year_total_exposure").alias("total_co2_net_tco2"),
        F.sum("land_use").alias("total_pasture_area_ha"),
        F.avg("forest_500_beef_num").alias("avg_forest500_beef"),
        F.sum("zdc_flag").alias("sum_zdc_flag"),
        F.sum(F.when(F.col("zdc_flag") == 1.0, F.col("volume")).otherwise(0.0)).alias("zdc_volume_t")
    )
    .withColumn(
        "zdc_share_volume",
        F.when(F.col("total_volume_t") > 0, F.col("zdc_volume_t") / F.col("total_volume_t")).otherwise(None)
    )
)

df_silver_muni.write.mode("overwrite").parquet(f"{SILVER_BASE}/flows_by_municipality")
print(f"SILVER (flows_by_municipality) salvo em: {SILVER_BASE}/flows_by_municipality")


                                                                                

SILVER (flows_by_municipality) salvo em: s3a://dados-supplychain/trase/brazil_beef/silver/flows_by_municipality


## Camada Gold

In [14]:
GOLD_BASE = f"{BASE_PATH}/gold"

df_flows_state = spark.read.parquet(f"{SILVER_BASE}/flows_by_state")

df_gold_state = (
    df_flows_state
    .groupBy("state_of_production", "state_of_production_trase_id")
    .agg(
        F.sum("total_volume_t").alias("sum_volume_t"),
        F.sum("total_fob_usd").alias("sum_fob_usd"),
        F.sum("total_cattle_defor_ha").alias("sum_cattle_defor_ha"),
        F.sum("total_co2_gross_tco2").alias("sum_co2_gross_tco2"),
        F.sum("total_pasture_area_ha").alias("sum_pasture_area_ha"),
        F.avg("avg_forest500_beef").alias("avg_forest500_beef_longterm"),
        F.avg("zdc_share_volume").alias("avg_zdc_share_volume_longterm")
    )
    .orderBy(F.desc("sum_volume_t"))
)

df_gold_state.write.mode("overwrite").parquet(f"{GOLD_BASE}/exports_by_state")
print(f"GOLD (exports_by_state) salvo em: {GOLD_BASE}/exports_by_state")

df_gold_state.show(10, truncate=False)


                                                                                

GOLD (exports_by_state) salvo em: s3a://dados-supplychain/trase/brazil_beef/gold/exports_by_state
+-------------------+----------------------------+--------------------+---------------------+--------------------+---------------------+--------------------+---------------------------+-----------------------------+
|state_of_production|state_of_production_trase_id|sum_volume_t        |sum_fob_usd          |sum_cattle_defor_ha |sum_co2_gross_tco2   |sum_pasture_area_ha |avg_forest500_beef_longterm|avg_zdc_share_volume_longterm|
+-------------------+----------------------------+--------------------+---------------------+--------------------+---------------------+--------------------+---------------------------+-----------------------------+
|UNKNOWN STATE      |BR-XX                       |1.1052985838506183E8|2.2020755068544426E10|4.0639158991566114E7|1.2934377391384726E10|1.4700830750077863E9|4.173482980504926          |0.0                          |
|SAO PAULO          |BR-35            