# Examen ETL: SPARK 12+1/02

## José María Álvarez Silva

Se podrá utilizar toda la información que se encuentra en el campus. 

Se va a trabajar sobre varios ficheros de datos:

Usuarios: id_usuario::sexo::edad::id_profesion::codigo_postal

Peliculas: id_pelicula::titulo (año)::tipo1|tipo2|tipo3

Ratings: id_pelicula::id_usuario::puntuacion::fecha_timestamp

A cada una de las preguntas hay que responder explicando brevemente que se pretende hacer antes de lanzar el código.

Al documento lo llamareís con vuestro nombre y apellido. Debeís enviarlo a mi correo de CUNEF antes del final del examen.

El lenguaje para trabajar con Spark podrá ser python o R indistintamente.

In [126]:
# Nota: Carga de las puntuaciones
# Función para parsear la fecha

from datetime import datetime
dateparse = lambda x: datetime.fromtimestamp(float(x))

In [185]:
import pandas as pd

## Primera tarea: Inicializar spark context y cargar los datos desde los ficheros.

In [127]:
from pyspark import SparkContext


SQL

In [128]:
sc = SparkContext()

In [129]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql import Row

Ratings

In [130]:
ratings_data_file = "./ratings.dat.txt"
ratings_raw_data = sc.textFile(ratings_data_file)

In [131]:
ratings_split_data = ratings_raw_data.map(lambda x : x.split("::")).map(lambda x : [x[0], x[1], float(x[2]), dateparse(x[3]).year])
ratings_split_data.take(5)

[['1', '1193', 5.0, 2000],
 ['1', '661', 3.0, 2000],
 ['1', '914', 3.0, 2000],
 ['1', '3408', 4.0, 2000],
 ['1', '2355', 5.0, 2001]]

In [132]:
ratings_row_data = ratings_split_data.map(lambda p: Row(
    id_pelicula_r = p[0], 
    id_usuario_r = p[1],
    puntuacion = float(p[2]),
    year_r = p[3]
    )
)

In [133]:
ratings_df = sqlContext.createDataFrame(ratings_row_data)
ratings_df.registerTempTable("ratings")

Users

In [134]:
users_data_file = "./users.dat.txt"
users_raw_data = sc.textFile(users_data_file)

In [135]:
users_split_data = users_raw_data.map(lambda x : x.split("::"))
users_split_data.take(5)

[['1', 'F', '1', '10', '48067'],
 ['2', 'M', '56', '16', '70072'],
 ['3', 'M', '25', '15', '55117'],
 ['4', 'M', '45', '7', '02460'],
 ['5', 'M', '25', '20', '55455']]

In [136]:
users_row_data = users_split_data.map(lambda p: Row(
    id_usuario = p[0], 
    sexo = p[1],
    edad = int(p[2]),
    id_profesion = p[3],
    codigo_postal = p[4]
    )
)

In [137]:
users_df = sqlContext.createDataFrame(users_row_data)
users_df.registerTempTable("users")

Movies

In [138]:
movies_data_file = "./movies.dat.txt"
movies_raw_data = sc.textFile(movies_data_file)

In [139]:
movies_split_data = movies_raw_data.map(lambda x : x.split("::")).map(lambda x : [x[0], x[1][0:-7], x[1][-5:-1], (x[2].split("|"))])
movies_split_data.take(5)

[['1', 'Toy Story', '1995', ['Animation', "Children's", 'Comedy']],
 ['2', 'Jumanji', '1995', ['Adventure', "Children's", 'Fantasy']],
 ['3', 'Grumpier Old Men', '1995', ['Comedy', 'Romance']],
 ['4', 'Waiting to Exhale', '1995', ['Comedy', 'Drama']],
 ['5', 'Father of the Bride Part II', '1995', ['Comedy']]]

In [140]:
## Probar
ejemplo = 'Toy Story (1995)'
ejemplo[-5:-1], ejemplo[0:-7]

('1995', 'Toy Story')

