# Películas de Netflix - Análisis con pyspark

Jorge Galeano Maté

### Instrucciones

1. Leer todos los csv descomprimidos guardados en la ruta de vuestro tmp en una sola línea de código (pista, usar wildcards para leer más de un fichero a la vez).
2. Analiza las columnas y renómbralos con un nombre que tenga sentido para cada una.
3. Limpia el dataframe para que no existan nulos, adicionalmente elimina todos los valores que no se correspondan con el resto de datos de la columna.
4. Revisa el tipo de dato de cada columna y parséalo según corresponda (la columna duración debe ser numérica).
5. Calcula la duración media en función del país.
6. Filtra las películas que contengan la palabra _music_ en su descripción y que su duración sea mayor a 90 minutos, ¿cuál es el actor que más películas ha realizado bajo estas condiciones?
7. Para el actor que más producciones ha realizado calcula cuántas semanas han pasado desde su primera producción hasta su última.
8. Transforma la columna de géneros para que su contenido sea un array con los valores de cada género por registro.
9. ¿Cuántas producciones se han realizado en un único país y cuántas tienen 2 o más países?
10. Escribe el dataframe final como un fichero parquet.

----------

Antes de comenzar, descargamos los datos según las instrucciones y descomprimimos los csv.

In [None]:
!wget -O /tmp/netflix_titles_dirty_01.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_01.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_02.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_02.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_03.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_03.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_04.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_04.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_05.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_05.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_06.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_06.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_07.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_07.csv.gz?raw=True'

In [None]:
!ls /tmp/netflix_titles* # Comprobación de descarga correcta.

In [None]:
# Descomprimimos los archivos e indicamos que se guarden los datos en un csv.
!gunzip -c /tmp/netflix_titles_dirty_01.csv.gz > /tmp/netflix_titles_dirty_01.csv
!gunzip -c /tmp/netflix_titles_dirty_02.csv.gz > /tmp/netflix_titles_dirty_02.csv
!gunzip -c /tmp/netflix_titles_dirty_03.csv.gz > /tmp/netflix_titles_dirty_03.csv
!gunzip -c /tmp/netflix_titles_dirty_04.csv.gz > /tmp/netflix_titles_dirty_04.csv
!gunzip -c /tmp/netflix_titles_dirty_05.csv.gz > /tmp/netflix_titles_dirty_05.csv
!gunzip -c /tmp/netflix_titles_dirty_06.csv.gz > /tmp/netflix_titles_dirty_06.csv
!gunzip -c /tmp/netflix_titles_dirty_07.csv.gz > /tmp/netflix_titles_dirty_07.csv

### 1. Leer todos los csv descomprimidos guardados en la ruta de vuestro tmp en una sola línea de código.

Creo la sesión de Spark y leo todos los csv a la vez aprovechando el uso de wildcards.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trabajo").getOrCreate()

# Para leer todos los csv de una vez, uso wildcards (*) para sustituir los números y así seleccionar todos.
df = spark.read.csv('/tmp/netflix_titles_dirty_0*.csv', sep='\t', inferSchema=True)

df.show()

### 2. Analiza las columnas y renómbralos con un nombre que tenga sentido para cada una.

Para ello, utilizo la función `withColumnsRenamed()`, que permite editar varias columnas a la vez.



In [None]:
df = df.withColumnsRenamed({
    '_c0' : 'id',
    '_c1' : 'tipo',
    '_c2' : 'titulo',
    '_c3' : 'direccion',
    '_c4' : 'elenco',
    '_c5' : 'pais',
    '_c6' : 'fecha_netflix', # Es la fecha en la que Netflix publicó la serie o película en su plataforma.
    '_c7' : 'estreno', # Es la fecha en la que se estrenó la serie o película.
    '_c8' : 'clasificacion',
    '_c9' : 'duracion',
    '_c10' : 'genero',
    '_c11' : 'sinopsis'
})

df.show()

### 3. Limpia el dataframe para que no existan nulos, adicionalmente elimina todos los valores que no se correspondan con el resto de datos de la columna. _Eliminar las filas que sean de series para trabajar solo con películas._

In [None]:
from pyspark.sql.functions import when, isnan, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() # Muestro el número de nulos en cada columna.

In [None]:
df = df.dropna() # Con esta función elimino todas las filas que contengan nulos.

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() # Muestro de nuevo el número de nulos en cada columna.

In [None]:
from pyspark.sql.functions import col

