# Práctica Spark Adrián García Bruzón

## 1. Introduccion

En el presente "notebook" se incluyen las soluciones implementadas a la práctica de la asignatura Sistemas Distribuidos 2

## 2. Configuración inicial

En esta sección se ejecutan las configuraciones iniciales de la API de Spark

In [2]:
# Este chunk es para ejecutarlo si tenemos dos versiones de Java instaladas en el sistema
# Load external packages programatically
import os
# packages = "com.databricks:spark-xml_2.11:0.5.0"

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["JAVA_HOME"]

# os.environ["PYSPARK_SUBMIT_ARGS"] = (
#     "--packages {0} pyspark-shell".format(packages)
# )

'/usr/lib/jvm/java-8-openjdk-amd64'

Iniciamos la sesión Spark. Incluyendo el número de núcleos con el que vamos a trabajar y el nombre de la aplicación

In [96]:
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.driver.cores", 1)
    .appName("Practica TMDB-5000")
    .getOrCreate() )
sc = spark.sparkContext

spark

## 3. Preprocesado

In [97]:
# Importamos las librerias necesarias para la práctica
from pyspark.sql import Row
from pyspark.sql import Window
from pyspark.sql import functions
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import * # from_json, col, collect_list, getItem, desc, datediff


from pprint import pprint
import rapidjson
import json

In [98]:
# Define el data scheme para el archivo de créditos
creditsSchema = StructType([StructField("movie_id", LongType(), True), 
                           StructField("title", StringType(), True),
                           StructField("cast", StringType(), True),
                           StructField("crew", StringType(), True)])

In [99]:
# Define el data scheme para el archivo de peliculas
moviesSchema = StructType([StructField("budget", LongType(), True), 
                           StructField("genres", StringType(), True),
                           StructField("homepage", StringType(), True),
                           StructField("id", LongType(), True),
                           StructField("keywords", StringType(), True),
                           StructField("original_language", StringType(), True),
                           StructField("original_title", StringType(), True),
                           StructField("overview", StringType(), True),
                           StructField("popularity", FloatType(), True),
                           StructField("production_companies", StringType(), True),
                           StructField("production_countries", StringType(), True),
                           StructField("release_date", DateType(), True),
                           StructField("revenue", LongType(), True), 
                           StructField("runtime", LongType(), True), 
                           StructField("spoken_langugaes", StringType(), True),
                           StructField("status", StringType(), True),
                           StructField("tagline", StringType(), True),
                           StructField("title", StringType(), True),
                           StructField("vote_average", FloatType(), True),
                           StructField("vote_count", FloatType(), True)])

In [100]:
#Leemos el archivo créditos según el esquema definido
credits = (spark.read.option("header", "true")
           .option("quote", "\"")  # Tuning escape double quotes
           .option("escape", "\"") # Tuning escape double quotes
           .csv("./tmdb_5000_credits.csv", schema=creditsSchema)
          )

In [101]:
credits.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)



In [102]:
#Leemos el archivo películas según el esquema definido
movies = (spark.read.option("header", "true")
           .option("dateFormat", "yyyy-MM-dd") 
           .option("quote", "\"")  # Tuning escape double quotes
           .option("escape", "\"") # Tuning escape double quotes
           .csv("./tmdb_5000_movies.csv", schema=moviesSchema)
          )

In [103]:
movies.printSchema()

root
 |-- budget: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: long (nullable = true)
 |-- spoken_langugaes: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: float (nullable = true)



In [104]:
# Creamos el esquema para las variables json que hemos encontrado en el archivo créditos
cast_json_schema = ArrayType(StructType([StructField("cast_id", IntegerType()),
                                         StructField("character", StringType()),
                                         StructField("credit_id", StringType()),
                                         StructField("gender", StringType()),
                                         StructField("id", IntegerType()),
                                         StructField("name", StringType()),
                                         StructField("order", IntegerType())]))

crew_json_schema = ArrayType(StructType([StructField("credit_id", StringType()),
                                         StructField("department", StringType()),
                                         StructField("gender", StringType()),
                                         StructField("id", IntegerType()),
                                         StructField("job", StringType()),
                                         StructField("name", StringType())]))

In [105]:
# Creamos el esquema para las variables json que hemos encontrado en el archivo películas
genres_json_schema = ArrayType(StructType([StructField("id", IntegerType()),
                                         StructField("name", StringType())]))

keywords_json_schema = ArrayType(StructType([StructField("id", IntegerType()),
                                         StructField("name", StringType())]))

pro_companies_json_schema = ArrayType(StructType([StructField("id", IntegerType()),
                                         StructField("name", StringType())]))

pro_countries_json_schema = ArrayType(StructType([StructField("iso_3166_1", StringType()),
                                         StructField("name", StringType())]))

spoken_json_schema = ArrayType(StructType([StructField("iso_3166_1", StringType()),
                                         StructField("name", StringType())]))

In [106]:
# Usamos la función "from json" para incluir las columnas enbebidas dentro de las columnas json
# Manteniendo el esquema general del dataframe del archivo créditos
credits = (credits.withColumn('cast', from_json(col('cast'), cast_json_schema))
           .withColumn('crew', from_json(col('crew'), crew_json_schema)) )

