In [2]:
%load_ext autoreload
%autoreload 2

# Neo4j v2.0.1

**Descarga**
- [Descargar Neo4j Desktop v2.0.1 (macOS)](https://neo4j.com/download/neo4j-desktop/?edition=desktop&flavour=osx&release=2.0.1&offline=false)

**Configuración**
1. Instalar Neo4j Desktop.
2. Abrir la app y crear un nuevo proyecto con "Add".
3. Dentro del proyecto, seleccionar "Add Graph" → "Local DBMS".
4. Configurar:
   - Name: `BDNR_ml-25m`
   - Password: `bdnr2025`
   - Versión usada: `2025.05.0`
5. Hacer clic en "Create".

**Conexión**
```
neo4j://127.0.0.1:7687
```

In [3]:
import json
from src.benchmark.utils import get_system_info
import py2neo

print("py2neo Version:", py2neo.__version__)
print("System Info:\n", json.dumps(get_system_info(), indent=3))

py2neo Version: 2021.2.4
System Info:
 {
   "python_version": "3.12.1",
   "system": {
      "os": "Darwin",
      "release": "24.5.0",
      "machine": "x86_64",
      "processor": "i386"
   },
   "cpu": {
      "physical_cores": 8,
      "total_cores": 8,
      "frequency_mhz": 2400
   },
   "memory": {
      "total_ram_gb": 16.0,
      "available_ram_gb": 0.72
   }
}


# Data Load

In [4]:
from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "bdnr2025"))

In [5]:
try:
    graph.run("RETURN 1").evaluate()
    print("Successfully connected to Neo4j")
except Exception as e:
    print(f"Connection failed: {e}")

Successfully connected to Neo4j


In [6]:
databases = graph.run("SHOW DATABASES").data()
print("Databases:", [db["name"] for db in databases])

Databases: ['neo4j', 'system']


In [7]:
from pprint import pprint

result = graph.run("SHOW DATABASES").data()
for db in result:
    print(f"\n{db['name']}")
    pprint(db, indent=2, width=100, depth=3, sort_dicts=False)


neo4j
{ 'name': 'neo4j',
  'type': 'standard',
  'aliases': [],
  'access': 'read-write',
  'address': 'localhost:7687',
  'role': 'primary',
  'writer': True,
  'requestedStatus': 'online',
  'currentStatus': 'online',
  'statusMessage': '',
  'default': True,
  'home': True,
  'constituents': []}

system
{ 'name': 'system',
  'type': 'system',
  'aliases': [],
  'access': 'read-write',
  'address': 'localhost:7687',
  'role': 'primary',
  'writer': True,
  'requestedStatus': 'online',
  'currentStatus': 'online',
  'statusMessage': '',
  'default': False,
  'home': False,
  'constituents': []}


### Batch Load: UNWIND & MERGE

In [8]:
import pandas as pd

# Read CSV files
base_path = "/Users/efaliveni//Downloads/ml-25m"

ratings = pd.read_csv(f"{base_path}/ratings.csv")
movies = pd.read_csv(f"{base_path}/movies.csv")
tags = pd.read_csv(f"{base_path}/tags.csv")
genome_tags = pd.read_csv(f"{base_path}/genome-tags.csv")
genome_scores = pd.read_csv(f"{base_path}/genome-scores.csv")

In [9]:
# Filter Data
SAMPLE_SIZE = 500

selected_users = ratings['userId'].unique()[:SAMPLE_SIZE]
filtered_ratings = ratings[ratings['userId'].isin(selected_users)]
filtered_movie_ids = filtered_ratings['movieId'].unique()
filtered_movies = movies[movies['movieId'].isin(filtered_movie_ids)].copy()
filtered_tags = tags[tags['movieId'].isin(filtered_movie_ids)]
filtered_scores = genome_scores[genome_scores['movieId'].isin(filtered_movie_ids)]

