<a href="https://colab.research.google.com/github/Nacho2904/orga_de_datos/blob/main/tp_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Trabajo Práctico II: Spark

## Setup

In [5]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u342-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 12 not upgraded.


In [6]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext

In [7]:
import pandas as pd
import numpy as np
from google.colab import drive 

drive.mount('/content/gdrive')
path_a_cast = 'gdrive/MyDrive/Parquets/cast.parquet'
path_a_companies = 'gdrive/MyDrive/Parquets/companies.parquet'
path_a_crew = 'gdrive/MyDrive/Parquets/crew.parquet'
path_a_actores = 'gdrive/MyDrive/Parquets/imdb_actors.parquet'
path_a_keywords = 'gdrive/MyDrive/Parquets/keywords.parquet'
path_a_links = 'gdrive/MyDrive/Parquets/links.parquet'
path_a_movies = 'gdrive/MyDrive/Parquets/movies.parquet'
path_a_ratings = 'gdrive/MyDrive/Parquets/ratings.parquet'


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [8]:
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)


## S7

¿Cuál fue el año más mortal para los actores y actrices?

In [None]:
AÑO_FALLECIMIENTO = 3
CONTEO_MUERTES = 1
rdd_actores = sqlContext.read.parquet(path_a_actores).rdd

In [None]:
rdd_años_de_fallecimiento = rdd_actores.map(lambda row: row[AÑO_FALLECIMIENTO])
rdd_años_de_fallecimiento.first()

'1987'

In [None]:
rdd_años_de_fallecimiento = rdd_años_de_fallecimiento.filter(str.isnumeric)
rdd_años_de_fallecimiento = rdd_años_de_fallecimiento.map(int)
dict_con_conteo = dict(rdd_años_de_fallecimiento.countByValue())
año_mas_mortal = sorted([[llave, dict_con_conteo[llave]] for llave in dict_con_conteo], 
                        key = lambda año_y_conteo: año_y_conteo[CONTEO_MUERTES])[-1]
print(año_mas_mortal)

[2021, 3409]


## S23

¿Cuál es el usuario más pesimista (menor rating promedio) para cada género de películas? Considere solo usuarios con más de 20 reviews

In [None]:
rdd_ratings = sqlContext.read.parquet(path_a_ratings).rdd
rdd_ratings.take(5)

[Row(userId=1, movieId=110, rating=1.0, timestamp=1425941529),
 Row(userId=1, movieId=147, rating=4.5, timestamp=1425942435),
 Row(userId=1, movieId=858, rating=5.0, timestamp=1425941523),
 Row(userId=1, movieId=1221, rating=5.0, timestamp=1425941546),
 Row(userId=1, movieId=1246, rating=5.0, timestamp=1425941556)]

In [None]:
rdd_users_with_more_than_20_reviews = rdd_ratings.map(lambda rating: (rating[0], 1)).countByKey().filter(lambda user: user[1] > 20)

In [None]:
rdd_ratings_as_key_value = rdd_ratings.map(lambda rating: (rating[0], rating[1:]))
rdd_ratings_filtered = rdd_ratings_as_key_value.join(rdd_users_with_more_than_20_reviews).map(
    lambda row: (np.int64(row[1][0][0]), (row[0], row[1][0][1])))

movieId, (userId, rating)

=> movieId, ((userId, rating), listOfGenres) 

=> userId, (rating, listOfGenres, 1)

In [None]:
rdd_movies = sqlContext.read.parquet(path_a_movies).rdd
rdd_movies_id_and_genre = rdd_movies.map(lambda movie: (movie[5], movie[3])).filter(lambda row: row[1]).map(
    lambda movie: (np.int64(movie[0]), movie[1].split(',')))
rdd_ratings_filtered_with_genres = rdd_ratings_filtered.join(rdd_movies_id_and_genre)
rdd_ratings_filtered_with_genres = rdd_ratings_filtered_with_genres.map(lambda row: (row[1][0][0], (row[1][0][1], row[1][1])))

