# Wildfire Analysis

In this Jupyter Notebook we present an analysis of brazilian wildfires.

First, we import all needed libraries and declare some defined values.

In [1]:
import random
import os
from os import listdir

import seaborn as sns

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F

from ipywidgets import interact, widgets

DATA_PATH = './data'
DATA_FILE = './data.zip'
KUDU_MASTER = 'kudu-master-1:7051'

Importing the `Kudu-Spark` connector from Cloudera.

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.kudu:kudu-spark3_2.12:1.13.0.7.1.5.17-1 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/ pyspark-shell'

Initializing *Spark*.

In [3]:
spark = SparkSession.builder.config('spark.packages', 'org.apache.kudu:kudu-spark3_2.12:1.13.0.7.1.5.17-1').getOrCreate()
sc = SparkContext.getOrCreate()
sc.setLogLevel('OFF')

https://repository.cloudera.com/artifactory/cloudera-repos/ added as a remote repository with the name: repo-1


:: loading settings :: url = jar:file:/usr/local/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.kudu#kudu-spark3_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ba54f106-369d-424a-b314-cf2aa187bf2c;1.0
	confs: [default]
	found org.apache.kudu#kudu-spark3_2.12;1.13.0.7.1.5.17-1 in repo-1
:: resolution report :: resolve 247ms :: artifacts dl 8ms
	:: modules in use:
	org.apache.kudu#kudu-spark3_2.12;1.13.0.7.1.5.17-1 from repo-1 in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-ba54f

Importing the Kudu datastore table.

In [4]:
def read_from_table(name):
    table = spark.read.option('kudu.master', KUDU_MASTER).option('kudu.table', f'impala::default.{name}').format('kudu').load()
    table.createOrReplaceTempView(name)

    return table

In [5]:
def write_to_table(df, name):
    df.write.option('kudu.master', KUDU_MASTER).option('kudu.table', f'impala::default.{name}').mode('append').format('kudu').save()

In [6]:
def persist_df(df, name):
    write_to_table(df, name)

    return read_from_table(name)

Importing the data.

In [7]:
!unzip -n {DATA_FILE} -d {DATA_PATH}

Archive:  ./data.zip


In [8]:
data = None

print(f'loading data files from {DATA_FILE}')

for file in listdir(DATA_PATH):
    if not file.endswith('.csv'):
        continue

    print(f'... {file}')
    tmp = spark.read.csv(f'{DATA_PATH}/{file}', header='true', inferSchema='true')
    data = data.union(tmp) if data else tmp

print('done')

loading data files from ./data.zip
... Focos_2010-01-01_2010-12-31.csv


                                                                                

... Focos_2011-01-01_2011-12-31.csv


                                                                                

... Focos_2012-01-01_2012-12-31.csv


                                                                                

... Focos_2013-01-01_2013-12-31.csv
... Focos_2014-01-01_2014-12-31.csv


                                                                                

... Focos_2015-01-01_2015-12-31.csv


                                                                                

... Focos_2016-01-01_2016-12-31.csv


                                                                                

... Focos_2017-01-01_2017-12-31.csv


                                                                                

... Focos_2018-01-01_2018-12-31.csv


                                                                                

... Focos_2019-01-01_2019-12-31.csv


                                                                                

... Focos_2020-01-01_2020-12-31.csv
done


                                                                                

Printing the original table schema

In [9]:
data.printSchema()

root
 |-- datahora: string (nullable = true)
 |-- satelite: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- bioma: string (nullable = true)
 |-- diasemchuva: string (nullable = true)
 |-- precipitacao: string (nullable = true)
 |-- riscofogo: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- frp: string (nullable = true)



Changing columns to decimal type.

