# La colección del MoMA (Museo de Arte Moderno) de Nueva York

### Disponible en Kaggle en:
https://www.kaggle.com/momanyc/museum-collection

(no obstante, debes usar exclusivamente el dataset que has recibido adjunto al email)


### Contexto

El Museo de Arte Moderno (MoMA) adquirió sus primeras obras de arte en 1929, año de su fundación. Hoy, la colección en evolución del Museo contiene casi 200.000 obras de todo el mundo que abarcan los últimos 150 años. La colección incluye una gama cada vez mayor de expresión visual, que incluye pintura, escultura, grabado, dibujo, fotografía, arquitectura, diseño, cine y medios y artes escénicas.

### Contenido

MoMA se compromete a ayudar a todos a comprender, disfrutar y utilizar nuestra colección. El sitio web del Museo presenta 72.706 obras de arte de 20.956 artistas. El conjunto de datos de obras de arte contiene 130.262 registros, que representan todas las obras que se han incorporado a la colección del MoMA y están catalogadas en nuestra base de datos. Incluye metadatos básicos para cada obra, incluyendo título, artista, fecha, medio, dimensiones y fecha de adquisición por parte del Museo. Algunos de estos registros tienen información incompleta y se indican como "no aprobados por el curador". El conjunto de datos de artistas contiene 15.091 registros, que representan a todos los artistas que tienen obras en la colección del MoMA y han sido catalogados en nuestra base de datos. Incluye metadatos básicos para cada artista, incluido el nombre, la nacionalidad, el sexo, el año de nacimiento y el año de fallecimiento.


### Variables y significado (sólo aquellas que se utilizarán)


* Title: string - título de la obra
* Artist: string - nombre del autor o autores
* ConstituentID: string - identificador no usado
* ArtistBio: string - biografía del autor
* Nationality: string - nacionalidad del autor(es)
* BeginDate: string - fecha(s) de nacimiento del autor(es)
* EndDate: string - fecha(s) de fallecimiento del autor(es)
* Gender: string - género del autor(es)
* Date: string - fecha de creación de la obra
* Medium: string - soporte físico en el que se creó la obra
* Dimensions: string - dimensiones (todas)
* CreditLine: string - forma de obtención de esa obra de arte por parte del museo
* AccessionNumber: string - identificador no usado
* Classification: string - tipología de la obra de arte
* Department: string - departamento del museo al que pertenece
* DateAcquired: string - fecha de adquisición
* Cataloged: string - si está catalogada o no
* ObjectID: string - identificador no usado
* URL: string - enlace a la web del museo
* ThumbnailURL: string - enlace a una imagen en miniatura
* Circumference (cm): string - perímetro de la obra en cm
* Depth (cm): string - profundidad en cm
* Diameter (cm): string - diámetro en cm
* Height (cm): string - altura en cm
* Length (cm): string - longitud en cm
* Weight (kg): string - peso de la obra en kg
* Width (cm): string - anchura de la obra en cm
* Seat Height (cm): string - altura del soporte donde se expone
* Duration (sec.): string - duración de la obra en caso de ser audiovisual


**Nombre completo del alumno: Bruno Urban Alfaro

**INSTRUCCIONES**: en cada celda debes responder a la pregunta formulada, asegurándote de que el resultado queda guardado en la(s) variable(s) que por defecto vienen inicializadas a None. No se necesita usar variables intermedias, pero puedes hacerlo siempre que el resultado final del cálculo quede guardado exactamente en la variable que venía inicializada a None (debes reemplazar None por la secuencia de transformaciones necesarias, pero nunca cambiar el nombre de esa variable). 

**No olvides borrar la línea raise NotImplementedError() de cada celda cuando hayas completado la solución de esa celda y quieras probarla.**

Después de cada celda evaluable verás una celda con código. Ejecútala (no modifiques su código) y te dirá si tu solución es correcta o no. Además de esas pruebas, se realizarán algunas más (ocultas) a la hora de puntuar el ejercicio, pero evaluar dicha celda es un indicador bastante fiable acerca de si realmente has implementado la solución correcta o no. Asegúrate de que, al menos, todas las celdas indican que el código es correcto antes de enviar el notebook terminado.

