# La organización Kiva de préstamos entre particulares

### Disponible en Kaggle en:
https://www.kaggle.com/kiva/data-science-for-good-kiva-crowdfunding/version/5

El conjunto de datos elegido recoge estadísticas de Kiva entre 2014 y 2017. Kiva abre un nuevo mundo de oportunidades para los menos favorecidos y nos permite a cualquiera de nosotros convertirnos en superhéroes. Se trata de una organización sin ánimo de lucro que ofrece pequeños préstamos para ayudar a las comunidades desatendidas que no tienen acceso a los servicios bancarios normales. Proporciona una plataforma y une a personas que estén dispuestas a prestar, un mínimo de 25 dólares, y prestatarios que expongan sus necesidades, la finalidad del dinero y las condiciones de devolución,  porque no es una donación, es un préstamo.

Al crear este servicio, Kiva habilita una solución que desbloquea capital para todos y mantiene un interés financiero muy bajo para los prestatarios. Por otro lado, permite que cualquiera sea parte de la solución y brinda a las personas un amplio abanico de opciones para elegir quién, dónde, cuánto y para qué sector desea ayudar.

### Variables y significado

Las variables utilizadas para describir cada préstamo son:


* id: ID único para préstamo
* funding_amount: la cantidad desembolsada por Kiva al agente de campo (USD)
* loan_amount: la cantidad desembolsada por el agente de campo al prestatario (USD)
* country_code: código ISO del país en el que se desembolsó el préstamo
* activity: Categoría más granular
* sector: Categoría de alto nivel
* use: Uso exacto del monto del préstamo
* country: Nombre completo del país en el que se desembolsó el préstamo
* region: Nombre completo de la región dentro del país
* currency: La moneda en que se desembolsó el préstamo
* partner_id: ID de la organización asociada
* posted_time: Hora a la que el agente de campo (intermediario) publica el préstamo en Kiva
* disbursed_time: Hora en que el agente de campo (intermediario) entrega el préstamo al beneficiario
* funded_time: El momento en que el préstamo publicado en Kiva es financiado por los prestamistas por completo
* term_in_months: La duración por la cual el préstamo se desembolsó en meses
* lender_count: El número total de prestamistas que contribuyeron a este préstamo.
* tags: Etiquetas para describir el caso específico
* borrower_genders: letras M, F separadas por comas, donde cada instancia representa un solo hombre / mujer en el grupo
* repayment_interval: Estado de pago
* date: fecha en la base de datos de esta operación.

**Nombre completo del alumno:**  

**INSTRUCCIONES**: en cada celda debes responder a la pregunta formulada, asegurándote de que el resultado queda guardado en la(s) variable(s) que por defecto vienen inicializadas a `None`. No se necesita usar variables intermedias, pero puedes hacerlo siempre que el resultado final del cálculo quede guardado exactamente en la variable que venía inicializada a None (debes reemplazar None por la secuencia de transformaciones necesarias, pero nunca cambiar el nombre de esa variable). **No olvides borrar la línea *raise NotImplementedError()* de cada celda cuando hayas completado la solución de esa celda y quieras probarla**.

Después de cada celda evaluable verás una celda con código. Ejecútala (no modifiques su código) y te dirá si tu solución es correcta o no. En caso de ser correcta, se ejecutará correctamente y no mostrará nada, pero si no lo es mostrará un error. Además de esas pruebas, se realizarán algunas más (ocultas) a la hora de puntuar el ejercicio, pero evaluar dicha celda es un indicador bastante fiable acerca de si realmente has implementado la solución correcta o no. Asegúrate de que, al menos, todas las celdas indican que el código es correcto antes de enviar el notebook terminado.

### Sobre el dataset kiva_loans.csv se pide:

**(1 punto)** Ejercicio 1