In [10]:
data = data.withColumn('diasemchuva', data.diasemchuva.cast(DecimalType(10, 5))) \
    .withColumn('precipitacao', data.precipitacao.cast(DecimalType(10, 5))) \
    .withColumn('riscofogo', data.riscofogo.cast(DecimalType(10, 5))) \
    .withColumn('latitude', data.latitude.cast(DecimalType(8, 5))) \
    .withColumn('longitude', data.longitude.cast(DecimalType(8, 5))) \
    .withColumn('frp', data.frp.cast(DecimalType(10, 5)))

[T] Separar o campo *datahora* em mês e ano;

In [11]:
datahora = data.withColumn('datahora_', F.split(data.datahora, ' '))
data = datahora.withColumn('data', F.split(F.element_at(datahora.datahora_, 1), '/'))
ano_mes = data.withColumn('ano', F.element_at(data.data, 1).cast(IntegerType())) \
    .withColumn('mes', F.element_at(data.data, 2).cast(IntegerType()))

data = ano_mes.drop('datahora_').drop('data')

Printing the resulting table schema

In [12]:
data.printSchema()

root
 |-- datahora: string (nullable = true)
 |-- satelite: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- bioma: string (nullable = true)
 |-- diasemchuva: decimal(10,5) (nullable = true)
 |-- precipitacao: decimal(10,5) (nullable = true)
 |-- riscofogo: decimal(10,5) (nullable = true)
 |-- latitude: decimal(8,5) (nullable = true)
 |-- longitude: decimal(8,5) (nullable = true)
 |-- frp: decimal(10,5) (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)



In [13]:
data = persist_df(data, 'queimada')

                                                                                

[T] Calcular quantas queimadas ocorreram em cada bioma por mês;

In [14]:
queimadas_bioma_mes = persist_df(
    data.groupBy('bioma', 'ano', 'mes').count(),
    'queimadas_bioma_mes'
)

                                                                                

[T] Encontrar os maiores FRP por mês em cada estado a partir de 2018;

In [15]:
frp_estado_mes = persist_df(
    data[data.ano >= 2018].groupBy('estado', 'ano', 'mes').agg(F.max('frp')).withColumnRenamed('max(frp)', 'max_frp'),
    'frp_estado_mes'
)

                                                                                

[C] Retornar o ranking dos biomas em que mais houve focos de queimadas em um determinado mês de um ano;

In [16]:
%%time
queimadas_bioma_mes[(queimadas_bioma_mes.mes == 1) & (queimadas_bioma_mes.ano == 2018)].orderBy(F.col('count').desc()).show()

+--------------+----+---+-----+
|         bioma| ano|mes|count|
+--------------+----+---+-----+
|      Amazonia|2018|  1| 1444|
|       Cerrado|2018|  1|  521|
|      Caatinga|2018|  1|  316|
|Mata Atlantica|2018|  1|  209|
|         Pampa|2018|  1|   40|
|      Pantanal|2018|  1|   23|
+--------------+----+---+-----+

CPU times: user 6.39 ms, sys: 310 µs, total: 6.7 ms
Wall time: 431 ms


[C] Retornar o ranking dos meses em que mais ocorreram queimadas por bioma;

In [17]:
def f_queimadas_bioma_mes_rank(x):
    w = Window.partitionBy('bioma').orderBy(F.col('count').desc())
    queimadas_bioma_mes_rank = queimadas_bioma_mes.withColumn('rank', F.row_number().over(w))
    return queimadas_bioma_mes_rank[queimadas_bioma_mes_rank.rank <= x].show(truncate=False)

In [18]:
%%time
f_queimadas_bioma_mes_rank(20)

