In [None]:
#pip install pymongo pyspark flask findspark

In [1]:
# IMPORTACION DE LAS BIBLIOTECAS NECESARIAS
from pymongo import MongoClient
import os, re
from bson.objectid import ObjectId
from pyspark.sql import SparkSession
from flask import Flask, jsonify, request

#### Punto 1
Se le ha asignado la tarea de construir una tabla de recomendación de películas según las 1.000.209 valoraciones anónimas de aproximadamente 3.900 películas creado por 6.040 usuarios de MovieLens en el año 2000. Descargue el conjunto de datos desde el siguiente link: https://grouplens.org/datasets/movielens/1m/. En el archivo README encontrará la descripción de cada variable de cada archivo, tenga en cuenta que necesita los tres archivos.

####
i).  En una base de datos de Mongo cree tres colecciones llamadas ratings, movies y users. Cada una contendrá los datos de los archivos con mismo nombre.

In [2]:
# importing MongoClient from pymongo
from pymongo import MongoClient 

# Hacer la conexión a MongoDB
client = MongoClient("mongodb://localhost:27017/")

# Crear base de datos
db = client["Movielens"]

# Crear colecciones
ratings_collection = db["ratings"]
movies_collection = db["movies"]
users_collection = db["users"]

In [3]:
print(client.list_database_names())

['admin', 'config', 'local', 'miBD']


##### .
ii) Cree un script en python que sea capaz conectarse a mongo, lea cada archivo y lo ingeste línea a línea (cada línea como un documento) a su respectiva colección en Mongo. Eg: en movies, tenga en cuenta que cada fila del archivo se considera como un único documento dentro de mongo, por lo tanto, si el archivo movies tiene n elementos (filas) se van a ingestarn documentos a la colección movies.

In [4]:
def load_data_to_mongo():

    # Mapeo de ocupaciones
     
    user_occupation_mapper = {
        '0': "other or not specified",
        '1': "academic/educator",
        '2': "artist",
        '3': "clerical/admin",
        '4': "college/grad student",
        '5': "customer service",
        '6': "doctor/health care",
        '7': "executive/managerial",
        '8': "farmer",
        '9': "homemaker",
        '10': "K-12 student",
        '11': "lawyer",
        '12': "programmer",
        '13': "retired",
        '14': "sales/marketing",
        '15': "scientist",
        '16': "self-employed",
        '17': "technician/engineer",
        '18': "tradesman/craftsman",
        '19': "unemployed",
        '20': "writer"
    }
    
    # ==========================================
    # 1. Cargar ratings.dat
    # ==========================================
    with open('./DataLake/Raw/Fuente1_ml-1m/ratings.dat') as file:
        ratings_batch = []
        
        for line in file.readlines():
            item = line.strip().split("::")
            
            record = {
                "user_id": int(item[0]),
                "movie_id": int(item[1]),
                "rating": int(item[2])
            }
            
            ratings_batch.append(record)
            
            # Insertar por lotes de 1000 documentos
            if len(ratings_batch) == 1000:
                ratings_collection.insert_many(ratings_batch)
                ratings_batch = []  # Limpiar el lote
        
        # Insertar los datos restantes si el número de registros no es múltiplo de 1000
        if ratings_batch:
            ratings_collection.insert_many(ratings_batch)

    # ==========================================
    # 2. Cargar users.dat
    # ==========================================
    with open('./DataLake/Raw/Fuente1_ml-1m/users.dat') as file:
        users_batch = []
        
        for line in file.readlines():
            item = line.strip().split("::")
            
            record = {
                "user_id": int(item[0]),
                "gender": item[1],
                "age": item[2],
                "occupation": user_occupation_mapper.get(item[3], "Unknown")
            }
            
            users_batch.append(record)
            
            # Insertar por lotes de 1000 documentos
            if len(users_batch) == 1000:
                users_collection.insert_many(users_batch)
                users_batch = []  # Limpiar el lote
        
        # Insertar los datos restantes si el número de registros no es múltiplo de 1000
        if users_batch:
            users_collection.insert_many(users_batch)

    # ==========================================
    # 3. Cargar movies.dat
    # ==========================================
    with open('./DataLake/Raw/Fuente1_ml-1m/movies.dat') as file:
        movies_batch = []
        
        for line in file.readlines():
            item = line.strip().split("::")
            
            # Extraer el año del título
            match = re.search(r'\((\d{4})\)', item[1])  # Buscar un número de 4 dígitos entre paréntesis
            year = int(match.group(1)) if match else None  # Si encuentra el año, lo extrae
            
            record = {
                "movie_id": int(item[0]),
                "title": item[1],
                "genres": item[2].split("|"),
                "year": year
            }
            
            movies_batch.append(record)
            
            # Insertar por lotes de 1000 documentos
            if len(movies_batch) == 1000:
                movies_collection.insert_many(movies_batch)
                movies_batch = []  # Limpiar el lote
        
        # Insertar los datos restantes si el número de registros no es múltiplo de 1000
        if movies_batch:
            movies_collection.insert_many(movies_batch)

    print("Datos cargados exitosamente a MongoDB.")