userId, (rating, listOfGenres)

=> userId, (genresDict)

genresDict <- {genre: [ratingsOfGenre] for genre in genres}

In [None]:
rdd_grouped_by_user = rdd_ratings_filtered_with_genres.groupByKey().map(lambda row: (row[0], list(row[1])))

acá en algún punto debería copiar y pegar la forma en la que saco todos los géneros

In [None]:
def get_mean_ratings_per_genre(ratings_and_genres):
  all_genres = ['Action', 'Adventure', 'Animation', 'Comedy', 'Crime', 'Documentary', 'Drama',
  'Family', 'Fantasy', 'Foreign', 'History', 'Horror', 'Music', 'Mystery', 'Romance', 'Science Fiction',
   'TV Movie', 'Thriller', 'War', 'Western']

  ratings = {genre: [] for genre in all_genres}
  for (rating, genres) in ratings_and_genres:
    for genre in genres:
      ratings[genre].append(rating)
  return {genre: np.sum(np.array(ratings[genre]))/len(ratings[genre]) if ratings[genre] else np.inf for genre in all_genres}

rdd_grouped_by_user_mean_ratings = rdd_grouped_by_user.map(lambda user: (user[0], get_mean_ratings_per_genre(user[1])))

In [None]:
genres = ['Action', 'Adventure', 'Animation', 'Comedy', 'Crime', 'Documentary', 'Drama',
  'Family', 'Fantasy', 'Foreign', 'History', 'Horror', 'Music', 'Mystery', 'Romance', 'Science Fiction',
   'TV Movie', 'Thriller', 'War', 'Western']

for genre in genres:
  most_pesimistic_user = rdd_grouped_by_user_mean_ratings.reduce(
      lambda user_1, user_2: user_1 if user_1[1][genre] < user_2[1][genre] else user_2)[0]
  print(f"El usuario más pesimista con películas de {genre} es {most_pesimistic_user}")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

In [None]:
"""
dejo los resultados acá para no tener que correrlo de vuelta
El usuario más pesimista con películas de Action es 232007
El usuario más pesimista con películas de Adventure es 232007
El usuario más pesimista con películas de Animation es 158483
El usuario más pesimista con películas de Comedy es 232007
El usuario más pesimista con películas de Crime es 252245
El usuario más pesimista con películas de Documentary es 219341
El usuario más pesimista con películas de Drama es 76193
El usuario más pesimista con películas de Family es 179759
El usuario más pesimista con películas de Fantasy es 205283
El usuario más pesimista con películas de Foreign es 23789
El usuario más pesimista con películas de History es 154619
El usuario más pesimista con películas de Horror es 205283
El usuario más pesimista con películas de Music es 93275
El usuario más pesimista con películas de Mystery es 222785
El usuario más pesimista con películas de Romance es 93275
El usuario más pesimista con películas de Science Fiction es 76193
El usuario más pesimista con películas de TV Movie es 21071
El usuario más pesimista con películas de Thriller es 76193
El usuario más pesimista con películas de War es 191147
El usuario más pesimista con películas de Western es 154247
"""

## S25

¿Cuál es el percentil 95 de la popularidad?

In [None]:
POPULARITY = 10
rdd_movies = sqlContext.read.parquet(path_a_movies).rdd

rdd_popularities = rdd_movies.map(lambda row: row[POPULARITY]).filter(lambda row: row)
number_of_movies = rdd_popularities.count()
top_5_per_cent = rdd_popularities.takeOrdered(int(number_of_movies*0.05), key = lambda x: -x)
percentile_95th = top_5_per_cent[-1]
print(percentile_95th)

11.063822


## S11

 Según los cinéfilos, ¿cuál es el rating promedio de las 5 películas más populares? Un usuario es cinéfilo cuando puntuó más de 50 películas y todas las puntuaciones son de más de 2.5 estrellas. Indicar id, título, popularidad y rating promedio de la película.