**Nunca se debe redondear ninguna cantidad si no lo pide explícitamente el enunciado**

## Sobre el dataset anterior (MoMA_Artworks.csv) se pide:

**(1 punto)** Ejercicio 1

* Leerlo tratando de que Spark infiera el tipo de dato de cada columna, y **cachearlo**. Debe guardarse en una variable llamada `artworks`.
* Puesto que existen columnas que contienen una coma enmedio del valor, en esos casos los valores vienen entre comillas dobles. Spark ya contempla esta posibilidad y puede leerlas adecuadamente **si al leer le indicamos las siguientes opciones adicionales** además de las que ya sueles usar: `.option("quote", "\"").option("escape", "\"")`.
* Asegúrate de que las **filas que no tienen el formato correcto sean descartadas**, indicando también la opción `mode` con el valor `DROPMALFORMED` como vimos en clase.

In [1]:
from pyspark.sql import functions as F
artworks = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .option("quote", "\"")\
                 .option("escape", "\"")\
                 .option("mode", "DROPMALFORMED")\
                .csv("gs://tarea--master/data/MoMA_Artworks.csv") 
# LÍNEA EVALUABLE, NO RENOMBRAR LAS VARIABLES

artworks.printSchema()



root
 |-- Title: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- ConstituentID: string (nullable = true)
 |-- ArtistBio: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- BeginDate: string (nullable = true)
 |-- EndDate: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Medium: string (nullable = true)
 |-- Dimensions: string (nullable = true)
 |-- CreditLine: string (nullable = true)
 |-- AccessionNumber: string (nullable = true)
 |-- Classification: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- DateAcquired: string (nullable = true)
 |-- Cataloged: string (nullable = true)
 |-- ObjectID: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- ThumbnailURL: string (nullable = true)
 |-- Circumference (cm): string (nullable = true)
 |-- Depth (cm): string (nullable = true)
 |-- Diameter (cm): string (nullable = true)
 |-- Height (cm): string (nullable = tr

                                                                                

In [2]:
# Cachear el DataFrame
#nos aseguramos de que al preguntar si esta cacheado sale true una vez cacheamos
 
artworks.cache()
#artworks.is_cached  

23/06/24 17:03:33 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[Title: string, Artist: string, ConstituentID: string, ArtistBio: string, Nationality: string, BeginDate: string, EndDate: string, Gender: string, Date: string, Medium: string, Dimensions: string, CreditLine: string, AccessionNumber: string, Classification: string, Department: string, DateAcquired: string, Cataloged: string, ObjectID: string, URL: string, ThumbnailURL: string, Circumference (cm): string, Depth (cm): string, Diameter (cm): string, Height (cm): string, Length (cm): string, Weight (kg): string, Width (cm): string, Seat Height (cm): string, Duration (sec.): string]

In [3]:
assert(artworks.count() == 128234)

                                                                                

**(1 punto)** Ejercicio 2

Puesto que vamos a hacer cálculos relativos a las fechas de nacimiento y muerte de los autores de las obras, es necesario tenerlas como número. Actualmente están como años entre paréntesis, ej. (1924) o a veces como varios paréntesis, 
como "(1924) (1931) (1918)" cuando son varios autores. Vamos a limpiar esto para obtener en BeginDate y en EndDate una columna numérica.

Partiendo del DF almacenado en la variable `artworks`, haciendo una única secuencia de transformaciones encadenadas, se pide:

* Quedarse sólo con aquellas obras en las que `BeginDate` no es null y `EndDate` tampoco.
* Sobre el DF resultante, reemplazar la columna `BeginDate` por el resultado de quitar los paréntesis. Consulta la documentación de la función `F.regexp_replace(nombreCol, stringBuscar, stringReemplazo)` donde el segundo argumento debe ser `"\(|\)"` que indica buscar tanto el ( como el ), y el tercer argumento debe ser la cadena vacía `""` para que los elimine.
* **Después de hacer esto**, la columna `BeginDate` debe ser reemplazada de nuevo por una columna de tipo vector, que es el resultado de invocar a `F.split(nombreCol, separador)` (función de `pyspark.sql.functions` que se aplica a una columna y devuelve otra columna de tipo vector tras haber cortado cada fila de la columna original por el carácter separador indicado). En nuestro caso el separador debe ser el carácter `" "` (un espacio en blanco).
* Fíjate que estos dos pasos se pueden hacer en una sola línea, ya que es posible invocar a `F.split(F.regexp_replace(...))`.
* Repetir los dos pasos anteriores en el mismo orden con la columna `EndDate`.
* No te preocupes por las columnas que tengan una sola fecha: la función split las convertirá en vectores de un solo elemento.
* Aplicar `F.split` para reemplazar la columna `Artist` por el resultado de cortar por el string  `", "` (es decir, la coma seguida de un espacio en blanco).
* Por último, crear la coluna `n_autores` como la longitud de cada vector de la columna `Artist`, usando la función `F.size(nombreCol)` que se aplica a columnas de tipo colección.
* El resultado debe guardarse en una variable llamada `obras_splitted_df`.

In [4]:
# LÍNEA EVALUABLE. NO RENOMBRAR LAS VARIABLES
#obras_splitted_df = None   # Sustituye None por las operaciones adecuadas
# YOUR CODE HERE

# Quedarse sólo con aquellas obras en las que BeginDate no es null y EndDate tampoco
obras_filtradas_df = artworks.filter((F.col("BeginDate").isNotNull()) & (F.col("EndDate").isNotNull()))

# Reemplazar la columna BeginDate por el resultado de quitar los paréntesis
obras_sin_parentesis_df = obras_filtradas_df.withColumn("BeginDate", F.split(F.regexp_replace(F.col("BeginDate"), "\(|\)", ""), " "))

# Reemplazar la columna EndDate por el resultado de quitar los paréntesis
obras_splitted_df = obras_sin_parentesis_df.withColumn("EndDate", F.split(F.regexp_replace(F.col("EndDate"), "\(|\)", ""), " "))

# Aplicar F.split para reemplazar la columna Artist por el resultado de cortar por el string ", "
obras_splitted_df = obras_splitted_df.withColumn("Artist", F.split(F.col("Artist"), ", "))

# Crear la coluna n_autores como la longitud de cada vector de la columna Artist, usando la función F.size(nombreCol)
obras_splitted_df = obras_splitted_df.withColumn("n_autores", F.size(F.col("Artist")))

In [5]:
assert(obras_splitted_df.count() == 126970)   # número de filas tras quitar las que tienen BeginDate y EndDate a null
tipos = dict(obras_splitted_df.dtypes)
assert(tipos["Artist"] == "array<string>")  # la columna Artist ahora debe ser una columna de vectores de string
assert(tipos["BeginDate"] == "array<string>") # la columna BeginDate ahora debe ser una columna de vectores de string
assert(tipos["EndDate"] == "array<string>") # la columna EndDate debe ahora ser una columna de vectores de string
assert(tipos["n_autores"] == "int") # la columna EndDate debe ahora ser una columna de vectores de string

                                                                                

**(2 puntos)** Ejercicio 3

En este ejercicio vamos a convertir todos los elementos de los vectores en números enteros. Además, a pesar de cómo hemos tratado de separar los elementos, existen algunas filas donde las columnas `BeginDate` y `EndDate` traen un texto que no son números entre paréntesis como nos gustaría, así que la separación no habrá funcionado muy bien. Vamos a quedarnos solo con aquellas filas que contienen exclusivamente enteros en las columnas.

Partiendo de `obras_splitted_df` se pide:

* Reemplazar la coluna `BeginDate` por el resultado de aplicar, en cada fila, una función definida por el usuario a **cada elemento del vector**. Existe en Pyspark una función llamada `transform` que hace justamente eso, pero en la versión 2.4 todavía no se puede invocar con la API de columnas sino que *existe solamente en la API de SQL puro* (esto es frecuente cuando los creadores de Spark introducen funciones nuevas: empieza estando disponible sólo para SQL puro y no para la API estructurada, que suele llegar en versiones posteriores).
  * Para poder usarla, escribe `F.expr("transform(BeginDate, x -> int(x))"))` como segundo argumento de `withColumn(...)`, donde la función que estamos aplicando a cada elemento del vector es simplemente transformarlo a entero.
* Haz lo mismo con la columna `EndDate`.
* A continuación, escribe una UDF que se aplique a una columna de tipo vector y compruebe si todos los elementos son enteros, en cuyo caso debe devolver `True`, y en caso contrario debe devolver `False`. Para ello:
  * Rellena el esqueleto de la función de Python `todos_enteros` que tienes esbozada. El argumento que va a recibir siempre será una lista de Python por la que debes iterar, comprobando si cada elemento es de tipo entero. En el momento en el que encuentres uno que no es entero, ya puedes directamente retornar False. Si el bucle finaliza sin haber retornado en ninguna iteración, significa que todos los elementos son enteros, así que debe devolver True. Utiliza la función de Python `isinstance(x, int)` para comprobar si un elemento es entero.
  * Crea en una variable `todos_enteros_udf` el objeto UDF que envuelve a la función anterior. Recuerda indicar que el valor devuelto por la función es BooleanType(), el cual debes importar adecuadamente para poder usarlo.

Cuando tengas hecha la UDF:
* Aplícala dentro de `withColumn(...)` para crear una nueva columna de booleanos llamada `enteros_begin` donde se indique si todos los elementos de cada fila de la columna `BeginDate` son enteros. Haz lo mismo para crear otra columna `enteros_end` que indique si todos los elementos de cada fila de la columna `EndDate` son enteros.
* **Después de haber hecho lo anterior**, filtra las filas para quedarte solamente con aquellas en las que la longitud de cada vector de la columna `BeginDate` es igual a la longitud del vector de la columna `Artist` de esa misma fila, y además la columna `enteros_begin` es True en esa fila y además la columna `enteros_end` es True en esa fila (condición booleana compuesta por 3 condiciones simples).
  * Consulta la documentación de la función `F.size(...)` que se aplica a una columna de tipo vector y devuelve una columna de enteros que son sus longitudes).
