# Download Data


In [4]:
# Global parameter - set this to the version you want to compare against
OLD_VERSION = "3.2.4.34"  # Set to None to always download, or specify version to compare

# API configuration
VERSION_URL = "https://api.beta.dofusdb.fr/version"
BASE_URL = "https://api.beta.dofusdb.fr"

# Endpoints to download
ENDPOINTS = [
    "achievement-categories",
    "achievement-objectives",
    "achievements",
    "achievement-rewards",
    "characteristics",
    "item-sets",
    "items",
    "monsters",
    "mounts",
    "spells",
    "spell-levels",
    "spell-pairs",
    "spell-states",
    "spell-types",
    "spell-variants"
]

In [5]:
import requests
import json
import os
from datetime import datetime
from typing import Dict, List, Any
import time

LIMIT = 50

# Create a session with default headers
session = requests.Session()
session.headers.update({
    "Referer": "NelsonJQ_ChangelogMaker",
    "User-Agent": "NelsonJQ_ChangelogMaker/1.0"
})

def get_current_version() -> str:
    """Fetch the current version from the API."""
    try:
        response = session.get(VERSION_URL)
        response.raise_for_status()
        version = response.text.strip().strip('"')
        print(f"Current API version: {version}")
        return version
    except requests.RequestException as e:
        print(f"Error fetching version: {e}")
        return None

def download_endpoint_data(endpoint: str) -> List[Dict[str, Any]]:
    """Download all data from a specific endpoint with pagination."""
    print(f"Downloading data from endpoint: {endpoint}")

    url = f"{BASE_URL}/{endpoint}"
    skip = 0
    all_entries = []

    while True:
        params = {
            "$limit": LIMIT,
            "$skip": skip
        }

        try:
            response = session.get(url, params=params)
            response.raise_for_status()

            data = response.json()

            # Handle different response formats
            if isinstance(data, dict):
                if 'data' in data and isinstance(data['data'], list):
                    batch_entries = data['data']
                elif 'total' in data:
                    # Some endpoints might return the data directly
                    batch_entries = data.get('data', [])
                else:
                    batch_entries = [data] if data else []
            elif isinstance(data, list):
                batch_entries = data
            else:
                batch_entries = []

            if not batch_entries:
                break

            all_entries.extend(batch_entries)
            skip += len(batch_entries)

            print(f"  Downloaded {len(all_entries)} entries so far...")

            # If we got fewer entries than the limit, we're done
            if len(batch_entries) < LIMIT:
                break

        except requests.RequestException as e:
            print(f"Error downloading from {endpoint}: {e}")
            break

        # Small delay to be respectful to the API
        time.sleep(0.1)

    print(f"  Total entries downloaded: {len(all_entries)}")
    return all_entries

def save_raw_json(data: Dict[str, Any], version: str, base_dir: str = "dofus_data"):
    """Save raw JSON data as backup."""
    raw_dir = os.path.join(base_dir, f"version_{version}", "raw_json")
    os.makedirs(raw_dir, exist_ok=True)

    for endpoint, entries in data.items():
        if endpoint == 'metadata':
            continue

        filename = f"{endpoint}.json"
        filepath = os.path.join(raw_dir, filename)

        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(entries, f, indent=2, ensure_ascii=False)

        print(f"Saved raw JSON: {filepath}")

def save_consolidated_dataset(data: Dict[str, Any], version: str, base_dir: str = "dofus_data"):
    """Save consolidated dataset with metadata."""
    version_dir = os.path.join(base_dir, f"version_{version}")
    os.makedirs(version_dir, exist_ok=True)

    # Save consolidated dataset
    dataset_path = os.path.join(version_dir, "consolidated_dataset.json")
    with open(dataset_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

    print(f"Saved consolidated dataset: {dataset_path}")

    # Save metadata separately for easy access
    metadata_path = os.path.join(version_dir, "metadata.json")
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(data['metadata'], f, indent=2, ensure_ascii=False)

    print(f"Saved metadata: {metadata_path}")

def create_dataset_summary(data: Dict[str, Any], version: str, base_dir: str = "dofus_data"):
    """Create a summary of the dataset for quick reference."""
    version_dir = os.path.join(base_dir, f"version_{version}")

    summary = {
        "version": version,
        "download_timestamp": data['metadata']['download_timestamp'],
        "endpoints": {}
    }

    for endpoint, entries in data.items():
        if endpoint == 'metadata':
            continue

        summary["endpoints"][endpoint] = {
            "total_entries": len(entries),
            "sample_keys": list(entries[0].keys()) if entries else []
        }

    summary_path = os.path.join(version_dir, "summary.json")
    with open(summary_path, 'w', encoding='utf-8') as f:
        json.dump(summary, f, indent=2, ensure_ascii=False)

    print(f"Saved dataset summary: {summary_path}")

def main():
    print("Starting Dofus DB data download...")

    # Get current version
    current_version = get_current_version()
    if not current_version:
        print("Failed to get current version. Aborting.")
        return

    # Check if we should skip download based on version
    if OLD_VERSION and OLD_VERSION == current_version:
        print(f"Current version ({current_version}) matches OLD_VERSION ({OLD_VERSION}). Aborting download.")
        return

    print(f"Proceeding with download for version: {current_version}")

    # Initialize consolidated dataset
    consolidated_data = {
        "metadata": {
            "version": current_version,
            "download_timestamp": datetime.now().isoformat(),
            "endpoints_downloaded": [],
            "total_entries": 0
        }
    }

    # Download data from all endpoints
    total_entries = 0
    for endpoint in ENDPOINTS:
        try:
            entries = download_endpoint_data(endpoint)
            consolidated_data[endpoint] = entries
            consolidated_data["metadata"]["endpoints_downloaded"].append(endpoint)
            total_entries += len(entries)
            print(f"Successfully downloaded {len(entries)} entries from {endpoint}")
        except Exception as e:
            print(f"Failed to download from {endpoint}: {e}")
            consolidated_data[endpoint] = []

    consolidated_data["metadata"]["total_entries"] = total_entries

    # Save all data
    print("\nSaving data...")
    save_raw_json(consolidated_data, current_version)
    save_consolidated_dataset(consolidated_data, current_version)
    create_dataset_summary(consolidated_data, current_version)

    print(f"\nDownload completed!")
    print(f"Version: {current_version}")
    print(f"Total entries downloaded: {total_entries}")
    print(f"Endpoints processed: {len(consolidated_data['metadata']['endpoints_downloaded'])}")

    # Create a latest version symlink/copy for easy access
    base_dir = "dofus_data"
    latest_path = os.path.join(base_dir, "latest")
    if os.path.exists(latest_path):
        if os.path.islink(latest_path):
            os.unlink(latest_path)
        else:
            import shutil
            shutil.rmtree(latest_path)

    # On Windows, create a copy instead of symlink
    import shutil
    version_dir = os.path.join(base_dir, f"version_{current_version}")
    shutil.copytree(version_dir, latest_path)
    print(f"Created latest version copy at: {latest_path}")

if __name__ == "__main__":
    main()

Starting Dofus DB data download...
Current API version: 3.2.4.35
Proceeding with download for version: 3.2.4.35
Downloading data from endpoint: achievement-categories
  Downloaded 50 entries so far...
  Downloaded 100 entries so far...
  Downloaded 126 entries so far...
  Total entries downloaded: 126
Successfully downloaded 126 entries from achievement-categories
Downloading data from endpoint: achievement-objectives
  Downloaded 50 entries so far...
  Downloaded 100 entries so far...
  Downloaded 150 entries so far...
  Downloaded 200 entries so far...
  Downloaded 250 entries so far...
  Downloaded 300 entries so far...
  Downloaded 350 entries so far...
  Downloaded 400 entries so far...
  Downloaded 450 entries so far...
  Downloaded 500 entries so far...
  Downloaded 550 entries so far...
  Downloaded 600 entries so far...
  Downloaded 650 entries so far...
  Downloaded 700 entries so far...
  Downloaded 750 entries so far...
  Downloaded 800 entries so far...
  Downloaded 850 en

# Batched faster DofusDB download

In [6]:
!pip install tqdm



In [None]:
import requests
import json
import os
from datetime import datetime
from typing import Dict, List, Any, Tuple
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from urllib.parse import urljoin
from tqdm.notebook import tqdm
import sys

LIMIT = 100  # Increased batch size
MAX_WORKERS = 8  # Adjust based on your system and API rate limits

# Thread-safe session creation
def create_session() -> requests.Session:
    """Create a session with default headers."""
    session = requests.Session()
    session.headers.update({
        "Referer": "NelsonJQ_ChangelogMaker",
        "User-Agent": "NelsonJQ_ChangelogMaker/1.0"
    })
    # Add connection pooling and timeout settings
    adapter = requests.adapters.HTTPAdapter(
        pool_connections=20,
        pool_maxsize=20,
        max_retries=3
    )
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

# Thread-local storage for sessions
thread_local = threading.local()

def get_session() -> requests.Session:
    """Get thread-local session."""
    if not hasattr(thread_local, 'session'):
        thread_local.session = create_session()
    return thread_local.session

def get_current_version() -> str:
    """Fetch the current version from the API."""
    try:
        session = get_session()
        response = session.get(VERSION_URL, timeout=10)
        response.raise_for_status()
        version = response.text.strip().strip('"')
        tqdm.write(f"✅ Current API version: {version}")
        return version
    except requests.RequestException as e:
        tqdm.write(f"❌ Error fetching version: {e}")
        return None

def download_batch(endpoint: str, skip: int, limit: int) -> Tuple[str, int, List[Dict[str, Any]]]:
    """Download a single batch of data from an endpoint."""
    session = get_session()
    url = f"{BASE_URL}/{endpoint}"

    params = {
        "$limit": limit,
        "$skip": skip
    }

    try:
        response = session.get(url, params=params, timeout=30)
        response.raise_for_status()

        data = response.json()

        # Handle different response formats
        if isinstance(data, dict):
            if 'data' in data and isinstance(data['data'], list):
                batch_entries = data['data']
            elif 'total' in data:
                batch_entries = data.get('data', [])
            else:
                batch_entries = [data] if data else []
        elif isinstance(data, list):
            batch_entries = data
        else:
            batch_entries = []

        return endpoint, skip, batch_entries

    except requests.RequestException as e:
        tqdm.write(f"⚠️ Error downloading batch from {endpoint} (skip={skip}): {e}")
        return endpoint, skip, []

def download_endpoint_data_parallel(endpoint: str, endpoint_pbar: tqdm = None) -> List[Dict[str, Any]]:
    """Download all data from a specific endpoint using parallel requests."""
    # First, get the first batch to determine total size if possible
    first_batch_endpoint, _, first_batch = download_batch(endpoint, 0, LIMIT)

    if not first_batch:
        if endpoint_pbar:
            endpoint_pbar.set_description(f"{endpoint}: No data found")
            endpoint_pbar.update(1)
        return []

    all_entries = first_batch.copy()

    # If we got fewer entries than the limit, we're done
    if len(first_batch) < LIMIT:
        if endpoint_pbar:
            endpoint_pbar.set_description(f"{endpoint}: {len(all_entries)} entries")
            endpoint_pbar.update(1)
        return all_entries

    # Calculate additional batches needed with progress tracking
    batch_pbar = tqdm(desc=f"Batches for {endpoint}", unit="batch", leave=False)

    futures = []
    skip = LIMIT

    with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, 4)) as executor:
        # Submit initial batch of requests
        for i in range(4):
            futures.append(executor.submit(download_batch, endpoint, skip, LIMIT))
            skip += LIMIT

        while futures:
            completed_futures = []
            for future in as_completed(futures, timeout=60):
                completed_futures.append(future)
                try:
                    endpoint_name, batch_skip, batch_entries = future.result()
                    batch_pbar.update(1)

                    if batch_entries:
                        all_entries.extend(batch_entries)
                        batch_pbar.set_description(f"Batches for {endpoint} ({len(all_entries)} entries)")

                        # If this batch was full, submit another request
                        if len(batch_entries) == LIMIT:
                            new_future = executor.submit(download_batch, endpoint, skip, LIMIT)
                            futures.append(new_future)
                            skip += LIMIT

                except Exception as e:
                    tqdm.write(f"⚠️ Error processing batch for {endpoint}: {e}")

            # Remove completed futures
            for future in completed_futures:
                futures.remove(future)

            # If no new batches were submitted and all are complete, we're done
            if not any(len(batch_entries) == LIMIT for _, _, batch_entries in
                      [f.result() for f in completed_futures if not f.exception()]):
                break

    batch_pbar.close()

    if endpoint_pbar:
        endpoint_pbar.set_description(f"{endpoint}: {len(all_entries)} entries ✅")
        endpoint_pbar.update(1)

    return all_entries