In [141]:
movies_row_data = movies_split_data.map(lambda p: Row(
    id_pelicula = p[0], 
    titulo = p[1],
    year = p[2],
    )
)

In [142]:
movies_df = sqlContext.createDataFrame(movies_row_data)
movies_df.registerTempTable("movies")

## Segunda tarea: Media de puntuaciones globales por año. ¿Hay algún año significativamente distinto?

In [95]:
media_puntuaciones_por_año = sqlContext.sql("""
    SELECT year_r, AVG(puntuacion) FROM ratings GROUP BY year_r
""")
media_puntuaciones_por_año.show()

+----+------------------+
|year|   avg(puntuacion)|
+----+------------------+
|2003| 3.486559139784946|
|2002| 3.458828911253431|
|2000|3.5903916742285498|
|2001|3.5122542537247643|
+----+------------------+



Obtener dos rdd's: uno que contenga el número de puntuaciones y otro que contenga la suma de puntuaciones por año. Otro rdd que sea el join de ambas y mapearlo para dividir la suma entre n; obteniendo asi la media por año.

In [96]:
n_ratings_por_año = ratings_split_data.map(lambda line: (line[3], 1)).reduceByKey(lambda a, b: a + b)
n_ratings_por_año.take(5)

[(2000, 904757), (2001, 68058), (2002, 24046), (2003, 3348)]

In [97]:
suma_ratings_por_año = ratings_split_data.map(lambda line: (line[3], line[2])).reduceByKey(lambda a, b: a + b)
suma_ratings_por_año.take(5)

[(2000, 3248432.0), (2001, 239037.0), (2002, 83171.0), (2003, 11673.0)]

In [98]:
n_ratings_por_año.join(suma_ratings_por_año).collect()

[(2000, (904757, 3248432.0)),
 (2001, (68058, 239037.0)),
 (2002, (24046, 83171.0)),
 (2003, (3348, 11673.0))]

In [99]:
n_ratings_por_año.join(suma_ratings_por_año).map(lambda year : (year[0], (int(year[1][1]) / year[1][0]))).collect()

[(2000, 3.5903916742285498),
 (2001, 3.5122542537247643),
 (2002, 3.458828911253431),
 (2003, 3.486559139784946)]

## Tercera pregunta: ¿Cuál es la película más votada por los mayores de 60? 

In [108]:
mayores_60 = sqlContext.sql("""
    SELECT * FROM users WHERE edad > 59
""")
mayores_60.show()

+-------------+----+------------+----------+----+
|codigo_postal|edad|id_profesion|id_usuario|sexo|
+-------------+----+------------+----------+----+
+-------------+----+------------+----------+----+



In [144]:
mayores_60 = sqlContext.sql("""
    SELECT DISTINCT(edad) FROM users
""")
mayores_60.show()

+----+
|edad|
+----+
|  50|
|  25|
|  56|
|   1|
|  35|
|  18|
|  45|
+----+



### MODIFICACIÓN Tercera pregunta: ¿Cuál es la película más votada por los mayores de 50? 

In [241]:
prueba = sqlContext.sql("""
    SELECT MAX(votos)
    FROM (SELECT titulo, COUNT(titulo) as votos
    FROM ((SELECT * FROM users WHERE edad > 49) JOIN ( movies JOIN ratings ON id_pelicula = id_pelicula_r) ON id_usuario = id_usuario_r  )
    GROUP BY titulo)
""")
prueba.show()

+----------+
|max(votos)|
+----------+
|       429|
+----------+



Hacer un join de las tres bases de datos para tener en un mismo rdd el título de la pelicula y las calificaciones. Antes de unir las bases de datos necesito hacer un filtro para que contenga solo mayores de 50 (tabla users). las conecciones de las tablas se establecen atravez de las variables  id user y id pelicula, las cuales utilizaremos como key para el join. Depués contaremos las veces que cada titulo fue votado para encontrar la pelucila con más votos.