* Primero debes escribir la UDF, y después una única secuencia de *cinco* transformaciones encadenadas que resuelva todo lo que pide este ejercicio. El resultado debe guardarse en la variable `obras_preprocesado_df`.

In [6]:

from pyspark.sql.types import BooleanType

def todos_enteros(lista):
    for x in lista:
        if not isinstance(x, int):
            return False
    return True

todos_enteros_udf = F.udf(todos_enteros, BooleanType())

obras_preprocesado_df = obras_splitted_df\
    .withColumn("BeginDate", F.expr("transform(BeginDate, x -> int(x))")) \
    .withColumn("EndDate", F.expr("transform(EndDate, x -> int(x))")) \
    .withColumn("enteros_begin", todos_enteros_udf("BeginDate")) \
    .withColumn("enteros_end", todos_enteros_udf("EndDate")) \
    .filter((F.size("BeginDate") == F.size("Artist")) &
            F.col("enteros_begin")  & F.col("enteros_end")) 


In [7]:
assert(obras_preprocesado_df.count() == 126224)

from pyspark.sql import types as T
assert(todos_enteros([4, -123, 0, 23]) and not todos_enteros([23, 12, "12"]))  # probamos la función todos_enteros
assert(todos_enteros_udf.returnType == T.BooleanType())  # la UDF debe devolver un tipo booleano

