# 1.- Carga y Preprocesamiento de Datos (1 punto)
# • Cargar el dataset en un RDD y asegurarse de eliminar la primera fila (encabezado).
# • Convertir los datos a una estructura de tuplas adecuadas: (usuario, película, minutos_vistos, rating, género).

In [7]:
# Importaciones necesarias
from pyspark.sql import SparkSession

# Inicializar SparkSession
spark = SparkSession.builder \
    .appName("AnalisisPeliculasRDD") \
    .master("local[*]") \
    .getOrCreate()

# Obtener el SparkContext
sc = spark.sparkContext

# 1. Carga y Preparación de los Datos
# Cargar el archivo como un RDD
raw_rdd = sc.textFile("peliculas_mas_vistas.csv")

# Obtener y remover el encabezado
header = raw_rdd.first()
data_rdd = raw_rdd.filter(lambda line: line != header)

# Parsear el RDD para obtener tuplas (usuario, pelicula, minutos, rating, genero)
# Los índices son: 0-usuario, 1-pelicula, 2-minutos, 3-rating, 4-genero
parsed_rdd = data_rdd.map(lambda line: line.split(';')) \
                     .map(lambda parts: (
                         parts[0],
                         parts[1],
                         int(parts[2]),
                         float(parts[3]),
                         parts[4]
                     ))

print("Primeros 5 registros del RDD parseado:")
print(parsed_rdd.take(5))

Primeros 5 registros del RDD parseado:
[('Juan', 'La Gran Aventura', 120, 4.5, 'Animación'), ('Ana', 'Acción Extrema', 90, 3.8, 'Acción'), ('Pedro', 'La Gran Aventura', 110, 4.2, 'Animación'), ('Carla', 'Drama Profundo', 150, 4.9, 'Drama'), ('Juan', 'Acción Extrema', 85, 4.0, 'Acción')]


# 2.- Cantidad de Visualizaciones por Película (1 punto)
# • Contar cuántas veces ha sido vista cada película.


In [8]:
# 2. Cantidad de Visualizaciones por Película
# Crear pares (pelicula, 1)
views_rdd = parsed_rdd.map(lambda x: (x[1], 1))

# Sumar el conteo por película
views_by_movie = views_rdd.reduceByKey(lambda a, b: a + b)

print("\nCantidad de visualizaciones por película:")
print(views_by_movie.collect())


Cantidad de visualizaciones por película:
[('Acción Extrema', 3), ('Drama Profundo', 2), ('Documental de la Naturaleza', 1), ('La Gran Aventura', 2), ('Romance Inesperado', 2)]


# 3.- Tiempo Total de Visualización por Película (1 punto)
# • Sumar el total de minutos vistos por cada película y mostrar el top 3 de las más vistas.

In [9]:
# 3. Tiempo Total de Visualización por Película
# Crear pares (pelicula, minutos_vistos)
time_rdd = parsed_rdd.map(lambda x: (x[1], x[2]))

# Sumar minutos por película
total_time_by_movie = time_rdd.reduceByKey(lambda a, b: a + b)

# Obtener el top 3 de películas por tiempo de visualización
top3_time_movies = total_time_by_movie.sortBy(lambda x: x[1], ascending=False).take(3)

print("\nTop 3 películas por tiempo total de visualización:")
print(top3_time_movies)


Top 3 películas por tiempo total de visualización:
[('Drama Profundo', 290), ('Acción Extrema', 275), ('Romance Inesperado', 250)]


# 4.- Películas con un Rating Promedio Mayor a 4.5 (1 punto)
# • Calcular el rating promedio de cada película y filtrar las que tengan más de 4.5.

In [10]:
# 4. Películas con un Rating Promedio Mayor a 4.5
# Crear pares (pelicula, (rating, 1)) para sumar los ratings y contar las ocurrencias
rating_sum_count_rdd = parsed_rdd.map(lambda x: (x[1], (x[3], 1)))

# Sumar ratings y contar visualizaciones por película
rating_aggregated = rating_sum_count_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Calcular el rating promedio y filtrar las que superan 4.5
avg_rating_filtered = rating_aggregated.map(lambda x: (x[0], x[1][0] / x[1][1])) \
                                       .filter(lambda x: x[1] > 4.5)

print("\nPelículas con un Rating Promedio Mayor a 4.5:")
print(avg_rating_filtered.collect())


Películas con un Rating Promedio Mayor a 4.5:
[('Drama Profundo', 4.85), ('Romance Inesperado', 4.550000000000001)]


# 5.- Promedio de Minutos Vistos por Género (1 punto)
# • Obtener el tiempo promedio de visualización por cada género.

In [11]:
# 5. Promedio de Minutos Vistos por Género
# Crear pares (genero, (minutos, 1))
minutes_sum_count_rdd = parsed_rdd.map(lambda x: (x[4], (x[2], 1)))

# Sumar minutos y contar visualizaciones por género
minutes_aggregated = minutes_sum_count_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Calcular el promedio de minutos por género
avg_minutes_by_genre = minutes_aggregated.map(lambda x: (x[0], x[1][0] / x[1][1]))

print("\nPromedio de Minutos Vistos por Género:")
print(avg_minutes_by_genre.collect())


Promedio de Minutos Vistos por Género:
[('Animación', 115.0), ('Acción', 91.66666666666667), ('Drama', 145.0), ('Romance', 125.0), ('Documental', 95.0)]


# 6.- Usuarios con Mayor Tiempo de Visualización Acumulado (1 punto)
# • Calcular el tiempo total visto por usuario y mostrar los 3 usuarios con más tiempo de visualización.