load_data_to_mongo()

Datos cargados exitosamente a MongoDB.


In [5]:
# ============================
# CONSULTAS EN MONGODB CON PYSPARK (punto iii)
# ============================

from flask import Flask, jsonify, request
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pymongo import MongoClient
from pyspark.sql.functions import col
import findspark
import json
import re

# Iniciar la sesión de Spark
findspark.init()
spark = SparkSession.builder.appName("TFinal")\
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
        .getOrCreate()

# Configurar el nivel de logs a ERROR para evitar los warnings innecesarios
spark.sparkContext.setLogLevel("ERROR")

# Conectar a MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["Movielens"]

# Verificar y listar las colecciones existentes en la base de datos de MongoDB
print("Colecciones disponibles en la base de datos 'Movielens':")
print(db.list_collection_names())

# Cargar las colecciones desde MongoDB
movies_df = spark.read.format("mongo") \
    .option("uri", "mongodb://localhost:27017/") \
    .option("database", "Movielens") \
    .option("collection", "movies") \
    .load().drop("_id")
    
ratings_df = spark.read.format("mongo") \
    .option("uri", "mongodb://localhost:27017/") \
    .option("database", "Movielens") \
    .option("collection", "ratings") \
    .load().drop("_id")

users_df = spark.read.format("mongo") \
    .option("uri", "mongodb://localhost:27017/") \
    .option("database", "Movielens") \
    .option("collection", "users") \
    .load().drop("_id")

# Convertir el DataFrame a JSON (como un RDD de Strings en formato JSON)
movies_rdd= movies_df.toJSON()
ratings_rdd= ratings_df.toJSON()
users_rdd= users_df.toJSON()

print("Primeras 3 filas de 'movies':")
display(movies_rdd.take(3))
print("Primeras 3 filas de 'ratings':")
display(ratings_rdd.take(3))
print("Primeras 3 filas de 'users':")
display(users_rdd.take(3))

spark.stop()

Colecciones disponibles en la base de datos 'Movielens':
['movies', 'ratings', 'users']
Primeras 3 filas de 'movies':


['{"genres":["Animation","Children\'s","Comedy"],"movie_id":1,"title":"Toy Story (1995)","year":1995}',
 '{"genres":["Adventure","Children\'s","Fantasy"],"movie_id":2,"title":"Jumanji (1995)","year":1995}',
 '{"genres":["Comedy","Romance"],"movie_id":3,"title":"Grumpier Old Men (1995)","year":1995}']

Primeras 3 filas de 'ratings':


['{"movie_id":1193,"rating":5,"user_id":1}',
 '{"movie_id":661,"rating":3,"user_id":1}',
 '{"movie_id":914,"rating":3,"user_id":1}']

Primeras 3 filas de 'users':


['{"age":"1","gender":"F","occupation":"K-12 student","user_id":1}',
 '{"age":"56","gender":"M","occupation":"self-employed","user_id":2}',
 '{"age":"25","gender":"M","occupation":"scientist","user_id":3}']

In [None]:
# ============================
# CONSULTAS EN MONGODB CON PYSPARK (punto iii)
# ============================

from flask import Flask, jsonify, request
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import findspark

# Inicialización de Flask
app = Flask("Final")

findspark.init()
conf = SparkConf().setMaster("local[*]").setAppName("MiSpark")
sc = SparkContext(conf = conf)

from flask import Flask, jsonify, request
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pymongo import MongoClient
from pyspark.sql.functions import col
import findspark
import json
import re

# Iniciar la sesión de Spark
findspark.init()
spark = SparkSession.builder.appName("TFinal")\
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
        .getOrCreate()

# Configurar el nivel de logs a ERROR para evitar los warnings innecesarios
spark.sparkContext.setLogLevel("ERROR")

# Conectar a MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["Movielens2"]

