<a href="https://colab.research.google.com/github/Rob-A-B/wildfire-hotspots/blob/feature/queimadas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install dask dask[dataframe] pyarrow



In [2]:
!pip install cudf-cu12 dask-cudf-cu12 --extra-index-url=https://pypi.nvidia.com

Looking in indexes: https://pypi.org/simple, https://pypi.nvidia.com


In [3]:
import os, datetime
from google.colab import files
import shutil, os, datetime
import os, glob, datetime
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import re
from google.colab import drive
import gc

import dask.dataframe as dd
import dask_cudf

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
hoje = datetime.date.today()
BASE   = "/content/projeto_pcb"
BRONZE_RAW = f"{BASE}/dados/bronze/raw"                   # onde guardamos os arquivos "brutos" por parti√ß√£o
BRONZE_CUR = f"{BASE}/dados/bronze/current"               # dataset consolidado (√∫nico arquivo)
os.makedirs(BRONZE_RAW, exist_ok=True)
os.makedirs(BRONZE_CUR, exist_ok=True)


DRIVE_CSV_PATH = "/content/drive/My Drive/8_periodo/Big Data/datasets"
os.makedirs(DRIVE_CSV_PATH, exist_ok=True)

In [6]:
# 1) Descobrir todos os CSVs diretamente de um link ou ID de pasta compartilhada do Google Drive
# Using gdown to download all files from a shared Google Drive folder.
# This requires the folder to be shared with "Anyone with the link".

# Replace with your actual shared Google Drive FOLDER link or ID
shared_folder_link_or_id = 'https://drive.google.com/drive/folders/1hRgLe3t-zGgWqFH7ZAtW0m6XQdgOW3bh?usp=sharing' # <-- **REPLACE THIS**

download_dir = f"{DRIVE_CSV_PATH}/downloaded_csvs"
os.makedirs(download_dir, exist_ok=True)

print(f"üì¶ Attempting to download all files from shared Google Drive folder: {shared_folder_link_or_id}")

try:
    # Use gdown to download all files from the folder
    # The --folder flag indicates that the provided link/ID is for a folder
    # The -O flag specifies the output directory
    !gdown --no-cookies --no-check-certificate --folder "{shared_folder_link_or_id}" -O "{download_dir}"

    # After downloading, list the files in the download directory to process them
    arquivos_csv = [f for f in os.listdir(download_dir) if f.endswith(".csv") and os.path.isfile(os.path.join(download_dir, f))]
    print(f"üì¶ Downloaded {len(arquivos_csv)} CSVs to {download_dir}.\n")

except Exception as e:
    print(f"‚ö†Ô∏è Error during download: {e}")
    print("Please ensure gdown is installed (`!pip install gdown`), the folder link/ID is correct, and the folder is shared with 'Anyone with the link'.")
    arquivos_csv = [] # Ensure arquivos_csv is empty if download fails


# 2) Mover/copy to Bronze/raw partitioned and read into pandas DataFrames
# Now process the downloaded files from the download_dir
for nome in arquivos_csv:
    caminho_src = os.path.join(download_dir, nome)

    # extrai AAAAMM do nome (ex.: focos_mensal_br_202311.csv)
    m = re.search(r'(\d{6})', nome)
    aaaamm = m.group(1) if m else f"{hoje.year}{hoje:%m}"
    ano, mes = aaaamm[:4], aaaamm[4:]

    # destino particionado (save as parquet in bronze raw)
    dest_dir = os.path.join(BRONZE_RAW, f"ano={ano}", f"mes={mes}")
    os.makedirs(dest_dir, exist_ok=True)
    nome_parquet = nome.replace(".csv", ".parquet")
    caminho_dst_parquet = os.path.join(dest_dir, nome_parquet)


    try:
        df_temp = pd.read_csv(caminho_src)
        df_temp["origem_arquivo"] = nome

        # salva como Parquet em bronze raw
        df_temp.to_parquet(caminho_dst_parquet, index=False)
        print(f"‚úÖ Processado e salvo como Parquet: {nome}  ‚ûú  {caminho_dst_parquet}")

        del df_temp # Free up memory
        gc.collect() # Run garbage collection

    except Exception as e:
        print(f"‚ö†Ô∏è Erro ao processar {caminho_src}: {e}")