In [10]:
from collections import defaultdict
# Create a dictionary of genome tags for quick lookup
genome_tags_dict = {row['tagId']: row['tag'] for _, row in genome_tags.iterrows()}

# Build dictionaries to accumulate data
ratings_by_movie = defaultdict(list)
scores_by_movie = defaultdict(list)

ratings_by_user = defaultdict(list)
tags_by_user = defaultdict(list)

In [11]:
# Process ratings and tags to build both movie and user views
RELEVANCE_THRESHOLD = 0.8

# Ratings
for _, r in filtered_ratings.iterrows():
    movie_id = int(r.movieId)
    user_id = int(r.userId)
    
    # Add to movie's ratings
    ratings_by_movie[movie_id].append({
        "userId": user_id,
        "rating": float(r.rating),
        "timestamp": int(r.timestamp)
    })
    
    # Add to user's ratings
    ratings_by_user[user_id].append({
        "movieId": movie_id,
        "rating": float(r.rating),
        "timestamp": int(r.timestamp)
    })
    
# Tags (only for users), genome tag is the cleaned infered tag
for _, t in filtered_tags.dropna(subset=["tag"]).iterrows():
    movie_id = int(t.movieId)
    user_id = int(t.userId)
    
    tags_by_user[user_id].append({
        "movieId": movie_id,
        "tag": t.tag,
        "timestamp": int(t.timestamp)
    })
    
# Genome scores (only for movies), with 0.8 threshold
for _, s in filtered_scores[filtered_scores.relevance > RELEVANCE_THRESHOLD].iterrows():
    movie_id = int(s.movieId)
    tag_id = int(s.tagId)
    
    scores_by_movie[movie_id].append({
        "tagId": tag_id,
        "tag": genome_tags_dict.get(tag_id, ""),
        "relevance": float(s.relevance)
    })

In [12]:
import re

# Build movie documents
movie_docs = []
for _, m in filtered_movies.iterrows():
    movie_id = int(m.movieId)
   
    # Extract year from title
    title = m.title
    year_match = re.search(r"\((\d{4})\)$", title)
    year = int(year_match.group(1)) if year_match else None
    
    # Get ratings and tags for this movie
    movie_ratings = ratings_by_movie.get(movie_id, [])
    tag_entries = scores_by_movie.get(movie_id, [])
    
    # Calculate statistics
    rating_count = len(movie_ratings)
    if rating_count > 0:
        avg_rating = sum(r['rating'] for r in movie_ratings) / rating_count
        # Calculate rating distribution (count of each star rating)
        rating_dist = {"0.5":0, "1.0":0, "1.5":0, "2.0":0, "2.5":0, 
                      "3.0":0, "3.5":0, "4.0":0, "4.5":0, "5.0":0}
        for r in movie_ratings:
            rating_key = f"{r['rating']}"
            rating_dist[rating_key] += 1 
    else:
        avg_rating = 0.0
        rating_dist = {}  # will result in empty arrays below

    # Neo4j only allows:
    # - Single values: string, int, float, boolean, datetime
    # - Arrays of primitive types: [int], [string], etc.
    dist_keys = list(rating_dist.keys()) if rating_dist else []
    dist_values = list(rating_dist.values()) if rating_dist else []
    
    tag_ids = [int(e["tagId"]) for e in tag_entries]
    tag_relevances = [float(e["relevance"]) for e in tag_entries]
    tag_names = [str(e["tag"]) for e in tag_entries]
    
    movie_docs.append({
        "movieId": movie_id,
        "title": title,
        "year": year,
        "genres": m.genres if isinstance(m.genres, list) else [],
        "ratingCount": rating_count,
        "avgRating": avg_rating,
        "ratingDistKeys": dist_keys,
        "ratingDistValues": dist_values,
        "tagIds": tag_ids,
        "tagRelevances": tag_relevances,
        "tagNames": tag_names
    })

In [13]:
user_docs = []