df = (
    df.filter(col('id').rlike('^\\d{8}$')) # Uso rlike() para definir la expresión regular y eliminar todos los id que no sean exactamente 8 números.
    .filter('tipo == "Movie"') # Filtro para dejar solo tipo Movie.
)

df.show()

### 4. Revisa el tipo de dato de cada columna y parséalo según corresponda (la columna duración debe ser numérica).

Para comprobar el esquema del dataframe, se puede usar el comando `printSchema()`.

In [None]:
df.printSchema()

Esto nos muestra que todos están parseados como _string_. Cambio las siguientes columnas:

- __id:__ int. _Porque solo son números enteros._
- __direccion:__ string array. _Porque puede contener uno o varios directores._
- __elenco:__ string array. _Porque puede contener uno o varios actores._
- __pais:__ string array. _Porque puede contener uno o varios países._
- __fecha_netflix:__ fecha. _Porque es una fecha._
- __estreno:__ int. _Porque es solo un año (un número entero)._
- __duracion:__ int. _Porque son solo números enteros._
- __genero:__ string array. _Porque puede contener uno o varios géneros._


Antes de continuar, la columna de "fecha_netflix" tiene un formato especial y variado. Tenemos algunos espacios antes o después del texto, los cuales hay que eliminar; números de día que a veces tienen solo uno y a veces dos; y en formato con el mes completo, espacios y comas.

Para gestionarlo, elimino los espacios con `trim()` (esto lo hago con todas las columnas, ya de paso) y decido separar la cadena en 3 columnas, para mes, día y año. Luego, elimino la coma que hay en el día y obligo a que sean dos números en vez de uno, añadiendo un 0 si solo hubiese uno. Por último, vuelvo a unir los tres en la columna original para que tenga todo el mismo formato y poder parsearlo a fecha.

In [None]:
from pyspark.sql.functions import trim, split, regexp_replace, lpad, concat, lit, StringType

df = df.select([trim(col(x)).alias(x) for x in df.columns]) # Elimino espacios al principio y al final en todas las columnas.

df = (
    df.withColumn('mes', split(col('fecha_netflix'), ' ').getItem(0)) # Obtengo el mes.
    .withColumn('dia', split(col('fecha_netflix'), ' ').getItem(1)) # Obtengo el día.
    .withColumn('año', split(col('fecha_netflix'), ' ').getItem(2)) # Obtengo el año.
)

df = df.withColumn('dia', regexp_replace(col('dia'), ',', '')) # Elimino la coma de día.

df = df.withColumn('dia', lpad(col('dia'), 2, '0')) # Obligo a que haya 2 caracteres, y que si no los hay, añada un 0 a la izquierda.

df = (
    df.withColumn('fecha_netflix', concat(col('mes'), lit(' '), col('dia'), lit(', '), col('año'))) # Uno los datos de nuevo.
    .drop('mes', 'dia', 'año') # Elimino las columnas que había creado temporalmente.
)

Una vez hecho esto, procedo a cambiar el tipo de dato de todas las columnas al correspondiente, usando `withColumns()`. También elimino la parte de "min" de la columna de duración, para ello lo hago fuera individualmente con `withColumn()` porque primero necesito que se elimine la parte de texto para poder pasarlo correctamente a int.

In [None]:
from pyspark.sql.functions import to_date

df = (
    df.withColumns({
        'id' : col('id').cast('int'),
        'direccion' : split(df['direccion'], ', '),
        'elenco' : split(df['elenco'], ', '),
        'pais' : split(df['pais'], ', '),
        'fecha_netflix' : to_date(col('fecha_netflix'), 'MMMM dd, yyyy'),
        'estreno' : col('estreno').cast('int'),
        'duracion' : regexp_replace(col('duracion'), ' min', ''),
        'genero' : split(df['genero'], ', ')
    })
    .withColumn('duracion', col('duracion').cast('int'))
)

df.show()

In [None]:
df.printSchema()

### 5. Calcula la duración media en función del país.

Para hacer esto, es necesario agrupar por país, usando la función `groupBy()`. No obstante, la columna es un array de strings, ya que a veces hay varios países. Por tanto, lo que hago será crear una fila con los mismos datos pero para cada país, es decir, si tengo Estados Unidos y Canadá en una película, creará dos filas con los mismos datos, pero una con Estados Unidos y otra con Canadá. Para ello uso la función `explode()`.

A partir de ahí, aplico la media con `mean()`.

In [None]:
from pyspark.sql.functions import explode, mean

df_paises = df.withColumn('paises_exp', explode(col('pais')))

