# Practica Final
Modulo 6 - Máster Data Science y Business Analytics
Sergio Hervás Aragón

Importaciones

In [None]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import split, to_date

Al trabajar con pyspark, configuraremos el entorno creando una sesion de Spark

In [None]:
# master(String master): Establece la dirección URL maestra de Spark a la que se va a conectar
# appName(String name): Establece un nombre para la aplicación, que se mostrará en la interfaz de usuario web de Spark.
# config(String key, double value): Establece una opción de configuración.
# getOrCreate(): Obtiene una SparkSession existente o, si no hay ninguna, crea una nueva uno basado en las opciones establecidas en este constructor.

try:
    spark = SparkSession.builder\
        .master('local')\
        .appName('netflix_titles')\
        .config('spark.ui.port', '4050')\
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
        .getOrCreate()
except Exception as e:
    print(f'Ha ocurrido un error: {str(e)}')

1.	Leer todos los csv descomprimidos guardados en la ruta de vuestro tmp en una sola línea de código (pista, usar wildcards para leer más de un fichero a la vez)

En primer lugar, procederemos a descarganos los archivos mediante una linea de comandos linux

In [None]:
# Declaro una lista con los archivos de los cuales voy ha hacer uso y me voy a descargar
csv_files = [
    '/netflix_titles_dirty_01.csv',
    '/netflix_titles_dirty_02.csv',
    '/netflix_titles_dirty_03.csv',
    '/netflix_titles_dirty_04.csv',
    '/netflix_titles_dirty_05.csv',
    '/netflix_titles_dirty_06.csv',
    '/netflix_titles_dirty_07.csv'
]

# Variable la cual usaremos en un futuro para indicar donde almacenar los archivos de la lista
first_ending = '/tmp'
# Variable la cual usaremos en un futuro para indicar la manera en la que nos vamos a descargar los archivos de la lista
ultimate_termination = '.gz'
# Bucle con el que recorreremos la lista
for file in csv_files:    
    # Declararemos dos variables que nos serviran para validar, si: 1º - Esta descargado el archivo, y 2º - Si el archivo esta descomprimido
    compressed_path = first_ending + file + ultimate_termination
    decompressed_path = first_ending + file    
    # Buscamos el path, y si no existe tanto el comprimido como el descomprimido, lo descargaremos y los descomprimiremos
    if not os.path.exists(compressed_path and decompressed_path):
        url = f'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data{file}.gz?raw=True'
        try:
            # Descargamos
            ! wget -O $compressed_path $url
            # Descomprimimos
            ! gunzip $compressed_path
        except Exception as e:
            print(f'Ha ocurrido un error: {str(e)}')
        finally:
            print(f'Archivo {file} descargado y descomprimido ')
    else:
        print(f'El fichero {file} ya existe')

In [None]:
df = spark.read.csv('./../../../tmp/*csv', sep='\t', header=False)
df.show(1, truncate=False)

2.	Analiza las columnas y renómbralos con un nombre que tenga sentido para cada una

In [None]:
# Declaro una lista que contendran los nombres de las columnas nuevas las cuales van a ser las que van a dar nombre a cada columna
columsNames = ['id','type','movie_name','director','actors','country','release_date','year','age_classification','duration','gender','description']

for item in range(len(columsNames)):
    # Reemplazo con el bucle, en primera posicion, el nombre de la columba original, y en segunda posicion el nuevo nombre
    df = df.withColumnRenamed(f'_c{item}', columsNames[item])
    
df.show(1)

3.	Limpia el dataframe para que no existan nulos, adicionalmente elimina todos los valores que no se correspondan con el resto de datos de la columna

In [None]:
# Declaremos un metodo que nos servira para ver cuantos nulos hay por columnas en nuestro df
def counts_off_nulls_spark(columsNames, df):
    print('Conteo de nulos por columnas:')
    for columns_name in columsNames:
        # Recorro el array de mis columnas filtrando por nombre de las columnas los valores que son nulos, y haciendo un conteo de estos
        number_nulls_columns = df.filter(df[columns_name].isNull()).count()
        print(f'\t{columns_name}: {number_nulls_columns} nulls number')
    

