# Estudio sobre contaminación mundial

### Análisis de la huella ecológica mundial

Las National Footprint Accounts (NFA) en asociacón de la  Global Footprint Network (www.footprintnetwork.org) combinan cada año más de 30 conjuntos de datos para calcular la Huella Ecológica y la biocapacidad de los países de todo el mundo en más de 50 años.

El objetivo de este compromiso es arrojar resultados de cuánta área se requiere para proporcionar los servicios ecológicos (regeneración de recursos y asimilación de desechos) consumidos por la humanidad ("Huella ecológica"), y cuánta área biológicamente productiva existe para proporcionar estos servicios ecológicos ( "biocapacidad") en cada año. Con ambos valores en la mano, podemos evaluar la sostenibilidad general de los países de todo el mundo y comprender mejor la necesidad colectiva de que la humanidad reduzca su impacto en la naturaleza.

En este estudio vamos a intentar estudiar y predecir la capacidad por hectaréa de biocapacidad y la huella ecológica de estas, en el estudio biocapTotGHA y EFConsTotGHA. También intentaremos hacer estudios de esto mismo pero sobre la poulicón sobre carbón.

Estos datos han sido obtenidos de: https://www.kaggle.com/footprintnetwork/national-footprint-accounts-2018

### Creación del dataset

In [4]:
from pyspark.sql import SparkSession
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf

spark = SparkSession \
    .builder \
    .appName("Entrega - data") \
    .getOrCreate()

Primero vamos a importar el dataframe de ejemplo, la integración directa con kaggle es bastante compleja y por eso hemos optado por importar el dataset a githubusercontent ya que este es relativamente pequeño

In [6]:
%sh
cd /tmp

#  Removemos el fichero antiguo 
if [ -e /tmp/NFA2018.csv]
then
    rm -f /NFA2018.csv
fi

# Descargamos el fichero 
wget https://raw.githubusercontent.com/davidcerezal/deep-learning-examples/master/NFA2018.csv

Ahora vamos a convertir el dataset a un dataframe, limipiaremos también algunas variable redudantes.

In [8]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

dbutils.fs.cp("file:///tmp/NFA2018.csv", "/tmp")

patentData = sqlContext.read.format('csv').option("header", 'true').load("/tmp/NFA2018.csv")

patentData.cache()

patentData = patentData.drop("crop_land")
patentData = patentData.drop("grazing_land")
patentData = patentData.drop("forest_land")
patentData = patentData.drop("fishing_ground")
patentData = patentData.drop("built_up_land")

#Filter the bio Capacity and foot print frames, to join later
patentData_bio = patentData.filter("record == 'BiocapTotGHA'").withColumnRenamed("total","BiocapTotGHA").drop('record').drop('carbon').drop('Percapita GDP (2010 USD)').withColumnRenamed("ISO alpha-3 code","ISO")
patentData_foot = patentData.filter("record == 'EFConsTotGHA'").withColumnRenamed("total","EFConsTotGHA").drop('record').drop('ISO alpha-3 code').drop('Percapita GDP (2010 USD)')

#Join and clean up of duplicated columns
patentData = patentData_bio.join(patentData_foot,(patentData_bio.country == patentData_foot.country) & (patentData_bio.year == patentData_foot.year)).drop(patentData_foot.country).drop(patentData_foot.UN_region).drop(patentData_foot.UN_subregion).drop(patentData_foot.year).drop(patentData_foot.population)

#World data
world_data = patentData.filter(patentData.country == 'World')

#countries Data
patentData = patentData.filter(patentData.country != 'World')

patentData = patentData.withColumn("year_c", patentData.year.cast('double')).drop('year')
patentData = patentData.withColumn("population_c", patentData.population.cast('double')).drop('population')
patentData = patentData.withColumn("BiocapTotGHA_c", patentData.BiocapTotGHA.cast('double')).drop('BiocapTotGHA')
patentData = patentData.withColumn("EFConsTotGHA_c", patentData.EFConsTotGHA.cast('double')).drop('EFConsTotGHA')
patentData = patentData.withColumn("carbon_c", patentData.carbon.cast('double')).drop('carbon')