r = obras_preprocesado_df.where("Title = 'House IV Project, Falls Village, Connecticut (Multiple axonometrics)'")\
.select("Title", "Artist", "BeginDate", "EndDate").first()
assert(r.Artist == ["Peter Eisenman", "Robert Cole"] and r.BeginDate == [1932, 0] and r.EndDate == [0, 0])

tipos = dict(obras_preprocesado_df.dtypes)
assert(tipos["BeginDate"] == "array<int>")
assert(tipos["EndDate"] == "array<int>")
assert(tipos["enteros_begin"] == "boolean")
assert(tipos["enteros_end"] == "boolean")

                                                                                

**(2 puntos)** Ejercicio 4

Ya tenemos como vectores de enteros las columnas `BeginDate`, `EndDate` y `Artist`. Si todo ha ido bien, las longitudes de los vectores de esas tres columnas deben coincidir: si en determinada fila `BeginDate` contiene un vector de longitud 2 (por ejemplo), también deberían tener longitud 2 el vector de la columna `EndDate` de esa misma fila y el de la columna `Artist`de esa misma fila. 

Ahora queremos **explotar** esas columnas, es decir, si en `BeginDate` hay un vector de **n** elementos, es porque la obra tiene **n** autores distintos, pero nos gustaría que aparezca como **n filas distintas** en nuestro DF, en lugar de venir comprimido en una única fila con vectores. En cada una de esas filas separadas, nos gustaría ver a un autor concreto de la  obra, con su nombre, su fecha de nacimiento y de fallecimiento. Los valores del resto de columnas serán idénticos en esas **n** filas, y sólo difieren las columnas Artist, BeginDate y EndDate. 