+--------+----+---+-----+----+
|bioma   |ano |mes|count|rank|
+--------+----+---+-----+----+
|Amazonia|2010|8  |45018|1   |
|Amazonia|2010|9  |43933|2   |
|Amazonia|2017|9  |36569|3   |
|Amazonia|2020|9  |32017|4   |
|Amazonia|2019|8  |30900|5   |
|Amazonia|2015|9  |29326|6   |
|Amazonia|2020|8  |29307|7   |
|Amazonia|2018|9  |24803|8   |
|Amazonia|2012|9  |24067|9   |
|Amazonia|2017|8  |21244|10  |
|Amazonia|2012|8  |20687|11  |
|Amazonia|2014|9  |20522|12  |
|Amazonia|2015|8  |20471|13  |
|Amazonia|2016|9  |20460|14  |
|Amazonia|2014|8  |20113|15  |
|Amazonia|2019|9  |19925|16  |
|Amazonia|2015|10 |19469|17  |
|Amazonia|2016|8  |18340|18  |
|Amazonia|2020|10 |17326|19  |
|Amazonia|2011|9  |16987|20  |
+--------+----+---+-----+----+
only showing top 20 rows

CPU times: user 6.82 ms, sys: 2.88 ms, total: 9.69 ms
Wall time: 858 ms


In [19]:
@interact(x=widgets.IntSlider(min=1, max=30, step=1, value=5, description='yes', continuous_update=False))
def g(x):
    return f_queimadas_bioma_mes_rank(x)

interactive(children=(IntSlider(value=5, continuous_update=False, description='yes', max=30, min=1), Output())…

[C] Calcular o ranking da média mensal de queimadas por bioma a partir do ano *x*;

In [20]:
def f_queimadas_bioma_mes(x):
    return queimadas_bioma_mes[queimadas_bioma_mes.ano >= x].groupBy('bioma').agg(F.mean('count')).orderBy(F.col('avg(count)').desc()).show()

In [21]:
%%time
f_queimadas_bioma_mes(2010)

+--------------+------------------+
|         bioma|        avg(count)|
+--------------+------------------+
|      Amazonia| 7447.568181818182|
|       Cerrado| 5783.295454545455|
|Mata Atlantica|1323.0681818181818|
|      Caatinga|1173.9848484848485|
|      Pantanal| 554.6060606060606|
|         Pampa| 85.52272727272727|
+--------------+------------------+

CPU times: user 3.35 ms, sys: 5.46 ms, total: 8.8 ms
Wall time: 731 ms


In [22]:
@interact(x=widgets.IntSlider(min=2010, max=2020, step=1, value=2015, description='yes', continuous_update=False))
def g(x):
    return f_queimadas_bioma_mes(x)

interactive(children=(IntSlider(value=2015, continuous_update=False, description='yes', max=2020, min=2010), O…

[C] Retornar o ranking dos maiores focos com maiores FRP por estado em um determinado mês de um ano;

In [23]:
%%time
frp_estado_mes[(frp_estado_mes.mes == 1) & (frp_estado_mes.ano == 2018)].orderBy(F.col('max_frp').desc()).show()

+-------------------+----+---+----------+
|             estado| ano|mes|   max_frp|
+-------------------+----+---+----------+
|            RORAIMA|2018|  1|1018.00000|
|  RIO GRANDE DO SUL|2018|  1| 404.70000|
|               PARA|2018|  1| 291.30000|
|            ALAGOAS|2018|  1| 283.70000|
|              BAHIA|2018|  1| 264.10000|
|           MARANHAO|2018|  1| 227.50000|
|          TOCANTINS|2018|  1| 226.40000|
|            PARAIBA|2018|  1| 220.60000|
|        MATO GROSSO|2018|  1| 213.50000|
|              PIAUI|2018|  1| 199.00000|
|              CEARA|2018|  1| 169.40000|
|         PERNAMBUCO|2018|  1| 115.00000|
|RIO GRANDE DO NORTE|2018|  1| 104.90000|
|              AMAPA|2018|  1|  96.90000|
|       MINAS GERAIS|2018|  1|  92.00000|
| MATO GROSSO DO SUL|2018|  1|  84.30000|
|              GOIAS|2018|  1|  83.10000|
|            SERGIPE|2018|  1|  81.40000|
|           AMAZONAS|2018|  1|  72.00000|
|           RONDONIA|2018|  1|  58.90000|
+-------------------+----+---+----

In [24]:
#sc.stop()