# Ejercicios de práctica

[Curso Big Data con Python y Spark - De Cero a Heroe](https://www.youtube.com/watch?v=df021jGEmPM&ab_channel=SoloPython)

Datasets [Movielens](https://grouplens.org/datasets/movielens/). Descargar `ml-100k.zip` y guardar como `files2`

In [2]:
from pyspark import SparkContext

sc = SparkContext(master='local', appName='PeliculaMasPopular')

22/04/17 17:22:07 WARN Utils: Your hostname, Calderon950527 resolves to a loopback address: 127.0.1.1; using 172.21.244.25 instead (on interface eth0)
22/04/17 17:22:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/17 17:22:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


El archivo `u.data` contiene un dataset con userID, movieID, rating, timestamp

La idea es encontrar el **movieID** de la película más popular.

In [3]:
movies = sc.textFile('files2/ml-100k/u.data').map(lambda line: line.split())
movies.take(5)

                                                                                

[['196', '242', '3', '881250949'],
 ['186', '302', '3', '891717742'],
 ['22', '377', '1', '878887116'],
 ['244', '51', '2', '880606923'],
 ['166', '346', '1', '886397596']]

In [4]:
movies = movies.map(lambda x:  (int(x[1]), 1)  )
movies.take(5)

[(242, 1), (302, 1), (377, 1), (51, 1), (346, 1)]

In [5]:
movie_counts = movies.reduceByKey(lambda x,y : x+y)
movie_counts.take(5)

# La operacion reduceByKey toma las claves y reduce los valores de acuerdo al lambda.

[(242, 117), (302, 297), (377, 13), (51, 81), (346, 126)]

In [6]:
flipped = movie_counts.map(lambda x : (x[1],x[0]) )
flipped.take(5)

[(117, 242), (297, 302), (13, 377), (81, 51), (126, 346)]

In [7]:
sorted_movies = flipped.sortByKey(ascending=False)
sorted_movies.take(5)

[(583, 50), (509, 258), (508, 100), (507, 181), (485, 294)]

## Variables Broadcast

Son variables globales que pueden ser accedidas por los Ejecutores (Executors)

`variable = sc.broadcast(function())` para crear una variable broadcast.

`variable.value[]` para usar la variable broadcast.

El archivo `u.item` contiene los nombres de las películas

In [8]:
def load_movie_names():
    movie_names = {}
    with open('files2/ml-100k/u.item', encoding="utf-8" ) as f:
        try:
            for line in f:
                fields = line.split('|')
                movie_names[int(fields[0])] = fields[1]
        except UnicodeDecodeError:
            pass
            
    return movie_names

In [9]:
name_dict = sc.broadcast(load_movie_names())

In [10]:
name_dict.value[1]

'Toy Story (1995)'

In [11]:
sorted_movies_with_names = sorted_movies.map(
    lambda count : (name_dict.value[count[1]], count[0])
)

In [12]:
sorted_movies_with_names.take(5)

[('Star Wars (1977)', 583),
 ('Contact (1997)', 509),
 ('Fargo (1996)', 508),
 ('Return of the Jedi (1983)', 507),
 ('Liar Liar (1997)', 485)]

---

Es posible realizar el búsque de la película más popular sin usar Variables Broadcast

In [19]:
nombre_peliculas = (
    sc.textFile('files2/ml-100k/u.item')
    .map(lambda line : line.split('|'))
    .map(lambda line : (int(line[0]), line[1]) )
    )

nombre_peliculas.take(5)

[(1, 'Toy Story (1995)'),
 (2, 'GoldenEye (1995)'),
 (3, 'Four Rooms (1995)'),
 (4, 'Get Shorty (1995)'),
 (5, 'Copycat (1995)')]

In [22]:
most_popular =  flipped.max()
most_popular
# puntuacion, id_pelicula

(583, 50)

In [21]:
nombre_peliculas.lookup(most_popular[1])[0]

'Star Wars (1977)'

In [28]:
sc.stop()