# 3) Consolidate everything into a single Dask DataFrame (Bronze/current) by reading the Parquet files
# Update the path to read from the BRONZE_RAW directory where the processed parquet files are saved
# Only attempt to read if there are processed files
if arquivos_csv:
    try:
        bronze_ddf = dask_cudf.read_parquet(os.path.join(BRONZE_RAW, "*/*/*.parquet"))

        # Convert 'ano' and 'mes' to Int64 as category dtype is not supported by parquet writer
        bronze_ddf["ano"] = bronze_ddf["ano"].astype("Int64")
        bronze_ddf["mes"] = bronze_ddf["mes"].astype("Int64")

        # Simple idempotence: remove exact duplicate rows
        bronze_ddf = bronze_ddf.drop_duplicates()

        print(f"\nüß± Bronze consolidated: {len(bronze_ddf):,} linhas, {bronze_ddf.shape[1]} colunas")
        # For Dask, we use .head() to see the first few rows. This triggers computation.
        display(bronze_ddf.head())

        # 4) Salva consolidado (Parquet)
        out_parquet = os.path.join(BRONZE_CUR, "focos_bronze_consolidado.parquet")

        # To save as a single file, we repartition to 1 partition before saving
        bronze_ddf.repartition(npartitions=1).to_parquet(out_parquet)

        print(f"\nüíæ Consolidado salvo em:\n - {out_parquet}")

    except Exception as e:
        print(f"‚ö†Ô∏è Erro ao consolidar ou salvar Bronze: {e}")
        # Handle the case where no parquet files were created
        if "Empty dataset" in str(e):
             print("No parquet files were created. Please check the download and processing steps.")

else:
    print("\nSkipping consolidation and saving Bronze as no CSV files were downloaded or processed.")

üì¶ Attempting to download all files from shared Google Drive folder: https://drive.google.com/drive/folders/1hRgLe3t-zGgWqFH7ZAtW0m6XQdgOW3bh?usp=sharing
Retrieving folder contents
Retrieving folder 1ovB3SBy-8dycREpOpS6R8IvZf1M9Pucf silver
Processing file 1C0kSP8H1texrVhDk3phK3JpbFzobhOvn focos_mensal_br_202401.csv
Processing file 1A5R0ddrYc9h1mdC13BcZgionHE2DOBqG focos_mensal_br_202402.csv
Processing file 1FfQ3dMqa4P8nJsj6aduzmuRK7KTGIic9 focos_mensal_br_202403.csv
Processing file 1xPIWC_TV-mF3Va_N4x2DpXtzqmMv-Uxt focos_mensal_br_202404.csv
Processing file 1mqey7AzyRkuBped-T_Qx5hXl2EfE4WA0 focos_mensal_br_202405.csv
Processing file 1NMvoGVZM2txO2O_31i28oBEfABs2eT9A focos_mensal_br_202406.csv
Processing file 1oysu2-kQSqWy8RnHU08SLefVi2hzV3SY focos_mensal_br_202407.csv
Processing file 11UijgNQ7DXDfulVRP4lXS05tbqmrjg-n focos_mensal_br_202408.csv
Processing file 1Byntu2ERioK27Mj8-lRY8Ylq7tHGZuZO focos_mensal_br_202409.csv
Processing file 1O8YNTuMNGJ-8ZrVZTHUM9IHK-ckgX_v6 focos_mensal_br

Unnamed: 0,id,lat,lon,data_hora_gmt,satelite,municipio,estado,pais,municipio_id,estado_id,pais_id,numero_dias_sem_chuva,precipitacao,risco_fogo,bioma,frp,origem_arquivo,ano,mes
20867,d0b2cb29-c10d-3dfb-9ac5-b490c0f03638,-30.03362,-51.11917,2024-01-07 03:53:00,NOAA-21,PORTO ALEGRE,RIO GRANDE DO SUL,Brasil,4314902,43,33,-999.0,0.6,-999.0,Pampa,2.6,focos_mensal_br_202401.csv,2024,1
5504,0b5505dd-3cfb-3e28-b1f0-4f89951b3f99,3.9103,-61.8839,2024-01-03 20:04:43,GOES-16,AMAJARI,RORAIMA,Brasil,1400027,14,33,3.0,0.0,0.08,Amaz√¥nia,96.1,focos_mensal_br_202401.csv,2024,1
20871,ee918bbc-91ba-3ebc-ac93-a139b6ebd3cd,-3.46594,-39.2058,2024-01-07 04:12:00,NPP-375D,PARAIPABA,CEAR√Å,Brasil,2310258,23,33,5.0,0.0,0.84,Caatinga,1.4,focos_mensal_br_202401.csv,2024,1
5507,d2a7e5eb-d47c-3ebe-9441-fe152a5a6e8e,3.8921,-61.8654,2024-01-03 20:04:44,GOES-16,AMAJARI,RORAIMA,Brasil,1400027,14,33,3.0,0.0,0.08,Amaz√¥nia,355.7,focos_mensal_br_202401.csv,2024,1
20875,5f5dbda9-0b4e-32f7-a4d1-513a026c6731,-2.88609,-40.52451,2024-01-07 04:12:00,NPP-375D,JIJOCA DE JERICOACOARA,CEAR√Å,Brasil,2307254,23,33,-999.0,0.75,0.62,Caatinga,0.9,focos_mensal_br_202401.csv,2024,1