# Cargar las colecciones desde MongoDB
movies_df = spark.read.format("mongo") \
    .option("uri", "mongodb://localhost:27017/") \
    .option("database", "Movielens2") \
    .option("collection", "movies") \
    .load().drop("_id")
    
ratings_df = spark.read.format("mongo") \
    .option("uri", "mongodb://localhost:27017/") \
    .option("database", "Movielens2") \
    .option("collection", "ratings") \
    .load().drop("_id")

users_df = spark.read.format("mongo") \
    .option("uri", "mongodb://localhost:27017/") \
    .option("database", "Movielens2") \
    .option("collection", "users") \
    .load().drop("_id")

# Convertir el DataFrame a JSON (como un RDD de Strings en formato JSON)
movies_rdd= movies_df.toJSON()
ratings_rdd= ratings_df.toJSON()
users_rdd= users_df.toJSON()


# Construir RDD con el rating promedio de cada película
def calculate_average_ratings():
     # (movie_id, (rating, 1)) -> (movie_id, (sum(ratings), count(ratings)))
    ratings_avg_rdd = ratings_rdd \
        .map(lambda r: (r['movie_id'], (r['rating'], 1)))\
        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
        .mapValues(lambda v: v[0] / v[1])
    return ratings_avg_rdd


@app.route('/rate_top20', methods=['GET'])
def rate_top20():
    # Obtener los parámetros de la solicitud (si existen)
    year = request.args.get('year', default=None, type=int)
    genre = request.args.get('genre', default=None, type=str)

    # Filtrar películas por género y año (usando el RDD de movies)
    filtered_movies = movies_rdd
    if year:
        filtered_movies = filtered_movies.filter(lambda m: m['year'] == year)  # Filtrar por año
    if genre:
        filtered_movies = filtered_movies.filter(lambda m: genre in m['genres'])

    # Calcular el rating promedio de cada película
    ratings_avg_rdd = calculate_average_ratings()

    # Unir los RDDs de ratings promedio con las películas filtradas
    filtered_ratings = filtered_movies.map(lambda m: (m['movie_id'], m['title'])) \
        .join(ratings_avg_rdd)  # Une (movie_id, title) con (movie_id, average_rating)

    # Ordenar las películas por rating promedio de manera descendente
    top_20 = filtered_ratings.takeOrdered(20, key=lambda x: -x[1][1])  # Ordenar por rating descendente

    # Devolver los resultados
    top_20_titles = [(movie[1][0], movie[1][1]) for movie in top_20]  # [(title, avg_rating)]
    return jsonify(top_20_titles)


@app.route('/rate_bottom20', methods=['GET'])
def rate_bottom20():
    year = request.args.get('year', default=None, type=int)
    genre = request.args.get('genre', default=None, type=str)

    filtered_movies = movies_rdd
    if year:
        filtered_movies = filtered_movies.filter(lambda m: m['year'] == year)
    if genre:
        filtered_movies = filtered_movies.filter(lambda m: genre in m['genres'])

    ratings_avg_rdd = calculate_average_ratings()

    filtered_ratings = filtered_movies.map(lambda m: (m['movie_id'], m['title'])) \
        .join(ratings_avg_rdd)

    bottom_20 = filtered_ratings.takeOrdered(20, key=lambda x: x[1][1])  # Ordenar por rating ascendente

    bottom_20_titles = [(movie[1][0], movie[1][1]) for movie in bottom_20]
    return jsonify(bottom_20_titles)


@app.route('/count_top20', methods=['GET'])
def count_top20():
    year = request.args.get('year', default=None, type=int)
    genre = request.args.get('genre', default=None, type=str)

    filtered_movies = movies_rdd
    if year:
        filtered_movies = filtered_movies.filter(lambda m: m['year'] == year)
    if genre:
        filtered_movies = filtered_movies.filter(lambda m: genre in m['genres'])

    # Contar las valoraciones de cada película
    movie_count_rdd = ratings_rdd.map(lambda r: (r['movie_id'], 1)) \
                                 .reduceByKey(lambda a, b: a + b)

    filtered_count = filtered_movies.map(lambda m: (m['movie_id'], m['title'])) \
                                    .join(movie_count_rdd)

    top_20_most_viewed = filtered_count.takeOrdered(20, key=lambda x: -x[1][1])  # Ordenar por el número de valoraciones

    top_20_movies_info = [(movie[1][0], movie[1][1]) for movie in top_20_most_viewed]
    return jsonify(top_20_movies_info)

# http://127.0.0.1:5000/recomendaciones?userAge=25&movieYear=1999
app.run(debug=True)