In [12]:
# 6. Usuarios con Mayor Tiempo de Visualización Acumulado
# Crear pares (usuario, minutos)
user_time_rdd = parsed_rdd.map(lambda x: (x[0], x[2]))

# Sumar minutos por usuario
total_time_by_user = user_time_rdd.reduceByKey(lambda a, b: a + b)

# Obtener el top 3 de usuarios por tiempo de visualización
top3_time_users = total_time_by_user.sortBy(lambda x: x[1], ascending=False).take(3)

print("\nTop 3 usuarios por tiempo de visualización:")
print(top3_time_users)


Top 3 usuarios por tiempo de visualización:
[('Carla', 250), ('Luis', 235), ('Pedro', 230)]


# 7.- Género Más Popular (1 punto)
# • Determinar cuál es el género con más visualizaciones en total.


In [13]:
# 7. Género Más Popular
# Crear pares (genero, 1)
genre_views_rdd = parsed_rdd.map(lambda x: (x[4], 1))

# Sumar visualizaciones por género
genre_counts = genre_views_rdd.reduceByKey(lambda a, b: a + b)

# Encontrar el género con el mayor conteo
most_popular_genre = genre_counts.max(key=lambda x: x[1])

print("\nGénero más popular:")
print(most_popular_genre)


Género más popular:
('Acción', 3)


# 8.- Película con Mayor Rating en Cada Género (1 punto)
# • Para cada género, obtener la película con mayor rating promedio.

In [14]:
# 8. Película con Mayor Rating en Cada Género
# Primero, calcular el rating promedio por película en cada género
movie_rating_by_genre_rdd = parsed_rdd.map(lambda x: ((x[4], x[1]), (x[3], 1))) \
                                      .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                                      .map(lambda x: (x[0][0], (x[0][1], x[1][0] / x[1][1])))

# Agrupar por género y encontrar la película con el rating más alto
best_movie_per_genre = movie_rating_by_genre_rdd.reduceByKey(lambda a, b: a if a[1] > b[1] else b)

print("\nPelícula con mayor rating en cada género:")
print(best_movie_per_genre.collect())


Película con mayor rating en cada género:
[('Acción', ('Acción Extrema', 4.0)), ('Drama', ('Drama Profundo', 4.85)), ('Documental', ('Documental de la Naturaleza', 4.1)), ('Animación', ('La Gran Aventura', 4.35)), ('Romance', ('Romance Inesperado', 4.550000000000001))]


# 9.- Distribución de Ratings (1 punto)
# • Contar cuántas películas tienen un rating entre 1-2, 2-3, 3-4 y 4-5.


In [15]:
# 9. Distribución de Ratings
# Mapear ratings a rangos
rating_bins_rdd = parsed_rdd.map(lambda x: x[3]) \
                           .map(lambda rating:
                                "1-2" if rating >= 1.0 and rating <= 2.0 else
                                "2-3" if rating > 2.0 and rating <= 3.0 else
                                "3-4" if rating > 3.0 and rating <= 4.0 else
                                "4-5" if rating > 4.0 and rating <= 5.0 else
                                "Otros"
                            ) \
                           .map(lambda bin: (bin, 1))

# Contar el número de visualizaciones en cada rango
rating_distribution = rating_bins_rdd.reduceByKey(lambda a, b: a + b)

print("\nDistribución de Ratings:")
print(rating_distribution.collect())


Distribución de Ratings:
[('4-5', 8), ('3-4', 2)]


# 10.- Optimización y Explicación del Código (1 punto)
# • Explicar brevemente el uso de lazy evaluation en Spark y cómo optimizar el código usando persist() o cache().

Lazy Evaluation (Evaluación Perezosa)
El código de Spark no se ejecuta línea por línea de inmediato. Las transformaciones (map(), filter(), reduceByKey()) solo construyen un plan de ejecución de operaciones. La computación real ocurre solo cuando se invoca una acción (collect(), count(), take()). Esta estrategia permite a Spark optimizar la ejecución del plan en su totalidad antes de correrlo, lo que resulta en un rendimiento mucho mayor. Por ejemplo, si tienes una cadena de 10 transformaciones, Spark puede combinarlas en menos etapas para minimizar el costoso "shuffling" de datos entre los nodos.

Optimización con persist() y cache()
Cuando un RDD es el resultado de operaciones costosas y se va a usar varias veces, cache() o persist() lo guardan en memoria.

cache() almacena el RDD en la RAM de los nodos.

persist() permite especificar el nivel de almacenamiento (RAM, disco, o ambos).

Uso: La primera vez que se ejecuta una acción sobre un RDD cacheado, Spark lo computa y lo almacena. Las siguientes acciones no lo recalcularán, sino que lo leerán del caché, lo que ahorra tiempo y recursos significativamente. Esto es vital para flujos de trabajo iterativos.

In [16]:
# Ejemplo conceptual de optimización con persist()
# Supongamos que este RDD se va a usar para varios análisis.
# Es el resultado de operaciones previas costosas.
filtered_rdd = parsed_rdd.filter(lambda x: x[4] == 'Acción').cache()

# Primera acción: se ejecuta el filtro y se cachea el resultado.
count_accion = filtered_rdd.count()
print(f"\nNúmero de películas de acción (primera vez): {count_accion}")

# Segunda acción: se usa el RDD cacheado, sin re-computar el filtro.
avg_rating_accion = filtered_rdd.map(lambda x: (x[3], 1)) \
                                .reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
avg_rating = avg_rating_accion[0] / avg_rating_accion[1]
print(f"Rating promedio de películas de acción (segunda vez, desde caché): {avg_rating:.2f}")

# Detener la sesión de Spark
spark.stop()


Número de películas de acción (primera vez): 3
Rating promedio de películas de acción (segunda vez, desde caché): 4.00
