# Examen ETL: SPARK 12+1/02

## Realizado por: Armando Torner Marchesi

Se podrá utilizar toda la información que se encuentra en el campus. 

Se va a trabajar sobre varios ficheros de datos:

Usuarios: id_usuario::sexo::edad::id_profesion::codigo_postal

Peliculas: id_pelicula::titulo (año)::tipo1|tipo2|tipo3

Ratings: id_pelicula::id_usuario::puntuacion::fecha_timestamp

A cada una de las preguntas hay que responder explicando brevemente que se pretende hacer antes de lanzar el código.

Al documento lo llamareís con vuestro nombre y apellido. Debeís enviarlo a mi correo de CUNEF antes del final del examen.

El lenguaje para trabajar con Spark podrá ser python o R indistintamente.

In [1]:
# Nota: Carga de las puntuaciones
# Función para parsear la fecha

from datetime import datetime
dateparse = lambda x: datetime.fromtimestamp(float(x))

## Primera tarea: Inicializar spark context y cargar los datos desde los ficheros.

In [3]:
sc.stop()

In [4]:
from pyspark import SparkContext
sc = SparkContext()

In [12]:
data_file = "movies.dat.txt"
movies = sc.textFile(data_file)
movies.take(5)

["1::Toy Story (1995)::Animation|Children's|Comedy",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy",
 '3::Grumpier Old Men (1995)::Comedy|Romance',
 '4::Waiting to Exhale (1995)::Comedy|Drama',
 '5::Father of the Bride Part II (1995)::Comedy']

In [13]:
data_file2 = "users.dat.txt"
users = sc.textFile(data_file2)
users.take(5)

['1::F::1::10::48067',
 '2::M::56::16::70072',
 '3::M::25::15::55117',
 '4::M::45::7::02460',
 '5::M::25::20::55455']

In [14]:
data_file3 = "ratings.dat.txt"
ratings = sc.textFile(data_file3)
ratings.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

In [15]:
movies2 = movies.map(lambda x: x.split("::"))
users2 = users.map(lambda x: x.split("::"))
ratings2 = ratings.map(lambda x: x.split("::")).map(lambda x: [x[0], x[1], int(x[2]), datetime.fromtimestamp(float(x[3])).year])
movies2.take(1), users2.take(1), ratings2.take(1)

([['1', 'Toy Story (1995)', "Animation|Children's|Comedy"]],
 [['1', 'F', '1', '10', '48067']],
 [['1', '1193', 5, 2000]])

## Segunda tarea: Media de puntuaciones globales por año. ¿Hay algún año significativamente distinto?

In [16]:
rating_year = ratings2.map(lambda x: (x[3], x[2]))
rating_year.take(5)

[(2000, 5), (2000, 3), (2000, 3), (2000, 4), (2001, 5)]

In [17]:
rating_year.map(lambda x: x[0]).distinct().collect()

[2000, 2002, 2001, 2003]

In [18]:
suma_ratings = rating_year.reduceByKey(lambda x, y: x + y)
suma_ratings.take(4)

[(2000, 3248314), (2002, 83171), (2001, 239155), (2003, 11673)]

In [19]:
cantidad = rating_year.map(lambda x: (x[0], 1))
cantidad_key = cantidad.reduceByKey(lambda x, y: x + y)
cantidad_key.take(4)

[(2000, 904721), (2002, 24046), (2001, 68094), (2003, 3348)]

In [20]:
juntos = suma_ratings.join(cantidad_key)
juntos.take(5)

[(2000, (3248314, 904721)),
 (2002, (83171, 24046)),
 (2001, (239155, 68094)),
 (2003, (11673, 3348))]

In [21]:
media = juntos.map(lambda x: (x[0], round(x[1][0]/x[1][1], 2)))
media.take(4)

[(2000, 3.59), (2002, 3.46), (2001, 3.51), (2003, 3.49)]

## Tercera pregunta: ¿Cuál es la película más votada por los mayores de 60? 

In [22]:
users2.take(1)

[['1', 'F', '1', '10', '48067']]

In [23]:
users_edad = users2.map(lambda x: (x[0], int(x[2])))
users_edad.take(5)

[('1', 1), ('2', 56), ('3', 25), ('4', 45), ('5', 25)]

In [24]:
mayores_60 = users_edad.filter(lambda x: x[1] > 60)
mayores_60.take(5)

[]

In [25]:
users_edad.max(lambda x: x[1])

('2', 56)

Vemos que no hay usuarios mayores de 60 años; el mayor tiene 56 años

In [26]:
ratings2.take(1)

[['1', '1193', 5, 2000]]

## Cuarta pregunta: ¿Cuál es la puntuación media de las peliculas de acción del año 2000?

In [30]:
movies2.take(5)