In [191]:
## id user, id pelicula 
temp_ratings = ratings_split_data.map(lambda line: (line[0], line[1]))
temp_ratings.take(5)

[('1', '1193'), ('1', '661'), ('1', '914'), ('1', '3408'), ('1', '2355')]

In [192]:
## id pelicula, titulo 
temp_movies = movies_split_data.map(lambda line: (line[0], line[1]))
temp_movies.take(5)

[('1', 'Toy Story'),
 ('2', 'Jumanji'),
 ('3', 'Grumpier Old Men'),
 ('4', 'Waiting to Exhale'),
 ('5', 'Father of the Bride Part II')]

In [193]:
## id user, edad
temp_users = users_split_data.filter(lambda user : int(user[2])>49).map(lambda line: (line[0], line[2]))
temp_users.take(5)

[('2', '56'), ('6', '50'), ('17', '50'), ('31', '56'), ('54', '50')]

In [194]:
## id user, (id pelicula,edad)
ratings_users = temp_ratings.join(temp_users)
ratings_users.take(5)

[('2265', ('1249', '56')),
 ('2265', ('3936', '56')),
 ('2265', ('3937', '56')),
 ('2265', ('2054', '56')),
 ('2265', ('1254', '56'))]

In [195]:
## id_pelicula, id users
ratings_users = ratings_users.map(lambda x : (x[1][0], x[0]))
ratings_users.take(5)

[('1249', '2265'),
 ('3936', '2265'),
 ('3937', '2265'),
 ('2054', '2265'),
 ('1254', '2265')]

In [196]:
## id_pelicula, (id_users, titulo)
ratings_users_movies = ratings_users.join(temp_movies)
ratings_users_movies.take(5)

[('1805', ('571', 'Wild Things')),
 ('1805', ('2025', 'Wild Things')),
 ('1805', ('4497', 'Wild Things')),
 ('1805', ('1925', 'Wild Things')),
 ('1805', ('4156', 'Wild Things'))]

In [197]:
votos_movies = ratings_users_movies.map(lambda movie : (movie[1][1],1)).reduceByKey(lambda a, b: a + b)
votos_movies.take(5)

[('City, The', 1),
 ('Six Days Seven Nights', 50),
 ('Tora! Tora! Tora!', 44),
 ('Army of Darkness', 33),
 ('Different for Girls', 4)]

In [198]:
pd.DataFrame(votos_movies.collect()).sort(1, ascending = False)

  if __name__ == '__main__':


Unnamed: 0,0,1
763,American Beauty,432
1725,Star Wars: Episode IV - A New Hope,331
1030,Shakespeare in Love,326
396,"Godfather, The",320
2812,Star Wars: Episode V - The Empire Strikes Back,319
1706,Fargo,317
686,L.A. Confidential,307
544,Jurassic Park,306
1937,Schindler's List,305
177,Star Wars: Episode VI - Return of the Jedi,300


In [270]:
votos_movies.max(lambda x : x[1])

('American Beauty', 432)

## Cuarta pregunta: ¿Cuál es la puntuación media de las peliculas de acción del año 2000?

Filtraremos por Tipo accion el rdd de movies y por año 2000 el rdd ratings. Haremos un join entre las tablas a partir del key (id pelicula). Contaremos las calificaciones y sumaremos las puntuaciones para despues dividir y obtener la media de puntuaciones de las peliculas de accion para le año 2000.

In [208]:
def f_tipo(x):
    y = ""
    for i in x:
        y = y+i
    return(y)

In [225]:
def accion(x):
    y = 0
    for i in x:
        if(i == "Action"):
            y = 1
    return(y)

In [226]:
movies_row_data2 = movies_split_data.map(lambda p: Row(
    id_pelicula = p[0], 
    titulo = p[1],
    year = p[2],
    action = accion(p[3])
    )
)

In [227]:
movies_df2 = sqlContext.createDataFrame(movies_row_data2)
movies_df2.registerTempTable("movies2")