* Leerlo **sin intentar** que Spark infiera el tipo de dato de cada columna
* Puesto que existen columnas que contienen una coma enmedio del valor, en esos casos los valores vienen entre comillas dobles. Spark ya contempla esta posibilidad y puede leerlas adecuadamente **si al leer le indicamos las siguientes opciones adicionales** además de las que ya sueles usar: `.option("quote", "\"").option("escape", "\"")`.
* Asegúrate de que las **filas que no tienen el formato correcto sean descartadas**, indicando también la opción `mode` con el valor `DROPMALFORMED` como vimos en clase.
* Crear un nuevo DF `kivaRawNoNullDF` en el que se hayan eliminado todas las filas que tengan algún valor nulo en cualquier columna **excepto en la columna tags**, que no será relevante para el análisis y por tanto podemos ignorar sus valores nulos y mantener dichas filas.

In [1]:
import sys
from pyspark.sql import SparkSession
spark = (SparkSession
 .builder
 .appName("ModeloD")
 .getOrCreate())

In [2]:
# LÍNEA EVALUABLE, NO RENOMBRAR LAS VARIABLES
kivaRawDF = spark.read\
                 .option("header", "true")\
                 .option("quote", "\"").option("escape", "\"")\
                 .option("mode", "DROPMALFORMED")\
                 .csv("ModeloD kiva_loans.csv")\
                 .cache()
# Descomentar estas líneas para calcular la lista de columnas que sí deben tenerse en cuenta para quitar nulos. Después
# tendrás que utilizar dicha lista en la operación que elimina los nulos
columnasExceptoTags = kivaRawDF.columns
columnasExceptoTags.remove("tags")
print(columnasExceptoTags)
kivaRawNoNullDF = kivaRawDF.na.drop(subset = columnasExceptoTags)
# YOUR CODE HERE
#raise NotImplementedError

['id', 'funded_amount', 'loan_amount', 'activity', 'sector', 'use', 'country_code', 'country', 'region', 'currency', 'partner_id', 'posted_time', 'disbursed_time', 'funded_time', 'term_in_months', 'lender_count', 'borrower_genders', 'repayment_interval', 'date']


In [3]:
from pyspark.sql.types import DoubleType
assert(kivaRawNoNullDF.count() == 574115)

**(1 punto)** Ejercicio 2

* Las columnas `posted_time` y `disbursed_time` son en realidad instantes de tiempo que Spark debería procesar como timestamp. Partiendo de `kivaRawNoNullDF`, reemplaza **ambas columnas** por su versión convertida a timestamp, utilizando `withColumn` con el mismo nombre de cada columna, y donde el nuevo valor de la columna viene dado por el siguiente código:

        F.from_unixtime(F.unix_timestamp('nombreColumna', 'yyyy-MM-dd HH:mm:ssXXX')).cast("timestamp"))

* Además, convierte a `DoubleType` la columna `loan_amount` y a `IntegerType` la columna `term_in_months`.

* El DF resultante de todas estas operaciones debe quedar almacenado en la variable `kivaDF`, **cacheado**.


In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
# LÍNEAS EVALUABLES, NO RENOMBRAR LAS VARIABLES
kivaDF = kivaRawNoNullDF.withColumn("posted_time", 
                                    F.from_unixtime(F.unix_timestamp('posted_time', 'yyyy-MM-dd HH:mm:ssXXX')).cast("timestamp"))\
                        .withColumn("disbursed_time", 
                                    F.from_unixtime(F.unix_timestamp('disbursed_time', 'yyyy-MM-dd HH:mm:ssXXX')).cast("timestamp"))\
                        .withColumn("term_in_months",
                                            F.col("term_in_months").cast(IntegerType()))\
                        .withColumn("loan_amount",
                                            F.col("loan_amount").cast(DoubleType()))\
                        .cache()
# YOUR CODE HERE
#raise NotImplementedError

In [5]:
typesDict = dict(kivaDF.dtypes)
assert(typesDict["posted_time"] == "timestamp") 
assert(typesDict["disbursed_time"] == "timestamp") 
assert(typesDict["loan_amount"] == "double") 
assert(typesDict["term_in_months"] == "int")
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
nullsRow = kivaDF.select(cnt_cond(F.col("posted_time").isNull()).alias("posted_nulls"),
              cnt_cond(F.col("disbursed_time").isNull()).alias("disbursed_nulls")).head()