display(patentData)


### Conocer el dataset
Ahora vamos a conoces un poco nuestro, dataset, ver quien es el mas contaminante o el mas bioético.

In [10]:
#Podemos hacer consultas sobre el desarrollo mundial
display(world_data.select( "year", "BiocapTotGHA", "EFConsTotGHA", "carbon", "population"))

#BiocapTotGHA = Total biocapacity in global hectares (gha)
#EFConsTotGHA = Total Ecological Footprint of consumption in global hectares (gha)

#Podemos observar rápidamente que hemos superado claramente nuestra capacidad de producción 

In [11]:
#Podemos hacer consultas tan geniales como estas
last_year_data = patentData.filter('year_c = 2014')
display(last_year_data.select('ISO', 'BiocapTotGHA_c'))

#Total biocapacity in global hectares (gha)

ISO,BiocapTotGHA_c
ARM,2362550.102
AFG,15942685.87
ALB,3038634.272
DZA,21084114.19
AGO,55489126.23
ATG,84838.93322
ARG,288354829.2
AUS,313020660.3
AUT,25673659.36
BHS,3577704.445


In [12]:
#Total Ecological Footprint of consumption in global hectares (gha)
display(last_year_data.select('ISO', 'EFConsTotGHA_c'))

ISO,EFConsTotGHA_c
ARM,6068198.904
AFG,24424655.11
ALB,6188638.631
DZA,95340572.5
AGO,37767604.8
ATG,380368.67
ARG,158584521.3
AUS,162657858.8
AUT,50049029.1
BHS,1847637.762


In [13]:
#Podemos filtar también por aquellos que no consumen más alla de su capacidad o al contrario
last_year_data.createOrReplaceTempView("last_year_data")
less_countries = spark.sql("""SELECT country, BiocapTotGHA_c - EFConsTotGHA_c as left_cosumption FROM last_year_data WHERE BiocapTotGHA_c > EFConsTotGHA_c ORDER BY left_cosumption DESC""")
display(less_countries)

country,left_cosumption
Brazil,1190032830.6
Canada,256224253.30000007
Russian Federation,187683876.69999996
Australia,150362801.5
"Congo, Democratic Republic of",144204424.16
Bolivia,141670069.07
Argentina,129770307.89999998
Colombia,85767902.97999999
Guyana,50363366.009
Peru,46273693.94


In [14]:
less_countries = patentData.filter("country = 'Brazil' or country = 'Canada' or country = 'Russian Federation' or country = 'Australia'")
less_countries = less_countries.withColumn('left_comsuption', less_countries.BiocapTotGHA_c - less_countries.EFConsTotGHA_c)
display(less_countries.select('country', 'year_c', 'left_comsuption'))

In [15]:
last_year_data.createOrReplaceTempView("last_year_data")
higher_countries = spark.sql("""SELECT country, EFConsTotGHA_c - BiocapTotGHA_c as required_cosumption FROM last_year_data WHERE BiocapTotGHA_c < EFConsTotGHA_c ORDER BY required_cosumption DESC""")
display(higher_countries)

country,required_cosumption
China,3830182480.0
United States of America,1527696779.0
India,865136162.9
Japan,524894092.69000006
Germany,262660677.7
"Korea, Republic of",256868620.19000003
United Kingdom,231217200.85
"Iran, Islamic Republic of",206996437.64
Italy,200707198.09
Saudi Arabia,172629005.57


In [16]:
most_countries = patentData.filter("country = 'China' or country = 'United States of America' or country = 'India' or country = 'Japan'")
most_countries = most_countries.withColumn('req_comsuption', most_countries.EFConsTotGHA_c -  most_countries.BiocapTotGHA_c)
display(most_countries.select('country', 'year_c', 'req_comsuption'))