for user_id in selected_users:
    user_id = int(user_id)

    user_ratings = ratings_by_user.get(user_id, [])
    user_tags = tags_by_user.get(user_id, [])
    
    # Rating stats
    rating_count = len(user_ratings)
    avg_rating = sum(r['rating'] for r in user_ratings) / rating_count if rating_count > 0 else 0.0
    
    # Sort ratings by timestamp
    user_ratings.sort(key=lambda r: r['timestamp'])

    # Flatten ratings for Neo4j
    rated_movie_ids = [r['movieId'] for r in user_ratings]
    rated_scores = [r['rating'] for r in user_ratings]
    rated_timestamps = [r['timestamp'] for r in user_ratings]

    # (Optional future support for tags)
    # tag_movie_ids = [t['movieId'] for t in user_tags]
    # tag_labels = [t['tag'] for t in user_tags]
    # tag_timestamps = [t['timestamp'] for t in user_tags]

    user_docs.append({
        "userId": user_id,
        "ratingCount": rating_count,
        "avgRating": round(avg_rating, 2),
        "ratedMovieIds": rated_movie_ids,
        "ratedScores": rated_scores,
        "ratedTimestamps": rated_timestamps,
        # "tagMovieIds": tag_movie_ids,
        # "tagLabels": tag_labels,
        # "tagTimestamps": tag_timestamps
    })


In [21]:
def load_users(user_data: list, batch_size: int = 1000) -> None:
    """Creates User nodes with rating history embedded as arrays."""
    
    query = """
    UNWIND $batch AS user
    MERGE (u:User {userId: user.userId})
    SET u.ratingCount = user.ratingCount,
        u.avgRating = user.avgRating,
        u.ratedMovieIds = user.ratedMovieIds,
        u.ratedScores = user.ratedScores,
        u.ratedTimestamps = user.ratedTimestamps
    """

    if batch_size is None:
        batch_size = len(user_data)

    for i in range(0, len(user_data), batch_size):
        batch = user_data[i:i + batch_size]
        graph.run(query, batch=batch)


def load_movies(movie_data: list, batch_size: int = 1000) -> None:
    """Creates Movie nodes with embedded genome tags (tagGenome)."""
         
    # Cypher query to store Movie nodes with embedded data
    query = """
    UNWIND $batch AS movie
    MERGE (m:Movie {movieId: movie.movieId})
    SET m.title = movie.title,
        m.year = movie.year,
        m.genres = movie.genres,
        m.ratingCount = movie.ratingCount,
        m.avgRating = movie.avgRating,
        m.ratingDistKeys = movie.ratingDistKeys,
        m.ratingDistValues = movie.ratingDistValues,        
        m.tagIds = movie.tagIds,
        m.tagRelevances = movie.tagRelevances,
        m.tagNames = movie.tagNames
    """
    
    if batch_size is None:
        batch_size = len(movie_data)

    for i in range(0, len(movie_data), batch_size):
        batch = movie_data[i:i + batch_size]
        graph.run(query, batch=batch)


def load_ratings(ratings_df: pd.DataFrame, batch_size: int = 1000) -> None:
    """Creates User and Movie nodes with RATED relationships."""
    rating_data = ratings_df.to_dict("records")
    
    query = """
    UNWIND $batch AS row
    MERGE (u:User {userId: row.userId})
    MERGE (m:Movie {movieId: row.movieId})
    MERGE (u)-[r:RATED]->(m)
    SET r.rating = row.rating, 
        r.timestamp = datetime({epochSeconds: row.timestamp})
    """
   
    if batch_size is None:
        batch_size = len(ratings_df)
    
    # Process in batches to avoid memory issues
    for i in range(0, len(rating_data), batch_size):
        batch = rating_data[i:i + batch_size]
        graph.run(query, batch=batch)