¿Pero cuál de las tres columnas de tipo vector (`Artist, BeginDate, EndDate`) debemos usar para explotar? Utilizaremos *las tres conjuntamente*.

Partiendo del DF `obras_preprocesado_df` construido en el ejercicio anterior, se pide:

* Crear mediante `withColumn(...)` una nueva columna llamada `tripletas` que en cada fila tenga un **vector de estructuras**, fusionando para ello las columnas `Artist`, `BeginDate` y `EndDate`, invocando a la función `F.arrys_zip(nombreCol1, nombreCol2, nombreCol3)` que devuelve un objeto Column de tipo vector de estructuras (en nuestro caso, cada estructura será una tripleta con tres campos llamados `Artist`, `BeginDate`, `EndDate`). 
  * La función `F.arrays_zip` funciona adecuadamente porque los vectores que fusionamos tienen siempre el mismo tamaño entre sí (aunque el tamaño puede ser distinto en cada fila, pero coincide para esas columnas en cada fila concreta).
  * Lo que hace la función es fusionar en una estructura elemento i-ésimo de cada uno de los vectores. La primera estructura estará formada por el primer elemento de Artist, de BeginDate y de EndDate; la segunda estructura será el segundo elemento de Artist, de BeginDate y de EndDate, y así sucesivamente. Todas estas estructuras se almacenan en un vector de estructuras en esa fila en la columna `tripletas`.
* **Después de haber hecho esto**, se pide reemplazar la columna `tripletas` por el resultado de **explotar** dicha columna mediante la función `F.explode(nombreCol)`. El resultado será una columna de tripletas, donde ya no hay vectores puesto que cada vector de tripletas ha sido explotado, dando lugar a **varias filas independientes, en las cuales hay una tripleta distinta (Artist, BeginDate, EndDate) del vector explotado**. En el resto de columnas todas las filas son idénticas entre sí.
* Por último, vamos a extraer como columnas cada uno de los campos de las estructuras. Se pide:
  * Reemplazar la columna `BeginDate` por el campo `tripletas.BeginDate` (se puede indicar así utilizando el operador . (punto) en el nombre de columna para `F.col(...)`), y hacer lo mismo con la columna `EndDate` y con la columna `Artist`.
* El ejercicio completo se resuelve con solamente cinco transformaciones (y por tanto, cinco líneas de código) que deben estar todas encadenadas.
* Guardar el resultado de las cinco transformaciones anteriores en la variable `obras_limpias_df` y **cachearla** puesto que a partir de ahora trabajaremos con este DF para extraer insights.
* Guardar el número de filas de `obras_limpias_df` en la variable `obras_autores`. Puedes comprobar que ha aumentado respecto a los ejercicios anteriores ya que hemos explotado las obras que tenían múltiples autores y se han convertido en filas independientes.

In [8]:

obras_limpias_df = obras_preprocesado_df\
.withColumn("tripletas", F.arrays_zip(F.col("Artist"), F.col("BeginDate"), F.col("EndDate"))) \
    .withColumn("tripletas", F.explode("tripletas")) \
    .withColumn("BeginDate", F.col("tripletas.BeginDate")) \
    .withColumn("EndDate", F.col("tripletas.EndDate")) \
    .withColumn("Artist", F.col("tripletas.Artist"))

obras_limpias_df.cache()

obras_autores = obras_limpias_df.count()

                                                                                

In [9]:
assert(obras_limpias_df.count() == 139202)

