In [None]:
!pip install neo4j

In [None]:
import pandas as pd
import json
from tqdm import tqdm
from neo4j import GraphDatabase

# ========================
# Neo4j Connection Setup
# ========================
neo4j_uri = "bolt://neo4j:7687"  # Using service DNS in your Kubernetes cluster
neo4j_username = "neo4j"
neo4j_password = ""              # No password as configured
driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_username, neo4j_password))

# ==============================================
# STEP 1: Import Movies and Genre Relationships
# ==============================================

# Retrieve existing movie IDs from Neo4j to avoid duplicates
existing_movie_ids = set()
with driver.session() as session:
    result = session.run("MATCH (m:Movie) RETURN m.id AS id")
    for record in result:
        existing_movie_ids.add(record["id"])
print("Already imported movies:", len(existing_movie_ids))

# Define file path and count total rows in movies-master.csv
movies_csv = "/home/jovyan/work/cinescope/data/csv-files/movies-master.csv"  # Adjust path if necessary
with open(movies_csv, "r", encoding="utf-8") as f:
    total_movie_rows = sum(1 for _ in f) - 1  # subtract header
print("Total rows in movies CSV:", total_movie_rows)

movies_chunksize = 10000

# Process movies in chunks with progress bar
with pd.read_csv(movies_csv, chunksize=movies_chunksize, encoding="utf-8") as reader:
    for df_chunk in tqdm(reader, total=(total_movie_rows // movies_chunksize) + 1, desc="Importing Movies"):
        # Filter out movies that are already imported based on the "id" column
        df_remaining = df_chunk[~df_chunk["id"].isin(existing_movie_ids)]
        
        with driver.session() as session:
            for _, row in df_remaining.iterrows():
                try:
                    movie_id = int(row["id"])
                except Exception as e:
                    continue  # Skip rows with invalid IDs

                title = row["title"]
                popularity = row["popularity"]
                vote_average = row["vote_average"] if pd.notnull(row["vote_average"]) else None

                # Merge the Movie node (creates or updates)
                session.run(
                    """
                    MERGE (m:Movie {id: $id})
                    ON CREATE SET m.title = $title, m.popularity = $pop, m.vote_average = $vote_average
                    ON MATCH SET m.title = $title, m.popularity = $pop, m.vote_average = $vote_average
                    """,
                    {"id": movie_id, "title": title, "pop": popularity, "vote_average": vote_average}
                )

                # Process the "genres" column (expects a JSON string)
                try:
                    genres_json = json.loads(row["genres"])
                except Exception as e:
                    genres_json = []
                for g in genres_json:
                    genre_id = g.get("id")
                    genre_name = g.get("name")
                    if genre_id is not None:
                        session.run(
                            """
                            MATCH (m:Movie {id: $movie_id})
                            MERGE (g:Genre {id: $genre_id})
                            ON CREATE SET g.name = $genre_name
                            MERGE (m)-[:HAS_GENRE]->(g)
                            """,
                            {"movie_id": movie_id, "genre_id": int(genre_id), "genre_name": genre_name}
                        )
                # Add the movie_id to the set so it isn't re-processed in later chunks
                existing_movie_ids.add(movie_id)

print("Movies import complete.")

# ==========================================================
# STEP 2: Import People and Actor–Movie Relationships
# ==========================================================

# Load person_ids.csv into a dictionary for quick lookup
df_person = pd.read_csv("/home/jovyan/work/cinescope/data/csv-files/person_ids.csv", encoding="utf-8")
person_dict = {}
for _, row in df_person.iterrows():
    try:
        person_id = int(row["id"])
    except Exception:
        continue
    person_dict[person_id] = {
        "name": row["name"],
        "popularity": row["popularity"],
        "adult": row["adult"]
    }
print("Loaded", len(person_dict), "persons from /home/jovyan/work/cinescope/data/csv-files/person_ids.csv")

# Retrieve existing person IDs from Neo4j
existing_person_ids = set()
with driver.session() as session:
    result = session.run("MATCH (p:Person) RETURN p.id AS id")
    for record in result:
        existing_person_ids.add(record["id"])
print("Already imported persons:", len(existing_person_ids))

# Process actor-movie-ids-master.csv in chunks
actor_movie_csv = "/home/jovyan/work/cinescope/data/csv-files/actor-movie-ids-master.csv"  # Adjust path if necessary
with open(actor_movie_csv, "r", encoding="utf-8") as f:
    total_actor_movie_rows = sum(1 for _ in f) - 1
print("Total rows in actor-movie CSV:", total_actor_movie_rows)

actor_chunksize = 10000
with pd.read_csv(actor_movie_csv, chunksize=actor_chunksize, encoding="utf-8") as reader:
    for df_chunk in tqdm(reader, total=(total_actor_movie_rows // actor_chunksize) + 1, desc="Importing Actor-Movie Relationships"):
        with driver.session() as session:
            for _, row in df_chunk.iterrows():
                try:
                    actor_id = int(row["actor_id"])
                    movie_id = int(row["movie_id"])
                except Exception as e:
                    continue

                # Skip relationships for movies that haven't been imported
                if movie_id not in existing_movie_ids:
                    continue

                # Merge the Person node if not already present
                if actor_id not in existing_person_ids:
                    person_info = person_dict.get(actor_id, None)
                    if person_info:
                        session.run(
                            """
                            MERGE (p:Person {id: $actor_id})
                            ON CREATE SET p.name = $name, p.popularity = $pop, p.adult = $adult
                            """,
                            {"actor_id": actor_id, "name": person_info["name"],
                             "pop": person_info["popularity"], "adult": person_info["adult"]}
                        )
                    else:
                        # If actor info is missing, create a minimal node
                        session.run(
                            """
                            MERGE (p:Person {id: $actor_id})
                            """,
                            {"actor_id": actor_id}
                        )
                    existing_person_ids.add(actor_id)
                
                # Create the ACTED_IN relationship between the Person and the Movie
                session.run(
                    """
                    MATCH (p:Person {id: $actor_id})
                    MATCH (m:Movie {id: $movie_id})
                    MERGE (p)-[:ACTED_IN]->(m)
                    """,
                    {"actor_id": actor_id, "movie_id": movie_id}
                )

print("Actor-Movie relationships import complete.")

driver.close()
print("ETL Process Completed Successfully.")

Already imported movies: 589512
Total rows in movies CSV: 1139406


Importing Movies:  54% 62/114 [4:16:06<48:51:33, 3382.57s/it]

In [8]:
import os

# Print the current working directory
print("Current working directory:", os.getcwd())

# Expand the tilde and list files in the expected directory
expected_dir = os.path.expanduser("~/work/cinescope/data/csv-files/")
print("Files in {}: {}".format(expected_dir, os.listdir(expected_dir)))

Current working directory: /home/jovyan/work/cinescope/jupyter-code
Files in /home/jovyan/work/cinescope/data/csv-files/: ['person_ids.csv', 'movies-master.csv', 'actor-movie-ids-master.csv', 'genre.csv', 'movie_ids.csv', '.ipynb_checkpoints']
