In [None]:
# -----------------------------------------------------------------------------
# 📘 Notebook: 03B_neo4j_ingestion.ipynb
#
# Purpose:
#   Import cleaned per-run summaries into a Neo4j graph for interactive
#   exploration and quality auditing. Each run becomes a node linked to
#   its date and metric values.
#
# Steps:
#   1. Load cleaned dataset (run_summary_cleaned.parquet)
#   2. Connect securely to Neo4j (via .env)
#   3. Create constraints and indexes
#   4. Import runs + date relationships
#   5. Optionally attach metric nodes
#   6. Validate import with diagnostic Cypher queries
#
# Input : ../data/strava/processed/run_summary_cleaned.parquet
# Output: Populated Neo4j graph
# Next  : Stage 4 – Feature Engineering & Clustering
# -----------------------------------------------------------------------------


In [None]:
# 📘 03B_neo4j_ingestion.ipynb
# Import cleaned running data into Neo4j for interactive exploration.

from neo4j import GraphDatabase
from dotenv import load_dotenv
from pathlib import Path
import pandas as pd
import os
from tqdm import tqdm

# --- 1. Load environment variables and connect securely ---
load_dotenv()
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USER = os.getenv("NEO4J_USER")
NEO4J_PASS = os.getenv("NEO4J_PASS")

try:
    driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS))
    with driver.session() as s:
        info = s.run("RETURN 1 AS ok").single()
        print(f"✅ Connected to Neo4j at {NEO4J_URI}")
except Exception as e:
    raise RuntimeError(f"❌ Neo4j connection failed: {e}")


In [None]:

# --- 2. Load cleaned running dataset -----------------------------------------
df_path = Path("../data/strava/processed/run_summary_cleaned.parquet")
df = pd.read_parquet(df_path)
print(f"Loaded {len(df):,} runs for graph import")

In [None]:
# --- 3. Ensure constraints & indexes ----------------------------------------
with driver.session() as s:
    s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (r:Run) REQUIRE r.run_id IS UNIQUE")
    s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (d:Date) REQUIRE d.date IS UNIQUE")
    s.run("CREATE INDEX run_metrics_idx IF NOT EXISTS FOR (r:Run) ON (r.avg_pace, r.total_distance_km)")
print("✅ Constraints and index ensured.")


In [None]:
# --- 4. Define batch import function ----------------------------------------
def import_runs_batch(tx, rows):
    """
    Insert or update a batch of runs and their associated Date nodes.
    Using UNWIND for batch import improves speed and keeps it idempotent.
    """
    query = """
    UNWIND $rows AS row
    MERGE (r:Run {run_id: row.run_id})
      SET r.total_distance_km = row.total_distance_km,
          r.avg_pace          = row.avg_pace,
          r.avg_speed         = row.avg_speed,
          r.avg_cadence       = row.avg_cadence,
          r.avg_hr            = row.avg_hr,
          r.elevation_gain    = row.elevation_gain,
          r.duration_min      = row.duration_min,
          r.missing_pct       = row.missing_pct
    MERGE (d:Date {date: date(row.date)})
    MERGE (r)-[:ON_DATE]->(d)
    """
    tx.run(query, rows=rows)

In [None]:
# --- 6. Validate import -----------------------------------------------------
with driver.session() as s:
    stats = s.run("""
        MATCH (r:Run)-[:ON_DATE]->(d:Date)
        RETURN count(r) AS runs, count(DISTINCT d) AS dates
    """).data()[0]
print(f"📈 Graph now contains {stats['runs']} runs across {stats['dates']} unique dates.")

In [None]:

# --- 7. Validate & close ----------------------------------------------------
# (Run this LAST)

# Quick sanity check before closing
with driver.session() as s:
    stats = s.run("""
        MATCH (r:Run)-[:ON_DATE]->(d:Date)
        RETURN count(r) AS runs, count(DISTINCT d) AS dates
    """).data()[0]
print(f"📈 Graph now contains {stats['runs']} runs across {stats['dates']} unique dates.")

# Close the driver once you're done
try:
    driver.close()
    print("🔒 Connection closed.")
except Exception as e:
    print(f"⚠️ Skipping close (driver was already closed?): {e}")