In [None]:
# Funcion que nos realiza un conteo de nulos antes de eliminarlos
counts_off_nulls_spark(columsNames,df)
# Eliminamos los nulos
df = df.dropna()
# Funcion que nos realiza un conteo de nulos despues de eliminarlos
counts_off_nulls_spark(columsNames,df)

4.	Revisa el tipo de dato de cada columna y parsealo según corresponda (la columna duración debe ser numérico)

In [None]:
# Realizamos primeramente un arreglo de la columna duracion, donde separaremos el contenido mediente el espacio formando dos
# columnas con la respectiva información, y elimaremos la columna original
if 'duration' in df.columns:
    df = df.\
        withColumn('time_duration', split(df['duration'], ' ')[0]).\
        withColumn('type_duration', split(df['duration'], ' ')[1]).\
        drop('duration')
    
# Una vez todas las columnas como queremos, procedemos al parseo de la información, emprezando por los enteros (id, year, time_duration)
df = df.\
    withColumn('id', df['id'].cast('int')).\
    withColumn('year', df['year'].cast('int')).\
    withColumn('time_duration', df['time_duration'].cast('int'))
    
# Seguidamente procederemos al parseo de la información en tipo Date (release_date)
df = df.withColumn("release_date", to_date(df['release_date'], 'MMMM dd, yyyy'))
    
df.printSchema()
df.show(1, truncate=False)

5.	Calcula la duración media en función del país

2.	Analiza las columnas y renómbralos con un nombre que tenga sentido para cada una

In [5]:
# Declaro una lista que contendran los nombres de las columnas nuevas las cuales van a ser las que van a dar nombre a cada columna
columsNames = ['id','type','movie_name','director','actors','country','release_date','year','age_classification','duration','gender','description']

for item in range(len(columsNames)):
    # Reemplazo con el bucle, en primera posicion, el nombre de la columba original, y en segunda posicion el nuevo nombre
    df = df.withColumnRenamed(f'_c{item}', columsNames[item])
    
df.show(1)

+--------+-----+-------------------+-----------+------------+-------------+-------------+----+------------------+--------+---------------+--------------------+
|      id| type|         movie_name|   director|      actors|      country| release_date|year|age_classification|duration|         gender|         description|
+--------+-----+-------------------+-----------+------------+-------------+-------------+----+------------------+--------+---------------+--------------------+
|80044126|Movie|D.L. Hughley: Clear|Jay Chapman|D.L. Hughley|United States|July 13, 2017|2014|             TV-MA|  59 min|Stand-Up Comedy|In this 2014 stan...|
+--------+-----+-------------------+-----------+------------+-------------+-------------+----+------------------+--------+---------------+--------------------+
only showing top 1 row



3.	Limpia el dataframe para que no existan nulos, adicionalmente elimina todos los valores que no se correspondan con el resto de datos de la columna

In [6]:
# Declaremos un metodo que nos servira para ver cuantos nulos hay por columnas en nuestro df
def counts_off_nulls_spark(columsNames, df):
    print('Conteo de nulos por columnas:')
    for columns_name in columsNames:
        # Recorro el array de mis columnas filtrando por nombre de las columnas los valores que son nulos, y haciendo un conteo de estos
        number_nulls_columns = df.filter(df[columns_name].isNull()).count()
        print(f'\t{columns_name}: {number_nulls_columns} nulls number')
    

In [7]:
# Funcion que nos realiza un conteo de nulos antes de eliminarlos
counts_off_nulls_spark(columsNames,df)
# Eliminamos los nulos
df = df.dropna()
# Funcion que nos realiza un conteo de nulos despues de eliminarlos
counts_off_nulls_spark(columsNames,df)

Conteo de nulos por columnas:


                                                                                

	id: 2 nulls number
	type: 59 nulls number
	movie_name: 64 nulls number
	director: 2012 nulls number
	actors: 629 nulls number
	country: 537 nulls number
	release_date: 80 nulls number
	year: 74 nulls number
	age_classification: 84 nulls number
	duration: 76 nulls number
	gender: 83 nulls number
	description: 86 nulls number
Conteo de nulos por columnas:
	id: 0 nulls number
	type: 0 nulls number
	movie_name: 0 nulls number
	director: 0 nulls number
	actors: 0 nulls number
	country: 0 nulls number
	release_date: 0 nulls number
	year: 0 nulls number
	age_classification: 0 nulls number
	duration: 0 nulls number
	gender: 0 nulls number
	description: 0 nulls number