üíæ Consolidado salvo em:
 - /content/projeto_pcb/dados/bronze/current/focos_bronze_consolidado.parquet


In [None]:
import shutil
import os

# Define the source path (the silver parquet file)
# Assuming the silver_parquet variable from the previous cell (4msZhIXbeoEg) holds the correct path
# If the previous cell was not run, you might need to redefine silver_parquet
# silver_parquet = '/content/projeto_pcb/dados/silver/focos_silver_20251012.parquet' # Example if needed

# Define the destination path in your Google Drive
# **IMPORTANT:** Replace 'My Drive/Your_Destination_Folder' with the actual path in your Drive
# This should be the same folder you intend to upload to.
drive_destination_folder = "/content/drive/My Drive/8_periodo/Big Data/datasets/silver" # Replace with your desired folder path in Drive
os.makedirs(drive_destination_folder, exist_ok=True) # Create the destination folder if it doesn't exist

# Define the full destination path including the filename
# Extract the filename from the silver_parquet source path
silver_file_name = os.path.basename(silver_parquet)
silver_parquet_destination = os.path.join(drive_destination_folder, silver_file_name)

print("Fonte:", silver_parquet)
print("Destino:", silver_parquet_destination)
print("Pasta destino:", drive_destination_folder)

# Copy the file
try:
    shutil.copy(silver_parquet, silver_parquet_destination)
    print(f"‚úÖ Arquivo salvo no Google Drive: {silver_parquet_destination}")
except FileNotFoundError:
    print(f"‚ö†Ô∏è Erro: Arquivo silver parquet n√£o encontrado em {silver_parquet}. Certifique-se de que a c√©lula anterior foi executada com sucesso.")
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao salvar arquivo no Google Drive: {e}")

In [None]:
bronze_ddf.info()

<class 'dask_cudf._expr.collection.DataFrame'>
Columns: 19 entries, id to mes
dtypes: object(10), float64(5), int64(4)

In [None]:
bronze_ddf.isna().sum().compute()

id                            0
lat                           0
lon                           0
data_hora_gmt                 0
satelite                      0
municipio                     0
estado                        0
pais                          0
municipio_id                  0
estado_id                     0
pais_id                       0
numero_dias_sem_chuva    125852
precipitacao             125852
risco_fogo               125852
bioma                         4
frp                      374354
origem_arquivo                0
ano                           0
mes                           0
dtype: int64

In [None]:
hoje = datetime.date.today()
BASE = "/content/projeto_pcb"
BRONZE = "/content/projeto_pcb/dados/bronze/current"
SILVER = "/content/projeto_pcb/dados/silver"
os.makedirs(SILVER, exist_ok=True)

In [None]:
df_temp = None
del df_temp
gc.collect()

41

In [None]:
bronze_ddf = bronze_ddf.drop(columns=['id', 'lat','lon','municipio_id','estado_id','pais_id','pais','origem_arquivo', 'ano', 'mes'])

In [None]:

paths = sorted(glob.glob(os.path.join(BRONZE, "*.parquet")))
assert paths, "Nenhum arquivo Parquet encontrado em bronze."

bronze_ddf = dd.read_parquet(paths[0]) # Assuming the consolidated file is the first one

esperadas = {"data_hora_gmt","satelite","municipio","estado"}
# Check for missing columns (Dask DataFrames have columns attribute)
faltando = esperadas - set(bronze_ddf.columns)
assert not faltando, f"Faltam colunas no dataset: {faltando}"

bronze_ddf["data_hora_gmt"] = bronze_ddf["data_hora_gmt"].astype('datetime64[ns, UTC]')