def download_all_endpoints_parallel(endpoints: List[str]) -> Dict[str, List[Dict[str, Any]]]:
    """Download data from all endpoints in parallel."""
    tqdm.write("🚀 Starting parallel download of all endpoints...")

    results = {}

    # Main progress bar for endpoints
    endpoint_pbar = tqdm(total=len(endpoints), desc="Endpoints", unit="endpoint")

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # Submit all endpoint downloads
        future_to_endpoint = {
            executor.submit(download_endpoint_data_parallel, endpoint, None): endpoint
            for endpoint in endpoints
        }

        # Collect results as they complete
        for future in as_completed(future_to_endpoint, timeout=300):
            endpoint = future_to_endpoint[future]
            try:
                entries = future.result()
                results[endpoint] = entries
                endpoint_pbar.set_description(f"Completed {endpoint}: {len(entries)} entries")
                endpoint_pbar.update(1)
            except Exception as e:
                tqdm.write(f"❌ Failed to download from {endpoint}: {e}")
                results[endpoint] = []
                endpoint_pbar.update(1)

    endpoint_pbar.close()
    return results

def save_raw_json_parallel(data: Dict[str, Any], version: str, base_dir: str = "dofus_data"):
    """Save raw JSON data as backup using parallel writes."""
    raw_dir = os.path.join(base_dir, f"version_{version}", "raw_json")
    os.makedirs(raw_dir, exist_ok=True)

    def save_endpoint_json(endpoint_data):
        endpoint, entries = endpoint_data
        if endpoint == 'metadata':
            return endpoint

        filename = f"{endpoint}.json"
        filepath = os.path.join(raw_dir, filename)

        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(entries, f, indent=2, ensure_ascii=False)

        return endpoint

    # Filter out metadata for saving
    save_items = [(k, v) for k, v in data.items() if k != 'metadata']

    # Save files in parallel with progress bar
    save_pbar = tqdm(total=len(save_items), desc="Saving raw JSON files", unit="file")

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [
            executor.submit(save_endpoint_json, item)
            for item in save_items
        ]

        for future in as_completed(futures):
            try:
                endpoint = future.result()
                if endpoint:
                    save_pbar.set_description(f"Saved {endpoint}.json")
                save_pbar.update(1)
            except Exception as e:
                tqdm.write(f"❌ Error saving file: {e}")
                save_pbar.update(1)

    save_pbar.close()