df_paises = df_paises.groupBy(col('paises_exp')).mean().select('paises_exp', 'avg(duracion)').orderBy('paises_exp').show()

### 6. Filtra las películas que contengan la palabra music en su descripción y que su duración sea mayor a 90 minutos, ¿cuál es el actor que más películas ha realizado bajo estas condiciones?

Para realizar esta parte, debo filtrar el dataframe por sinopsis que contenga la palabra "music", filtrar por las que la duración sea mayor de 90, y separar el elenco, ya que son arrays. Después, agrupo por elenco y hago un conteo para saber cuántas veces sale ese actor o actriz.

In [None]:
from pyspark.sql.functions import contains, count

df_music = (
    df.filter(col('sinopsis').contains('music'))
    .filter('duracion > 90')
    .withColumn('elenco', explode(col('elenco')))
)

actor_music = df_music.groupBy(col('elenco')).count().orderBy('count', ascending=False).first()[0]
cantidad_music = df_music.groupBy(col('elenco')).count().orderBy('count', ascending=False).first()[1]

print(f'El/la actor/actriz que más ha trabajado bajo estas condiciones es {actor_music} con {cantidad_music} películas.')

### 7. Para el actor que más producciones ha realizado calcula cuántas semanas han pasado desde su primera producción hasta su última.

Primero, extraigo los actores del elenco para que estén cada uno en una fila con su producción (`df_producciones`). Después, agrupo por elenco y hago el conteo para sacar el que más producciones ha realizado (`actor_producciones`).

Con este dato, hago un filtro en el que selecciono solo las producciones de ese actor (`primer_actor`).

Las fechas de estreno solo están por año, por lo que paso los años a fecha (se considera que es el 1 de enero para cada uno) y de ahí obtengo la fecha menor y la mayor. Con `.days` obtengo el número de días, y al dividirlo entre 7, el número exacto de semanas.

In [None]:
from pyspark.sql.functions import min, max

df_producciones = df.withColumn('elenco', explode(col('elenco')))

primer_actor = df_producciones.groupBy(col('elenco')).count().orderBy('count', ascending=False).first()[0] # Nombre de actor/actriz con más producciones.

producciones_actor = df_producciones.filter(f'elenco == "{primer_actor}"').withColumn('estreno', to_date('estreno')) # Lista de producciones de ese actor/actriz.

primera_prod = producciones_actor.select(min('estreno')).collect()[0][0] # Fecha de primera producción.
ultima_prod = producciones_actor.select(max('estreno')).collect()[0][0] # Fecha de última producción.

semanas = round((ultima_prod - primera_prod).days / 7, 2)

print(f'El/la actor/actriz con más producciones es {primer_actor}, y han pasado {semanas} semanas desde su primera producción hasta la última.')

### 8. Transforma la columna de géneros para que su contenido sea un array con los valores de cada género por registro.

Esto ya lo realicé anteriormente en el apartado 4 al ajustar los tipos de cada columna.

Individualmente sería con: `df = df.withColumn('genero', split(df['genero'], ', '))`, lo cual divide los géneros en base al separador de la coma y el espacio; y los añade a un array, de forma que cada registro tiene un array con los géneros.

Adjunto el esquema para comprobar que está realizado.

In [None]:
df.printSchema()

### 9. ¿Cuántas producciones se han realizado en un único país y cuántas tienen 2 o más países?

Para obtener las producciones por países, como es una columna con arrays, solamente tengo que filtrar por la columna, definiendo los que cuyo tamaño sea de 1 (es decir, un país) y los que cuyo tamaño sea de 2 o más (es decir, dos países o más).

Hago un conteo con `count()` para obtener el número total.

In [None]:
from pyspark.sql.functions import size

prod_un_pais = df.filter(size(col('pais')) == 1).count()
prod_varios_paises = df.filter(size(col('pais')) >= 2).count()

print(f'''Hay {prod_un_pais} producciones realizadas por un único país.
Hay {prod_varios_paises} producciones realizadas por dos o más países.''')

### 10. Escribe el dataframe final como un fichero parquet.

Para pasar el dataframe (`df`) a un archivo parquet, uso la función `write.parquet()`. Apunto la ruta y su nombre y elijo el modo sobreescribir.

In [None]:
df.write.parquet(path='/tmp/dataframe_netflix.parquet', mode='overwrite')

Para comprobar que se guardó correctamente, pruebo a leer el archivo y mostrarlo.

In [None]:
prueba_parquet = spark.read.parquet('/tmp/dataframe_netflix.parquet')

prueba_parquet.show()