In [230]:
prueba = sqlContext.sql("""
    SELECT * FROM movies2 WHERE action = 1
""")
prueba.show()

+------+-----------+-------------------+----+
|action|id_pelicula|             titulo|year|
+------+-----------+-------------------+----+
|     1|          6|               Heat|1995|
|     1|          9|       Sudden Death|1995|
|     1|         10|          GoldenEye|1995|
|     1|         15|   Cutthroat Island|1995|
|     1|         20|        Money Train|1995|
|     1|         21|         Get Shorty|1995|
|     1|         42|    Dead Presidents|1995|
|     1|         44|      Mortal Kombat|1995|
|     1|         51|     Guardian Angel|1994|
|     1|         70|From Dusk Till Dawn|1996|
|     1|         71|          Fair Game|1995|
|     1|         89|       Nick of Time|1995|
|     1|         95|       Broken Arrow|1996|
|     1|         98|           Shopping|1994|
|     1|        110|         Braveheart|1995|
|     1|        112|Rumble in the Bronx|1995|
|     1|        139|             Target|1995|
|     1|        145|           Bad Boys|1995|
|     1|        153|     Batman Fo

In [231]:
prueba = sqlContext.sql("""
    SELECT * FROM ratings WHERE year_r = 2000
""")
prueba.show()

+-------------+------------+----------+------+
|id_pelicula_r|id_usuario_r|puntuacion|year_r|
+-------------+------------+----------+------+
|            1|        1193|       5.0|  2000|
|            1|         661|       3.0|  2000|
|            1|         914|       3.0|  2000|
|            1|        3408|       4.0|  2000|
|            1|        1197|       3.0|  2000|
|            1|        1287|       5.0|  2000|
|            1|        2804|       5.0|  2000|
|            1|         594|       4.0|  2000|
|            1|         919|       4.0|  2000|
|            1|         938|       4.0|  2000|
|            1|        2398|       4.0|  2000|
|            1|        2918|       4.0|  2000|
|            1|        1035|       5.0|  2000|
|            1|        2791|       4.0|  2000|
|            1|        2018|       4.0|  2000|
|            1|        3105|       5.0|  2000|
|            1|        2797|       4.0|  2000|
|            1|        2321|       3.0|  2000|
|            

In [237]:
prueba = sqlContext.sql("""
    SELECT AVG(puntuacion) FROM ((SELECT * FROM movies2 WHERE action = 1) JOIN (SELECT * FROM ratings WHERE year_r = 2000) ON id_pelicula = id_pelicula_r)
""")
prueba.show()

+------------------+
|   avg(puntuacion)|
+------------------+
|3.5394796457746964|
+------------------+



In [242]:
movies_action = movies_raw_data.map(lambda x : x.split("::")).map(lambda x : [x[0], x[1][0:-7], x[1][-5:-1], accion(x[2].split("|"))])
movies_action.take(5)

[['1', 'Toy Story', '1995', 0],
 ['2', 'Jumanji', '1995', 0],
 ['3', 'Grumpier Old Men', '1995', 0],
 ['4', 'Waiting to Exhale', '1995', 0],
 ['5', 'Father of the Bride Part II', '1995', 0]]

In [244]:
ratings_2000 = ratings_split_data.filter(lambda rate : int(rate[3]) == 2000)
ratings_2000.take(5)

[['1', '1193', 5.0, 2000],
 ['1', '661', 3.0, 2000],
 ['1', '914', 3.0, 2000],
 ['1', '3408', 4.0, 2000],
 ['1', '1197', 3.0, 2000]]

In [246]:
action_2000 = movies_action.map(lambda x : (x[0], x[3])).join(ratings_2000.map(lambda x : (x[0], x[2])))
action_2000.take(5)

[('1440', (0, 3.0)),
 ('1440', (0, 3.0)),
 ('1440', (0, 3.0)),
 ('1440', (0, 4.0)),
 ('1440', (0, 3.0))]