tipos = dict(obras_limpias_df.dtypes)
# Comprobamos los tipos de datos de las columnas resultantes tras explotar la columna tripletas original
assert(tipos["BeginDate"] == "int")
assert(tipos["EndDate"] == "int")
assert(tipos["Artist"] == "string")
assert(tipos["tripletas"] == "struct<Artist:string,BeginDate:int,EndDate:int>")


**(1 punto)** Ejercicio 5

Partiendo del DF `obras_limpias_df` construido y cacheado en el ejercicio anterior, se pide:

* Reemplazar la columna `Date` (fecha de creación de la obra) por el resultado de *extraer la primera ocurrencia de 4 dígitos seguidos de la columna Date* (el año de la obra) y convertirlo a tipo entero. Actualmente esa columna es un campo de texto libre donde generalmente se mencionan varios años (a veces es el año que empezó esa obra, o bien el año que se dio a conocer, ...) , con lo que esta extracción no será totalmente perfecta pero al menos nos dará un año de referencia. 
  * PISTA: utiliza dentro de `withColumn(...)` la función `F.regexp_extract(nombreCol, "(\d\d\d\d)", 1)` para indicar que queremos extraer la *primera* ocurrencia de 4 dígitos seguidos, ya que la expresión regular `\d` significa "cualquier dígito".
  * PISTA: `regexp_extract(...)` devuelve un objeto de tipo Column sobre el que directamente podemos encadenar la llamada para convertirlo a columna de enteros.
* Utilizando `F.when(...)`, crear una nueva columna `edad_autor` con: 
  * Si `BeginDate` es igual a 0, `edad_autor` debe ser igual a 41 (recuerda usar `F.lit(...)`), que es la mediana de la edad de las filas donde BeginDate sí tiene un valor positivo.
  * En cualquier otro caso, la edad que tenía el autor cuando creó esa obra es la diferencia entre el año de creación menos el año de nacimiento.
* Crear una nueva columna `mediana_edad` que contenga, **para cada autor**, la mediana de la edad con la que creó sus obras (columna `edad_autor`). Dicho valor será igual para todas las filas de un mismo autor, pero distinto para diferentes autores. 
  * PISTA: deben utilizarse *funciones de ventana* particionadas por el nombre del autor (guardar la ventana creada en la variable `ventana_autor`). **No está permitido utilizar JOIN**.
  * PISTA: la mediana se calcula con la función `percentile_approximate` que en Spark 2.4 todavía no existe en la API estructurada pero sí en SQL puro, por lo que la aplicaremos con `F.expr('percentile_approx(nombreCol, 0.5)')` ya que por definición, es el percentil 0.5.
* Filtrar el DF resultante para quedarnos sólo con aquellas filas en las que la edad del autor sea estrictamente positiva.
* Guardar el resultado de estas cuatro transformaciones en la variable `edades_df`.

In [10]:
from pyspark.sql.window import Window

edades_df = obras_limpias_df.withColumn("Date", F.regexp_extract("Date", "(\d\d\d\d)", 1).cast("integer")) \
    .withColumn("edad_autor", F.when(F.col("BeginDate") == 0, 41).otherwise(F.col("Date") - F.col("BeginDate"))) \
    .withColumn("mediana_edad", F.expr('percentile_approx(edad_autor, 0.5)').over(Window.partitionBy("Artist"))) \
    .filter(F.col("edad_autor") > 0)

ventana_autor = Window.partitionBy("Artist")

In [11]:
cc = edades_df.columns
assert("Date" in cc and "edad_autor" in cc and "mediana_edad" in cc)
assert(round(edades_df.select(F.mean("edad_autor").alias("edad_media")).first().edad_media, 2) == 43.53)
assert(round(edades_df.select(F.mean("Date").alias("date_media")).first().date_media, 2) == 1955.92)
assert(round(edades_df.select(F.mean("mediana_edad").alias("mediana_media")).first().mediana_media, 1) == 43.1)

                                                                                

**(1 punto)** Ejercicio 6

A continuación: 