for c in ["satelite","municipio","estado"]:
      bronze_ddf[c] = bronze_ddf[c].astype(str).str.strip()

bronze_ddf = bronze_ddf.dropna(subset=["data_hora_gmt"])

# Identify and replace -999 with NaN in numeric columns
numeric_cols = bronze_ddf.select_dtypes(include=np.number).columns
# Use Dask's assign with a dictionary comprehension for replacement
replace_dict = {col: bronze_ddf[col].replace(-999, np.nan) for col in numeric_cols}
bronze_ddf = bronze_ddf.assign(**replace_dict)


# Calculate and print proportion of NaNs per column using Dask
nan_proportion_per_column = bronze_ddf.isnull().sum().compute() / len(bronze_ddf) * 100
print("\nPropor√ß√£o de NaN por coluna (%):")
print(nan_proportion_per_column)

# Calculate and print proportion of rows with NaN using Dask
rows_with_nan = bronze_ddf.isnull().any(axis=1).sum().compute()
proportion_rows_with_nan = rows_with_nan / len(bronze_ddf) * 100
print(f"\nPropor√ß√£o de linhas com NaN no total do dataset (%): {proportion_rows_with_nan:.2f}%")

bronze_ddf = bronze_ddf.dropna()

# Extract date components using Dask's .dt accessor
bronze_ddf["ano"] = bronze_ddf["data_hora_gmt"].dt.year.astype("Int64")
bronze_ddf["mes"] = bronze_ddf["data_hora_gmt"].dt.month.astype("Int64")
bronze_ddf["dia"] = bronze_ddf["data_hora_gmt"].dt.day.astype("Int64")

# carga que prof pediu
bronze_ddf["dt_carga"] = datetime.date.today().isoformat()

# Salva silver as Parquet
SILVER_FILE =  f"focos_silver_{datetime.date.today():%Y%m%d}.parquet"
silver_parquet = os.path.join(SILVER,SILVER_FILE)
os.makedirs(SILVER, exist_ok=True)

# Save as a single parquet file
bronze_ddf.repartition(npartitions=1).to_parquet(silver_parquet)


print("Silver salvo:", silver_parquet, "| linhas:", len(bronze_ddf))

display(bronze_ddf.head())


Propor√ß√£o de NaN por coluna (%):
id                       0.000000
lat                      0.000000
lon                      0.000000
data_hora_gmt            0.000000
satelite                 0.000000
municipio                0.000000
estado                   0.000000
pais                     0.000000
municipio_id             0.000000
estado_id                0.000000
pais_id                  0.000000
numero_dias_sem_chuva    1.916553
precipitacao             1.180835
risco_fogo               2.209782
bioma                    0.000038
frp                      3.512460
origem_arquivo           0.000000
ano                      0.000000
mes                      0.000000
dtype: float64

Propor√ß√£o de linhas com NaN no total do dataset (%): 6.23%
Silver salvo: /content/projeto_pcb/dados/silver/focos_silver_20251012.parquet | linhas: 9993721


Unnamed: 0,id,lat,lon,data_hora_gmt,satelite,municipio,estado,pais,municipio_id,estado_id,...,numero_dias_sem_chuva,precipitacao,risco_fogo,bioma,frp,origem_arquivo,ano,mes,dia,dt_carga
16932,2d3ed0b6-5b0c-3184-8cc9-0611e10fd6a3,-19.4659,-55.6796,2024-01-06 14:06:53+00:00,GOES-16,AQUIDAUANA,MATO GROSSO DO SUL,Brasil,5001102,50,...,2.0,0.0,0.3,Pantanal,84.2,focos_mensal_br_202401.csv,2024,1,6,2025-10-12
16939,c64e73e1-134d-3c50-8e17-4df3865b92ba,-9.2355,-37.5969,2024-01-06 14:15:56+00:00,GOES-16,CANAPI,ALAGOAS,Brasil,2701605,27,...,9.0,0.0,1.0,Caatinga,109.8,focos_mensal_br_202401.csv,2024,1,6,2025-10-12
16942,ca3325dd-2a9e-34f3-8c8a-8c640670d4bf,-9.6129,-37.9285,2024-01-06 14:15:58+00:00,GOES-16,CANIND√â DE S√ÉO FRANCISCO,SERGIPE,Brasil,2801207,28,...,13.0,0.0,1.0,Caatinga,119.8,focos_mensal_br_202401.csv,2024,1,6,2025-10-12
16943,1bb0fb77-8ec5-3e70-a030-637c07af6dd7,-9.612,-37.9807,2024-01-06 14:15:58+00:00,GOES-16,CANIND√â DE S√ÉO FRANCISCO,SERGIPE,Brasil,2801207,28,...,14.0,0.0,1.0,Caatinga,113.5,focos_mensal_br_202401.csv,2024,1,6,2025-10-12
16948,e7aa32a5-796f-363e-bb7c-32fa8718234e,-9.5428,-37.5481,2024-01-06 14:15:58+00:00,GOES-16,S√ÉO JOS√â DA TAPERA,ALAGOAS,Brasil,2708402,27,...,15.0,0.0,1.0,Caatinga,102.5,focos_mensal_br_202401.csv,2024,1,6,2025-10-12