country,year_c,req_comsuption
India,1961.0,58949792.80000001
India,1962.0,64419644.49999997
India,1963.0,70595253.49999997
India,1964.0,71203838.0
India,1965.0,81276048.89999998
India,1966.0,90686505.39999998
India,1967.0,92324188.9
India,1968.0,93187685.89999998
India,1969.0,93834501.1
India,1970.0,97198475.7


In [17]:
#Podemos definir udf que nos pueden ayudar mucho como por ejemplo en el siguiente caso
spark.udf.register("calculate_percentage", lambda biocap, req: int(((biocap - req)/req*100)) if biocap > req else int(0-((req - biocap)/biocap*100)))

#Países menos agresivos en sus emisiones
percetages = spark.sql("""SELECT country, CONCAT(CAST(calculate_percentage(BiocapTotGHA_c, EFConsTotGHA_c) as int), '%') as percetage FROM last_year_data ORDER BY CAST(calculate_percentage(BiocapTotGHA_c, EFConsTotGHA_c) as int) DESC""")
display(percetages)

country,percetage
French Guiana,3864%
Suriname,2326%
Guyana,2295%
Gabon,846%
Congo,763%
Central African Republic,554%
Bolivia,436%
"Congo, Democratic Republic of",254%
Uruguay,246%
Namibia,212%


In [18]:
#Países más agresivos en sus emisiones
percetages = spark.sql("""SELECT country, CONCAT(CAST(calculate_percentage(BiocapTotGHA_c, EFConsTotGHA_c) as int), '%') as real_percentage FROM last_year_data ORDER BY CAST(calculate_percentage(BiocapTotGHA_c, EFConsTotGHA_c) as int) ASC""")
display(percetages)

country,real_percentage
Singapore,-9886%
Bermuda,-4810%
Réunion,-2817%
Barbados,-2067%
Cayman Islands,-1673%
United Arab Emirates,-1651%
Israel,-1644%
Bahrain,-1529%
Saudi Arabia,-1351%
Cyprus,-1262%


### Creación del modelo

In [20]:
#Vamos a eliminar algunas variables por ser redundantes
patentData = patentData.na.fill(0)
patentData = patentData.drop('country')

#La variable que vamos a intentar predecir
patentData = patentData.withColumnRenamed('BiocapTotGHA_c', 'label')
patentData = patentData.fillna({'label': 0})


#primero podemos empezar creando la partición de train y test, en este caso para conservar las variables de tiempo en las predicciones no lo vamos a hacer aleatoriamente y lo filtraremos por año, dejando los tres últimos para test. 
patentData_train = patentData.filter(patentData.year_c < 2011)
patentData_test = patentData.filter(patentData.year_c > 2011)
display(patentData)

Preparación de los datos para el modelo. Para ellos creamos diferentes stages, con OneHotEncoderEstimator, StringIndexer y VectorAssembler.

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

stages = []
categoricalColumns = ['ISO', 'UN_region', 'UN_subregion']
numericCols = ['year_c', 'population_c', 'EFConsTotGHA_c', 'carbon_c']

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
assembler.setHandleInvalid("keep")
stages += [assembler]

print(stages)


Primero vamos a empezar con un Random Forest Regressor

In [24]:
rf = RandomForestRegressor(featuresCol = 'features', labelCol = 'label')

stages += [rf]
pipeline = Pipeline(stages=stages)

#Seleccionamos los hiperparámetros a buscar
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=3)

cvModel = crossval.fit(patentData_train)

## Evaluación del modelo

vamos a evaluar ahora a los resultados de este.

In [26]:
predictions = cvModel.transform(patentData_test)

predictions = cvModel.transform(patentData_test)
predictions.select('ISO','year_c', 'population_c', 'BiocapTotGHA_c', 'EFConsTotGHA_c', 'carbon_c').show(10)

In [27]:
import numpy as np
import matplotlib.pyplot as plt

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

display(rfResult)

Podemos ver los mejores hiperparámetros que para este ha encontrado

In [29]:
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[6]

print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))