## 1. Import Required Libraries

In [None]:
import asyncio
import json
import logging
import os
import re
import uuid
import pandas as pd
import numpy as np
from src.memory.neo4j_store import Neo4jStore

# Setup logging and reproducible seed
logging.basicConfig(level=logging.INFO)
np.random.seed(1234)

print('Imports complete')

## 2. Configure Environment, Paths & Backups

In [None]:
BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), '..'))  # adjust as necessary
NOTEBOOK_BACKUP_DIR = os.path.join(BASE_DIR, 'notebooks', 'backups')
os.makedirs(NOTEBOOK_BACKUP_DIR, exist_ok=True)
DRY_RUN = True  # set to False to apply changes
BATCH_LIMIT = 50  # number of nodes to test in notebook runs (for safety)
print(f'Backups will be stored in: {NOTEBOOK_BACKUP_DIR}')

## 3. Load Dataset(s) into a Pandas DataFrame (from DB)

In [None]:
async def list_missing_app_id(limit=BATCH_LIMIT):
    store = Neo4jStore()
    await store.initialize()
    if not store.neo4j_driver:
        raise RuntimeError('Neo4j driver not available')
    q = ('MATCH (m:Memory) WHERE (m.app_id IS NULL OR m.app_id = 
 ) '
         'RETURN elementId(m) as eid, m.metadata as metadata, m.content as content, m.created_at as created_at LIMIT $limit')
    rows = await store.execute_cypher(q, {'limit': limit})
    await store.close()
    df = pd.DataFrame(rows) if rows else pd.DataFrame(columns=['eid','metadata','content','created_at'])
    return df

df = asyncio.run(list_missing_app_id())
df.head(10)


## 4. Quick Data Inspection and Sampling

In [None]:
def sample_inspect(df: pd.DataFrame):
    print('Total missing:', len(df))
    # Parse metadata JSON column (it may be a dict already or a JSON string)
    def parse_meta(m):
        try:
            if isinstance(m, str) and m.strip():
                return json.loads(m)
            elif isinstance(m, dict):
                return m
        except Exception:
            return {}
        return {}
    df['parsed_meta'] = df['metadata'].apply(parse_meta)
    df['meta_app_id'] = df['parsed_meta'].apply(lambda x: x.get('app_id'))
    print(df['meta_app_id'].value_counts(dropna=False).head(10))
    return df

df = sample_inspect(df)
df[['eid','meta_app_id','content']].head(10)


## 5. Define app_id Validation Rules

In [None]:
APP_ID_REGEX = re.compile(r'^[0-9a-fA-F\-]{36}$')  # UUID v4/5 canonical form

def is_valid_app_id(aid):
    if not aid or not isinstance(aid, str):
        return False
    return bool(APP_ID_REGEX.match(aid))

# Quick check on our sample
bad_count = df['meta_app_id'].apply(lambda x: not is_valid_app_id(x) if x else True).sum()
print(f'Invalid or missing app_id values in sample: {bad_count}/{len(df)}')


## 6. Detect Missing, Duplicate, and Invalid app_id

In [None]:
def detect_issues(df):
    df['is_valid'] = df['meta_app_id'].apply(lambda x: is_valid_app_id(x))
    df['is_missing'] = df['meta_app_id'].isnull()
    # Detect duplicates among provided app_id values
    dup_mask = df.duplicated('meta_app_id', keep=False) & (df['meta_app_id'].notnull())
    df['is_dup'] = dup_mask
    return df

df = detect_issues(df)
df[['eid','meta_app_id','is_missing','is_valid','is_dup']].head(10)


## 7. Summarize Issues and Visualize Counts

In [None]:
def report(df):
    total = len(df)
    missing = df['is_missing'].sum()
    invalid = (~df['is_valid']).sum()
    dup = df['is_dup'].sum()
    return {'total': total, 'missing': int(missing), 'invalid': int(invalid), 'dup': int(dup)}

print(report(df))


## 8. Normalize and Clean app_id

In [None]:
def normalize_app_id(aid):
    if not aid or not isinstance(aid, str):
        return None
    # Trim and lower-case the characters; preserve hyphens; ensure canonical uuid format is preserved
    x = aid.strip()
    # If it looks like uuid in any case, normalize to lowercase
    if APP_ID_REGEX.match(x):
        return x.lower()
    # Try to remove any non-hex and hyphen chars that might be noise
    cleaned = re.sub('[^0-9a-fA-F\-]', '', x)
    if APP_ID_REGEX.match(cleaned):
        return cleaned.lower()
    return None

# Apply normalization
df['normalized_app_id'] = df['meta_app_id'].apply(normalize_app_id)
df['normalized_app_id'].head(10)


## 9. Map External IDs to Canonical app_id (lookup table)

In [None]:
def apply_mapping(df, mapping):
    # mapping is dict: external -> canonical
    df['mapped_app_id'] = df['normalized_app_id'].map(mapping).fillna(df['normalized_app_id'])
    return df