[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"],
 ['2', 'Jumanji (1995)', "Adventure|Children's|Fantasy"],
 ['3', 'Grumpier Old Men (1995)', 'Comedy|Romance'],
 ['4', 'Waiting to Exhale (1995)', 'Comedy|Drama'],
 ['5', 'Father of the Bride Part II (1995)', 'Comedy']]

In [31]:
pelis_action = movies2.filter(lambda x: 'Action' in x[2])
pelis_action.take(5)

[['6', 'Heat (1995)', 'Action|Crime|Thriller'],
 ['9', 'Sudden Death (1995)', 'Action'],
 ['10', 'GoldenEye (1995)', 'Action|Adventure|Thriller'],
 ['15', 'Cutthroat Island (1995)', 'Action|Adventure|Romance'],
 ['20', 'Money Train (1995)', 'Action']]

In [32]:
ratings_2000 = ratings2.filter(lambda x: x[3] == 2000)
ratings_2000.take(5)

[['1', '1193', 5, 2000],
 ['1', '661', 3, 2000],
 ['1', '914', 3, 2000],
 ['1', '3408', 4, 2000],
 ['1', '1197', 3, 2000]]

In [33]:
ratings_2000_red = ratings_2000.map(lambda x: (x[0], x[2]))
ratings_2000_key = ratings_2000_red.reduceByKey(lambda x, y: x + y)
ratings_2000_key.take(5)

[('1', 167), ('4', 88), ('8', 540), ('9', 396), ('10', 1216)]

In [34]:
numero_ratings = ratings_2000.map(lambda x: (x[0], 1))
numero_rating_key = numero_ratings.reduceByKey(lambda x, y: x + y)
numero_rating_key.take(5)

[('1', 40), ('4', 21), ('8', 139), ('9', 106), ('10', 299)]

In [35]:
total = ratings_2000_key.join(numero_rating_key)
total.take(5)

[('1', (167, 40)),
 ('4', (88, 21)),
 ('8', (540, 139)),
 ('9', (396, 106)),
 ('10', (1216, 299))]

In [36]:
pelis_action_2000 = total.join(pelis_action)
pelis_action_2000.take(5)

[('10', ((1216, 299), 'GoldenEye (1995)')),
 ('20', ((57, 13), 'Money Train (1995)')),
 ('44', ((454, 116), 'Mortal Kombat (1995)')),
 ('70', ((200, 54), 'From Dusk Till Dawn (1996)')),
 ('110', ((260, 80), 'Braveheart (1995)'))]

In [37]:
pelis_final = pelis_action_2000.map(lambda x: (x[0], x[1][0]))
pelis_final.take(5)

[('10', (1216, 299)),
 ('20', (57, 13)),
 ('44', (454, 116)),
 ('70', (200, 54)),
 ('110', (260, 80))]

In [38]:
pelis_final_ratings = pelis_final.map(lambda x: ("Comun", x[1][0]))
pelis_final_ratings.take(5)

[('Comun', 1216),
 ('Comun', 57),
 ('Comun', 454),
 ('Comun', 200),
 ('Comun', 260)]

In [39]:
pelis_final_ratings_key = pelis_final_ratings.reduceByKey(lambda x, y: x + y)
pelis_final_ratings_key.take(1)

[('Comun', 257794)]

In [40]:
pelis_final_count = pelis_final.map(lambda x: ("Comun", x[1][1]))
pelis_final_count_key = pelis_final_count.reduceByKey(lambda x, y: x + y)
pelis_final_count_key.take(1)

[('Comun', 72834)]

In [41]:
media2 = pelis_final_ratings_key.join(pelis_final_count_key)
media2.take(1)

[('Comun', (257794, 72834))]

In [42]:
media_2000_accion = media2.map(lambda x: round(x[1][0]/x[1][1],2))
media_2000_accion.take(1)

[3.54]

## Quinta pregunta: ¿ Cuál es el año en que mayor número de usuarios votaron?

In [45]:
ratings2.take(5)

[['1', '1193', 5, 2000],
 ['1', '661', 3, 2000],
 ['1', '914', 3, 2000],
 ['1', '3408', 4, 2000],
 ['1', '2355', 5, 2001]]

In [46]:
rating_user = ratings2.map(lambda x: (x[3], x[1])).distinct()
rating_user.take(5)

[(2000, '914'), (2000, '3408'), (2000, '1197'), (2000, '2804'), (2000, '2791')]

In [47]:
rating_user_map = rating_user.map(lambda x: (x[0], 1))
rating_user_key = rating_user_map.reduceByKey(lambda x, y: x + y)

In [48]:
rating_user_key.max(lambda x: x[1])

(2000, 3678)

## Sexta pregunta: ¿ Cuál es la película con mejor puntación media?

In [49]:
sc.stop()