assert(nullsRow.posted_nulls == 0)
assert(nullsRow.disbursed_nulls == 0)

**(2 puntos)** Ejercicio 3

Partiendo de `kivaDF`:

* Primero, añade una nueva columna `dias_desembolso` que contenga el número de días que han pasado entre la fecha en que los prestamistas aceptaron financiar un proyecto, y la fecha en que el agente de campo entregó los fondos al beneficiario. Para ello, utiliza `withColumn` en combinación con la función `F.datediff("colFuturo", "colPasado")`
* De manera análoga, añade otra nueva columna `dias_aceptacion` que contenga el número de días entre el anuncio de la necesidad de préstamo y la aceptación de financiarlo por parte de algún prestamista.
* Reemplazar la columna `sector` por otra en la que se hayan traducido las categorías "Education" por "Educacion" (sin tilde para evitar posibles problemas) y "Agriculture" por "Agricultura", dejando como están el resto de categorías. **La sustitución no debe tener más que tres casos**: uno para cada categoría que vamos a reemplazar, y un tercero para el resto de categorías, que deben quedarse como estaban.
* El resultado debe quedar guardado en la variable `kivaTiemposDF`.

In [6]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
# imports......
kivaTiemposDF = kivaDF.withColumn("dias_desembolso", F.days(F.datediff("disbursed_time", "funded_time")))\
                      .withColumn("dias_aceptacion", F.days(F.datediff("funded_time", "posted_time")))\
                      .withColumn("sector", F.when(F.col("sector") == "Education", "Educacion")\
                                             .when(F.col("sector") == "Agriculture", "Agricultura")\
                                             .otherwise(F.col("sector")))
# YOUR CODE HERE
#raise NotImplementedError

In [7]:
assert(kivaTiemposDF.where("sector == 'Agricultura'").count() == 157003)
assert(kivaTiemposDF.where("sector == 'Educacion'").count() == 28417)
# Comprobamos que las 13 restantes se mantienen sin cambios
assert(kivaTiemposDF.groupBy("sector").count().join(kivaDF.groupBy("sector").count(), ["sector", "count"]).count() == 13)

**(3 puntos)** Ejercicio 4

Partiendo de `kivaTiemposDF`, crear un nuevo DataFrame llamado `kivaAgrupadoDF` que tenga:

* Tantas filas como **países (`country`; no usar el código de país)**, y tantas columnas como **sectores** (cada una llamada como el sector) más una (la columna del país, que debe aparecer en primer lugar). En cada celda deberá ir el número **medio (redondeado a 2 cifras decimales)** de días transcurridos en ese país y sector *entre la fecha en que se anuncia la necesidad de préstamo y la fecha en la que un prestamista acepta financiarlo*. Esta columna ha sido calculada en la celda precedente.
* Después de esto, añadir una columna adicional `transcurrido_global` con el número **medio (redondeado a 2 cifras decimales) de días transcurridos en cada país** entre ambas fechas *sin tener en cuenta el sector*. Cada fila tendrá la media de las 15 columnas del apartado anterior.
* Por último, ordenar el DF resultante **descendentemente** en base al tiempo medio global, `transcurrido_global`. El DF resultado de la ordenación debe ser almacenado en la variable `kivaAgrupadoDF`. 

PISTA: utiliza el método `pivot` con el sector para el primer apartado, envolviendo a la función de agregación con la función `F.round`, es decir, `F.round(F.funcionAgregacion(....), 2)`, y `withColumn` con una operación aritmética entre columnas en el segundo. **No debe utilizarse la función `when`** ya que Spark es capaz de hacer directamente aritmética entre objetos columna. La operación aritmética también debe estar envuelta por round: `F.round(op. aritmética entre objetos columna, 2)`.

