In [1]:
# Analísis de películas más vistas en Chile con Spark RDDs

from pyspark import SparkContext

import sys
import os
os.environ["PYSPARK_PYTHON"] = sys.executable

# 1. Carga y preprocesamiento de datos
# Check if SparkContext is already initialized
if 'sc' not in globals() or sc is None:
    sc = SparkContext(appName="PeliculasMasVistasChile")

# Cambia la ruta si es necesario
data_path = "peliculas_mas_vistas.csv"
rdd = sc.textFile(data_path)

# Eliminar encabezado y convertir a tuplas
header = rdd.first()
rdd = rdd.filter(lambda x: x != header)
def parse_row(row):
    usuario, pelicula, minutos, rating, genero = row.split(",")
    return (usuario, pelicula, int(minutos), float(rating), genero)
rdd = rdd.map(parse_row)

In [2]:
# 2. Cantidad de visualizaciones por película
vis_por_pelicula = rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b) # a = a + b
print("Visualizaciones por película:")
for pelicula, count in vis_por_pelicula.collect():
    print(f"{pelicula}: {count}")


Visualizaciones por película:
Acción Extrema: 3
Drama Profundo: 2
Documental de la Naturaleza: 1
La Gran Aventura: 2
Romance Inesperado: 2


In [4]:
# 3. Tiempo total de visualización por película (top 3)
tiempo_por_pelicula = rdd.map(lambda x: (x[1], x[2])).reduceByKey(lambda a, b: a + b)
top3 = tiempo_por_pelicula.takeOrdered(3, key=lambda x: -x[1])
print("Top 3 películas por minutos vistos:")
for pelicula, minutos in top3:
    print(f"{pelicula}: {minutos} min")

Top 3 películas por minutos vistos:
Drama Profundo: 290 min
Acción Extrema: 275 min
Romance Inesperado: 250 min


In [6]:
# 4. Películas con rating promedio > 4.5
rating_sum_count = rdd.map(lambda x: (x[1], (x[3], 1))).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
rating_promedio = rating_sum_count.mapValues(lambda x: x[0]/x[1])
peliculas_rating_alto = rating_promedio.filter(lambda x: x[1] > 4.5)
print("Películas con rating promedio > 4.5:")
for pelicula, rating in peliculas_rating_alto.collect():
    print(f"{pelicula}: {rating:.2f}")

Películas con rating promedio > 4.5:
Drama Profundo: 4.85
Romance Inesperado: 4.55


In [8]:
# 5. Promedio de minutos vistos por género
genero_minutos = rdd.map(lambda x: (x[4], (x[2], 1))).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
promedio_minutos_genero = genero_minutos.mapValues(lambda x: x[0]/x[1])
print("Promedio de minutos vistos por género:")
for genero, prom in promedio_minutos_genero.collect():
    print(f"{genero}: {prom:.2f} min")


Promedio de minutos vistos por género:
Animación: 115.00 min
Acción: 91.67 min
Drama: 145.00 min
Romance: 125.00 min
Documental: 95.00 min


In [10]:
# 6. Usuarios con mayor tiempo de visualización acumulado (top 3)
tiempo_usuario = rdd.map(lambda x: (x[0], x[2])).reduceByKey(lambda a, b: a + b)
top3_usuarios = tiempo_usuario.takeOrdered(3, key=lambda x: -x[1])
print("Top 3 usuarios por minutos vistos:")
for usuario, minutos in top3_usuarios:
    print(f"{usuario}: {minutos} min")

Top 3 usuarios por minutos vistos:
Carla: 250 min
Luis: 235 min
Pedro: 230 min


In [12]:
# 7. Género más popular (más visualizaciones)
genero_vis = rdd.map(lambda x: (x[4], 1)).reduceByKey(lambda a, b: a + b)
genero_popular = genero_vis.takeOrdered(1, key=lambda x: -x[1])[0]
print(f"Género más popular: {genero_popular[0]} con {genero_popular[1]} visualizaciones")

Género más popular: Acción con 3 visualizaciones


In [14]:
# 8. Película con mayor rating en cada género
# (película, (rating, 1, género))
genero_pelicula_rating = rdd.map(lambda x: ((x[4], x[1]), (x[3], 1)))
sum_count = genero_pelicula_rating.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
promedio = sum_count.mapValues(lambda x: x[0]/x[1])
# (género, (película, promedio))
genero_peliculas = promedio.map(lambda x: (x[0][0], (x[0][1], x[1])))
from operator import itemgetter
def max_rating(a, b):
    return a if a[1] > b[1] else b
max_por_genero = genero_peliculas.reduceByKey(max_rating)
print("Película con mayor rating promedio por género:")
for genero, (pelicula, rating) in max_por_genero.collect():
    print(f"{genero}: {pelicula} ({rating:.2f})")

Película con mayor rating promedio por género:
Acción: Acción Extrema (4.00)
Drama: Drama Profundo (4.85)
Documental: Documental de la Naturaleza (4.10)
Animación: La Gran Aventura (4.35)
Romance: Romance Inesperado (4.55)


In [16]:
# 9. Distribución de ratings
# (película, (rating, 1))
peliculas_rating = rdd.map(lambda x: (x[1], (x[3], 1))).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
promedios = peliculas_rating.mapValues(lambda x: x[0]/x[1])
def rating_bin(r):
    if 1 <= r < 2:
        return '1-2'
    elif 2 <= r < 3:
        return '2-3'
    elif 3 <= r < 4:
        return '3-4'
    elif 4 <= r <= 5:
        return '4-5'
    else:
        return 'otro'
dist = promedios.map(lambda x: (rating_bin(x[1]), 1)).reduceByKey(lambda a, b: a + b)
print("Distribución de ratings promedio de películas:")
for rango, count in dist.collect():
    print(f"{rango}: {count}")

Distribución de ratings promedio de películas:
4-5: 5


In [17]:
# 10. Explicación de optimización
print("""
Lazy evaluation en Spark significa que las transformaciones no se ejecutan hasta que se llama a una acción (como collect, count, take). Esto permite a Spark optimizar el plan de ejecución.
Para optimizar, podemos usar persist() o cache() en RDDs reutilizados varias veces, por ejemplo:
    rdd.persist()
Esto evita recalcular el RDD en cada acción y mejora el rendimiento si la memoria lo permite.
""")

sc.stop()


Lazy evaluation en Spark significa que las transformaciones no se ejecutan hasta que se llama a una acción (como collect, count, take). Esto permite a Spark optimizar el plan de ejecución.
Para optimizar, podemos usar persist() o cache() en RDDs reutilizados varias veces, por ejemplo:
    rdd.persist()
Esto evita recalcular el RDD en cada acción y mejora el rendimiento si la memoria lo permite.