* Crear en la variable `bucketizer_nacimiento` un objeto Bucketizer para discretizar la variable `BeginDate` con los puntos de corte `[-float("inf"), 1900, 1920, 1940, 1960, 1980, 2000, 2022]`, estableciendo como columna de salida una nueva columna llamada `decada_nacimiento`. Existen obras y autores anteriores a 1900, pero no los tendremos en cuenta.
* Crear en la variable `bucketizer_edad` otro objeto Bucketizer para discretizar la variable `edad_autor` en décadas con los puntos de corte [0, 20, 30, 40, 50, 60, float("inf")], estableciendo como columna de salida una nueva columna llamada `decada_creacion`.
* Crear en la variable `pipeline_bucketizers` un pipeline con ambos bucketizers. El pipeline debe contener **exclusivamente** estas dos etapas, y **ninguna más**, ni tampoco ningún algoritmo predictivo ni nada parecido.
* Entrenar el pipeline con el DF `edades_df` y a continuación transformar dicho dataframe, guardando el resultado en la variable `edades_discretizadas_df`.
  * No es necesario hacer (será penalizado) ninguna división en entrenamiento y test, puesto que no estamos entrenando ningún modelo predictivo.

In [12]:
from pyspark.ml.feature import Bucketizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

bucketizer_nacimiento = Bucketizer(splits=[-float("inf"), 1900, 1920, 1940, 1960, 1980, 2000, 2022],
                                   inputCol="BeginDate", outputCol="decada_nacimiento")
bucketizer_edad = Bucketizer(splits=[0, 20, 30, 40, 50, 60, float("inf")],
                             inputCol="edad_autor", outputCol="decada_creacion")
pipeline_bucketizers = Pipeline(stages=[bucketizer_nacimiento, bucketizer_edad])

edades_discretizadas_df = pipeline_bucketizers.fit(edades_df).transform(edades_df)



In [13]:
from pyspark.ml.feature import Bucketizer
assert(isinstance(bucketizer_nacimiento, Bucketizer))
assert(isinstance(bucketizer_edad, Bucketizer))
assert(bucketizer_nacimiento.getSplits() == [-float("inf"), 1900, 1920, 1940, 1960, 1980, 2000, 2022] and
       bucketizer_nacimiento.getInputCol() == "BeginDate" and
       bucketizer_nacimiento.getOutputCol() == "decada_nacimiento")

assert(bucketizer_edad.getSplits() == [0, 20, 30, 40, 50, 60, float("inf")] and
       bucketizer_edad.getInputCol() == "edad_autor" and
       bucketizer_edad.getOutputCol() == "decada_creacion")

tipos = dict(edades_discretizadas_df.dtypes)
assert("decada_nacimiento" in edades_discretizadas_df.columns and "decada_creacion" in edades_discretizadas_df.columns)
assert(tipos["decada_nacimiento"] == "double" and tipos["decada_creacion"] == "double")

**(2 puntos)** Ejercicio 7

Partiendo de `edades_discretizadas_df` se pide:

* Reemplazar la columna `decada_creacion` por el resultado de recategorizar sus valores a string de esta forma: 
0.0 -> `"[0, 20)"`, 1.0 -> `"[20, 30)"`, 2.0 -> `"[30, 40)"`, 3.0 -> `"[40, 50)"`, 4.0 -> `"[50, 60)"`, 5.0 -> `"[60+"`.
Una opción es utilizar la función `F.when(...)`. Las etiquetas han de ser exactmaente estas o no se validará la solución.
* Reemplazar la columna `decada_nacimiento` por el resultado de recategorizar sus valores a string de esta forma: 
0.0 -> `0-1900"`, 1.0 -> `"1900-20"`, 2.0 -> `"1920-40"`, 3.0 -> `"1940-60"`, 4.0 -> `"1960-80"`, 5.0 -> `"1980-2000"`, 6.0 -> `"2000-22`.
* Crear un nuevo DF `obras_decada_df` que tenga tantas filas como décadas de nacimiento existen (es decir, 7) y tantas columnas como décadas contemplamos en la vida de un artista (es decir, 6) más una (la década de nacimiento del artista, que debe ser la primera columna de todas). En cada casilla debe contener **el recuento** del número total de obras que han creado durante esa década de su vida (correspondiente a la columna) los artistas que han nacido en la década correspondiente a esa fila. El DF resultante debe estar ordenado de menor a mayor en base a la columna de la década de nacimiento. Los valores nulos generados debido a combinaciones inexistentes deben rellenarse por 0.
  * PISTA: utiliza `groupBy(...).pivot(...)` y también la función `fillna(0)` después de la ordenación.