In [107]:
credits.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cast_id: integer (nullable = true)
 |    |    |-- character: string (nullable = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: string (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- order: integer (nullable = true)
 |-- crew: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- department: string (nullable = true)
 |    |    |-- gender: string (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- job: string (nullable = true)
 |    |    |-- name: string (nullable = true)



In [108]:
# De la misma forma para el archivo películas
movies = (movies.withColumn('genres', from_json(col('genres'), genres_json_schema))
           .withColumn('keywords', from_json(col('keywords'), keywords_json_schema))
           .withColumn('production_companies', from_json(col('production_companies'), pro_companies_json_schema)) 
           .withColumn('production_countries', from_json(col('production_countries'), pro_countries_json_schema)) 
           .withColumn('spoken_langugaes', from_json(col('spoken_langugaes'), spoken_json_schema)))       

In [109]:
movies.printSchema()

root
 |-- budget: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- production_countries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- iso_3166_1: string (nullable = true)
 |    |    |-- n

In [110]:
#En este chunk vamos a filtrar el dataframe eliminando los registros nulos de las variables que vamos a utilizar
movies = movies.filter("title != '0'")
movies = movies.filter("release_date != 'null'")

## 4. Resultados de las consultas

### 4.1 Consulta 1
Obtener el ranking de las 10 mejores películas de toda la lista según el promedio de puntuación de votación (vote_average) obtenido.

In [111]:
ran_vote =(movies
 .select(col('title').alias('Titulo'), col('vote_average').alias('Puntuación')).sort(desc("vote_average"))
 .limit(10))
ran_vote.show()

+--------------------+----------+
|              Titulo|Puntuación|
+--------------------+----------+
|    Stiff Upper Lips|      10.0|
|Dancer, Texas Pop...|      10.0|
|Me You and Five B...|      10.0|
|      Little Big Top|      10.0|
|           Sardaarji|       9.5|
|      One Man's Hero|       9.3|
|The Shawshank Red...|       8.5|
|  There Goes My Baby|       8.5|
|The Prisoner of Z...|       8.4|
|       The Godfather|       8.4|
+--------------------+----------+



### 4.2 Consulta 2
Obtener el ranking de los 10 géneros de películas que han tenido mayor presupuesto promedio por género.

In [112]:
gen_bud = (movies.select(col('title'),
               explode(col('genres')).alias('genres'), col('budget'))
 .select(col('title'), col('genres').getItem('name').alias('Genero'), col('budget').alias('Presupuesto'))
 .groupBy(col('Genero'))
 .sum("Presupuesto")
 .withColumnRenamed("sum(Presupuesto)", "Presupuesto")
 .sort(desc("Presupuesto"))
 .limit(10))
gen_bud.show()

+---------------+-----------+
|         Genero|Presupuesto|
+---------------+-----------+
|         Action|59443406599|
|      Adventure|52391220463|
|          Drama|47498112157|
|         Comedy|43582711809|
|       Thriller|40727495424|
|Science Fiction|27748069865|
|        Fantasy|26942696595|
|         Family|26019109625|
|          Crime|19383466474|
|        Romance|18158357743|
+---------------+-----------+



### 4.3 Consulta 3
Encuentra a la persona (miembro del casting o del equipo) que ha participado en más películas. Después, encuentra al director que ha participado en más películas.

In [113]:
#Creamos un dataframe con las veces que una persona del casting a trabajado en una pelicula

casting = (credits.select(col('title'),explode(col('cast')).alias('cast')) 
 .select(col('title').alias('Pelicula'),col('cast').getItem('name').alias('Persona'))
         .drop_duplicates()
         .groupBy('Persona')
         .count()
          )

#Creamos un dataframe con las veces que una persona del equipo a trabajado en una pelicula
crew = (credits.select(col('title'), explode(col('crew')).alias('crew')) 
 .select(col('title'), col('crew').getItem('name').alias('Persona'))
        .drop_duplicates()
        .groupBy('Persona')
        .count()
        )

#Unimos por filas los dos dataframe.

casting_crew = casting.union(crew).orderBy('count', ascending=False)
casting_crew.show(10)

+-------------------+-----+
|            Persona|count|
+-------------------+-----+
|        Avy Kaufman|   83|
|       Mary Vernieu|   81|
|     Deborah Aquila|   74|
|        Hans Zimmer|   71|
|James Newton Howard|   68|
|   Harvey Weinstein|   68|
|        Tricia Wood|   67|
|      Bob Weinstein|   67|
|  Samuel L. Jackson|   67|
|   Steven Spielberg|   65|
+-------------------+-----+
only showing top 10 rows



In [114]:
# Mostramos el director que ha participado en más peliculas
best_dir =(credits.select(explode(col('crew')).alias('crew')) 
 .select(col('crew').getItem('name').alias('Persona'), col('crew').getItem('job').alias('Trabajo'))
 .filter("Trabajo == 'Director'")
 .groupBy('Persona')
 .count()            
 .orderBy('count', ascending=False)            
 .limit(1))
best_dir.show()

+----------------+-----+
|         Persona|count|
+----------------+-----+
|Steven Spielberg|   27|
+----------------+-----+



Se puede ver como Steven Spielberg aparece en la primera subconulta con un count de 65 mientras que solo a dirigido 27 peliculas. Esto es que este director también ha trabajado en otras peliculas como guionista etc. 

### 4.4 Consulta 4
Utilizando como referencia la fecha de lanzamiento de la película, calcula para cada director el tiempo medio que transcurre entre la dirección de una película y la dirección de la siguiente. Calcula el ranking de los 10 directores cuyo tiempo medio entre películas sea el más bajo.

In [115]:
#En esta consulta necesitamos información de ambos datasets por lo que vamos a extraer las columnas necesarias en variables
directores = (credits.select(col('title'),
             explode(col('crew')).alias('crew')) 
             .select(col('title').alias('Pelicula'), col('crew').getItem('name').alias('Director'), 
             col('crew').getItem('job').alias('Trabajo'))
             .filter("Trabajo == 'Director'"))

estrenos = movies.select(col('title').alias('Pelicula'), col('release_date').alias('Estreno'))

#Posterirmente vamos a unirlas en un unico dataframe de trabajo
dir_est= (directores.join(estrenos, directores.Pelicula == estrenos.Pelicula)
         .select(directores.Director, estrenos.Estreno))
dir_est.show(10)

+-----------------+----------+
|         Director|   Estreno|
+-----------------+----------+
|    James Cameron|2009-12-10|
|   Gore Verbinski|2007-05-19|
|       Sam Mendes|2015-10-26|
|Christopher Nolan|2012-07-16|
|   Andrew Stanton|2012-03-07|
|        Sam Raimi|2007-05-01|
|     Byron Howard|2010-11-24|
|     Nathan Greno|2010-11-24|
|      Joss Whedon|2015-04-22|
|      David Yates|2009-07-07|
+-----------------+----------+
only showing top 10 rows



In [116]:
#Creamos una ventana para poder particionar el datafreme por Director, ordenandolas por fechas de estreno. 
overCategory = Window.partitionBy("Director").orderBy("Estreno")

#Con esta ventana creamos una nueva columna que nos da el siguiente estreno de cada director.
dir_est = dir_est.withColumn(
  "lead", lead("Estreno", 1).over(overCategory)).select("Director", "Estreno", "lead")

#Calculamos los días entre estrenos, sumando cero días cuando no hay estrenos posteriores
dif_dir = dir_est.withColumn(
    "tiempo_trabajos", when(col("lead").isNull(), 0).otherwise(datediff('lead', "Estreno")))

#Finalmente calculamos el tiempo medio en dias que transcurre entre la dirección de una película y la dirección de la siguiente.
# Los tiempos ceros son para directores que solo han dirigido una pelicula. 
avg_dir = (dif_dir.groupBy('Director')
         .agg(functions.avg('tiempo_trabajos'))
         .withColumnRenamed("avg(tiempo_trabajos)", "Tiempo_medio")
         .show(10))

+-----------------+------------+
|         Director|Tiempo_medio|
+-----------------+------------+
|   Aleksey German|         0.0|
|   Allison Anders|       689.0|
|       Bobby Roth|         0.0|
|     Jim Jarmusch|         0.0|
|      John Milius|      1071.0|
|       John Wells|       322.5|
|       Rob Bowman|      1034.0|
|  Thomas Langmann|         0.0|
|Wash Westmoreland|      1552.5|
|     Wayne Kramer|       542.5|
+-----------------+------------+
only showing top 10 rows



In [117]:
#Por último mostramos el ranking de los 10 directores cuyo tiempo medio entre películas es el más bajo.
ran10dir = (dif_dir.groupBy('Director')
         .agg(functions.avg('tiempo_trabajos'))
         .withColumnRenamed("avg(tiempo_trabajos)", "Tiempo_medio")
         .filter('Tiempo_medio > 0')
         .orderBy("Tiempo_medio"))

ran10dir.show(10)


+------------------+------------+
|          Director|Tiempo_medio|
+------------------+------------+
| Michael O. Sajbel|         3.5|
|  Christian Alvart|        13.0|
|    Youssef Delara|        19.5|
|      Darren Stein|        24.0|
|  Robert Greenwald|        29.0|
|Michael Landon Jr.|        63.0|
|       J.B. Rogers|        75.5|
| Richard Schenkman|        80.0|
|         Fred Wolf|       101.5|
|       Jason Trost|       113.5|
+------------------+------------+
only showing top 10 rows



## 6. Conversión de resultados a Pandas DataFrames

In [118]:
#Consulta 1
ran_vote = gen_bud.toPandas()

#Consulta 2
gen_bud = gen_bud.toPandas()

#Consulta 3
casting_crew = casting_crew.toPandas()
best_dir = best_dir.toPandas()

#Consulta 4
ran10dir= ran10dir.toPandas()

## 6. Cerrar la sesión Spark

In [129]:
spark.stop()