def save_consolidated_dataset(data: Dict[str, Any], version: str, base_dir: str = "dofus_data"):
    """Save consolidated dataset with metadata."""
    version_dir = os.path.join(base_dir, f"version_{version}")
    os.makedirs(version_dir, exist_ok=True)

    save_pbar = tqdm(total=2, desc="Saving consolidated files", unit="file")

    # Save consolidated dataset
    dataset_path = os.path.join(version_dir, "consolidated_dataset.json")
    with open(dataset_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

    save_pbar.set_description("Saved consolidated_dataset.json")
    save_pbar.update(1)

    # Save metadata separately for easy access
    metadata_path = os.path.join(version_dir, "metadata.json")
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(data['metadata'], f, indent=2, ensure_ascii=False)

    save_pbar.set_description("Saved metadata.json")
    save_pbar.update(1)
    save_pbar.close()

def create_dataset_summary(data: Dict[str, Any], version: str, base_dir: str = "dofus_data"):
    """Create a summary of the dataset for quick reference."""
    version_dir = os.path.join(base_dir, f"version_{version}")

    summary = {
        "version": version,
        "download_timestamp": data['metadata']['download_timestamp'],
        "endpoints": {}
    }

    for endpoint, entries in data.items():
        if endpoint == 'metadata':
            continue

        summary["endpoints"][endpoint] = {
            "total_entries": len(entries),
            "sample_keys": list(entries[0].keys()) if entries else []
        }

    summary_path = os.path.join(version_dir, "summary.json")
    with open(summary_path, 'w', encoding='utf-8') as f:
        json.dump(summary, f, indent=2, ensure_ascii=False)

    tqdm.write(f"📊 Saved dataset summary: {summary_path}")

def main():
    tqdm.write("🚀 Starting Dofus DB data download with parallel processing...")
    start_time = time.time()

    # Get current version
    current_version = get_current_version()
    if not current_version:
        tqdm.write("❌ Failed to get current version. Aborting.")
        return

    # Check if we should skip download based on version
    if OLD_VERSION and OLD_VERSION == current_version:
        tqdm.write(f"⏭️ Current version ({current_version}) matches OLD_VERSION ({OLD_VERSION}). Aborting download.")
        return

    tqdm.write(f"✅ Proceeding with download for version: {current_version}")

    # Download data from all endpoints in parallel
    endpoint_data = download_all_endpoints_parallel(ENDPOINTS)

    # Initialize consolidated dataset
    consolidated_data = {
        "metadata": {
            "version": current_version,
            "download_timestamp": datetime.now().isoformat(),
            "endpoints_downloaded": list(endpoint_data.keys()),
            "total_entries": sum(len(entries) for entries in endpoint_data.values())
        }
    }

    # Add endpoint data to consolidated dataset
    consolidated_data.update(endpoint_data)

    # Save all data
    tqdm.write("💾 Saving data...")
    save_time = time.time()

    # Create overall saving progress
    save_progress = tqdm(total=3, desc="Save operations", unit="operation")

    # Use parallel saving for raw JSON
    save_progress.set_description("Saving raw JSON files")
    save_raw_json_parallel(consolidated_data, current_version)
    save_progress.update(1)

    save_progress.set_description("Saving consolidated dataset")
    save_consolidated_dataset(consolidated_data, current_version)
    save_progress.update(1)

    save_progress.set_description("Creating dataset summary")
    create_dataset_summary(consolidated_data, current_version)
    save_progress.update(1)

    save_progress.close()

    end_time = time.time()

    # Final summary
    tqdm.write("\n" + "="*60)
    tqdm.write("✅ Download completed!")
    tqdm.write(f"📦 Version: {current_version}")
    tqdm.write(f"📊 Total entries downloaded: {consolidated_data['metadata']['total_entries']:,}")
    tqdm.write(f"🔗 Endpoints processed: {len(consolidated_data['metadata']['endpoints_downloaded'])}")
    tqdm.write(f"⏱️ Total time: {end_time - start_time:.2f} seconds")
    tqdm.write(f"⬇️ Download time: {save_time - start_time:.2f} seconds")
    tqdm.write(f"💾 Save time: {end_time - save_time:.2f} seconds")

    # Create a latest version copy
    copy_pbar = tqdm(total=1, desc="Creating latest version copy", unit="operation")

    base_dir = "dofus_data"
    latest_path = os.path.join(base_dir, "latest")
    if os.path.exists(latest_path):
        import shutil
        shutil.rmtree(latest_path)

    import shutil
    version_dir = os.path.join(base_dir, f"version_{current_version}")
    shutil.copytree(version_dir, latest_path)

    copy_pbar.set_description("Latest version copy created ✅")
    copy_pbar.update(1)
    copy_pbar.close()

    tqdm.write(f"📁 Created latest version copy at: {latest_path}")
    tqdm.write("="*60)

if __name__ == "__main__":
    main()

# Search string in consolidated_data.json

In [8]:
import json

file_path = '/content/dofus_data/version_3.2.4.35/raw_json/items.json'
search_string = "Valentía de la Señorita Jhessica"
found_item = None

try:
    with open(file_path, 'r', encoding='utf-8') as f:
        items_data = json.load(f)

    for item in items_data:
        # Check if the item has a "name" field and if the search string is in any of the language values
        if "name" in item and isinstance(item["name"], dict):
            if any(isinstance(value, str) and search_string in value for value in item["name"].values()):
                found_item = item
                break

    if found_item:
        print(f"Found item containing '{search_string}' in the 'name' field:")
        display(found_item)
    else:
        print(f"No item found containing '{search_string}' in the 'name' field.")

except FileNotFoundError:
    print(f"Error: File not found at {file_path}")
except json.JSONDecodeError:
    print(f"Error: Could not decode JSON from {file_path}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

Found item containing 'Valentía de la Señorita Jhessica' in the 'name' field:


{'_id': '67ecf439be5632587026c966',
 'm_flags': 57480,
 'id': 20366,
 'typeId': 10,
 'iconId': 10336,
 'level': 200,
 'realWeight': 10,
 'useAnimationId': 0,
 'price': 20000,
 'itemSetId': -1,
 'criteria': '',
 'criteriaTarget': '',
 'appearanceId': 0,
 'isColorable': False,
 'recipeSlots': 8,
 'recipeIds': [],
 'dropMonsterIds': [],
 'dropTemporisMonsterIds': [],
 'possibleEffects': [{'m_flags': 15,
   'effectUid': 0,
   'baseEffectId': 5,
   'effectId': 125,
   'order': 0,
   'targetId': 0,
   'targetMask': '',
   'duration': 0,
   'random': 0,
   'group': 0,
   'modificator': 0,
   'dispellable': 1,
   'delay': 0,
   'triggers': '',
   'effectElement': -1,
   'spellId': -1,
   'effectTriggerDuration': 0,
   'zoneDescr': {'cellIds': [],
    'shape': 67,
    'param1': 1,
    'param2': 0,
    'damageDecreaseStepPercent': 10,
    'maxDamageDecreaseApplyCount': 4,
    'isStopAtTarget': False},
   'value': 0,
   'diceNum': 401,
   'diceSide': 500,
   'displayZero': False,
   'visibleInToo

# Random modification of json items

In [9]:
import json
import random
import copy
from typing import Dict, List, Any
import os

def modify_effects(effects: List[Dict], num_changes: int = None) -> List[Dict]:
    """Randomly modify effects in an item."""
    if not effects:
        return effects

    modified_effects = copy.deepcopy(effects)

    if num_changes is None:
        num_changes = random.randint(1, min(3, len(effects)))

    for _ in range(num_changes):
        if not modified_effects:
            break

        effect_idx = random.randint(0, len(modified_effects) - 1)
        effect = modified_effects[effect_idx]

        # Choose what to modify
        modification_type = random.choice(['from_to', 'characteristic', 'effectId', 'add_effect', 'remove_effect'])

        if modification_type == 'from_to' and 'from' in effect and 'to' in effect:
            # Modify from/to values
            old_from = effect.get('from', 0)
            old_to = effect.get('to', 0)

            # Apply random changes
            change_percent = random.uniform(0.8, 1.2)  # ±20%
            effect['from'] = max(0, int(old_from * change_percent))
            effect['to'] = max(effect['from'], int(old_to * change_percent))

        elif modification_type == 'characteristic' and 'characteristic' in effect:
            # Change characteristic ID
            effect['characteristic'] = random.randint(1, 150)

        elif modification_type == 'effectId' and 'effectId' in effect:
            # Change effect ID
            effect['effectId'] = random.randint(100, 3000)

        elif modification_type == 'add_effect' and len(modified_effects) < 10:
            # Add a new effect
            new_effect = {
                "from": random.randint(1, 50),
                "to": random.randint(51, 100),
                "characteristic": random.randint(1, 150),
                "category": random.randint(0, 2),
                "elementId": random.choice([-1, 1, 2, 3, 4, 5]),
                "effectId": random.randint(100, 3000)
            }
            modified_effects.append(new_effect)

        elif modification_type == 'remove_effect' and len(modified_effects) > 1:
            # Remove an effect
            modified_effects.pop(effect_idx)

    return modified_effects

def modify_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """Randomly modify an item."""
    modified_item = copy.deepcopy(item)

    # List of fields that can be safely modified
    modifiable_fields = {
        'level': lambda x: max(1, x + random.randint(-10, 10)) if isinstance(x, int) else x,
        'price': lambda x: max(0, int(x * random.uniform(0.5, 2.0))) if isinstance(x, int) else x,
        'realWeight': lambda x: max(1, x + random.randint(-5, 5)) if isinstance(x, int) else x,
        'apCost': lambda x: max(1, min(6, x + random.randint(-1, 1))) if isinstance(x, int) else x,
        'range': lambda x: max(0, min(20, x + random.randint(-2, 2))) if isinstance(x, int) else x,
        'minRange': lambda x: max(0, min(10, x + random.randint(-1, 1))) if isinstance(x, int) else x,
        'craftXpRatio': lambda x: x + random.randint(-50, 50) if isinstance(x, int) else x,
        'recyclingNuggets': lambda x: max(0, x + random.randint(-5, 10)) if isinstance(x, int) else x,
        'criticalHitProbability': lambda x: max(0, min(100, x + random.randint(-5, 5))) if isinstance(x, int) else x,
        'maxCastPerTurn': lambda x: max(1, min(10, x + random.randint(-1, 2))) if isinstance(x, int) else x,
    }

    # Modify some basic fields
    num_field_changes = random.randint(1, 3)
    fields_to_modify = random.sample(list(modifiable_fields.keys()),
                                   min(num_field_changes, len(modifiable_fields)))

    for field in fields_to_modify:
        if field in modified_item and modified_item[field] is not None:
            try:
                modified_item[field] = modifiable_fields[field](modified_item[field])
            except:
                pass  # Skip if modification fails

    # Modify boolean fields occasionally
    boolean_fields = ['cursed', 'usable', 'targetable', 'exchangeable', 'twoHanded',
                     'etheral', 'enhanceable', 'isDestructible', 'isSaleable', 'isLegendary']

    if random.random() < 0.3:  # 30% chance to modify a boolean
        bool_field = random.choice([f for f in boolean_fields if f in modified_item])
        if bool_field in modified_item:
            modified_item[bool_field] = not modified_item[bool_field]

    # Modify effects (most important for testing)
    if 'effects' in modified_item and modified_item['effects']:
        if random.random() < 0.8:  # 80% chance to modify effects
            modified_item['effects'] = modify_effects(modified_item['effects'])

    # Modify criteria string occasionally
    if 'criteria' in modified_item and isinstance(modified_item['criteria'], str):
        if random.random() < 0.2 and modified_item['criteria']:  # 20% chance
            # Simple modification: change some numbers in criteria
            criteria = modified_item['criteria']
            import re
            numbers = re.findall(r'\d+', criteria)
            if numbers:
                old_num = random.choice(numbers)
                new_num = str(max(1, int(old_num) + random.randint(-10, 10)))
                modified_item['criteria'] = criteria.replace(old_num, new_num, 1)

    return modified_item

def modify_other_endpoint_item(item: Dict[str, Any], endpoint: str) -> Dict[str, Any]:
    """Modify items from other endpoints (spells, monsters, etc.)."""
    modified_item = copy.deepcopy(item)

    # Common modifications based on endpoint type
    if endpoint == 'spells':
        if 'name' in modified_item and isinstance(modified_item['name'], dict):
            # Modify spell name slightly
            for lang in modified_item['name']:
                if lang != 'id' and isinstance(modified_item['name'][lang], str):
                    name = modified_item['name'][lang]
                    modified_item['name'][lang] = name + " (Modified)"

        # Modify spell-specific fields
        modifiable = ['iconId', 'order']
        for field in modifiable:
            if field in modified_item and isinstance(modified_item[field], int):
                modified_item[field] += random.randint(-5, 5)

    elif endpoint == 'monsters':
        # Modify monster-specific fields
        if 'aggressiveZoneSize' in modified_item and isinstance(modified_item['aggressiveZoneSize'], int):
            modified_item['aggressiveZoneSize'] = max(0, modified_item['aggressiveZoneSize'] + random.randint(-2, 3))

        if 'speedAdjust' in modified_item and isinstance(modified_item['speedAdjust'], int):
            modified_item['speedAdjust'] += random.randint(-10, 10)

        # Toggle some boolean flags
        bool_fields = ['isBoss', 'isMiniBoss', 'canPlay', 'canTackle']
        if random.random() < 0.3:
            field = random.choice([f for f in bool_fields if f in modified_item])
            if field in modified_item:
                modified_item[field] = not modified_item[field]

    elif endpoint == 'achievements':
        # Modify achievement points
        if 'points' in modified_item and isinstance(modified_item['points'], int):
            modified_item['points'] = max(1, modified_item['points'] + random.randint(-5, 15))

        if 'level' in modified_item and isinstance(modified_item['level'], int):
            modified_item['level'] = max(1, modified_item['level'] + random.randint(-5, 5))

    elif endpoint in ['spell-levels', 'spell-states', 'characteristics']:
        # Modify numerical fields
        for key, value in modified_item.items():
            if isinstance(value, int) and key not in ['id', '_id', 'm_id'] and random.random() < 0.2:
                modified_item[key] = max(0, value + random.randint(-5, 5))

    return modified_item

def create_test_dataset(input_path: str, output_path: str, items_to_modify: int = 200, other_endpoints_to_modify: int = 5):
    """Create a test dataset with random modifications."""
    print(f"Loading dataset from: {input_path}")

    # Load the original dataset
    with open(input_path, 'r', encoding='utf-8') as f:
        dataset = json.load(f)

    modified_dataset = copy.deepcopy(dataset)
    changes_made = []

    # Modify items
    if 'items' in dataset and dataset['items']:
        print(f"Modifying {items_to_modify} items...")
        items = modified_dataset['items']

        # Randomly select items to modify
        items_to_change = random.sample(range(len(items)), min(items_to_modify, len(items)))

        for idx in items_to_change:
            original_item = items[idx]
            modified_item = modify_item(original_item)
            modified_dataset['items'][idx] = modified_item

            changes_made.append({
                'endpoint': 'items',
                'item_id': original_item.get('id', 'unknown'),
                'item_name': original_item.get('name', {}).get('en', 'Unknown') if isinstance(original_item.get('name'), dict) else str(original_item.get('name', 'Unknown')),
                'changes': 'Multiple fields modified'
            })

    # Modify other endpoints
    other_endpoints = [ep for ep in dataset.keys() if ep not in ['metadata', 'items'] and isinstance(dataset[ep], list)]
    endpoints_to_modify = random.sample(other_endpoints, min(other_endpoints_to_modify, len(other_endpoints)))

    print(f"Modifying items from endpoints: {', '.join(endpoints_to_modify)}")

    for endpoint in endpoints_to_modify:
        if endpoint in dataset and dataset[endpoint]:
            items = modified_dataset[endpoint]
            num_to_modify = min(10, len(items))  # Modify up to 10 items per endpoint
            items_to_change = random.sample(range(len(items)), num_to_modify)

            for idx in items_to_change:
                original_item = items[idx]
                modified_item = modify_other_endpoint_item(original_item, endpoint)
                modified_dataset[endpoint][idx] = modified_item

                changes_made.append({
                    'endpoint': endpoint,
                    'item_id': original_item.get('id', 'unknown'),
                    'item_name': original_item.get('name', {}).get('en', 'Unknown') if isinstance(original_item.get('name'), dict) else str(original_item.get('name', 'Unknown')),
                    'changes': 'Modified for testing'
                })

    # Update metadata
    if 'metadata' in modified_dataset:
        modified_dataset['metadata']['version'] = modified_dataset['metadata'].get('version', '0.0.0') + '-test'
        modified_dataset['metadata']['download_timestamp'] = modified_dataset['metadata'].get('download_timestamp', '') + ' (TEST VERSION)'

    # Save the modified dataset
    print(f"Saving modified dataset to: {output_path}")
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(modified_dataset, f, indent=2, ensure_ascii=False)

    # Save change log
    change_log_path = output_path.replace('.json', '_changes.json')
    with open(change_log_path, 'w', encoding='utf-8') as f:
        json.dump({
            'total_changes': len(changes_made),
            'items_modified': items_to_modify,
            'other_endpoints_modified': len(endpoints_to_modify),
            'changes': changes_made
        }, f, indent=2, ensure_ascii=False)

    print(f"Test dataset created successfully!")
    print(f"Total changes made: {len(changes_made)}")
    print(f"Change log saved to: {change_log_path}")

    return modified_dataset, changes_made

def main():
    # Paths
    input_path = "/content/dofus_data/latest/consolidated_dataset.json"
    output_path = "/content/dofus_data/latest/consolidated_dataset_modified.json"

    # Check if input file exists
    if not os.path.exists(input_path):
        print(f"Error: Input file not found: {input_path}")
        print("Please make sure the file exists or update the path.")
        return

    try:
        # Create test dataset
        modified_dataset, changes = create_test_dataset(
            input_path=input_path,
            output_path=output_path,
            items_to_modify=200,
            other_endpoints_to_modify=5
        )

        print("\n" + "="*60)
        print("TEST DATASET CREATION SUMMARY")
        print("="*60)
        print(f"Original file: {input_path}")
        print(f"Modified file: {output_path}")
        print(f"Total modifications: {len(changes)}")
        print("\nBreakdown by endpoint:")

        endpoint_counts = {}
        for change in changes:
            endpoint = change['endpoint']
            endpoint_counts[endpoint] = endpoint_counts.get(endpoint, 0) + 1

        for endpoint, count in endpoint_counts.items():
            print(f"  {endpoint}: {count} items modified")

        print("\nYou can now use the diff analyzer to compare:")
        print(f"  Old: {input_path}")
        print(f"  New: {output_path}")

    except Exception as e:
        print(f"Error creating test dataset: {str(e)}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

Loading dataset from: /content/dofus_data/latest/consolidated_dataset.json
Modifying 200 items...
Modifying items from endpoints: achievement-rewards, characteristics, spell-levels, spell-pairs, monsters
Saving modified dataset to: /content/dofus_data/latest/consolidated_dataset_modified.json
Test dataset created successfully!
Total changes made: 250
Change log saved to: /content/dofus_data/latest/consolidated_dataset_modified_changes.json

TEST DATASET CREATION SUMMARY
Original file: /content/dofus_data/latest/consolidated_dataset.json
Modified file: /content/dofus_data/latest/consolidated_dataset_modified.json
Total modifications: 250

Breakdown by endpoint:
  items: 200 items modified
  achievement-rewards: 10 items modified
  characteristics: 10 items modified
  spell-levels: 10 items modified
  spell-pairs: 10 items modified
  monsters: 10 items modified

You can now use the diff analyzer to compare:
  Old: /content/dofus_data/latest/consolidated_dataset.json
  New: /content/dofus

# Diff tool of consolidate_dataset.json

In [10]:
import json
import os
from typing import Dict, List, Any, Tuple, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import pandas as pd
from pathlib import Path
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import webbrowser
import tempfile

@dataclass
class ChangeRecord:
    """Represents a single change in the dataset."""
    endpoint: str
    item_id: str
    item_name: str
    field_path: str
    change_type: str  # 'added', 'removed', 'modified', 'value_changed'
    old_value: Any
    new_value: Any
    description: str
    severity: str = 'info'  # 'info', 'minor', 'major', 'critical'

class DatasetDiffAnalyzer:
    """Analyzes differences between two consolidated dataset files."""

    def __init__(self):
        self.changes: List[ChangeRecord] = []
        self.reference_mapping = self._load_reference_mappings()

    def _load_reference_mappings(self) -> Dict[str, Dict[int, str]]:
        """Load reference mappings for IDs to names (characteristics, effects, etc.)."""
        # This will be populated from the datasets themselves
        return {
            'characteristics': {},
            'effects': {},
            'spell_types': {},
            'item_types': {}
        }

    def _build_reference_mappings(self, dataset: Dict[str, Any]) -> None:
        """Build reference mappings from the dataset."""
        # Build characteristics mapping
        if 'characteristics' in dataset:
            for char in dataset['characteristics']:
                self.reference_mapping['characteristics'][char['id']] = char.get('name', {}).get('en', f"Characteristic {char['id']}")

        # Build spell types mapping
        if 'spell-types' in dataset:
            for spell_type in dataset['spell-types']:
                self.reference_mapping['spell_types'][spell_type['id']] = spell_type.get('longName', f"SpellType {spell_type['id']}")

        # Build item types mapping
        if 'items' in dataset:
            for item in dataset['items']:
                if 'type' in item and item['type']:
                    type_id = item['type']['id']
                    type_name = item['type'].get('name', {}).get('en', f"ItemType {type_id}")
                    self.reference_mapping['item_types'][type_id] = type_name

    def _get_item_name(self, item: Dict[str, Any]) -> str:
        """Extract item name in English or fallback."""
        if 'name' in item:
            if isinstance(item['name'], dict):
                return item['name'].get('en', item['name'].get('fr', item['name'].get('es', str(item.get('id', 'Unknown')))))
            return str(item['name'])
        return f"Item {item.get('id', 'Unknown')}"

    def _get_nested_value(self, obj: Any, path: str) -> Any:
        """Get value from nested object using dot notation path."""
        keys = path.split('.')
        current = obj
        for key in keys:
            if isinstance(current, dict):
                current = current.get(key)
            elif isinstance(current, list) and key.isdigit():
                idx = int(key)
                current = current[idx] if 0 <= idx < len(current) else None
            else:
                return None
            if current is None:
                break
        return current

    def _compare_effects(self, old_effects: List[Dict], new_effects: List[Dict], item_id: str, item_name: str, endpoint: str) -> None:
        """Compare effects arrays and record changes."""
        # Convert to dictionaries for easier comparison
        old_effects_dict = {i: effect for i, effect in enumerate(old_effects)}
        new_effects_dict = {i: effect for i, effect in enumerate(new_effects)}

        # Find added effects
        if len(new_effects) > len(old_effects):
            for i in range(len(old_effects), len(new_effects)):
                effect = new_effects[i]
                char_name = self.reference_mapping['characteristics'].get(effect.get('characteristic'), f"Characteristic {effect.get('characteristic')}")
                self.changes.append(ChangeRecord(
                    endpoint=endpoint,
                    item_id=item_id,
                    item_name=item_name,
                    field_path=f"effects.{i}",
                    change_type='added',
                    old_value=None,
                    new_value=effect,
                    description=f"Added effect: {char_name} ({effect.get('from', 0)} to {effect.get('to', 0)})",
                    severity='minor'
                ))

        # Find removed effects
        if len(old_effects) > len(new_effects):
            for i in range(len(new_effects), len(old_effects)):
                effect = old_effects[i]
                char_name = self.reference_mapping['characteristics'].get(effect.get('characteristic'), f"Characteristic {effect.get('characteristic')}")
                self.changes.append(ChangeRecord(
                    endpoint=endpoint,
                    item_id=item_id,
                    item_name=item_name,
                    field_path=f"effects.{i}",
                    change_type='removed',
                    old_value=effect,
                    new_value=None,
                    description=f"Removed effect: {char_name} ({effect.get('from', 0)} to {effect.get('to', 0)})",
                    severity='minor'
                ))

        # Compare existing effects
        min_length = min(len(old_effects), len(new_effects))
        for i in range(min_length):
            old_effect = old_effects[i]
            new_effect = new_effects[i]

            if old_effect != new_effect:
                char_name = self.reference_mapping['characteristics'].get(
                    new_effect.get('characteristic', old_effect.get('characteristic')),
                    f"Characteristic {new_effect.get('characteristic', old_effect.get('characteristic'))}"
                )

                # Detailed comparison of effect properties
                for key in ['from', 'to', 'characteristic', 'effectId']:
                    if old_effect.get(key) != new_effect.get(key):
                        old_val = old_effect.get(key)
                        new_val = new_effect.get(key)

                        if key in ['from', 'to']:
                            description = f"{char_name}: {key.capitalize()} value changed {old_val} → {new_val}"
                            severity = 'major' if abs((new_val or 0) - (old_val or 0)) > 10 else 'minor'
                        else:
                            description = f"{char_name}: {key} changed {old_val} → {new_val}"
                            severity = 'minor'

                        self.changes.append(ChangeRecord(
                            endpoint=endpoint,
                            item_id=item_id,
                            item_name=item_name,
                            field_path=f"effects.{i}.{key}",
                            change_type='value_changed',
                            old_value=old_val,
                            new_value=new_val,
                            description=description,
                            severity=severity
                        ))

    def _compare_items(self, old_item: Dict[str, Any], new_item: Dict[str, Any], endpoint: str) -> None:
        """Compare two items and record differences."""
        item_id = str(old_item.get('id', new_item.get('id', 'Unknown')))
        item_name = self._get_item_name(new_item if new_item else old_item)

        # Special handling for effects
        if 'effects' in old_item or 'effects' in new_item:
            old_effects = old_item.get('effects', [])
            new_effects = new_item.get('effects', [])
            if old_effects != new_effects:
                self._compare_effects(old_effects, new_effects, item_id, item_name, endpoint)

        # Compare other important fields
        important_fields = ['level', 'price', 'typeId', 'itemSetId', 'criteria', 'apCost', 'range', 'minRange']

        for field in important_fields:
            old_value = old_item.get(field)
            new_value = new_item.get(field)

            if old_value != new_value:
                severity = 'major' if field in ['level', 'apCost', 'range'] else 'minor'
                description = f"{field.replace('Id', ' ID')} changed: {old_value} → {new_value}"

                self.changes.append(ChangeRecord(
                    endpoint=endpoint,
                    item_id=item_id,
                    item_name=item_name,
                    field_path=field,
                    change_type='value_changed',
                    old_value=old_value,
                    new_value=new_value,
                    description=description,
                    severity=severity
                ))

    def compare_datasets(self, old_dataset: Dict[str, Any], new_dataset: Dict[str, Any]) -> List[ChangeRecord]:
        """Compare two datasets and return list of changes."""
        self.changes = []

        # Build reference mappings from both datasets
        self._build_reference_mappings(old_dataset)
        self._build_reference_mappings(new_dataset)

        # Get all endpoints from both datasets
        old_endpoints = set(k for k in old_dataset.keys() if k != 'metadata')
        new_endpoints = set(k for k in new_dataset.keys() if k != 'metadata')
        all_endpoints = old_endpoints.union(new_endpoints)

        for endpoint in all_endpoints:
            old_items = old_dataset.get(endpoint, [])
            new_items = new_dataset.get(endpoint, [])

            # Create ID-based mappings for comparison
            old_items_dict = {item['id']: item for item in old_items if 'id' in item}
            new_items_dict = {item['id']: item for item in new_items if 'id' in item}

            all_ids = set(old_items_dict.keys()).union(set(new_items_dict.keys()))

            for item_id in all_ids:
                old_item = old_items_dict.get(item_id)
                new_item = new_items_dict.get(item_id)

                if old_item and new_item:
                    # Item exists in both - compare
                    self._compare_items(old_item, new_item, endpoint)
                elif old_item and not new_item:
                    # Item removed
                    item_name = self._get_item_name(old_item)
                    self.changes.append(ChangeRecord(
                        endpoint=endpoint,
                        item_id=str(item_id),
                        item_name=item_name,
                        field_path='*',
                        change_type='removed',
                        old_value=old_item,
                        new_value=None,
                        description=f"Item removed: {item_name}",
                        severity='major'
                    ))
                elif new_item and not old_item:
                    # Item added
                    item_name = self._get_item_name(new_item)
                    self.changes.append(ChangeRecord(
                        endpoint=endpoint,
                        item_id=str(item_id),
                        item_name=item_name,
                        field_path='*',
                        change_type='added',
                        old_value=None,
                        new_value=new_item,
                        description=f"Item added: {item_name}",
                        severity='minor'
                    ))

        return self.changes

class DiffReportGenerator:
    """Generates various types of diff reports."""

    def __init__(self, changes: List[ChangeRecord]):
        self.changes = changes

    def to_dataframe(self) -> pd.DataFrame:
        """Convert changes to a pandas DataFrame."""
        data = []
        for change in self.changes:
            data.append({
                'Endpoint': change.endpoint,
                'Item ID': change.item_id,
                'Item Name': change.item_name,
                'Field Path': change.field_path,
                'Change Type': change.change_type,
                'Old Value': str(change.old_value) if change.old_value is not None else '',
                'New Value': str(change.new_value) if change.new_value is not None else '',
                'Description': change.description,
                'Severity': change.severity
            })
        return pd.DataFrame(data)

    def to_html(self, title: str = "Dataset Diff Report") -> str:
        """Generate an interactive HTML report."""
        df = self.to_dataframe()

        # Group changes by endpoint and severity
        endpoint_stats = df.groupby(['Endpoint', 'Severity']).size().unstack(fill_value=0)

        html_template = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>{title}</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
                .header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; }}
                .summary {{ background: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }}
                .endpoint-section {{ background: white; margin-bottom: 20px; border-radius: 10px; overflow: hidden; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }}
                .endpoint-header {{ background: #f8f9fa; padding: 15px; border-bottom: 1px solid #dee2e6; cursor: pointer; }}
                .endpoint-header:hover {{ background: #e9ecef; }}
                .endpoint-content {{ padding: 20px; display: none; }}
                .change-item {{ margin-bottom: 15px; padding: 15px; border-radius: 5px; border-left: 4px solid #ccc; }}
                .severity-critical {{ border-left-color: #dc3545; background: #f8d7da; }}
                .severity-major {{ border-left-color: #fd7e14; background: #fff3cd; }}
                .severity-minor {{ border-left-color: #ffc107; background: #fff3cd; }}
                .severity-info {{ border-left-color: #17a2b8; background: #d1ecf1; }}
                .change-type {{ font-weight: bold; text-transform: uppercase; font-size: 0.8em; }}
                .change-added {{ color: #28a745; }}
                .change-removed {{ color: #dc3545; }}
                .change-modified {{ color: #fd7e14; }}
                .change-value_changed {{ color: #6f42c1; }}
                .field-path {{ font-family: monospace; background: #f8f9fa; padding: 2px 6px; border-radius: 3px; }}
                .value {{ font-family: monospace; background: #f8f9fa; padding: 2px 6px; border-radius: 3px; }}
                .stats {{ display: flex; gap: 15px; flex-wrap: wrap; }}
                .stat-box {{ background: white; padding: 15px; border-radius: 8px; text-align: center; min-width: 120px; }}
                .filter-controls {{ background: white; padding: 15px; border-radius: 10px; margin-bottom: 20px; }}
                .filter-controls input, .filter-controls select {{ margin: 5px; padding: 8px; border: 1px solid #ddd; border-radius: 4px; }}
            </style>
            <script>
                function toggleEndpoint(endpointId) {{
                    const content = document.getElementById(endpointId + '-content');
                    const header = document.getElementById(endpointId + '-header');
                    if (content.style.display === 'none' || content.style.display === '') {{
                        content.style.display = 'block';
                        header.style.background = '#e9ecef';
                    }} else {{
                        content.style.display = 'none';
                        header.style.background = '#f8f9fa';
                    }}
                }}

                function filterChanges() {{
                    const searchTerm = document.getElementById('search').value.toLowerCase();
                    const severityFilter = document.getElementById('severity-filter').value;
                    const typeFilter = document.getElementById('type-filter').value;

                    const changeItems = document.querySelectorAll('.change-item');
                    changeItems.forEach(item => {{
                        const text = item.textContent.toLowerCase();
                        const severity = item.className.includes('severity-' + severityFilter) || severityFilter === '';
                        const type = item.className.includes('change-' + typeFilter) || typeFilter === '';
                        const search = text.includes(searchTerm) || searchTerm === '';

                        item.style.display = (severity && type && search) ? 'block' : 'none';
                    }});
                }}

                function expandAll() {{
                    const contents = document.querySelectorAll('.endpoint-content');
                    contents.forEach(content => {{
                        content.style.display = 'block';
                    }});
                }}

                function collapseAll() {{
                    const contents = document.querySelectorAll('.endpoint-content');
                    contents.forEach(content => {{
                        content.style.display = 'none';
                    }});
                }}
            </script>
        </head>
        <body>
            <div class="header">
                <h1>{title}</h1>
                <p>Generated on {timestamp}</p>
            </div>

            <div class="summary">
                <h2>Summary</h2>
                <div class="stats">
                    <div class="stat-box">
                        <h3>{total_changes}</h3>
                        <p>Total Changes</p>
                    </div>
                    <div class="stat-box">
                        <h3>{endpoints_affected}</h3>
                        <p>Endpoints Affected</p>
                    </div>
                    <div class="stat-box">
                        <h3>{critical_changes}</h3>
                        <p>Critical Changes</p>
                    </div>
                    <div class="stat-box">
                        <h3>{major_changes}</h3>
                        <p>Major Changes</p>
                    </div>
                </div>
            </div>

            <div class="filter-controls">
                <input type="text" id="search" placeholder="Search changes..." onkeyup="filterChanges()">
                <select id="severity-filter" onchange="filterChanges()">
                    <option value="">All Severities</option>
                    <option value="critical">Critical</option>
                    <option value="major">Major</option>
                    <option value="minor">Minor</option>
                    <option value="info">Info</option>
                </select>
                <select id="type-filter" onchange="filterChanges()">
                    <option value="">All Types</option>
                    <option value="added">Added</option>
                    <option value="removed">Removed</option>
                    <option value="modified">Modified</option>
                    <option value="value_changed">Value Changed</option>
                </select>
                <button onclick="expandAll()">Expand All</button>
                <button onclick="collapseAll()">Collapse All</button>
            </div>

            {endpoint_sections}
        </body>
        </html>
        """

        # Generate endpoint sections
        endpoint_sections = ""
        grouped = df.groupby('Endpoint')

        for endpoint, group in grouped:
            endpoint_id = endpoint.replace('-', '_').replace(' ', '_')
            changes_html = ""

            for _, row in group.iterrows():
                severity_class = f"severity-{row['Severity']}"
                type_class = f"change-{row['Change Type']}"

                changes_html += f"""
                <div class="change-item {severity_class} {type_class}">
                    <div class="change-type change-{row['Change Type']}">{row['Change Type']}</div>
                    <strong>{row['Item Name']}</strong> (ID: {row['Item ID']})
                    <br><span class="field-path">{row['Field Path']}</span>
                    <br>{row['Description']}
                    {f'<br>Old: <span class="value">{row["Old Value"]}</span>' if row['Old Value'] else ''}
                    {f'<br>New: <span class="value">{row["New Value"]}</span>' if row['New Value'] else ''}
                </div>
                """

            endpoint_sections += f"""
            <div class="endpoint-section">
                <div class="endpoint-header" id="{endpoint_id}-header" onclick="toggleEndpoint('{endpoint_id}')">
                    <h3>{endpoint} ({len(group)} changes)</h3>
                </div>
                <div class="endpoint-content" id="{endpoint_id}-content">
                    {changes_html}
                </div>
            </div>
            """

        # Calculate statistics
        total_changes = len(df)
        endpoints_affected = df['Endpoint'].nunique()
        critical_changes = len(df[df['Severity'] == 'critical'])
        major_changes = len(df[df['Severity'] == 'major'])

        return html_template.format(
            title=title,
            timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            total_changes=total_changes,
            endpoints_affected=endpoints_affected,
            critical_changes=critical_changes,
            major_changes=major_changes,
            endpoint_sections=endpoint_sections
        )

    def to_json(self) -> str:
        """Convert changes to JSON format."""
        return json.dumps([asdict(change) for change in self.changes], indent=2, default=str)

class DiffAnalyzerGUI:
    """GUI application for the diff analyzer."""

    def __init__(self):
        self.root = tk.Tk()
        self.root.title("Dataset Diff Analyzer")
        self.root.geometry("800x600")

        self.old_dataset_path = tk.StringVar()
        self.new_dataset_path = tk.StringVar()
        self.output_dir = tk.StringVar(value=os.getcwd())

        self.setup_ui()

    def setup_ui(self):
        """Setup the GUI components."""
        # Main frame
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))

        # File selection
        ttk.Label(main_frame, text="Old Dataset:").grid(row=0, column=0, sticky=tk.W, pady=5)
        ttk.Entry(main_frame, textvariable=self.old_dataset_path, width=60).grid(row=0, column=1, padx=5)
        ttk.Button(main_frame, text="Browse", command=lambda: self.browse_file(self.old_dataset_path)).grid(row=0, column=2)

        ttk.Label(main_frame, text="New Dataset:").grid(row=1, column=0, sticky=tk.W, pady=5)
        ttk.Entry(main_frame, textvariable=self.new_dataset_path, width=60).grid(row=1, column=1, padx=5)
        ttk.Button(main_frame, text="Browse", command=lambda: self.browse_file(self.new_dataset_path)).grid(row=1, column=2)

        ttk.Label(main_frame, text="Output Directory:").grid(row=2, column=0, sticky=tk.W, pady=5)
        ttk.Entry(main_frame, textvariable=self.output_dir, width=60).grid(row=2, column=1, padx=5)
        ttk.Button(main_frame, text="Browse", command=self.browse_output_dir).grid(row=2, column=2)

        # Buttons
        button_frame = ttk.Frame(main_frame)
        button_frame.grid(row=3, column=0, columnspan=3, pady=20)

        ttk.Button(button_frame, text="Analyze Differences", command=self.analyze_differences).pack(side=tk.LEFT, padx=5)
        ttk.Button(button_frame, text="Exit", command=self.root.quit).pack(side=tk.RIGHT, padx=5)

        # Progress bar
        self.progress = ttk.Progressbar(main_frame, mode='indeterminate')
        self.progress.grid(row=4, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=10)

        # Results text area
        self.results_text = tk.Text(main_frame, height=20, width=80)
        self.results_text.grid(row=5, column=0, columnspan=3, pady=10)

        # Scrollbar for text area
        scrollbar = ttk.Scrollbar(main_frame, orient=tk.VERTICAL, command=self.results_text.yview)
        scrollbar.grid(row=5, column=3, sticky=(tk.N, tk.S))
        self.results_text.configure(yscrollcommand=scrollbar.set)

    def browse_file(self, var):
        """Browse for a JSON file."""
        filename = filedialog.askopenfilename(
            title="Select Dataset File",
            filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
        )
        if filename:
            var.set(filename)

    def browse_output_dir(self):
        """Browse for output directory."""
        dirname = filedialog.askdirectory(title="Select Output Directory")
        if dirname:
            self.output_dir.set(dirname)

    def analyze_differences(self):
        """Perform the diff analysis."""
        if not self.old_dataset_path.get() or not self.new_dataset_path.get():
            messagebox.showerror("Error", "Please select both dataset files.")
            return

        try:
            self.progress.start()
            self.results_text.delete(1.0, tk.END)
            self.results_text.insert(tk.END, "Loading datasets...\n")
            self.root.update()

            # Load datasets
            with open(self.old_dataset_path.get(), 'r', encoding='utf-8') as f:
                old_dataset = json.load(f)

            with open(self.new_dataset_path.get(), 'r', encoding='utf-8') as f:
                new_dataset = json.load(f)

            self.results_text.insert(tk.END, "Analyzing differences...\n")
            self.root.update()

            # Analyze differences
            analyzer = DatasetDiffAnalyzer()
            changes = analyzer.compare_datasets(old_dataset, new_dataset)

            self.results_text.insert(tk.END, f"Found {len(changes)} changes.\n")
            self.root.update()

            # Generate reports
            report_generator = DiffReportGenerator(changes)

            # Save HTML report
            html_report = report_generator.to_html("Dataset Comparison Report")
            html_path = os.path.join(self.output_dir.get(), "diff_report.html")
            with open(html_path, 'w', encoding='utf-8') as f:
                f.write(html_report)

            # Save JSON report
            json_report = report_generator.to_json()
            json_path = os.path.join(self.output_dir.get(), "diff_report.json")
            with open(json_path, 'w', encoding='utf-8') as f:
                f.write(json_report)

            # Save CSV report
            df = report_generator.to_dataframe()
            csv_path = os.path.join(self.output_dir.get(), "diff_report.csv")
            df.to_csv(csv_path, index=False)

            self.results_text.insert(tk.END, f"\nReports generated:\n")
            self.results_text.insert(tk.END, f"- HTML: {html_path}\n")
            self.results_text.insert(tk.END, f"- JSON: {json_path}\n")
            self.results_text.insert(tk.END, f"- CSV: {csv_path}\n")

            # Show summary
            severity_counts = df['Severity'].value_counts()
            self.results_text.insert(tk.END, f"\nSummary by severity:\n")
            for severity, count in severity_counts.items():
                self.results_text.insert(tk.END, f"- {severity.capitalize()}: {count}\n")

            # Ask to open HTML report
            if messagebox.askyesno("Success", "Analysis complete! Would you like to open the HTML report?"):
                webbrowser.open(f"file://{os.path.abspath(html_path)}")

        except Exception as e:
            messagebox.showerror("Error", f"An error occurred: {str(e)}")
            self.results_text.insert(tk.END, f"Error: {str(e)}\n")
        finally:
            self.progress.stop()

    def run(self):
        """Run the GUI application."""
        self.root.mainloop()

def main():
    """Main function to run the GUI application."""
    app = DiffAnalyzerGUI()
    app.run()

if __name__ == "__main__":
    main()

TclError: no display name and no $DISPLAY environment variable

# No-GUI diff tool (colab version)

In [11]:
import json
import os
from typing import Dict, List, Any, Tuple, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import pandas as pd
from pathlib import Path
from tqdm.notebook import tqdm
import IPython.display as display

@dataclass
class ChangeRecord:
    """Represents a single change in the dataset."""
    endpoint: str
    item_id: str
    item_name: str
    field_path: str
    change_type: str  # 'added', 'removed', 'modified', 'value_changed'
    old_value: Any
    new_value: Any
    description: str
    severity: str = 'info'  # 'info', 'minor', 'major', 'critical'

class DatasetDiffAnalyzer:
    """Analyzes differences between two consolidated dataset files."""

    def __init__(self):
        self.changes: List[ChangeRecord] = []
        self.reference_mapping = self._load_reference_mappings()

    def _load_reference_mappings(self) -> Dict[str, Dict[int, str]]:
        """Load reference mappings for IDs to names (characteristics, effects, etc.)."""
        return {
            'characteristics': {},
            'effects': {},
            'spell_types': {},
            'item_types': {}
        }

    def _build_reference_mappings(self, dataset: Dict[str, Any]) -> None:
        """Build reference mappings from the dataset."""
        # Build characteristics mapping
        if 'characteristics' in dataset:
            for char in dataset['characteristics']:
                self.reference_mapping['characteristics'][char['id']] = char.get('name', {}).get('en', f"Characteristic {char['id']}")

        # Build spell types mapping
        if 'spell-types' in dataset:
            for spell_type in dataset['spell-types']:
                self.reference_mapping['spell_types'][spell_type['id']] = spell_type.get('longName', f"SpellType {spell_type['id']}")

        # Build item types mapping
        if 'items' in dataset:
            for item in dataset['items']:
                if 'type' in item and item['type']:
                    type_id = item['type']['id']
                    type_name = item['type'].get('name', {}).get('en', f"ItemType {type_id}")
                    self.reference_mapping['item_types'][type_id] = type_name

    def _get_item_name(self, item: Dict[str, Any]) -> str:
        """Extract item name in English or fallback."""
        if 'name' in item:
            if isinstance(item['name'], dict):
                return item['name'].get('en', item['name'].get('fr', item['name'].get('es', str(item.get('id', 'Unknown')))))
            return str(item['name'])
        return f"Item {item.get('id', 'Unknown')}"

    def _compare_effects(self, old_effects: List[Dict], new_effects: List[Dict], item_id: str, item_name: str, endpoint: str) -> None:
        """Compare effects arrays and record changes."""
        # Find added effects
        if len(new_effects) > len(old_effects):
            for i in range(len(old_effects), len(new_effects)):
                effect = new_effects[i]
                char_name = self.reference_mapping['characteristics'].get(effect.get('characteristic'), f"Characteristic {effect.get('characteristic')}")
                self.changes.append(ChangeRecord(
                    endpoint=endpoint,
                    item_id=item_id,
                    item_name=item_name,
                    field_path=f"effects.{i}",
                    change_type='added',
                    old_value=None,
                    new_value=effect,
                    description=f"Added effect: {char_name} ({effect.get('from', 0)} to {effect.get('to', 0)})",
                    severity='minor'
                ))

        # Find removed effects
        if len(old_effects) > len(new_effects):
            for i in range(len(new_effects), len(old_effects)):
                effect = old_effects[i]
                char_name = self.reference_mapping['characteristics'].get(effect.get('characteristic'), f"Characteristic {effect.get('characteristic')}")
                self.changes.append(ChangeRecord(
                    endpoint=endpoint,
                    item_id=item_id,
                    item_name=item_name,
                    field_path=f"effects.{i}",
                    change_type='removed',
                    old_value=effect,
                    new_value=None,
                    description=f"Removed effect: {char_name} ({effect.get('from', 0)} to {effect.get('to', 0)})",
                    severity='minor'
                ))

        # Compare existing effects
        min_length = min(len(old_effects), len(new_effects))
        for i in range(min_length):
            old_effect = old_effects[i]
            new_effect = new_effects[i]

            if old_effect != new_effect:
                char_name = self.reference_mapping['characteristics'].get(
                    new_effect.get('characteristic', old_effect.get('characteristic')),
                    f"Characteristic {new_effect.get('characteristic', old_effect.get('characteristic'))}"
                )

                # Detailed comparison of effect properties
                for key in ['from', 'to', 'characteristic', 'effectId']:
                    if old_effect.get(key) != new_effect.get(key):
                        old_val = old_effect.get(key)
                        new_val = new_effect.get(key)

                        if key in ['from', 'to']:
                            description = f"{char_name}: {key.capitalize()} value changed {old_val} → {new_val}"
                            severity = 'major' if abs((new_val or 0) - (old_val or 0)) > 10 else 'minor'
                        else:
                            description = f"{char_name}: {key} changed {old_val} → {new_val}"
                            severity = 'minor'

                        self.changes.append(ChangeRecord(
                            endpoint=endpoint,
                            item_id=item_id,
                            item_name=item_name,
                            field_path=f"effects.{i}.{key}",
                            change_type='value_changed',
                            old_value=old_val,
                            new_value=new_val,
                            description=description,
                            severity=severity
                        ))

    def _compare_items(self, old_item: Dict[str, Any], new_item: Dict[str, Any], endpoint: str) -> None:
        """Compare two items and record differences."""
        item_id = str(old_item.get('id', new_item.get('id', 'Unknown')))
        item_name = self._get_item_name(new_item if new_item else old_item)

        # Special handling for effects
        if 'effects' in old_item or 'effects' in new_item:
            old_effects = old_item.get('effects', [])
            new_effects = new_item.get('effects', [])
            if old_effects != new_effects:
                self._compare_effects(old_effects, new_effects, item_id, item_name, endpoint)

        # Compare other important fields
        important_fields = ['level', 'price', 'typeId', 'itemSetId', 'criteria', 'apCost', 'range', 'minRange']

        for field in important_fields:
            old_value = old_item.get(field)
            new_value = new_item.get(field)

            if old_value != new_value:
                severity = 'major' if field in ['level', 'apCost', 'range'] else 'minor'
                description = f"{field.replace('Id', ' ID')} changed: {old_value} → {new_value}"

                self.changes.append(ChangeRecord(
                    endpoint=endpoint,
                    item_id=item_id,
                    item_name=item_name,
                    field_path=field,
                    change_type='value_changed',
                    old_value=old_value,
                    new_value=new_value,
                    description=description,
                    severity=severity
                ))

    def compare_datasets(self, old_dataset: Dict[str, Any], new_dataset: Dict[str, Any]) -> List[ChangeRecord]:
        """Compare two datasets and return list of changes."""
        self.changes = []

        # Build reference mappings from both datasets
        self._build_reference_mappings(old_dataset)
        self._build_reference_mappings(new_dataset)

        # Get all endpoints from both datasets
        old_endpoints = set(k for k in old_dataset.keys() if k != 'metadata')
        new_endpoints = set(k for k in new_dataset.keys() if k != 'metadata')
        all_endpoints = old_endpoints.union(new_endpoints)

        # Progress bar for endpoints
        with tqdm(total=len(all_endpoints), desc="Analyzing endpoints", unit="endpoint") as pbar:
            for endpoint in all_endpoints:
                pbar.set_description(f"Analyzing {endpoint}")

                old_items = old_dataset.get(endpoint, [])
                new_items = new_dataset.get(endpoint, [])

                # Create ID-based mappings for comparison
                old_items_dict = {item['id']: item for item in old_items if 'id' in item}
                new_items_dict = {item['id']: item for item in new_items if 'id' in item}

                all_ids = set(old_items_dict.keys()).union(set(new_items_dict.keys()))

                for item_id in all_ids:
                    old_item = old_items_dict.get(item_id)
                    new_item = new_items_dict.get(item_id)

                    if old_item and new_item:
                        # Item exists in both - compare
                        self._compare_items(old_item, new_item, endpoint)
                    elif old_item and not new_item:
                        # Item removed
                        item_name = self._get_item_name(old_item)
                        self.changes.append(ChangeRecord(
                            endpoint=endpoint,
                            item_id=str(item_id),
                            item_name=item_name,
                            field_path='*',
                            change_type='removed',
                            old_value=old_item,
                            new_value=None,
                            description=f"Item removed: {item_name}",
                            severity='major'
                        ))
                    elif new_item and not old_item:
                        # Item added
                        item_name = self._get_item_name(new_item)
                        self.changes.append(ChangeRecord(
                            endpoint=endpoint,
                            item_id=str(item_id),
                            item_name=item_name,
                            field_path='*',
                            change_type='added',
                            old_value=None,
                            new_value=new_item,
                            description=f"Item added: {item_name}",
                            severity='minor'
                        ))

                pbar.update(1)

        return self.changes

class DiffReportGenerator:
    """Generates various types of diff reports."""

    def __init__(self, changes: List[ChangeRecord]):
        self.changes = changes

    def to_dataframe(self) -> pd.DataFrame:
        """Convert changes to a pandas DataFrame."""
        data = []
        for change in self.changes:
            data.append({
                'Endpoint': change.endpoint,
                'Item ID': change.item_id,
                'Item Name': change.item_name,
                'Field Path': change.field_path,
                'Change Type': change.change_type,
                'Old Value': str(change.old_value) if change.old_value is not None else '',
                'New Value': str(change.new_value) if change.new_value is not None else '',
                'Description': change.description,
                'Severity': change.severity
            })
        return pd.DataFrame(data)

    def to_html(self, title: str = "Dataset Diff Report") -> str:
        """Generate an interactive HTML report."""
        df = self.to_dataframe()

        html_template = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>{title}</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
                .header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; }}
                .summary {{ background: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }}
                .endpoint-section {{ background: white; margin-bottom: 20px; border-radius: 10px; overflow: hidden; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }}
                .endpoint-header {{ background: #f8f9fa; padding: 15px; border-bottom: 1px solid #dee2e6; cursor: pointer; }}
                .endpoint-header:hover {{ background: #e9ecef; }}
                .endpoint-content {{ padding: 20px; display: none; }}
                .change-item {{ margin-bottom: 15px; padding: 15px; border-radius: 5px; border-left: 4px solid #ccc; }}
                .severity-critical {{ border-left-color: #dc3545; background: #f8d7da; }}
                .severity-major {{ border-left-color: #fd7e14; background: #fff3cd; }}
                .severity-minor {{ border-left-color: #ffc107; background: #fff3cd; }}
                .severity-info {{ border-left-color: #17a2b8; background: #d1ecf1; }}
                .change-type {{ font-weight: bold; text-transform: uppercase; font-size: 0.8em; }}
                .change-added {{ color: #28a745; }}
                .change-removed {{ color: #dc3545; }}
                .change-modified {{ color: #fd7e14; }}
                .change-value_changed {{ color: #6f42c1; }}
                .field-path {{ font-family: monospace; background: #f8f9fa; padding: 2px 6px; border-radius: 3px; }}
                .value {{ font-family: monospace; background: #f8f9fa; padding: 2px 6px; border-radius: 3px; }}
                .stats {{ display: flex; gap: 15px; flex-wrap: wrap; }}
                .stat-box {{ background: white; padding: 15px; border-radius: 8px; text-align: center; min-width: 120px; }}
            </style>
            <script>
                function toggleEndpoint(endpointId) {{
                    const content = document.getElementById(endpointId + '-content');
                    if (content.style.display === 'none' || content.style.display === '') {{
                        content.style.display = 'block';
                    }} else {{
                        content.style.display = 'none';
                    }}
                }}
            </script>
        </head>
        <body>
            <div class="header">
                <h1>{title}</h1>
                <p>Generated on {timestamp}</p>
            </div>

            <div class="summary">
                <h2>Summary</h2>
                <div class="stats">
                    <div class="stat-box">
                        <h3>{total_changes}</h3>
                        <p>Total Changes</p>
                    </div>
                    <div class="stat-box">
                        <h3>{endpoints_affected}</h3>
                        <p>Endpoints Affected</p>
                    </div>
                    <div class="stat-box">
                        <h3>{critical_changes}</h3>
                        <p>Critical Changes</p>
                    </div>
                    <div class="stat-box">
                        <h3>{major_changes}</h3>
                        <p>Major Changes</p>
                    </div>
                </div>
            </div>

            {endpoint_sections}
        </body>
        </html>
        """

        # Generate endpoint sections
        endpoint_sections = ""
        grouped = df.groupby('Endpoint')

        for endpoint, group in grouped:
            endpoint_id = endpoint.replace('-', '_').replace(' ', '_')
            changes_html = ""

            for _, row in group.iterrows():
                severity_class = f"severity-{row['Severity']}"
                type_class = f"change-{row['Change Type']}"

                changes_html += f"""
                <div class="change-item {severity_class} {type_class}">
                    <div class="change-type change-{row['Change Type']}">{row['Change Type']}</div>
                    <strong>{row['Item Name']}</strong> (ID: {row['Item ID']})
                    <br><span class="field-path">{row['Field Path']}</span>
                    <br>{row['Description']}
                    {f'<br>Old: <span class="value">{row["Old Value"]}</span>' if row['Old Value'] else ''}
                    {f'<br>New: <span class="value">{row["New Value"]}</span>' if row['New Value'] else ''}
                </div>
                """

            endpoint_sections += f"""
            <div class="endpoint-section">
                <div class="endpoint-header" onclick="toggleEndpoint('{endpoint_id}')">
                    <h3>{endpoint} ({len(group)} changes)</h3>
                </div>
                <div class="endpoint-content" id="{endpoint_id}-content">
                    {changes_html}
                </div>
            </div>
            """

        # Calculate statistics
        total_changes = len(df)
        endpoints_affected = df['Endpoint'].nunique()
        critical_changes = len(df[df['Severity'] == 'critical'])
        major_changes = len(df[df['Severity'] == 'major'])

        return html_template.format(
            title=title,
            timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            total_changes=total_changes,
            endpoints_affected=endpoints_affected,
            critical_changes=critical_changes,
            major_changes=major_changes,
            endpoint_sections=endpoint_sections
        )

def analyze_datasets_colab(old_path: str, new_path: str, output_dir: str = "/content/diff_reports"):
    """Analyze datasets in Google Colab environment."""

    print("🚀 Starting dataset analysis...")

    # Create output directory
    os.makedirs(output_dir, exist_ok=True)

    # Load datasets
    print("📖 Loading datasets...")
    with open(old_path, 'r', encoding='utf-8') as f:
        old_dataset = json.load(f)

    with open(new_path, 'r', encoding='utf-8') as f:
        new_dataset = json.load(f)

    print(f"✅ Loaded old dataset: {len([k for k in old_dataset.keys() if k != 'metadata'])} endpoints")
    print(f"✅ Loaded new dataset: {len([k for k in new_dataset.keys() if k != 'metadata'])} endpoints")

    # Analyze differences
    print("\n🔍 Analyzing differences...")
    analyzer = DatasetDiffAnalyzer()
    changes = analyzer.compare_datasets(old_dataset, new_dataset)

    print(f"✅ Analysis complete! Found {len(changes)} changes.")

    # Generate reports
    print("\n📊 Generating reports...")
    report_generator = DiffReportGenerator(changes)

    # Generate DataFrame and display summary
    df = report_generator.to_dataframe()

    print("\n" + "="*60)
    print("📈 ANALYSIS SUMMARY")
    print("="*60)
    print(f"Total changes found: {len(changes)}")
    print(f"Endpoints affected: {df['Endpoint'].nunique()}")

    # Show severity breakdown
    severity_counts = df['Severity'].value_counts()
    print("\nSeverity breakdown:")
    for severity, count in severity_counts.items():
        print(f"  {severity.capitalize()}: {count}")

    # Show endpoint breakdown
    endpoint_counts = df['Endpoint'].value_counts()
    print(f"\nTop 10 endpoints by changes:")
    for endpoint, count in endpoint_counts.head(10).items():
        print(f"  {endpoint}: {count}")

    # Save reports
    print("\n💾 Saving reports...")

    # Save HTML report
    html_report = report_generator.to_html("Dataset Comparison Report")
    html_path = os.path.join(output_dir, "diff_report.html")
    with open(html_path, 'w', encoding='utf-8') as f:
        f.write(html_report)
    print(f"✅ HTML report saved: {html_path}")

    # Save CSV report
    csv_path = os.path.join(output_dir, "diff_report.csv")
    df.to_csv(csv_path, index=False)
    print(f"✅ CSV report saved: {csv_path}")

    # Save JSON report
    json_path = os.path.join(output_dir, "diff_report.json")
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump([asdict(change) for change in changes], f, indent=2, default=str)
    print(f"✅ JSON report saved: {json_path}")

    # Display sample changes
    print(f"\n🔍 Sample changes (showing first 10):")
    for i, change in enumerate(changes[:10]):
        print(f"{i+1}. [{change.severity.upper()}] {change.endpoint} - {change.item_name}")
        print(f"   {change.description}")

    print(f"\n🎉 Analysis complete! Reports saved to: {output_dir}")

    # Return the DataFrame for further analysis in Colab
    return df, changes

# Example usage function for Colab
def run_analysis():
    """Run the analysis with predefined paths - modify these for your files."""

    old_path = "/content/dofus_data/latest/consolidated_dataset.json"
    new_path = "/content/dofus_data/latest/consolidated_dataset_modified.json"
    output_dir = "/content/diff_reports"

    # Check if files exist
    if not os.path.exists(old_path):
        print(f"❌ Old dataset file not found: {old_path}")
        return None, None

    if not os.path.exists(new_path):
        print(f"❌ New dataset file not found: {new_path}")
        print("💡 Run the test dataset creation script first!")
        return None, None

    # Run analysis
    return analyze_datasets_colab(old_path, new_path, output_dir)

# If running in Colab, use this instead of the GUI
if __name__ == "__main__":
    print("🔧 This is the Colab-friendly version of the diff analyzer")
    print("📝 To run analysis, use: df, changes = run_analysis()")

🔧 This is the Colab-friendly version of the diff analyzer
📝 To run analysis, use: df, changes = run_analysis()


In [13]:
# Then run the diff analysis
df, changes = run_analysis()

# View the results
print(f"Found {len(changes)} changes")
df.head(10)  # Show first 10 changes in a nice table

# View HTML report
from IPython.display import HTML
with open('/content/diff_reports/diff_report.html', 'r') as f:
    HTML(f.read())

🚀 Starting dataset analysis...
📖 Loading datasets...
✅ Loaded old dataset: 15 endpoints
✅ Loaded new dataset: 15 endpoints

🔍 Analyzing differences...


Analyzing endpoints:   0%|          | 0/15 [00:00<?, ?endpoint/s]

✅ Analysis complete! Found 197 changes.

📊 Generating reports...

📈 ANALYSIS SUMMARY
Total changes found: 197
Endpoints affected: 2

Severity breakdown:
  Minor: 151
  Major: 46

Top 10 endpoints by changes:
  items: 193
  spell-levels: 4

💾 Saving reports...
✅ HTML report saved: /content/diff_reports/diff_report.html
✅ CSV report saved: /content/diff_reports/diff_report.csv
✅ JSON report saved: /content/diff_reports/diff_report.json

🔍 Sample changes (showing first 10):
1. [MAJOR] spell-levels - Item 1351
   apCost changed: 3 → 0
2. [MINOR] spell-levels - Item 21472
   minRange changed: 1 → 2
3. [MAJOR] spell-levels - Item 24221
   apCost changed: 4 → 0
4. [MAJOR] spell-levels - Item 56596
   apCost changed: 1 → 3
5. [MINOR] items - Ivory Hammer
   Added effect: Intelligence (41 to 68)
6. [MAJOR] items - Ivory Hammer
   apCost changed: 4 → 3
7. [MINOR] items - Autumn Leaf
   Added effect: Erosion (20 to 98)
8. [MINOR] items - Autumn Leaf
   Vitality: effectId changed 125 → 928
9. [MIN