def load_tags(tags_df: pd.DataFrame, batch_size: int = 1000) -> None:
    """Creates TAGGED relationships between existing Users and Movies"""
    # Clean data - remove null tags
    tag_data = tags_df.dropna(subset=["tag"]).to_dict("records")
    
    query = """
    UNWIND $batch AS row
    MATCH (u:User {userId: row.userId})
    MATCH (m:Movie {movieId: row.movieId})
    MERGE (u)-[t:TAGGED]->(m)
    SET t.tag = row.tag
    """
       
    if batch_size is None:
        batch_size = len(tags_df)
        
    for i in range(0, len(tag_data), batch_size):
        batch = tag_data[i:i + batch_size]
        graph.run(query, batch=batch)

In [22]:
graph.run("MATCH (n) DETACH DELETE n")

In [23]:
import time

start_time = time.time()

stats = {
    'movies_loaded': 0,
    'users_loaded': 0,
    'rates_created': 0,
    'tags_created': 0,
    'execution_time': 0
}

BATCH_SIZE = None

# Insert data
try:
    print("Starting data loading process...")
    
    # Load users (foundational nodes)
    print("\nLoading users...")
    user_start = time.time()
    load_users(user_docs, BATCH_SIZE)
    stats['users_loaded'] = len(user_docs)
    print(f"Users loaded in {time.time() - user_start:.2f}s")
    
    # Load movies (foundational nodes)
    print("\nLoading movies...")
    movie_start = time.time()
    load_movies(movie_docs, BATCH_SIZE)
    stats['movies_loaded'] = len(movie_docs)
    print(f"Movies loaded in {time.time() - movie_start:.2f}s")
    
    # Load users through ratings
    print("\nLoading ratings...")
    ratings_start = time.time()
    load_ratings(filtered_ratings, BATCH_SIZE)
    stats['rates_created'] = len(filtered_ratings)
    print(f"Ratings loaded in {time.time() - ratings_start:.2f}s")
    
    # Load tags
    print("\nLoading tags...")
    tags_start = time.time()
    load_tags(filtered_tags, BATCH_SIZE)
    stats['tags_created'] = len(filtered_tags)
    print(f"Tags loaded in {time.time() - tags_start:.2f}s")

except Exception as e:
    print(f"\nError during data loading: {str(e)}")
    raise
finally:
    stats['execution_time'] = time.time() - start_time
    print(f"\nTotal processing time: {stats['execution_time']:.2f} seconds")
    print(f"Summary: {stats}")

Starting data loading process...

Loading users...
Users loaded in 0.23s

Loading movies...
Movies loaded in 4.68s

Loading ratings...
Ratings loaded in 77.77s

Loading tags...
Tags loaded in 68.89s

Total processing time: 151.58 seconds
Summary: {'movies_loaded': 7141, 'users_loaded': 500, 'rates_created': 62834, 'tags_created': 811156, 'execution_time': 151.57688093185425}


### Sample Movie

In [17]:
query = """
MATCH (m:Movie)
RETURN m
LIMIT 1
"""
graph.run(query)

m
"(_0:Movie {avgRating: 2.5, genres: [], movieId: 8977, ratingCount: 6, ratingDistKeys: ['0.5', '1.0', '1.5', '2.0', '2.5', '3.0', '3.5', '4.0', '4.5', '5.0'], ratingDistValues: [0, 0, 2, 0, 1, 2, 1, 0, 0, 0], tagIds: [132, 359, 508, 509, 564], tagNames: ['big budget', 'epic', 'historical', 'history', 'irish accent'], tagRelevances: [0.8362499999999999, 0.92325, 0.9425, 0.89625, 0.8847499999999999], title: 'Alexander (2004)', year: 2004})"


### Sample User

In [18]:
query = """
MATCH (u:User)
RETURN u
LIMIT 1
"""
graph.run(query).to_table()

