In [1]:
# iniciando a sessão com o pyspark
!pip install pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DesmatamentoBR") \
    .getOrCreate()



In [2]:
# importando os dados do kaggle
import kagglehub

# Download latest version
path = kagglehub.dataset_download("mbogernetto/brazilian-amazon-rainforest-degradation")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/mbogernetto/brazilian-amazon-rainforest-degradation?dataset_version_number=3...


100%|██████████| 45.9k/45.9k [00:00<00:00, 6.92MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/mbogernetto/brazilian-amazon-rainforest-degradation/versions/3





In [4]:
# Verificando o tipo do arquivo
import os

path = '/root/.cache/kagglehub/datasets/mbogernetto/brazilian-amazon-rainforest-degradation/versions/3'
os.listdir(path)

['def_area_2004_2019.csv',
 'inpe_brazilian_amazon_fires_1999_2019.csv',
 'el_nino_la_nina_1999_2019.csv']

In [5]:
# Leitura dos dataframes utilizados
# Dados de queimadas (focos de incêndio por estado/mês)
fires_df = spark.read.csv(
    path + '/inpe_brazilian_amazon_fires_1999_2019.csv',
    header=True,
    inferSchema=True
)
fires_df.show(5)

# Eventos climáticos (El Niño / La Niña)
climate_events_df = spark.read.csv(
    path + '/el_nino_la_nina_1999_2019.csv',
    header=True,
    inferSchema=True
)
climate_events_df.show(5)

# Resumo anual por estado da Amazônia Legal
annual_summary_df = spark.read.csv(
    path + '/def_area_2004_2019.csv',
    header=True,
    inferSchema=True
)
annual_summary_df.show(5)

+----+-----+-----------+-------------------+-------------------+---------+
|year|month|      state|           latitude|          longitude|firespots|
+----+-----+-----------+-------------------+-------------------+---------+
|1999|    1|   AMAZONAS| -2.371113333333333| -59.89993333333334|        3|
|1999|    1|   MARANHAO| -2.257394722222222|-45.487830555555554|       36|
|1999|    1|MATO GROSSO|-12.660633333333333|-55.057988888888886|       18|
|1999|    1|       PARA| -2.474820459770115| -48.54696666666667|       87|
|1999|    1|   RONDONIA|           -12.8617|           -60.5131|        1|
+----+-----+-----------+-------------------+-------------------+---------+
only showing top 5 rows

+----------+--------+----------+--------+
|start year|end year|phenomenon|severity|
+----------+--------+----------+--------+
|      2004|    2005|   El Nino|    Weak|
|      2006|    2007|   El Nino|    Weak|
|      2014|    2015|   El Nino|    Weak|
|      2018|    2019|   El Nino|    Weak|
|     

In [None]:
# padronizar e limpeza do dados
fires = fires_df
climate = climate_events_df
summary = annual_summary_df

fires = fires.dropDuplicates()
climate = climate.dropDuplicates()
summary = summary.dropDuplicates()

In [None]:
# Verificando dados nulos
from pyspark.sql import functions as F
for df, name in zip([fires, climate, summary], ['fires', 'climate', 'summary']):
    print(f"{name}:")
    df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

fires:
+----+-----+-----+--------+---------+---------+
|year|month|state|latitude|longitude|firespots|
+----+-----+-----+--------+---------+---------+
|   0|    0|    0|       0|        0|        0|
+----+-----+-----+--------+---------+---------+

climate:
+----------+--------+----------+--------+
|start year|end year|phenomenon|severity|
+----------+--------+----------+--------+
|         0|       0|         0|       0|
+----------+--------+----------+--------+

summary:
+-----------+---+---+---+---+---+---+---+---+---+---------+
|Ano/Estados| AC| AM| AP| MA| MT| PA| RO| RR| TO|AMZ LEGAL|
+-----------+---+---+---+---+---+---+---+---+---+---------+
|          0|  0|  0|  0|  0|  0|  0|  0|  0|  0|        0|
+-----------+---+---+---+---+---+---+---+---+---+---------+



In [None]:
# Expandir anos dos fenômenos climáticos
climate = climate.withColumn(
    "year", F.explode(F.sequence(F.col("start year"), F.col("end year")))
).drop("start year", "end year")

climate.show(5)

+----------+--------+----+
|phenomenon|severity|year|
+----------+--------+----+
|   La Nina|    Weak|2008|
|   La Nina|    Weak|2009|
|   La Nina|  Strong|1999|
|   La Nina|  Strong|2000|
|   La Nina|    Weak|2005|
+----------+--------+----+
only showing top 5 rows



In [None]:
# Padronizar tipos e colunas
from pyspark.sql.functions import col, when

summary = summary.withColumnRenamed("Ano/Estados", "year")

summary_long = summary.select(
    "year",
    *[F.struct(F.lit(col).alias("state"), F.col(col).alias("total_fires")).alias(col) for col in summary.columns if col not in ["year", "AMZ LEGAL"]]
).select("year", *summary.columns[1:-1])

# Transformar wide para long (explode)
summary_long = summary.selectExpr("year", "stack(9, 'AC', AC, 'AM', AM, 'AP', AP, 'MA', MA, 'MT', MT, 'PA', PA, 'RO', RO, 'RR', RR, 'TO', TO) as (state, summary_fires)")

# padronizando o nome dos estados para ficar igual a tabela fires
summary_long = summary_long.withColumn(
    "state",
    when(col("state") == "AC", "ACRE")
    .when(col("state") == "AM", "AMAZONAS")
    .when(col("state") == "AP", "AMAPA")
    .when(col("state") == "MA", "MARANHAO")
    .when(col("state") == "MT", "MATO GROSSO")
    .when(col("state") == "PA", "PARA")
    .when(col("state") == "RO", "RONDONIA")
    .when(col("state") == "RR", "RORAIMA")
    .when(col("state") == "TO", "TOCANTINS")
    .otherwise(col("state"))
)

summary_long.show(5)

+----+-----------+-------------+
|year|      state|summary_fires|
+----+-----------+-------------+
|2009|       ACRE|          167|
|2009|   AMAZONAS|          405|
|2009|      AMAPA|           70|
|2009|   MARANHAO|          828|
|2009|MATO GROSSO|         1049|
+----+-----------+-------------+
only showing top 5 rows



In [None]:
# Agregar focos por ano/estado
fires_agg = fires.groupBy("year", "state").agg(F.sum("firespots").alias("total_fires"))

fires_agg.show(5)

+----+-----------+-----------+
|year|      state|total_fires|
+----+-----------+-----------+
|2014|MATO GROSSO|      15677|
|2004|MATO GROSSO|      70422|
|2002|   AMAZONAS|      10203|
|2009|      AMAPA|       2456|
|2018|  TOCANTINS|        281|
+----+-----------+-----------+
only showing top 5 rows



In [None]:
# fazer a junção
fires_climate = fires_agg.join(climate, on="year", how="left")
fires_final = fires_climate.join(summary_long, on=["year", "state"], how="left")

fires_final.show(25)

+----+-----------+-----------+----------+-----------+-------------+
|year|      state|total_fires|phenomenon|   severity|summary_fires|
+----+-----------+-----------+----------+-----------+-------------+
|2014|MATO GROSSO|      15677|   El Nino|       Weak|         1075|
|2004|MATO GROSSO|      70422|   El Nino|       Weak|        11814|
|2002|   AMAZONAS|      10203|   El Nino|   Moderate|         NULL|
|2009|      AMAPA|       2456|   El Nino|   Moderate|           70|
|2009|      AMAPA|       2456|   La Nina|       Weak|           70|
|2018|  TOCANTINS|        281|   La Nina|       Weak|           25|
|2018|  TOCANTINS|        281|   El Nino|       Weak|           25|
|2016|       ACRE|       7684|   La Nina|       Weak|          372|
|2016|       ACRE|       7684|   El Nino|Very Strong|          372|
|2012|       ACRE|       4720|   La Nina|   Moderate|          305|
|2001|   AMAZONAS|       1297|   La Nina|       Weak|         NULL|
|2018|   AMAZONAS|      11446|   La Nina|       

In [None]:
# criar colunas derivadas
# intensidade de fogo
from pyspark.sql.functions import col, when, avg
from pyspark.sql.window import Window

# Calcular os quartis
quantiles = fires_final.approxQuantile("total_fires", [0.25, 0.75], 0.01)
q1 = quantiles[0]
q3 = quantiles[1]

# Criar coluna fire_level com base nos quartis
fires_final = fires_final.withColumn(
    "fire_level",
    when(col("total_fires") <= q1, "Low")
    .when((col("total_fires") > q1) & (col("total_fires") <= q3), "Medium")
    .otherwise("High")
)

# Média de focos por estado (sem considerar o ano atual)
window_spec = Window.partitionBy("state")

# Cria a coluna fire_anomaly que mostra se houve menos ou mais queimadas em relação com a média
fires_final = fires_final.withColumn(
    "avg_state_fires", avg("total_fires").over(window_spec)
).withColumn(
    "fire_anomaly", col("total_fires") - col("avg_state_fires")
)

fires_final.show(5)

+----+-----+-----------+----------+-----------+-------------+----------+-----------------+------------------+
|year|state|total_fires|phenomenon|   severity|summary_fires|fire_level|  avg_state_fires|      fire_anomaly|
+----+-----+-----------+----------+-----------+-------------+----------+-----------------+------------------+
|2016| ACRE|       7684|   La Nina|       Weak|          372|    Medium|6214.757575757576| 1469.242424242424|
|2016| ACRE|       7684|   El Nino|Very Strong|          372|    Medium|6214.757575757576| 1469.242424242424|
|2012| ACRE|       4720|   La Nina|   Moderate|          305|    Medium|6214.757575757576|-1494.757575757576|
|2005| ACRE|      15993|   El Nino|       Weak|          592|      High|6214.757575757576| 9778.242424242424|
|2005| ACRE|      15993|   La Nina|       Weak|          592|      High|6214.757575757576| 9778.242424242424|
+----+-----+-----------+----------+-----------+-------------+----------+-----------------+------------------+
only showi

In [None]:
# Agregando por ano e estado
from pyspark.sql.functions import avg, sum

fires_agg_df = fires_final.groupBy("year", "state").agg(
    avg("total_fires").alias("avg_total_fires"),
    avg("summary_fires").alias("avg_summary_fires"),
    avg("fire_anomaly").alias("avg_fire_anomaly")
)
fires_agg_df.show()


+----+-----+---------------+-----------------+------------------+
|year|state|avg_total_fires|avg_summary_fires|  avg_fire_anomaly|
+----+-----+---------------+-----------------+------------------+
|2016| ACRE|         7684.0|            372.0| 1469.242424242424|
|2012| ACRE|         4720.0|            305.0|-1494.757575757576|
|2005| ACRE|        15993.0|            592.0| 9778.242424242424|
|2010| ACRE|         8661.0|            259.0| 2446.242424242424|
|2001| ACRE|          829.0|             NULL|-5385.757575757576|
|2018| ACRE|         6626.0|            444.0|  411.242424242424|
|2014| ACRE|         4398.0|            309.0|-1816.757575757576|
|2007| ACRE|         8549.0|            184.0| 2334.242424242424|
|2004| ACRE|         7271.0|            728.0| 1056.242424242424|
|2000| ACRE|          430.0|             NULL|-5784.757575757576|
|1999| ACRE|          347.0|             NULL|-5867.757575757576|
|2002| ACRE|         7985.0|             NULL| 1770.242424242424|
|2013| ACR

In [None]:
# (Análise exploratória 1) - Evolução Anual das Queimadas por Estado
from pyspark.sql.functions import avg

fires_by_year_state = fires_agg_df.groupBy("year", "state").agg(
    avg("avg_total_fires").alias("avg_total_fires")
).orderBy("year", "state")

fires_by_year_state.show()

+----+-----------+---------------+
|year|      state|avg_total_fires|
+----+-----------+---------------+
|1999|       ACRE|          347.0|
|1999|      AMAPA|          101.0|
|1999|   AMAZONAS|         1048.0|
|1999|   MARANHAO|         4136.0|
|1999|MATO GROSSO|        28538.0|
|1999|       PARA|        20478.0|
|1999|   RONDONIA|         7121.0|
|1999|    RORAIMA|          220.0|
|1999|  TOCANTINS|          869.0|
|2000|       ACRE|          430.0|
|2000|      AMAPA|          253.0|
|2000|   AMAZONAS|          857.0|
|2000|   MARANHAO|         4500.0|
|2000|MATO GROSSO|        17242.0|
|2000|       PARA|        18201.0|
|2000|   RONDONIA|         5505.0|
|2000|    RORAIMA|          362.0|
|2000|  TOCANTINS|          818.0|
|2001|       ACRE|          829.0|
|2001|      AMAPA|         1300.0|
+----+-----------+---------------+
only showing top 20 rows



In [None]:
# (Análise exploratória 2) Estados com Maiores e Menores Anomalias de Queimadas
from pyspark.sql.functions import avg

anomaly_by_state = fires_agg_df.groupBy("state").agg(
    avg("avg_fire_anomaly").alias("avg_fire_anomaly")
).orderBy("avg_fire_anomaly", ascending=False)

anomaly_by_state.show()


+-----------+-------------------+
|      state|   avg_fire_anomaly|
+-----------+-------------------+
|MATO GROSSO| 1669.8484848484852|
|   MARANHAO|  263.5411255411259|
|   RONDONIA| 248.30735930735906|
|       PARA| 176.61904761904762|
|    RORAIMA| 49.038961038961176|
|      AMAPA|  41.46320346320334|
|  TOCANTINS| 31.969696969696997|
|       ACRE|-192.32900432900456|
|   AMAZONAS| -383.5584415584416|
+-----------+-------------------+



In [None]:
# (Análise exploratória 3) Impacto dos fenômenos climáticos nas queimadas por estado ao longo do tempo
from pyspark.sql.functions import avg, count, when

# Filtrar dados com fenômeno não nulo
df_phenomenon = fires_final.filter(fires_final.phenomenon.isNotNull())

# Calcular média de queimadas por estado, ano e fenômeno
fires_phenomenon_summary = df_phenomenon.groupBy("state", "year", "phenomenon") \
    .agg(
        avg("total_fires").alias("avg_total_fires"),
    ) \
    .orderBy("state", "year")

fires_phenomenon_summary.show(20)

+-----+----+----------+---------------+
|state|year|phenomenon|avg_total_fires|
+-----+----+----------+---------------+
| ACRE|1999|   La Nina|          347.0|
| ACRE|2000|   La Nina|          430.0|
| ACRE|2001|   La Nina|          829.0|
| ACRE|2002|   El Nino|         7985.0|
| ACRE|2003|   El Nino|        10523.0|
| ACRE|2004|   El Nino|         7271.0|
| ACRE|2005|   El Nino|        15993.0|
| ACRE|2005|   La Nina|        15993.0|
| ACRE|2006|   El Nino|         6198.0|
| ACRE|2006|   La Nina|         6198.0|
| ACRE|2007|   El Nino|         8549.0|
| ACRE|2007|   La Nina|         8549.0|
| ACRE|2008|   La Nina|         5699.0|
| ACRE|2009|   La Nina|         3511.0|
| ACRE|2009|   El Nino|         3511.0|
| ACRE|2010|   La Nina|         8661.0|
| ACRE|2010|   El Nino|         8661.0|
| ACRE|2011|   La Nina|         3191.0|
| ACRE|2012|   La Nina|         4720.0|
| ACRE|2014|   El Nino|         4398.0|
+-----+----+----------+---------------+
only showing top 20 rows



In [None]:
# (Análise exploratória 4) correlação entre Severidade dos Fenômenos Climáticos e Nível de Queimadas
from pyspark.sql.functions import count, collect_set

df_severity_fire_level = fires_final.filter("severity IS NOT NULL") \
    .groupBy("severity", "fire_level").agg(
    count("*").alias("occurrences")
).orderBy("severity", "fire_level")


df_phenomenon_by_severity = fires_final.filter("severity IS NOT NULL") \
    .groupBy("severity") \
    .agg(collect_set("phenomenon").alias("phenomena"))

df_phenomenon_by_severity.show(truncate=False)

df_severity_fire_level.show()

+-----------+------------------+
|severity   |phenomena         |
+-----------+------------------+
|Strong     |[La Nina]         |
|Very Strong|[El Nino]         |
|Weak       |[El Nino, La Nina]|
|Moderate   |[El Nino, La Nina]|
+-----------+------------------+

+-----------+----------+-----------+
|   severity|fire_level|occurrences|
+-----------+----------+-----------+
|   Moderate|      High|         14|
|   Moderate|       Low|         10|
|   Moderate|    Medium|         30|
|     Strong|      High|         13|
|     Strong|       Low|         20|
|     Strong|    Medium|         21|
|Very Strong|      High|          5|
|Very Strong|       Low|          2|
|Very Strong|    Medium|         11|
|       Weak|      High|         44|
|       Weak|       Low|         39|
|       Weak|    Medium|         79|
+-----------+----------+-----------+



In [None]:
# (Análise exploratória 5) Detecção de Outliers na Média de Focos de Incêndio por Estado e Ano utilizando quartis
from pyspark.sql.functions import col, percentile_approx

# Coletar Q1 e Q3
q1, q3 = fires_agg_df.approxQuantile("avg_total_fires", [0.25, 0.75], 0.01)
iqr = q3 - q1

# Definir limites de outliers
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filtrar anos outliers
df_outliers = fires_agg_df.filter(
    (col("avg_total_fires") < lower_bound) | (col("avg_total_fires") > upper_bound)
)

df_outliers.select("year", "state", "avg_total_fires", "avg_fire_anomaly") \
    .orderBy("avg_total_fires", ascending=False) \
    .show(40)

+----+-----------+---------------+------------------+
|year|      state|avg_total_fires|  avg_fire_anomaly|
+----+-----------+---------------+------------------+
|2002|       PARA|       106849.0|           62155.0|
|2002|MATO GROSSO|        79680.0| 52998.51515151515|
|2004|       PARA|        74214.0|           29520.0|
|2005|       PARA|        71477.0|           26783.0|
|2004|MATO GROSSO|        70422.0| 43740.51515151515|
|2007|       PARA|        68491.0|           23797.0|
|2010|       PARA|        57196.0|           12502.0|
|2006|       PARA|        55840.0|           11146.0|
|2005|MATO GROSSO|        53489.0|26807.515151515152|
|2003|       PARA|        53040.0|            8346.0|
|2007|MATO GROSSO|        52399.0|25717.515151515152|
|2003|MATO GROSSO|        50713.0|24031.515151515152|
|2017|       PARA|        49770.0|            5076.0|
|2008|       PARA|        48449.0|            3755.0|
|2015|       PARA|        43164.0|           -1530.0|
|2009|       PARA|        41

In [None]:
# salvando os dataframes que serão usados no loocker studio
from google.colab import drive
drive.mount('/content/drive')

fires_by_year_state.coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/drive/MyDrive/queimadas/fires_state_year")

anomaly_by_state.coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/drive/MyDrive/queimadas/fire_by_phenomenon")

fires_phenomenon_summary.coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/drive/MyDrive/queimadas/severity_phenomenon")

df_severity_fire_level.coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/drive/MyDrive/queimadas/severity_fire")

df_outliers.coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/drive/MyDrive/queimadas/df_outliers")

fires_final.coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/drive/MyDrive/queimadas/fires_final")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Finalizando sessão com o spark
spark.stop()