In [248]:
action_2000.map(lambda x : x[1][1]).sum() / action_2000.map(lambda x : x[1][1]).count()

3.5816669721007757

## Quinta pregunta: ¿ Cuál es el año en que mayor número de usuarios votaron?

Agrupamos por año y contamos el número de usuarios diferentes que votaron (unicos; i.e. sin duplicar)

In [256]:
prueba = sqlContext.sql("""
    SELECT year_r, COUNT(DISTINCT(id_usuario_r)) FROM ratings GROUP BY year_r
""")
prueba.show()

+------+----------------------------+
|year_r|count(DISTINCT id_usuario_r)|
+------+----------------------------+
|  2003|                        1601|
|  2002|                        2971|
|  2000|                        3678|
|  2001|                        3289|
+------+----------------------------+



In [262]:
ratings_year = ratings_split_data.map(lambda x : (str(x[3])+x[1])).distinct().map(lambda x :(x[0:4],1)).reduceByKey(lambda a, b: a + b)
pd.DataFrame(ratings_year.collect())

[('2003', 1601), ('2001', 3289), ('2002', 2971), ('2000', 3678)]

## Sexta pregunta: ¿ Cuál es la película con mejor puntación media?

Join entre los rdd ratings y movies, sumamos las puntuaciones por pelicula, contamos los votos por pelicula. Mapeamos para dividir la suma entre el número de votos por pelicula y obtenemos el máximo.

In [263]:
ratings_movies_join = ratings_split_data.map(lambda x : (x[0],x[2])).join(movies_split_data.map(lambda x : (x[0],x[1])))
ratings_movies_join.take(5)

[('2250', (4.0, "Men Don't Leave")),
 ('2250', (3.0, "Men Don't Leave")),
 ('2250', (1.0, "Men Don't Leave")),
 ('2250', (5.0, "Men Don't Leave")),
 ('2250', (4.0, "Men Don't Leave"))]

In [264]:
title_ratings = ratings_movies_join.map(lambda x : (x[1][1],x[1][0]))
title_ratings.take(5)

[("Men Don't Leave", 4.0),
 ("Men Don't Leave", 3.0),
 ("Men Don't Leave", 1.0),
 ("Men Don't Leave", 5.0),
 ("Men Don't Leave", 4.0)]

In [265]:
title_sum_rating = title_ratings.reduceByKey(lambda a, b: a + b)
title_sum_rating.take(5)

[('City, The', 1228.0),
 ('How to Make an American Quilt', 173.0),
 ('Weird Science', 212.0),
 ('Surviving Picasso', 542.0),
 ('Outrageous Fortune', 1246.0)]

In [266]:
n_title_rating = title_ratings.map(lambda x : (x[0],1)).reduceByKey(lambda a, b: a + b)
n_title_rating.take(5)

[('City, The', 377),
 ('How to Make an American Quilt', 41),
 ('Weird Science', 59),
 ('Surviving Picasso', 144),
 ('Outrageous Fortune', 378)]

In [269]:
pd.DataFrame(title_sum_rating.join(n_title_rating).map(lambda x : (x[0], x[1][0]/x[1][1])).collect()).sort(1, ascending = False)

  if __name__ == '__main__':


Unnamed: 0,0,1
3158,New Jersey Drive,4.962963
324,I'll Be Home For Christmas,4.956522
390,Drowning Mona,4.904762
3651,Goya in Bordeaux (Goya en Bodeos),4.890909
1139,Farewell My Concubine,4.843137
2196,"Favor, The",4.837838
1418,Jean de Florette,4.796117
1589,Tigrero: A Film That Was Never Made,4.733333
3832,Welcome To Sarajevo,4.714286
663,Lord of the Flies,4.702703


In [272]:
title_sum_rating.join(n_title_rating).map(lambda x : (x[0], x[1][0]/x[1][1])).max(lambda x : x[1])

('New Jersey Drive', 4.962962962962963)

In [125]:
sc.stop()