u
"(_3039:User {avgRating: 3.81, ratedMovieIds: [5952, 2012, 2011, 1653, 1250, 6539, 6377, 3448, 1088, 899, 4308, 2161, 6711, 3949, 8360, 5878, 306, 1175, 307, 1237, 7327, 8154, 7234, 2843, 4144, 7365, 2068, 4422, 4973, 6016, 8873, 2692, 27721, 7323, 6954, 8014, 7939, 6370, 8973, 4703, 31956, 5147, 8786, 1260, 2351, 7940, 7209, 8685, 7820, 7937, 7938, 8405, 4325, 2632, 1217, 8729, 5912, 5767, 665, 2573, 27266, 8327, 32591, 5269, 3569, 27193, 5684, 7318, 296, 7361], ratedScores: [4.0, 2.5, 2.5, 4.0, 4.0, 3.5, 4.0, 4.0, 4.0, 3.5, 3.0, 3.5, 5.0, 5.0, 4.0, 4.0, 3.5, 3.5, 5.0, 5.0, 3.5, 5.0, 4.5, 4.5, 5.0, 4.0, 2.5, 3.0, 4.5, 5.0, 3.0, 5.0, 3.0, 3.5, 3.5, 3.5, 2.5, 4.5, 4.0, 4.0, 3.5, 4.0, 4.0, 3.5, 4.5, 4.5, 4.0, 1.0, 2.5, 3.0, 2.5, 3.5, 5.0, 5.0, 3.5, 3.5, 3.0, 5.0, 5.0, 4.0, 4.5, 5.0, 5.0, 0.5, 5.0, 3.0, 2.0, 2.0, 5.0, 5.0], ratedTimestamps: [1147868053, 1147868068, 1147868079, 1147868097, 1147868414, 1147868461, 1147868469, 1147868480, 1147868495, 1147868510, 1147868534, 1147868609, 1147868622, 1147868678, 1147868682, 1147868807, 1147868817, 1147868826, 1147868828, 1147868839, 1147868855, 1147868865, 1147868869, 1147868891, 1147868898, 1147869033, 1147869044, 1147869048, 1147869080, 1147869090, 1147869094, 1147869100, 1147869115, 1147869119, 1147869150, 1147869155, 1147869183, 1147869191, 1147869211, 1147869223, 1147877610, 1147877654, 1147877853, 1147877857, 1147877957, 1147877967, 1147877986, 1147878023, 1147878050, 1147878055, 1147878063, 1147878095, 1147878122, 1147878248, 1147878326, 1147878452, 1147878698, 1147878729, 1147878820, 1147878923, 1147879365, 1147879375, 1147879538, 1147879571, 1147879603, 1147879774, 1147879797, 1147879850, 1147880044, 1147880055], ratingCount: 70, userId: 1})"


# Single-Threaded Benchmark
Simulate User Load, real time latency stress

In [19]:
Explore

NameError: name 'Explore' is not defined

In [None]:
# Total de nodos por tipo
def contar_nodos():
    query = """
    MATCH (n)
    RETURN labels(n)[0] AS tipo, count(*) AS cantidad
    """
    return graph.run(query).to_table()

# Total de relaciones por tipo
def contar_relaciones():
    query = """
    MATCH ()-[r]->()
    RETURN type(r) AS relacion, count(*) AS cantidad
    """
    return graph.run(query).to_table()

# Usuarios sin ratings
def usuarios_sin_rating():
    query = """
    MATCH (u:User)
    WHERE NOT (u)-[:RATED]->()
    RETURN count(u) AS usuarios_sin_rating
    """
    return graph.run(query).to_table()

# Películas sin género ni rating ni genome tags
def peliculas_huerfanas():
    query = """
    MATCH (m:Movie)
    WHERE NOT (m)-[:HAS_GENRE]->()
      AND NOT (m)<-[:RATED]-()
      AND NOT (m)-[:HAS_RELEVANCE]->()
    RETURN count(m) AS peliculas_sin_relaciones
    """
    return graph.run(query).to_table()


In [None]:
print("Nodos por tipo:")
print(contar_nodos())

print("\nRelaciones por tipo:")
print(contar_relaciones())

print("\nUsuarios sin ratings:")
print(usuarios_sin_rating())

print("\nPelículas sin ningún vínculo:")
print(peliculas_huerfanas())