In [9]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
kivaAgrupadoDF = kivaTiemposDF.groupBy(F.col("country")).agg(F.mean("dias_aceptacion"))
# YOUR CODE HERE
#raise NotImplementedError

In [10]:
r1 = kivaAgrupadoDF.head()
assert(r1.country == "United States")
assert((r1.Agricultura - 12.0 < 0.01) | (r1.Agricultura - 12.17 < 0.01))
assert((r1.Educacion - 15.21 < 0.01) | (r1.Educacion - 15.33 < 0.01))
assert(r1.Wholesale - 27.5 < 0.01)
assert((r1.transcurrido_global - 20.94 < 0.01) | (r1.transcurrido_global - 21.04 < 0.01))

Py4JJavaError: An error occurred while calling o132.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
HashAggregate(keys=[country#23], functions=[pivotfirst(sector#1660, round(avg(`dias_aceptacion`), 2)#4149, Agricultura, Arts, Clothing, Construction, Educacion, Entertainment, Food, Health, Housing, Manufacturing, Personal Use, Retail, Services, Transportation, Wholesale, 0, 0)], output=[country#23, Agricultura#4182, Arts#4183, Clothing#4184, Construction#4185, Educacion#4186, Entertainment#4187, Food#4188, Health#4189, Housing#4190, Manufacturing#4191, Personal Use#4192, Retail#4193, Services#4194, Transportation#4195, Wholesale#4196])
+- Exchange hashpartitioning(country#23, 200), ENSURE_REQUIREMENTS, [id=#350]
   +- HashAggregate(keys=[country#23], functions=[partial_pivotfirst(sector#1660, round(avg(`dias_aceptacion`), 2)#4149, Agricultura, Arts, Clothing, Construction, Educacion, Entertainment, Food, Health, Housing, Manufacturing, Personal Use, Retail, Services, Transportation, Wholesale, 0, 0)], output=[country#23, Agricultura#4165, Arts#4166, Clothing#4167, Construction#4168, Educacion#4169, Entertainment#4170, Food#4171, Health#4172, Housing#4173, Manufacturing#4174, Personal Use#4175, Retail#4176, Services#4177, Transportation#4178, Wholesale#4179])
      +- *(2) HashAggregate(keys=[country#23, sector#1660], functions=[avg(cast(dias_aceptacion#1637 as bigint))], output=[country#23, sector#1660, round(avg(`dias_aceptacion`), 2)#4149])
         +- Exchange hashpartitioning(country#23, sector#1660, 200), ENSURE_REQUIREMENTS, [id=#345]
            +- *(1) HashAggregate(keys=[country#23, sector#1660], functions=[partial_avg(cast(dias_aceptacion#1637 as bigint))], output=[country#23, sector#1660, sum#4605, count#4606L])
               +- *(1) Project [CASE WHEN (sector#20 = Education) THEN Educacion WHEN (sector#20 = Agriculture) THEN Agricultura ELSE sector#20 END AS sector#1660, country#23, days(datediff(cast(funded_time#29 as date), cast(posted_time#621 as date))) AS dias_aceptacion#1637]
                  +- InMemoryTableScan [country#23, funded_time#29, posted_time#621, sector#20]
                        +- InMemoryRelation [id#16, funded_amount#17, loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#621, disbursed_time#642, funded_time#29, term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                              +- *(1) Project [id#16, funded_amount#17, cast(loan_amount#18 as double) AS loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, cast(from_unixtime(unix_timestamp(posted_time#27, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS posted_time#621, cast(from_unixtime(unix_timestamp(disbursed_time#28, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS disbursed_time#642, funded_time#29, cast(term_in_months#30 as int) AS term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35]
                                 +- *(1) Filter AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)
                                    +- InMemoryTableScan [activity#19, borrower_genders#33, country#23, country_code#22, currency#25, date#35, disbursed_time#28, funded_amount#17, funded_time#29, id#16, lender_count#31, loan_amount#18, partner_id#26, posted_time#27, region#24, repayment_interval#34, sector#20, tags#32, term_in_months#30, use#21], [AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)]
                                          +- InMemoryRelation [id#16, funded_amount#17, loan_amount#18, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#27, disbursed_time#28, funded_time#29, term_in_months#30, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                                                +- FileScan csv [id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,tags#32,borrower_genders#33,repayment_interval#34,date#35] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/alejandro.perez/Documents/NotebooksGithub/Tarea/ModeloD kiva_loa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,funded_amount:string,loan_amount:string,activity:string,sector:string,use:string...

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doExecute(HashAggregateExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:439)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3532)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3529)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(country#23, 200), ENSURE_REQUIREMENTS, [id=#350]
+- HashAggregate(keys=[country#23], functions=[partial_pivotfirst(sector#1660, round(avg(`dias_aceptacion`), 2)#4149, Agricultura, Arts, Clothing, Construction, Educacion, Entertainment, Food, Health, Housing, Manufacturing, Personal Use, Retail, Services, Transportation, Wholesale, 0, 0)], output=[country#23, Agricultura#4165, Arts#4166, Clothing#4167, Construction#4168, Educacion#4169, Entertainment#4170, Food#4171, Health#4172, Housing#4173, Manufacturing#4174, Personal Use#4175, Retail#4176, Services#4177, Transportation#4178, Wholesale#4179])
   +- *(2) HashAggregate(keys=[country#23, sector#1660], functions=[avg(cast(dias_aceptacion#1637 as bigint))], output=[country#23, sector#1660, round(avg(`dias_aceptacion`), 2)#4149])
      +- Exchange hashpartitioning(country#23, sector#1660, 200), ENSURE_REQUIREMENTS, [id=#345]
         +- *(1) HashAggregate(keys=[country#23, sector#1660], functions=[partial_avg(cast(dias_aceptacion#1637 as bigint))], output=[country#23, sector#1660, sum#4605, count#4606L])
            +- *(1) Project [CASE WHEN (sector#20 = Education) THEN Educacion WHEN (sector#20 = Agriculture) THEN Agricultura ELSE sector#20 END AS sector#1660, country#23, days(datediff(cast(funded_time#29 as date), cast(posted_time#621 as date))) AS dias_aceptacion#1637]
               +- InMemoryTableScan [country#23, funded_time#29, posted_time#621, sector#20]
                     +- InMemoryRelation [id#16, funded_amount#17, loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#621, disbursed_time#642, funded_time#29, term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- *(1) Project [id#16, funded_amount#17, cast(loan_amount#18 as double) AS loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, cast(from_unixtime(unix_timestamp(posted_time#27, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS posted_time#621, cast(from_unixtime(unix_timestamp(disbursed_time#28, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS disbursed_time#642, funded_time#29, cast(term_in_months#30 as int) AS term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35]
                              +- *(1) Filter AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)
                                 +- InMemoryTableScan [activity#19, borrower_genders#33, country#23, country_code#22, currency#25, date#35, disbursed_time#28, funded_amount#17, funded_time#29, id#16, lender_count#31, loan_amount#18, partner_id#26, posted_time#27, region#24, repayment_interval#34, sector#20, tags#32, term_in_months#30, use#21], [AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)]
                                       +- InMemoryRelation [id#16, funded_amount#17, loan_amount#18, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#27, disbursed_time#28, funded_time#29, term_in_months#30, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                                             +- FileScan csv [id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,tags#32,borrower_genders#33,repayment_interval#34,date#35] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/alejandro.perez/Documents/NotebooksGithub/Tarea/ModeloD kiva_loa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,funded_amount:string,loan_amount:string,activity:string,sector:string,use:string...

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:90)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 30 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
HashAggregate(keys=[country#23], functions=[partial_pivotfirst(sector#1660, round(avg(`dias_aceptacion`), 2)#4149, Agricultura, Arts, Clothing, Construction, Educacion, Entertainment, Food, Health, Housing, Manufacturing, Personal Use, Retail, Services, Transportation, Wholesale, 0, 0)], output=[country#23, Agricultura#4165, Arts#4166, Clothing#4167, Construction#4168, Educacion#4169, Entertainment#4170, Food#4171, Health#4172, Housing#4173, Manufacturing#4174, Personal Use#4175, Retail#4176, Services#4177, Transportation#4178, Wholesale#4179])
+- *(2) HashAggregate(keys=[country#23, sector#1660], functions=[avg(cast(dias_aceptacion#1637 as bigint))], output=[country#23, sector#1660, round(avg(`dias_aceptacion`), 2)#4149])
   +- Exchange hashpartitioning(country#23, sector#1660, 200), ENSURE_REQUIREMENTS, [id=#345]
      +- *(1) HashAggregate(keys=[country#23, sector#1660], functions=[partial_avg(cast(dias_aceptacion#1637 as bigint))], output=[country#23, sector#1660, sum#4605, count#4606L])
         +- *(1) Project [CASE WHEN (sector#20 = Education) THEN Educacion WHEN (sector#20 = Agriculture) THEN Agricultura ELSE sector#20 END AS sector#1660, country#23, days(datediff(cast(funded_time#29 as date), cast(posted_time#621 as date))) AS dias_aceptacion#1637]
            +- InMemoryTableScan [country#23, funded_time#29, posted_time#621, sector#20]
                  +- InMemoryRelation [id#16, funded_amount#17, loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#621, disbursed_time#642, funded_time#29, term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(1) Project [id#16, funded_amount#17, cast(loan_amount#18 as double) AS loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, cast(from_unixtime(unix_timestamp(posted_time#27, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS posted_time#621, cast(from_unixtime(unix_timestamp(disbursed_time#28, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS disbursed_time#642, funded_time#29, cast(term_in_months#30 as int) AS term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35]
                           +- *(1) Filter AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)
                              +- InMemoryTableScan [activity#19, borrower_genders#33, country#23, country_code#22, currency#25, date#35, disbursed_time#28, funded_amount#17, funded_time#29, id#16, lender_count#31, loan_amount#18, partner_id#26, posted_time#27, region#24, repayment_interval#34, sector#20, tags#32, term_in_months#30, use#21], [AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)]
                                    +- InMemoryRelation [id#16, funded_amount#17, loan_amount#18, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#27, disbursed_time#28, funded_time#29, term_in_months#30, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                                          +- FileScan csv [id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,tags#32,borrower_genders#33,repayment_interval#34,date#35] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/alejandro.perez/Documents/NotebooksGithub/Tarea/ModeloD kiva_loa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,funded_amount:string,loan_amount:string,activity:string,sector:string,use:string...

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doExecute(HashAggregateExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 38 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(country#23, sector#1660, 200), ENSURE_REQUIREMENTS, [id=#345]
+- *(1) HashAggregate(keys=[country#23, sector#1660], functions=[partial_avg(cast(dias_aceptacion#1637 as bigint))], output=[country#23, sector#1660, sum#4605, count#4606L])
   +- *(1) Project [CASE WHEN (sector#20 = Education) THEN Educacion WHEN (sector#20 = Agriculture) THEN Agricultura ELSE sector#20 END AS sector#1660, country#23, days(datediff(cast(funded_time#29 as date), cast(posted_time#621 as date))) AS dias_aceptacion#1637]
      +- InMemoryTableScan [country#23, funded_time#29, posted_time#621, sector#20]
            +- InMemoryRelation [id#16, funded_amount#17, loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#621, disbursed_time#642, funded_time#29, term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(1) Project [id#16, funded_amount#17, cast(loan_amount#18 as double) AS loan_amount#684, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, cast(from_unixtime(unix_timestamp(posted_time#27, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS posted_time#621, cast(from_unixtime(unix_timestamp(disbursed_time#28, yyyy-MM-dd HH:mm:ssXXX, Some(Europe/Paris), false), yyyy-MM-dd HH:mm:ss, Some(Europe/Paris)) as timestamp) AS disbursed_time#642, funded_time#29, cast(term_in_months#30 as int) AS term_in_months#663, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35]
                     +- *(1) Filter AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)
                        +- InMemoryTableScan [activity#19, borrower_genders#33, country#23, country_code#22, currency#25, date#35, disbursed_time#28, funded_amount#17, funded_time#29, id#16, lender_count#31, loan_amount#18, partner_id#26, posted_time#27, region#24, repayment_interval#34, sector#20, tags#32, term_in_months#30, use#21], [AtLeastNNulls(n, id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,borrower_genders#33,repayment_interval#34,date#35)]
                              +- InMemoryRelation [id#16, funded_amount#17, loan_amount#18, activity#19, sector#20, use#21, country_code#22, country#23, region#24, currency#25, partner_id#26, posted_time#27, disbursed_time#28, funded_time#29, term_in_months#30, lender_count#31, tags#32, borrower_genders#33, repayment_interval#34, date#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                                    +- FileScan csv [id#16,funded_amount#17,loan_amount#18,activity#19,sector#20,use#21,country_code#22,country#23,region#24,currency#25,partner_id#26,posted_time#27,disbursed_time#28,funded_time#29,term_in_months#30,lender_count#31,tags#32,borrower_genders#33,repayment_interval#34,date#35] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/alejandro.perez/Documents/NotebooksGithub/Tarea/ModeloD kiva_loa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,funded_amount:string,loan_amount:string,activity:string,sector:string,use:string...

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:90)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 50 more
Caused by: java.lang.UnsupportedOperationException: Cannot generate code for expression: days(datediff(cast(input[1, string, true] as date), cast(input[2, timestamp, true] as date)))
	at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode(Expression.scala:307)
	at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode$(Expression.scala:306)
	at org.apache.spark.sql.catalyst.expressions.PartitionTransformExpression.doGenCode(PartitionTransforms.scala:35)
	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
	at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:163)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$2(basicPhysicalOperators.scala:73)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$1(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1026)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:733)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 69 more


**(3 puntos)** Ejercicio 5

Partiendo de nuevo de `kivaTiemposDF`, añadir las siguientes columnas:

* Primero, tres columnas adicionales llamadas `transc_medio`, `transc_min`, `transc_max` que contengan, respectivamente, **el número de días medio, mínimo y máximo transcurrido para proyectos de ese mismo país y ese mismo sector** entre la fecha en que se postea la necesidad de préstamo y la fecha en la que alguien acepta financiarlo (es decir, la columna `dias_aceptacion` calculada antes y utilizada también en la celda anterior). Es decir, queremos una columna extra para que podamos tener, junto a cada préstamo, información agregada de los préstamos similares, entendidos como aquellos del mismo país y del mismo sector. **No se debe utilizar JOIN sino solo funciones de ventana**.
* Finalmente, crear otra columna adicional `diff_dias` que contenga la **diferencia en días entre los días que transcurrieron en este proyecto y la media de días de los proyectos similares** (calculada en el apartado anterior). Debería ser lo primero menos lo segundo, de forma que un número positivo indica que este préstamo tardó más días en ser aceptado que la media de días de este país y sector, y un número negativo indica lo contrario. El resultado debe obtenerse aplicando operaciones aritméticas con columnas existentes, **sin utilizar `when`**.
* El DF resultante con las 4 columnas nuevas que hemos añadido debe quedar almacenado en la variable `kivaExtraInfoDF`.

In [None]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
# imports necesarios..........
windowPaisSector = None
kivaExtraInfoDF = None
# YOUR CODE HERE
raise NotImplementedError

In [None]:
r = kivaExtraInfoDF.where("id = '658540'").head()
assert(r.country == 'Burkina Faso')
assert(r.transc_medio - 11.02 < 0.05)
assert(r.dias_aceptacion == 35)
assert(r.diff_dias - 24.0 < 0.001)