# Banco Mundial: préstamos a países en desarrollo

Detalles sobre la organización en: https://www.worldbank.org/en/about/what-we-do

### Disponible en Kaggle en:
https://www.kaggle.com/theworldbank/world-banks-major-contracts

El Banco Mundial es una organización internacional que se fundó en 1944 para reconstruir Europa después de la Segunda Guerra Mundial. Es uno de una variedad de organizaciones que ayudan a dar forma y equilibrar la economía mundial. Hoy, su función principal es combatir la pobreza ofreciendo asistencia para el desarrollo a países del tercer mundo de ingresos medios y bajos.
Según el propio Banco Mundial, la organización tiene dos objetivos muy específicos para 2030:
* Poner fin a la pobreza extrema disminuyendo el porcentaje de personas que viven con menos de `$`1.90 por día a no más del 3% 
* Promover la prosperidad compartida fomentando el crecimiento de los ingresos del 40% inferior para cada país.

### Variables y significado

1. As of Date: 6-Sep-17, date when the file was generated. 
2. Fiscal Year: año fiscal en el que se ha llevado a cabo el préstamo.
3. Region: área geográfica del país al que se le ha concedido el préstamo.
4. Borrower Country: país al que se le ha concedido el préstamo.
5. Borrower Country Code: código de dicho país.
6. Project ID: código interno del préstamo.
7. Project Name: nombre del proyecto que se llevará a cabo.
8. Procurement Type: área para la cual se utilizará el préstamo.
9. Procurement Category: ídem al anterior pero con una categorización más amplia.
10. Procurement Method: tipo de adjudicación.
11. Product Line: área interna a la que se le ha concedido el préstamo.
12. Major Sector: economic sector económico al que se destinará.
13. WB Contract Number: número interno de contrato.
14. Contract Description: descripción.
15. Contract Signing Date: fecha en la que el contrato se oficializó.
16. Supplier: proveedor principal que trabaja en el proyecto.
17. Supplier Country: país del proveedor principal.
18. Supplier Country Code: código del país del proveedor.
19. Total Contract Amount (USD): cantidad total en dólares.
20. Borrower Contract Reference Number: número de referencia del contrato.

**Nombre completo del alumno: Miguel Benayas Penas**  

**INSTRUCCIONES**: en cada celda debes responder a la pregunta formulada, asegurándote de que el resultado queda guardado en la(s) variable(s) que por defecto vienen inicializadas a `None`. No se necesita usar variables intermedias, pero puedes hacerlo siempre que el resultado final del cálculo quede guardado exactamente en la variable que venía inicializada a None (debes reemplazar None por la secuencia de transformaciones necesarias, pero nunca cambiar el nombre de esa variable). **No olvides borrar la línea *raise NotImplementedError()* de cada celda cuando hayas completado la solución de esa celda y quieras probarla**.

Después de cada celda evaluable verás una celda con código. Ejecútala (no modifiques su código) y te dirá si tu solución es correcta o no. En caso de ser correcta, se ejecutará correctamente y no mostrará nada, pero si no lo es mostrará un error. Además de esas pruebas, se realizarán algunas más (ocultas) a la hora de puntuar el ejercicio, pero evaluar dicha celda es un indicador bastante fiable acerca de si realmente has implementado la solución correcta o no. Asegúrate de que, al menos, todas las celdas indican que el código es correcto antes de enviar el notebook terminado.

### Sobre el dataset anterior (Major_Contract_Awards.csv) se pide:

**(1 punto)** Ejercicio 1

* Leerlo tratando de que Spark infiera el tipo de dato de cada columna, y **cachearlo**. 
* Puesto que existen columnas que contienen una coma enmedio del valor, en esos casos los valores vienen entre comillas dobles. Spark ya contempla esta posibilidad y puede leerlas adecuadamente **si al leer le indicamos las siguientes opciones adicionales** además de las que ya sueles usar: `.option("quote", "\"").option("escape", "\"")`.
* Asegúrate de que las **filas que no tienen el formato correcto sean descartadas**, indicando también la opción `mode` con el valor `DROPMALFORMED` como vimos en clase.