In [None]:
rdd_ratings = sqlContext.read.parquet(path_a_ratings).rdd
rdd_movies = sqlContext.read.parquet(path_a_movies).rdd

In [None]:
rdd_ratings_key_value = rdd_ratings.map(lambda row: (row[0], [row[1], row[2]]))

In [None]:
user_has_no_ratings_below = lambda user, threshold: all([rating[1] > threshold for rating in user[1]]) 
user_has_more_than_25_ratings = lambda user: len(user[1]) > 25
rdd_ratings_grouped_per_user = rdd_ratings_key_value.groupByKey().map(lambda row: (row[0], list(row[1])))
rdd_usuarios_cinefilos = rdd_ratings_grouped_per_user.filter(lambda user: user_has_no_ratings_below(user, 2.5) and 
                                                         user_has_more_than_25_ratings(user)).map(lambda user: user[0])

In [None]:
rdd_ratings_key_value_cinefilos = rdd_ratings_key_value.join(rdd_usuarios_cinefilos)

In [None]:
rdd_ratings_key_value_cinefilos.take(1)

[(8020, [168, 4.0])]

In [None]:
popularity = 10
rdd_top_5_most_popular_movies = rdd_movies.filter(lambda row: row[10])
rdd_top_5_most_popular_movies = rdd_top_5_most_popular_movies.takeOrdered(5, key = lambda row: -row[popularity])

In [None]:
rdd_top_5_most_popular_movies[0][10]

547.488298

## S34

 ¿Cuál es la probabilidad de que una película de un género en particular tenga una calificación promedio mayor a 3 (utilizando la tabla de ratings)? Obtenga las probabilidades de cada género posible.

In [9]:
rdd_ratings = sqlContext.read.parquet(path_a_ratings).rdd.cache()
rdd_movies = sqlContext.read.parquet(path_a_movies).rdd

In [55]:
rdd_movies_id_and_genre = rdd_movies.map(lambda row: (row[5], set(row[3].split(',') if row[3] else {})))

genres = rdd_movies_id_and_genre.map(lambda row: row[1]).reduce(
    lambda genres_1, genres_2: genres_1.union(genres_2))

In [53]:
rdd_ratings_filtered = rdd_ratings.map(lambda row: (str(row[1]), row[2]))
rdd_ratings_and_genres = rdd_ratings_filtered.join(rdd_movies_id_and_genre).map(lambda row: (row[1][1], row[1][0]))

In [54]:
get_count_of_movies_of_genre = lambda genre: rdd_ratings_and_genres.filter(lambda row: genre in row[0]).count()
get_count_of_good_movies_of_genre = lambda genre: rdd_ratings_and_genres.filter(lambda row: genre in row[0] and row[1] > 3).count()
get_probability_of_good_movie_of_genre = lambda genre: get_count_of_good_movies_of_genre(genre)/get_count_of_movies_of_genre(genre)

genre_with_probabilities = {genre: get_probability_of_good_movie_of_genre(genre) for genre in genres}
genre_with_probabilities

{'Comedy': 0.6066197209119013,
 'History': 0.5608738726958001,
 'Adventure': 0.5899834610810757,
 'Family': 0.5477754782192815,
 'Action': 0.6138275652380827,
 'Thriller': 0.6129063690134363,
 'Crime': 0.6156749695776628,
 'Fantasy': 0.5871750819809239,
 'Music': 0.5634004372358573,
 'Romance': 0.6080044469149527,
 'Science Fiction': 0.598594724310434,
 'Documentary': 0.5946180723747522,
 'Animation': 0.6312684971205037,
 'Western': 0.6731553749288013,
 'War': 0.5968992886651866,
 'Mystery': 0.6458909037465415,
 'Foreign': 0.6299020577108132,
 'Horror': 0.5949641298241065,
 'Drama': 0.602679729460368,
 'TV Movie': 0.6765014812819822}