# Example mapping: empty for now
mapping = {}
df = apply_mapping(df, mapping)
df[['eid','normalized_app_id','mapped_app_id']].head(10)


## 10. Generate New app_id for Missing or Invalid Entries

In [None]:
def generate_app_id(row):
    # Prefer deterministic generation from metadata values if available
    try:
        parsed = row['parsed_meta'] if pd.notnull(row['parsed_meta']) else {}
    except Exception:
        parsed = {}
    if parsed.get('app_id'):
        return parsed['app_id']
    if parsed.get('source') and parsed.get('chunk_index') is not None:
        ns = uuid.UUID('f8bd0f6e-0c4c-4654-9201-12c4f2b4b5ef')
        return str(uuid.uuid5(ns, f
    content = row.get('content') or ''
    ns = uuid.UUID('f8bd0f6e-0c4c-4654-9201-12c4f2b4b5ef')
    return str(uuid.uuid5(ns, content[:4096]))

# Apply only to rows missing or invalid mapped_app_id
mask = (~df['mapped_app_id'].apply(is_valid_app_id)) | df['mapped_app_id'].isnull()
df.loc[mask, 'new_app_id'] = df[mask].apply(generate_app_id, axis=1)
df[['eid','meta_app_id','normalized_app_id','mapped_app_id','new_app_id']].head(10)


## 11. Apply Repairs with Dry Run and Change Log

In [None]:
def build_change_log(df):
    changes = []
    for idx, row in df.iterrows():
        old = row['meta_app_id'] if row.get('meta_app_id') else None
        new = row['new_app_id'] if row.get('new_app_id') else row.get('mapped_app_id')
        if new and (old != new):
            changes.append({'eid': row['eid'], 'old_app_id': old, 'new_app_id': new, 'reason': 'missing/invalid/dupe'})
    return pd.DataFrame(changes)

changes = build_change_log(df)
print(changes.head(20))

# If DRY_RUN, do not apply changes to DB. Otherwise, update nodes in Neo4j with new app_id and metadata
async def apply_changes_to_db(changes_df, dry_run=True):
    if changes_df.empty:
        print('No changes to apply')
        return
    store = Neo4jStore()
    await store.initialize()
    applied = []
    for _, r in changes_df.iterrows():
        eid = r['eid']
        new_app_id = r['new_app_id']
        if not dry_run:
            # Fetch existing metadata
            q = 'MATCH (m:Memory) WHERE elementId(m) = $eid RETURN m.metadata as metadata'
            res = await store.execute_cypher(q, {'eid': eid})
            meta = {}
            if res and res[0].get('metadata'):
                try:
                    meta = json.loads(res[0]['metadata']) if isinstance(res[0]['metadata'], str) else res[0]['metadata']
                except Exception:
                    meta = {}
            meta['app_id'] = new_app_id
            # Update node property and metadata field to include app_id
            update_q = 'MATCH (m:Memory) WHERE elementId(m) = $eid SET m.app_id = $app_id, m.metadata = $meta RETURN elementId(m) as eid'
            await store.execute_cypher(update_q, {'eid': eid, 'app_id': new_app_id, 'meta': json.dumps(meta)})
            applied.append(eid)
    await store.close()
    print(f'Applied {len(applied)} updates')

# Run as dry run here
changes_df = changes.copy()
changes_df['new_app_id'] = changes_df['new_app_id'].fillna(changes_df['new_app_id'])
print('Preview of changes:')
print(changes_df.head(20))

# To apply changes, toggle dry_run to False and run:
# asyncio.run(apply_changes_to_db(changes_df, dry_run=False))


## 12. Persist Repairs and Create Versioned Backup

In [None]:
def backup_original_df(df, backup_dir=NOTEBOOK_BACKUP_DIR):
    if df.empty:
        return None
    outpath = os.path.join(backup_dir, f'missing_appid_backup_{pd.Timestamp.now().strftime("%Y%m%d%H%M%S")}.parquet')
    df.to_parquet(outpath, index=False)
    return outpath

bk = backup_original_df(df)
print('Backup saved to:', bk)


## 13. Unit Tests and Validation Functions

In [None]:
# Basic sanity tests (not using pytest in notebook)
assert is_valid_app_id(str(uuid.uuid4())) == True
assert normalize_app_id(' ' + str(uuid.uuid4()).upper() + ' ') is not None
print('Basic tests passed')

## 14. End-to-end Example: Fix a Sample Subset

In [None]:
# Demo run: build the change log and run in dry-run mode, optionally apply
changes_preview = changes.copy()
print(changes_preview.head(20))
# To apply changes in the real DB (on a small subset) set DRY_RUN=False and run the apply function
# asyncio.run(apply_changes_to_db(changes_preview, dry_run=False))

print('End of notebook')