* Crear una nueva columna llamada `obras_totales` que contenga, en cada fila, el número total de obras que han sido creadas por artistas que nacieron en la década indicada por la fila. PISTA: utiliza **aritmética de columnas**, es decir, una operación aritmética con los seis objetos columna involucrados. No hay que agrupar nada.
* Reemplazar una a una cada una de las columnas `[0, 20), [20, 30) ..., [60+"` por su equivalente en porcentaje (esto es, dividiendo la columna entre la columna `obras_totales` y multiplicando por 100). El resultado debe estar redondeado a 2 dígitos decimales, para lo cual debes aplicar `F.round(objetoColumna, 2)` al objeto columna resultante.
* Guardar el resultado de las transformaciones anteriores en la variable `porcentajes_decadas_df`.

In [14]:
# reemplazar la columna decada_creacion por el resultado de recategorizar sus valores a string 
# reemplazar la columna decada_nacimiento por el resultado de recategorizar sus valores a string 

edades_discretizadas_df = edades_discretizadas_df \
    .withColumn("decada_creacion", F.when(F.col("decada_creacion") == 0.0, "[0, 20)") \
                                   .when(F.col("decada_creacion") == 1.0, "[20, 30)") \
                                   .when(F.col("decada_creacion") == 2.0, "[30, 40)") \
                                   .when(F.col("decada_creacion") == 3.0, "[40, 50)") \
                                   .when(F.col("decada_creacion") == 4.0, "[50, 60)") \
                                   .when(F.col("decada_creacion") == 5.0, "[60+")) \
    .withColumn("decada_nacimiento", F.when(F.col("decada_nacimiento") == 0.0, "0-1900") \
                                       .when(F.col("decada_nacimiento") == 1.0, "1900-20") \
                                       .when(F.col("decada_nacimiento") == 2.0, "1920-40") \
                                       .when(F.col("decada_nacimiento") == 3.0, "1940-60") \
                                       .when(F.col("decada_nacimiento") == 4.0, "1960-80") \
                                       .when(F.col("decada_nacimiento") == 5.0, "1980-2000") \
                                       .when(F.col("decada_nacimiento") == 6.0, "2000-22"))

In [17]:
# Crear un nuevo DF obras_decada_df que tenga tantas filas como décadas de nacimiento existen

obras_decada_df = edades_discretizadas_df.groupBy("decada_nacimiento").pivot("decada_creacion").count().fillna(0).orderBy("decada_nacimiento")

# Crear una nueva columna llamada obras_totales

obras_decada_df = obras_decada_df.withColumn("obras_totales", sum([F.col(col) for col in obras_decada_df.columns[1:]]))


# Reemplazar una a una cada una de las columnas por su equivalente en porcentaje
# Guardar el resultado de las transformaciones anteriores en la variable porcentajes_decadas_df.

for decade in obras_decada_df.columns[1:]:
    obras_decada_df = obras_decada_df.withColumn(decade, F.round((F.col(decade) / F.col("obras_totales")) * 100, 2))
    
porcentajes_decada_df = obras_decada_df  

In [20]:
lista = porcentajes_decada_df.collect()
assert(len(porcentajes_decada_df.columns) == 8) # debe tener 8 columnas que son las 6 de la década de la vida, más la decada_nacimeinto más obras_totales
assert(len(lista) == 7)               # el DF debe tener 7 filas porque hay 7 posibles categorías en decada_nacimiento
assert(lista[0].decada_nacimiento == "0-1900"  and round(lista[0]["[50, 60)"]) == 15)
assert(lista[1].decada_nacimiento == "1900-20" and round(lista[1]["[30, 40)"]) == 21)
assert(lista[2].decada_nacimiento == "1920-40" and round(lista[2]["[60+"]) == 8)
assert(lista[3].decada_nacimiento == "1940-60" and round(lista[3]["[20, 30)"]) == 23)