# Apache Spark


En esta actividad vamos a realizar un procesamiento de datos de la manera tradicional (con Python+Pandas) y usando programación distribuida con Apache Spark. Para ello se deben realizar los siguientes pasos.

#### Parte A

1. Descargar el dataset "Movielens" con información de valoraciones de películas del siguiente enlace: https://files.grouplens.org/datasets/movielens/ml-latest.zipLinks to an external site. Los archivos con los que vamos a trabajar del zip son "movies.csv" y "ratings.csv".
2. Crear dos programas en python con pandas (pueden ser notebooks) que realicen las siguientes tareas:
- Contar cuántas veces aparece cada tipo de valoración (cuantas veces se ha dado una estrella, cuantas se han dado 1.5, cuantas 2, etc.) y que se muestre por pantalla de forma ordenada. Imprimir el tiempo que se tarda en hacer el proceso.
- Calcular la valoración media de cada película, lo añada como nueva columna a los datos del dataset "movies.csv" y lo guarde como un nuevo archivo "movies_rating.csv".Para ambos casos, imprimir el tiempo que se tarda en hacer el proceso. Estos programas los debéis desarrollar en vuestros ordenadores.

#### Parte B

1. Conectarse al clúster de la universidad para ejecutar tareas con Spark sobre el clúster.
2. Utilizando los datos de "movies.csv" y "ratings.csv" que se encuentran en la carpeta /data/movielens de HDFS del clúster, escribir dos programas en python (también pueden ser notebooks) con Spark (también pueden ser notebooks) que realicen las mismas tareas que las del apartado A. Calcular el tiempo que tarda cada tarea con Spark.

Escribir una memoria que describa el trabajo realizado, explicando el código, los problemas que habéis encontrado, los resultados con los tiempos, y una comparativa de ellos. Incluir un apartado de conclusiones comentando vuestra opinión sobre el trabajo.


### 2. Librerías

In [2]:
# importamos las librerias necesarias e iniciamos spark
import pandas as pd
from datetime import datetime

import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### 3. Parte A

#### Python

A continuación se realiza el apartado de la práctica en python

In [3]:
# cargamos los datos de ratings
ratings = pd.read_csv("./ml-latest/ratings.csv")
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,307,3.5,1256677221
1,1,481,3.5,1256677456
2,1,1091,1.5,1256677471
3,1,1257,4.5,1256677460
4,1,1449,4.5,1256677264


In [4]:
# cargamos los datos de las peliculas
movies = pd.read_csv("./ml-latest/movies.csv")
movies.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [5]:
# iniciamos el tiempo
start_time = datetime.now()
# contamos los ratings y los ordenamos
count = ratings['rating'].value_counts().sort_index()
# terminamos el tiempo
end_time = datetime.now()
# imprimimos el resultado
print(count)
# imprimimos el tiempo
print('Duration: {}'.format(end_time - start_time))

rating
0.5     442388
1.0     886233
1.5     441354
2.0    1850627
2.5    1373419
3.0    5515668
3.5    3404360
4.0    7394710
4.5    2373550
5.0    4071135
Name: count, dtype: int64
Duration: 0:00:00.272542


In [6]:
# iniciamos el tiempo
start_time2 = datetime.now()
# dropeamos las columnas que no necesitamos
ratings2=ratings.drop(columns=['userId', 'timestamp'])
# agrupamos por pelicula y calculamos la media
mean=ratings2.groupby(['movieId']).mean()
# union de las dos tablas
movies_rating = pd.merge(movies, mean, on="movieId", how='left')
# transformamos el dataframe a csv
movies_rating.to_csv('movies_rating.csv')
# terminamos el tiempo
end_time2 = datetime.now()
# imprimimos el tiempo
print('Duration: {}'.format(end_time2 - start_time2))

Duration: 0:00:00.978666


### 4. Parte B

#### Spark

In [7]:
# lectura de csv en spark de peliculas
movies = spark.read.csv('./ml-latest/movies.csv', header = True, inferSchema = 'true')
movies.show(10)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [8]:
# lectura de csv en spark de ratings
ratings = spark.read.csv('./ml-latest/ratings.csv', header = True, inferSchema = 'true') 
ratings.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
+------+-------+------+----------+
only showing top 10 rows



In [9]:
# iniciamos el tiempo
start_time = datetime.now()
# Contar cuántas veces aparece cada tipo de valoración y que se muestre por pantalla de forma ordenada
count = ratings.groupBy('rating').count().orderBy('rating', ascending=False).show()
# terminamos el tiempo
end_time = datetime.now()
# imprimimos el tiempo
print('Duration: {}'.format(end_time - start_time))

+------+-------+
|rating|  count|
+------+-------+
|   5.0|4071135|
|   4.5|2373550|
|   4.0|7394710|
|   3.5|3404360|
|   3.0|5515668|
|   2.5|1373419|
|   2.0|1850627|
|   1.5| 441354|
|   1.0| 886233|
|   0.5| 442388|
+------+-------+

Duration: 0:00:03.752703


In [10]:
# iniciamos el tiempo
start_time = datetime.now()
# agrupamos por pelicula y calculamos la media
ratings_mean = ratings.groupBy('movieID').agg({'rating':'mean'})
# union de las dos tablas
movie_ratings = movies.join(ratings_mean, on = 'movieID', how = 'left')
# guardar el dataset en un csv
movie_ratings.toPandas().to_csv("./movies_ratings_spark.csv", header=True)
# terminamos el tiempo
end_time = datetime.now()
# imprimimos el dataframe
movie_ratings.show(5)
# imprimimos el tiempo
print('Duration: {}'.format(end_time - start_time))

+-------+--------------------+--------------------+------------------+
|movieId|               title|              genres|       avg(rating)|
+-------+--------------------+--------------------+------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|3.8866494325899312|
|      3|Grumpier Old Men ...|      Comedy|Romance| 3.173981392364453|
|      5|Father of the Bri...|              Comedy|3.0772909396406876|
|      6|         Heat (1995)|Action|Crime|Thri...|3.8442108566049575|
|      4|Waiting to Exhale...|Comedy|Drama|Romance| 2.874539979926397|
+-------+--------------------+--------------------+------------------+
only showing top 5 rows

Duration: 0:00:00.047635