In [None]:
print(silver_parquet_source)

/content/projeto_pcb/dados/silver/focos_silver_20251012.parquet


In [None]:
import shutil

SILVER_FILE =  f"focos_silver_{datetime.date.today():%Y%m%d}.parquet/*"
# Define the source path (the silver parquet file)
# Make sure the silver_parquet variable from the previous cell is accessible
silver_parquet_source = '/content/projeto_pcb/dados/silver/focos_silver_20251012.parquet/part.0.parquet'

# Define the destination path in your Google Drive
# **IMPORTANT:** Replace 'My Drive/Your_Destination_Folder' with the actual path in your Drive
drive_destination_folder = "/content/drive/My Drive/8_periodo/Big Data/datasets/silver"
os.makedirs(drive_destination_folder, exist_ok=True) # Create the destination folder if it doesn't exist

# Define the full destination path including the filename
silver_parquet_destination = os.path.join(drive_destination_folder, SILVER_FILE)

print("Fonte:", silver_parquet_source)
print("Destino:", silver_parquet_destination)
print("Pasta destino:", drive_destination_folder)

# Copy the file
try:
    shutil.copy(silver_parquet_source, silver_parquet_destination)
    print(f"‚úÖ Arquivo salvo no Google Drive: {silver_parquet_destination}")
except FileNotFoundError:
    print(f"‚ö†Ô∏è Erro: Arquivo silver parquet n√£o encontrado em {silver_parquet_source}. Certifique-se de que a c√©lula anterior foi executada com sucesso.")
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao salvar arquivo no Google Drive: {e}")