4.	Revisa el tipo de dato de cada columna y parsealo según corresponda (la columna duración debe ser numérico)

In [8]:
try:
    # Realizamos primeramente un arreglo de la columna duracion, donde separaremos el contenido mediente el espacio formando dos
    # columnas con la respectiva información, y elimaremos la columna original
    if 'duration' in df.columns:
        df = df.\
            withColumn('time_duration', split(df['duration'], ' ')[0]).\
            withColumn('type_duration', split(df['duration'], ' ')[1]).\
            drop('duration')
except Exception as e:
    print(f'Ha ocurrido un error: {str(e)}')
finally:
    print(f'Parseo de fecha finalizado')
    
try:
# Una vez todas las columnas como queremos, procedemos al parseo de la información, emprezando por los enteros (id, year, time_duration)
    df = df.\
        withColumn('id', df['id'].cast('int')).\
        withColumn('year', df['year'].cast('int')).\
        withColumn('time_duration', df['time_duration'].cast('int'))
except Exception as e:
    print(f'Ha ocurrido un error: {str(e)}')
finally:
    print(f'Parseo de enteros finalizado')
    
try:
    # Seguidamente procederemos al parseo de la información en tipo Date (release_date)
    df = df.withColumn("release_date", to_date(df['release_date'], 'MMMM dd, yyyy'))
except Exception as e:
    print(f'Ha ocurrido un error: {str(e)}')
finally:
    print(f'Parseo de fechas finalizado')
    
df.printSchema()
df.show(1, truncate=False)

Parseo de fecha finalizado
Parseo de enteros finalizado
Parseo de fechas finalizado
root
 |-- id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- director: string (nullable = true)
 |-- actors: string (nullable = true)
 |-- country: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- age_classification: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- description: string (nullable = true)
 |-- time_duration: integer (nullable = true)
 |-- type_duration: string (nullable = true)

+--------+-----+-------------------+-----------+------------+-------------+------------+----+------------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------+-------------+-------------+
|id      |type |movie_name         |director   |actors      |country      |r

5.	Calcula la duración media en función del país

# Bibliografía 

 - [Como hacer hipervinculos](https://learn.microsoft.com/es-es/contribute/content/how-to-write-links)

#### Apartado 1:

 - [¿Que son los wildcards?](https://support.microsoft.com/en-us/office/examples-of-wildcard-characters-939e153f-bd30-47e4-a763-61897c87b3f4#:~:text=Wildcards%20are%20special%20characters%20that,named%20John%20on%20Park%20Street.)

 - [Visualización de la ejecución de mi aplicación netflix_titles en modo local (http://localhost:4050/)](http://localhost:4050/)

 - [Información del objeto builder](https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/SparkSession.Builder.html)

 - [Expansión del patrón de nombres de ruta de estilo Unix (glob)](https://docs.python.org/es/3/library/glob.html)

 - [path](https://www.guru99.com/es/python-check-if-file-exists.html)

 - [Ejecutar comandos de shell en Jupyter Notebook](https://blogs.upm.es/estudiaciencia/variables-en-bash/)

 #### Apartado 2:

 - [Cambiar el nombre de columnas usando 'withColumnRenamed'](https://www.machinelearningplus.com/pyspark/pyspark-rename-columns/?utm_content=cmp-true)

 #### Apartado 3:

 - [pyspark.sql.DataFrame.printSchema](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.printSchema.html)

 - [truncate (referencia apartado 11)](https://stackoverflow.com/questions/33742895/how-to-show-full-column-content-in-a-spark-dataframe)

 #### Apartado 4:

 - [Cast Column Type With Example](https://sparkbyexamples.com/pyspark/pyspark-cast-column-type/)

 - [Spark – Split DataFrame single column into multiple columns](https://sparkbyexamples.com/spark/spark-split-dataframe-column-into-multiple-columns/)

 - [Ver etiquetas de columnas](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.columns.html)

 - [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER](https://community.databricks.com/t5/data-engineering/inconsistent-behavior-cross-version-parse-datetime-by-new-parser/td-p/43674)

 #### Apartado 5