In [1]:
# LÍNEA EVALUABLE, NO RENOMBRAR LAS VARIABLES
contractsDF = spark.read.option('inferSchema','true')\
                        .option("header", "true")\
                        .option("quote", "\"")\
                        .option("escape", "\"")\
                        .option("mode", "DROPMALFORMED")\
                        .csv("gs://bucketmbp/notebooks/jupyter/Major_Contract_Awards.csv")\
                        .cache()

# Sustituye None por las operaciones adecuadas
# YOUR CODE HERE
# raise NotImplementedError

In [2]:
from pyspark.sql.types import DoubleType
assert(contractsDF.count() == 148515)

**(1 punto)** Ejercicio 2

* La columna **Total Contract Amount (USD)** es en realidad numérica, pero todas las cantidades incluyen el signo `$` por lo que Spark la reconoce como string. Para corregir este comportamiento, vamos a eliminar el `$` de todas las filas utilizando la función `F.regexp_replace("Total Contract Amount (USD)", "\$", "")` donde `"\$"` es el string que queremos reemplazar (hay que escaparlo poniendo `\` delante porque sino el `$` se interpreta como un carácter especial), y siendo el nuevo string el string vacío, `""`. Esta función pertenece al paquete `pyspark.sql.functions`, por lo que ya funciona de manera distribuida, y devuelve como resultado un objeto columna transformado. 

* Aplica esta función dentro de la función `withColumn` para **reemplazar** la columna `Total Contract Amount (USD)` ya existente por la columna devuelta por `regexp_replace`. La manera de utilizarla es totalmente análoga a la utilización de, por ejemplo, la función `F.when` dentro de `withColumn`. 
* Aprovecha también para hacer un casting del objeto columna devuelto por regexp_replace, que es una columna de strings, a una columna de enteros: `F.regexp_replace(...).cast(...)`. Almacena el DF resultante en la variable `contractsDFenteros`, **cachéala** y utilízala a partir de este momento para trabajar en las celdas posteriores, salvo que la celda indique lo contrario.

In [3]:
# No olvides los imports que necesites...
# LÍNEAS EVALUABLES, NO RENOMBRAR LAS VARIABLES
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

contractsDFenteros = contractsDF.withColumn("Total Contract Amount (USD)",\
                     F.regexp_replace("Total Contract Amount (USD)", "\$", "")\
                     .cast(IntegerType()))\
                     .cache()
                     
# YOUR CODE HERE
#raise NotImplementedError

In [4]:
from pyspark.sql.types import IntegerType
assert(len(contractsDFenteros.columns) == len(contractsDF.columns))
assert(contractsDFenteros.count() == contractsDF.count())
assert(contractsDFenteros.schema["Total Contract Amount (USD)"].dataType == IntegerType())

**(1 punto)** Ejercicio 3

* Partiendo de `contractsDFenteros`, crear un nuevo DF donde la columna "Region" sea reemplazada por otra con mismo nombre, de tipo string en la que todos los valores de la columna original (LATIN AMERICA AND CARIBBEAN, SOUTH ASIA, OTHER ... etc) estén traducidos al español. Puedes elegir la traducción que más te guste, pero debe mantenerse el mismo número de categorías que ya había, que eran siete.
* El evaluador oculto comprobará que sigue habiendo el mismo número de ejemplos en cada categoría con el nuevo nombre, y que las categorías efectivamente se han traducido (ninguna se debe llamar igual que antes). Puedes cambiar AFRICA por África.

In [5]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES

# ¿Cuáles son las categorías para 'Region'?
# contractsDFenteros.select('Region').distinct().collect() 

contractsTranslatedDF = contractsDFenteros.withColumn("Region",\
                        F.when(F.col("Region")=='LATIN AMERICA AND CARIBBEAN' , 'latinoamerica' )\
                        .when(F.col("Region")=='SOUTH ASIA' , 'surasia')\
                        .when(F.col("Region")=='AFRICA' , 'africa')\
                        .when(F.col("Region")=='MIDDLE EAST AND NORTH AFRICA' , \
                              'oriente medio y norafrica')\
                        .when(F.col("Region")=='EAST ASIA AND PACIFIC' , 'asia-pacifico')\
                        .when(F.col("Region")=='EUROPE AND CENTRAL ASIA' , 'europa y asia central')\
                        .otherwise('otros'))  
                                                                                                 
# YOUR CODE HERE
#raise NotImplementedError

In [6]:
assert(contractsTranslatedDF.select("Region").distinct().count() == 7)
g1 = contractsDF.groupBy("Region").count().withColumnRenamed("Region", "R1")
g2 = contractsTranslatedDF.groupBy("Region").agg(F.count("*").alias("c2"))
joinedDF = g1.join(g2, F.col("count") == g2.c2)
assert(joinedDF.count() == 7)
assert(joinedDF.where(F.col("count") == F.col("c2")).count() == 7)
assert(joinedDF.where(F.col("R1") == F.col("Region")).count() == 0)

**(1 punto)** Ejercicio 4

* Partiendo de `contractsTranslatedDF`, crear un nuevo DataFrame de **una sola fila** que contenga, **por este orden de columnas**, el **número** de categorías distintas existentes en cada una de las columnas `Procurement Type`, `Procurement Category` y `Procurement Method`. Pista: crear cada una de estas tres columnas al vuelo con `select`(). Renombrar cada columna de conteo para que se llame igual que la propia columna que estamos contando.

In [7]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
from pyspark.sql.types import IntegerType,StructType,StructField,StringType
from pyspark.sql import Row

pt_count = contractsDFenteros.select('Procurement Type').distinct().count()
pc_count = contractsDFenteros.select('Procurement Category').distinct().count()
pm_count = contractsDFenteros.select('Procurement Method').distinct().count()

schema = StructType([StructField('Procurement Type', IntegerType()),\
                     StructField('Procurement Category',IntegerType()),\
                    StructField('Procurement Method',IntegerType())])

rows = [Row(pt_count,pc_count,pm_count)]
numeroCategoriasDF = spark.createDataFrame(rows, schema)



# YOUR CODE HERE
#raise NotImplementedError

In [8]:
assert(len(numeroCategoriasDF.columns) == 3)
assert(numeroCategoriasDF.count() == 1)
categorias = numeroCategoriasDF.collect()[0]
assert(categorias["Procurement Type"] == 60)
assert(categorias["Procurement Category"] == 5)
assert(categorias["Procurement Method"] == 18)

**(3 puntos)** Ejercicio 5

* Partiendo de `contractsDFenteros` definido anteriormente, crear un **pipeline** formado por dos etapas: un indexador de la columna categórica `Procurement Method` y un discretizador (bucketizer) de la columna numérica que habíamos convertido a entero al principio, `Total Contract Amount (USD)`, de manera que sea convertida en una columna de números reales empezando en 0 y cuya parte decimal siempre sea 0.
* Para el indexador, si una vez entrenado le llegasen etiquetas que no ha visto antes, deberá eliminar esas filas (recordar la opción adecuada que vimos en clase). La columna de salida debe llamarse `ProcurementIndexed`.
* Para el discretizador, utilizar como puntos de corte los siguientes: (-Inf, 0, 100000, 200000, 300000, 400000, Inf). La columna de salida debe llamarse `TotalDiscretized`.
* Una vez creados ambos, componerlos para crear un Pipeline, y aplicarlo a `contractsDFenteros` para entrenar y a continuación también para predecir (es decir, transformarlo). El DF resultante de la transformación debe almacenarse en la variable `contractsTransformedDF`

In [9]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
# imports necesarios..........
from pyspark.ml.feature import StringIndexer, Bucketizer
from pyspark.ml import Pipeline, PipelineModel

splits = [-float("inf"), 0, 100000, 200000, 300000, 400000, float("inf")]

bucketizer = bucketizer = Bucketizer(splits = splits, inputCol = "Total Contract Amount (USD)",\
                                     outputCol = "TotalDiscretized")
indexer = StringIndexer(inputCol = "Procurement Method",outputCol = "ProcurementIndexed",\
                        handleInvalid='skip')

pipeline = Pipeline().setStages([indexer, bucketizer])

pipelinemodel = pipeline.fit(contractsDFenteros)
contractsTransformedDF = pipelinemodel.transform(contractsDFenteros)

# YOUR CODE HERE
#raise NotImplementedError

In [10]:
from pyspark.sql import functions as F
assert("TotalDiscretized" in contractsTransformedDF.columns)
assert("ProcurementIndexed" in contractsTransformedDF.columns)
assert(len(contractsTransformedDF.columns) == len(contractsDFenteros.columns) + 2)
assert(bucketizer in pipeline.getStages())
assert(indexer in pipeline.getStages())
from pyspark.sql.types import DoubleType
assert(contractsTransformedDF.schema["TotalDiscretized"].dataType == DoubleType())
assert(contractsTransformedDF.schema["ProcurementIndexed"].dataType == DoubleType())
assert(bucketizer.getSplits() == [-float("Inf"), 0, 100000, 200000, 300000, 400000, float("Inf")])
assert(bucketizer.getInputCol() == "Total Contract Amount (USD)")
assert(bucketizer.getOutputCol() == "TotalDiscretized")
assert(indexer.getInputCol() == "Procurement Method")
assert(indexer.getOutputCol() == "ProcurementIndexed")
assert(indexer.getHandleInvalid() == "skip")

**(3 puntos)** Ejercicio 6

* Añadir una nueva columna al DF `contractsDFenteros` llamada `Total Pais` (sin tilde) que contenga en cada fila el **importe total prestado al país (Borrower Country Code) de esa fila (es decir, el total del país al que corresponde cada proyecto)**. El nuevo DF debe tener el mismo número de filas y una columna más. **NO DEBE UTILIZARSE JOIN sino funciones de ventana** con una ventana por país, que debe almacenarse en la variable `paisWindow`. El resultado sería equivalente a una agrupación por países y agregación de suma de importe, y después juntar cada total del país a cada celda (como si fuese un join por el país), pero **todo esto debe hacerse exclusivamente con ventanas y sin usar JOIN**.
* Una vez hecho esto, añade una segunda columna nueva llamada `Porcentaje Pais` (sin tilde) que contenga el **porcentaje** que ha supuesto cada proyecto sobre el total destinado a ese país (dicho total ha sido calculado en el punto anterior). El porcentaje no debe ir en tanto por 1 sino en tanto por 100.
* Añadir una tercera columna llamada `Media Pais` (sin tilde) que contenga en cada fila **el importe medio destinado a los proyectos del país al que corresponde el proyecto**. Debe utilizarse la misma ventana definida en el primer apartado, cambiando solo la función de agregación aplicada.
* Añadir una cuarta columna llamada `Diff Porcentaje` que sea la diferencia, medida en porcentaje, entre el importe destinado al proyecto y el importe medio de un proyecto en ese país. Debe calcularse operando con las columnas existentes, restando a la columna `Total Contract Amount (USD)` el importe de `Media Pais`, dividiendo entre esta última y multiplicando por 100, **sin utilizar** en ningún caso la función `when`. Una diferencia positiva indicará que ese proyecto ha recibido más fondos que la media de los proyectos de ese país, y una diferencia negativa indicará lo contrario.
* El DF resultante debe almacenarse en una variable `porcentajesDF` y debe tener el mismo número de filas que `contractsDFenteros`.

In [11]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
# imports necesarios..........
from pyspark.sql import Window

paisWindow = Window().partitionBy("Borrower Country Code")

# nueva columna 1/4
porcentajesDF = contractsDFenteros.withColumn("Total Pais",\
                F.sum("Total Contract Amount (USD)").over(paisWindow))   

# nueva columna 2/4
porcentajesDF = porcentajesDF.withColumn("Porcentaje Pais",\
                100*F.col("Total Contract Amount (USD)")/F.col("Total Pais") )

# nueva columna 3/4
porcentajesDF = porcentajesDF.withColumn("Media Pais",\
                F.mean("Total Contract Amount (USD)").over(paisWindow)) 

# nueva columna 4/4
porcentajesDF = porcentajesDF.withColumn("Diff Porcentaje",\
         100*(F.col("Total Contract Amount (USD)") - F.col("Media Pais"))/ F.col("Media Pais"))

# YOUR CODE HERE
#raise NotImplementedError

In [12]:
assert("Total Pais" in porcentajesDF.columns)
assert("Porcentaje Pais" in porcentajesDF.columns)
r = porcentajesDF.where("`Project ID` = 'P069947'").head()
assert(r["Total Pais"] == 70485209)
assert(r["Porcentaje Pais"] - 25.985824912571374 < 0.001)
assert(r["Media Pais"] - 597332.279661017 < 0.001)
assert(r["Diff Porcentaje"] - 2966.3273396834217 < 0.001)