Fonte: /content/projeto_pcb/dados/silver/focos_silver_20251012.parquet/part.0.parquet
Destino: /content/drive/My Drive/8_periodo/Big Data/datasets/silver/focos_silver_20251012.parquet/*
Pasta destino: /content/drive/My Drive/8_periodo/Big Data/datasets/silver
‚ö†Ô∏è Erro: Arquivo silver parquet n√£o encontrado em /content/projeto_pcb/dados/silver/focos_silver_20251012.parquet/part.0.parquet. Certifique-se de que a c√©lula anterior foi executada com sucesso.


In [None]:
bronze_df.isna().sum()

Unnamed: 0,0
data_hora_gmt,0
satelite,0
municipio,0
estado,0
numero_dias_sem_chuva,302472
precipitacao,302472
risco_fogo,302472
bioma,9
frp,511105
ano,0


In [None]:
bronze_ddf = None
del bronze_ddf
gc.collect()

0

In [None]:
GOLD = "/content/projeto_pcb/dados/gold"
os.makedirs(GOLD, exist_ok=True)

In [None]:
import dask.dataframe as dd
import os
import glob
import datetime
import pandas as pd # Keep pandas for final output display if needed
import dask_cudf # Import dask_cudf

GOLD = "/content/projeto_pcb/dados/gold"
os.makedirs(GOLD, exist_ok=True)

padrao_silver = "focos_silver_*.parquet"   # adjustment for parquet files

paths = sorted(glob.glob(os.path.join(SILVER, padrao_silver)))
assert paths, f"Nenhum arquivo Parquet Silver encontrado em: {SILVER}"

# Read the silver parquet files into a Dask DataFrame
# Dask can read multiple files matching a pattern directly
ddf_silver = dask_cudf.read_parquet(paths)

# Ensure 'data_hora_gmt' is datetime with UTC timezone
# ddf_silver["data_hora_gmt"] = ddf_silver["data_hora_gmt"].astype('datetime64[ns, UTC]') # Removed this line


# Normaliza√ß√µes √∫teis para agrega√ß√£o using Dask's .dt accessor
ddf_silver["ano"]  = ddf_silver["data_hora_gmt"].dt.year.astype("Int64")
ddf_silver["mes"]  = ddf_silver["data_hora_gmt"].dt.month.astype("Int64")
ddf_silver["dia"]  = ddf_silver["data_hora_gmt"].dt.day.astype("Int64")
ddf_silver["hora"] = ddf_silver["data_hora_gmt"].dt.hour.astype("Int64") # Convert to Int64

# Recreate 'data' column as a string for grouping
ddf_silver["data"] = ddf_silver["data_hora_gmt"].dt.strftime('%Y-%m-%d')


# Aggregations using Dask groupby
por_estado_mes = (
    ddf_silver.groupby(["estado","ano","mes"])
      .size()
      .reset_index()
      .rename(columns={0: "qtd_focos"}) # Rename the column after reset_index
      .compute() # Trigger computation and get pandas DataFrame
      .sort_values(["ano","mes","qtd_focos"], ascending=[True,True,False])
)


por_municipio_dia = (
    ddf_silver.groupby(["estado","municipio","data"])
      .size()
      .reset_index()
      .rename(columns={0: "qtd_focos"}) # Rename the column after reset_index
      .compute() # Trigger computation and get pandas DataFrame
      .sort_values(["data","qtd_focos"], ascending=[True,False])
)


por_satelite_mes = (
    ddf_silver.groupby(["satelite","ano","mes"])
      .size()
      .reset_index()
      .rename(columns={0: "qtd_focos"}) # Rename the column after reset_index
      .compute() # Trigger computation and get pandas DataFrame
      .sort_values(["ano","mes","qtd_focos"], ascending=[True,True,False])
)

por_satelite_total = (
    ddf_silver.groupby("satelite")
      .size()
      .reset_index()
      .rename(columns={0: "qtd_focos"}) # Rename the column after reset_index
      .compute() # Trigger computation and get pandas DataFrame
      .assign(perc=lambda x: (x["qtd_focos"] / x["qtd_focos"].sum())*100)
      .sort_values("qtd_focos", ascending=False)
      .round({"perc": 2})
)

# For spatial grid, compute first as it might be a large result
# Round lat/lon using Dask map_partitions if needed for large data
# ddf_silver["lat_cell"] = ddf_silver["lat"].map_partitions(lambda x: x.round(2))
# ddf_silver["lon_cell"] = ddf_silver["lon"].map_partitions(lambda x: x.round(2))
# Simple round for now, assuming it fits in memory after aggregation
ddf_silver["lat_cell"] = ddf_silver["lat"].round(2)
ddf_silver["lon_cell"] = ddf_silver["lon"].round(2)


grade_espacial = (
    ddf_silver.groupby(["lat_cell","lon_cell"])
      .size()
      .reset_index()
      .rename(columns={0: "qtd_focos"}) # Rename the column after reset_index
      .compute() # Trigger computation and get pandas DataFrame
      .sort_values("qtd_focos", ascending=False)
)

# ------------------------------
# Salva em GOLD
# ------------------------------
hoje = datetime.date.today().isoformat()

g1 = os.path.join(GOLD, f"gold_focos_por_estado_mes_{hoje}.parquet") # Save as parquet
g2 = os.path.join(GOLD, f"gold_focos_por_municipio_dia_{hoje}.parquet") # Save as parquet
g3 = os.path.join(GOLD, f"gold_focos_por_satelite_mes_{hoje}.parquet") # Save as parquet
g3b= os.path.join(GOLD, f"gold_focos_por_satelite_total_{hoje}.parquet") # Save as parquet
g4 = os.path.join(GOLD, f"gold_grade_espacial_{hoje}.parquet") # Save as parquet

# Save pandas DataFrames to parquet
por_estado_mes.to_parquet(g1, index=False)
por_municipio_dia.to_parquet(g2, index=False)
por_satelite_mes.to_parquet(g3, index=False)
por_satelite_total.to_parquet(g3b, index=False)
grade_espacial.to_parquet(g4, index=False)


print("Gold salvo:")
print(" -", g1)
print(" -", g2)
print(" -", g3)
print(" -", g3b)
print(" -", g4)

try:
    ult_ano  = por_estado_mes["ano"].max()
    ult_mes  = por_estado_mes.query("ano == @ult_ano")["mes"].max()
    top_estados = (
        por_estado_mes
        .query("ano == @ult_ano and mes == @ult_mes")
        .nlargest(10, "qtd_focos")
    )
    print(f"\nTOP estados no m√™s mais recente ({ult_ano}-{ult_mes:02d}):")
    print(top_estados.to_string())
except Exception as e:
    print("\n[Aviso] N√£o foi poss√≠vel imprimir TOP estados do m√™s mais recente:", e)

print("\n% por satelite (total):")
print(por_satelite_total.to_string())

Gold salvo:
 - /content/projeto_pcb/dados/gold/gold_focos_por_estado_mes_2025-10-12.parquet
 - /content/projeto_pcb/dados/gold/gold_focos_por_municipio_dia_2025-10-12.parquet
 - /content/projeto_pcb/dados/gold/gold_focos_por_satelite_mes_2025-10-12.parquet
 - /content/projeto_pcb/dados/gold/gold_focos_por_satelite_total_2025-10-12.parquet
 - /content/projeto_pcb/dados/gold/gold_grade_espacial_2025-10-12.parquet

TOP estados no m√™s mais recente (2025-10):
                 estado   ano  mes  qtd_focos
125            MARANH√ÉO  2025   10      57887
218           TOCANTINS  2025   10      55587
8                 GOI√ÅS  2025   10      46353
566               PIAU√ç  2025   10      44843
531               BAHIA  2025   10      37219
99         MINAS GERAIS  2025   10      33428
547         MATO GROSSO  2025   10      33184
175                PAR√Å  2025   10      19456
489                ACRE  2025   10       9619
82   MATO GROSSO DO SUL  2025   10       9395

% por satelite (total):
    s

In [None]:
# cont = df["Grey"].value_counts().sort_index()
# plt.bar(cont.index.astype(str), cont.values)
# plt.title("Distribui√ß√£o da classe Grey")
# plt.xlabel("Grey (0=n√£o cinza, 1=cinza)")
# # plt.ylabel("n")
# plt.show()


In [None]:
# agg = df.groupby("Grey")[["R","G","B"]].mean().reset_index()
# for ch in ["R","G","B"]:
#     plt.bar(agg["Grey"].astype(str), agg[ch], label=ch)
#     plt.title(f"M√©dia do canal {ch} por Grey")
#     plt.xlabel("Grey")
#     plt.ylabel(f"M√©dia {ch} (escala {'0‚Äì255' if df['R'].max()>1 else '0‚Äì1'})")
#     plt.show()


# Task
Explain how to integrate Dask with cuDF for GPU acceleration in Google Colab.

## Set up a gpu runtime

### Subtask:
Ensure your Colab notebook is using a GPU runtime.


## Install rapids

### Subtask:
Install the necessary RAPIDS libraries, including cuDF and Dask-cuDF.


**Reasoning**:
Install the necessary RAPIDS libraries (cuDF and dask-cuDF) using pip.



In [None]:
!pip install cudf-cu12 dask-cudf-cu12 --extra-index-url=https://pypi.nvidia.com

Looking in indexes: https://pypi.org/simple, https://pypi.nvidia.com


## Import libraries

### Subtask:
Import dask_cudf and dask.dataframe.


**Reasoning**:
Import the necessary libraries for working with Dask and cuDF.



In [None]:
import dask_cudf
import dask.dataframe as dd

## Create a dask-cudf dataframe

### Subtask:
Load your data into a cuDF DataFrame first, and then create a Dask-cuDF DataFrame from it. Alternatively, you can use Dask-cuDF's read functions directly if they support your data format.


**Reasoning**:
The data is available as a consolidated parquet file in the BRONZE_CUR directory. I will use `dask_cudf.read_parquet` to load this data directly into a Dask-cuDF DataFrame.



In [None]:
import os
import dask_cudf

BRONZE_CUR = "/content/projeto_pcb/dados/bronze/current"
bronze_parquet_path = os.path.join(BRONZE_CUR, "focos_bronze_consolidado.parquet")

# Load the data into a Dask-cuDF DataFrame
ddf_gpu = dask_cudf.read_parquet(bronze_parquet_path)

# Display the first few rows to verify
print("Dask-cuDF DataFrame loaded:")
display(ddf_gpu.head())

Dask-cuDF DataFrame loaded:


Unnamed: 0,id,lat,lon,data_hora_gmt,satelite,municipio,estado,pais,municipio_id,estado_id,pais_id,numero_dias_sem_chuva,precipitacao,risco_fogo,bioma,frp,origem_arquivo,ano,mes
37,3283c9ee-3a7e-3123-9c39-b2b83631915c,-18.0072,-57.401901,2024-01-01 01:11:32,METOP-C,CORUMB√Å,MATO GROSSO DO SUL,Brasil,5003207,50,33,0,10.58,-999.0,Pantanal,,focos_mensal_br_202401.csv,2024,1
44,1abcaee1-bd65-38f7-b12b-32586fe3953f,3.2409,-60.763302,2024-01-01 01:17:30,METOP-C,BOA VISTA,RORAIMA,Brasil,1400100,14,33,8,0.0,1.0,Amaz√¥nia,,focos_mensal_br_202401.csv,2024,1
51,66778b43-fcc3-36f7-8dcc-50a09854b6db,-18.0629,-57.3721,2024-01-01 02:56:48,GOES-16,CORUMB√Å,MATO GROSSO DO SUL,Brasil,5003207,50,33,0,14.42,0.0,Pantanal,55.9,focos_mensal_br_202401.csv,2024,1
70,cc6ebd2c-100b-399b-b635-60ce4844791e,-12.55343,-41.48535,2024-01-01 04:01:00,NOAA-21,PALMEIRAS,BAHIA,Brasil,2923506,29,33,8,2.57,0.51,Caatinga,2.0,focos_mensal_br_202401.csv,2024,1
74,0df506db-cc03-361e-b1a6-bf5d11f0ac16,-12.56825,-41.45536,2024-01-01 04:01:00,NOAA-21,LEN√á√ìIS,BAHIA,Brasil,2919306,29,33,9,1.58,0.68,Caatinga,0.7,focos_mensal_br_202401.csv,2024,1


## Perform gpu-accelerated operations

### Subtask:
Perform some basic data manipulation operations on the Dask-cuDF DataFrame (`ddf_gpu`) to demonstrate GPU acceleration.


**Reasoning**:
Perform data manipulation and aggregation on the dask-cudf dataframe to demonstrate GPU acceleration.



In [None]:
# 1. Select a few columns
selected_cols_ddf = ddf_gpu[['estado', 'municipio', 'precipitacao', 'risco_fogo', 'bioma']]

# 2. Filter the DataFrame based on a condition
# Filter for 'precipitacao' greater than 0 and 'risco_fogo' greater than 0
filtered_ddf = selected_cols_ddf[(selected_cols_ddf['precipitacao'] > 0) & (selected_cols_ddf['risco_fogo'] > 0)]

# 3. Perform a simple aggregation (e.g., calculate the mean of 'risco_fogo')
# Calculate the mean of 'risco_fogo' for the filtered data
mean_risco_fogo = filtered_ddf['risco_fogo'].mean()

# 4. Use the .compute() method to trigger the Dask computation
# Trigger the computation to get the result
computed_mean = mean_risco_fogo.compute()

# 5. Print or display the computed result
print(f"Computed mean 'risco_fogo' on filtered data (GPU accelerated): {computed_mean}")

Computed mean 'risco_fogo' on filtered data (GPU accelerated): 0.6301548414101147


## Monitor gpu usage (optional)

### Subtask:
Use tools like `nvidia-smi` or the Colab resource monitor to observe GPU memory usage and activity.


**Reasoning**:
Execute the `nvidia-smi` command to observe GPU usage and memory.



In [None]:
!nvidia-smi

Sun Oct 12 21:38:10 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   51C    P0             30W /   70W |     114MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

## Summary:

### Data Analysis Key Findings

*   The Colab environment needs to be manually configured to use a GPU runtime; this step cannot be performed programmatically.
*   The required RAPIDS libraries, including `cudf-cu12` and `dask-cudf-cu12`, were already installed in the environment.
*   The necessary libraries, `dask_cudf` and `dask.dataframe`, were successfully imported.
*   A Dask-cuDF DataFrame was successfully created by directly reading a parquet file using `dask_cudf.read_parquet`.
*   GPU-accelerated operations were successfully performed on the Dask-cuDF DataFrame, including column selection, filtering, and calculating the mean of the 'risco\_fogo' column, resulting in a computed mean of approximately 0.630.
*   The `nvidia-smi` command successfully executed, providing information about the GPU status (Tesla T4) and usage.

### Insights or Next Steps

*   Explore more complex Dask-cuDF operations (e.g., joins, groupbys) to further leverage GPU acceleration on larger datasets.
*   Compare the performance of Dask-cuDF operations on the GPU with equivalent Dask operations on the CPU to quantify the acceleration benefits.
