In [None]:
import os
import json
import numpy as np
from PIL import Image
import piexif
import pyexiv2
import matplotlib.pyplot as plt
from pathlib import Path
import tempfile
import warnings
import io
import hashlib
from typing import Dict, Any, Tuple
import importlib

# Import actual Gen5 classes from PyPI package
from gen5 import Gen5FileHandler
from gen5.core.exceptions import Gen5ImageError

# Suppress non-critical warnings
warnings.filterwarnings("ignore", category=UserWarning, module="pyexiv2")
warnings.filterwarnings("ignore", category=RuntimeWarning, module="gen5")

# -------------------------------
# 1. Test Image Generator
# -------------------------------

def create_test_image(seed: int = 0, size: Tuple[int, int] = (1024, 1024)) -> Image.Image:
    """Generate a deterministic test image with realistic noise"""
    rng = np.random.default_rng(seed)
    # Create RGB image with structured noise (more realistic than pure random)
    y, x = np.ogrid[:size[1], :size[0]]
    noise = 0.1 * rng.random(size + (3,), dtype=np.float32)
    pattern = 0.3 * np.sin(0.01 * x + 0.02 * y) + 0.3 * np.cos(0.03 * x - 0.01 * y)
    img_arr = np.clip(128 + 100 * pattern[..., np.newaxis] + 128 * noise, 0, 255).astype(np.uint8)

    return Image.fromarray(img_arr, mode="RGB")

def image_to_png_bytes(img: Image.Image) -> bytes:
    """Convert PIL image to PNG bytes without quality loss"""
    buf = io.BytesIO()
    img.save(buf, format="PNG", compress_level=9)
    return buf.getvalue()

# -------------------------------
# 2. Rich Metadata Generator
# -------------------------------

def generate_rich_metadata(seed: int = 0) -> Dict[str, Any]:
    """Generate comprehensive metadata matching Gen5 capabilities"""
    rng = np.random.default_rng(seed)

    # Simulate latent tensor (real latents would be larger)
    # Changed latent_tensor shape from (4, 64, 64) to (1, 4, 64, 64) to match expected 4D input for Gen5
    latent_tensor = rng.standard_normal((1, 4, 64, 64), dtype=np.float32)

    return {
        "model_name": "stable-diffusion-xl-base-1.0",
        "model_version": "1.0.0",
        "prompt": "A photorealistic cyberpunk cityscape at dusk, neon lights reflecting on wet streets, cinematic lighting",
        "negative_prompt": "blurry, low quality, text, watermark, deformed",
        "tags": ["cyberpunk", "night", "cityscape", "neon", "photorealistic", "8k"],
        "generation_settings": {
            "width": 1024,
            "height": 1024,
            "steps": 30,
            "cfg_scale": 7.5,
            "sampler": "DPM++ 2M Karras",
            "seed": int(rng.integers(1e9)),
            "denoising_strength": 1.0,
            "clip_skip": 2,
        },
        "hardware_info": {
            "gpu": [
                {"name": "NVIDIA RTX 4090", "memory_gb": 24, "driver": "550.54.15", "cuda_version": "12.4"}
            ],
            "cpu": "AMD Ryzen 9 7950X3D",
            "cpu_cores": os.cpu_count() or 8, # Ensure cpu_cores is an integer >= 1
            "ram_gb": 64,
            "os": "Ubuntu 24.04 LTS",
            "python_version": "3.11.9",
            "libraries": {
                "torch": "2.3.0",
                "diffusers": "0.28.0",
                "transformers": "4.40.0"
            }
        },
        "latent": {"sample": latent_tensor}  # Will be packed by Gen5 encoder
    }

# -------------------------------
# 3. EXIF/XMP Embedding Functions
# -------------------------------

def embed_exif_xmp_into_png(img: Image.Image, metadata: Dict[str, Any], output_path: str):
    """Embed comprehensive metadata into PNG using EXIF and XMP"""
    # Convert to RGB (EXIF has limited RGBA support)
    if img.mode in ("RGBA", "P"):
        img = img.convert("RGB")

    # Create EXIF data with basic info
    exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}}
    exif_dict["0th"][piexif.ImageIFD.Software] = "Gen5Benchmark/1.0".encode()
    exif_dict["0th"][piexif.ImageIFD.DateTime] = "2026:01:07 15:30:00".encode()
    exif_dict["Exif"][piexif.ExifIFD.UserComment] = piexif.UserComment.dump(
        json.dumps({
            "prompt": metadata["prompt"],
            "model": f"{metadata['model_name']}@{metadata['model_version']}",
            "tags": metadata["tags"]
        }),
        encoding="unicode"
    )

    # Prepare full metadata for XMP (including latent as base64)
    metada

def save_png_with_sidecar(img: Image.Image, metadata: Dict[str, Any], png_path: str, xmp_path: str):
    """Save PNG with separate XMP sidecar file"""
    # Save clean PNG
    if img.mode in ("RGBA", "P"):
        img = img.convert("RGB")
    img.save(png_path, "PNG", compress_level=9)

    # Prepare metadata for sidecar (same as embedded version)
    metadata_copy = metadata.copy()
    if "latent" in metadata_copy and "sample" in metadata_copy["latent"]:
        latent_bytes = metadata_copy["latent"]["sample"].tobytes()
        metadata_copy["latent"]["sample_b64"] = hashlib.sha256(latent_bytes).hexdigest()[:16] + f"...({len(latent_bytes)} bytes)"
        del metadata_copy["latent"]["sample"]

    xmp_data = json.dumps(metadata_copy, indent=2, sort_keys=True, ensure_ascii=False)

    # Create sidecar XMP
    with open(xmp_path, "w", encoding="utf-8") as f:
        f.write(f"""<?xpacket begin=\ufeff" id="W5M0MpCehiHzreSzNTczkc9d"?>\n<x:xmpmeta xmlns:x="adobe:ns:meta/" x:xmptk="Gen5Benchmark">\n <rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">\n  <rdf:Description rdf:about=""\n    xmlns:gen5="http://ns.gen5.ai/1.0/">\n   <gen5:FullMetadata><![CDATA[{xmp_data}]]></gen5:FullMetadata>\n  </rdf:Description>\n </rdf:RDF>\n</x:xmpmeta>\n<?xpacket end="w"?>"""
)

# -------------------------------
# 4. Gen5 Encoder Wrapper
# -------------------------------

def create_gen5_file(
    img: Image.Image,
    metadata: Dict[str, Any],
    output_path: str,
    should_compress: bool = True,
    convert_float16: bool = True
) -> int:
    """Create a .gen5 file using the official encoder"""
    handler = Gen5FileHandler()

    # Prepare parameters for encoder
    img_bytes = image_to_png_bytes(img)

    # Extract components needed by Gen5 encoder
    latent = metadata["latent"]
    model_name = metadata["model_name"]
    model_version = metadata["model_version"]
    prompt = metadata["prompt"]
    tags = metadata["tags"]
    generation_settings = metadata["generation_settings"]
    hardware_info = metadata["hardware_info"]

    # Create chunk records container
    chunk_records = []

    # Encode using official Gen5 API
    try:
        handler.file_encoder(
            filename=output_path,
            latent=latent,
            chunk_records=chunk_records,
            model_name=model_name,
            model_version=model_version,
            prompt=prompt,
            tags=tags,
            img_binary=img_bytes,
            should_compress=should_compress,
            convert_float16=convert_float16,
            generation_settings=generation_settings,
            hardware_info=hardware_info
        )
    except Exception as e:
        # Fallback to minimal encoding if full metadata fails
        print(f"‚ö†Ô∏è Full metadata encoding failed: {str(e)}")
        print("Attempting minimal encoding...")
        handler.file_encoder(
            filename=output_path,
            latent=latent,
            chunk_records=chunk_records,
            model_name=model_name,
            model_version=model_version,
            prompt=prompt,
            tags=tags,
            img_binary=img_bytes,
            should_compress=should_compress,
            convert_float16=convert_float16
        )

    return os.path.getsize(output_path)

# -------------------------------
# 5. Benchmark Runner
# -------------------------------

def benchmark_single_image(img: Image.Image, metadata: Dict[str, Any], tmp_dir: Path) -> Dict[str, int]:
    """Benchmark a single image across all formats"""
    # Generate unique base name
    base_name = hashlib.sha256(image_to_png_bytes(img)).hexdigest()[:8]

    # 1. Create .gen5 file
    gen5_path = tmp_dir / f"{base_name}.gen5"
    gen5_size = create_gen5_file(
        img=img,
        metadata=metadata,
        output_path=str(gen5_path),
        should_compress=True,
        convert_float16=True
    )

    # 2. Create PNG with EXIF/XMP embedded
    png_embed_path = tmp_dir / f"{base_name}_exif_xmp.png"
    embed_exif_xmp_into_png(img.copy(), metadata, str(png_embed_path))
    exif_xmp_size = png_embed_path.stat().st_size

    # 3. Create PNG + XMP sidecar
    png_sidecar_path = tmp_dir / f"{base_name}_sidecar.png"
    xmp_sidecar_path = tmp_dir / f"{base_name}_sidecar.xmp"
    save_png_with_sidecar(img.copy(), metadata, str(png_sidecar_path), str(xmp_sidecar_path))
    sidecar_total = png_sidecar_path.stat().st_size + xmp_sidecar_path.stat().st_size

    # 4. Raw PNG for baseline
    raw_png_path = tmp_dir / f"{base_name}_raw.png"
    img.save(raw_png_path, "PNG", compress_level=9)
    raw_png_size = raw_png_path.stat().st_size

    return {
        "raw_png": raw_png_size,
        "gen5": gen5_size,
        "exif_xmp": exif_xmp_size,
        "sidecar_total": sidecar_total,
        "sidecar_png": png_sidecar_path.stat().st_size,
        "sidecar_xmp": xmp_sidecar_path.stat().st_size,
    }

def run_benchmark(num_images: int = 5) -> list:
    """Run full benchmark across multiple test images"""
    results = []

    with tempfile.TemporaryDirectory() as tmp_dir_str:
        tmp_dir = Path(tmp_dir_str)
        print(f"üìÅ Using temporary directory: {tmp_dir}")

        for i in range(num_images):
            print(f"\nüîÑ Processing image {i+1}/{num_images}...")
            img = create_test_image(seed=i)
            metadata = generate_rich_metadata(seed=i)

            try:
                res = benchmark_single_image(img, metadata, tmp_dir)
                results.append(res)
                print(f"‚úÖ Image {i+1} complete. Sizes:")
                print(f"   Raw PNG:       {res['raw_png']:,} bytes")
                print(f"   .gen5:         {res['gen5']:,} bytes (+{((res['gen5']/res['raw_png'])-1)*100:.1f}%)")
                print(f"   EXIF/XMP PNG:  {res['exif_xmp']:,} bytes (+{((res['exif_xmp']/res['raw_png'])-1)*100:.1f}%)")
                print(f"   Sidecar Total: {res['sidecar_total']:,} bytes (+{((res['sidecar_total']/res['raw_png'])-1)*100:.1f}%)")
            except Exception as e:
                print(f"‚ùå Failed on image {i+1}: {str(e)}")
                import traceback
                traceback.print_exc()
                continue

    return results

# -------------------------------
# 6. Visualization
# -------------------------------

def plot_results(results: list):
    """Generate publication-quality comparison graphs"""
    if not results:
        print("‚ö†Ô∏è No results to plot!")
        return

    n = len(results)
    indices = np.arange(n)
    width = 0.18

    # Prepare data
    raw_png = [r["raw_png"] for r in results]
    gen5 = [r["gen5"] for r in results]
    exif_xmp = [r["exif_xmp"] for r in results]
    sidecar_total = [r["sidecar_total"] for r in results]

    # Create figure with two subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), height_ratios=[3, 1])
    fig.suptitle("Storage Efficiency Benchmark: .gen5 vs EXIF/XMP vs Sidecar", fontsize=16, fontweight='bold')

    # Main comparison bar chart
    bars = [
        ax1.bar(indices - 1.5*width, raw_png, width, label="Raw PNG", color="#1f77b4", alpha=0.85, edgecolor='black'),
        ax1.bar(indices - 0.5*width, gen5, width, label=".gen5 Format", color="#ff7f0e", alpha=0.85, edgecolor='black'),
        ax1.bar(indices + 0.5*width, exif_xmp, width, label="PNG + EXIF/XMP", color="#2ca02c", alpha=0.85, edgecolor='black'),
        ax1.bar(indices + 1.5*width, sidecar_total, width, label="PNG + XMP Sidecar", color="#d62728", alpha=0.85, edgecolor='black')
    ]

    # Add value labels on bars
    for bar_group in bars:
        for bar in bar_group:
            height = bar.get_height()
            ax1.annotate(f'{height/1024:.1f}KB',
                        xy=(bar.get_x() + bar.get_width() / 2, height),
                        xytext=(0, 3),  # 3 points vertical offset
                        textcoords="offset points",
                        ha='center', va='bottom',
                        fontsize=8,
                        rotation=45)

    ax1.set_ylabel("File Size (bytes)", fontsize=12)
    ax1.set_title("Absolute File Sizes", fontsize=14)
    ax1.legend(loc='best', fontsize=10)
    ax1.grid(axis='y', linestyle='--', alpha=0.7)
    ax1.set_xticks(indices)
    ax1.set_xticklabels([f"Image {i+1}" for i in range(n)], fontsize=10)

    # Overhead comparison (percentage)
    gen5_overhead = [(g/r - 1)*100 for g, r in zip(gen5, raw_png)]
    exif_overhead = [(e/r - 1)*100 for e, r in zip(exif_xmp, raw_png)]
    sidecar_overhead = [(s/r - 1)*100 for s, r in zip(sidecar_total, raw_png)]

    ax2.plot(indices, gen5_overhead, 'o-', linewidth=2, markersize=8,
             label=".gen5 Overhead", color="#ff7f0e")
    ax2.plot(indices, exif_overhead, 's--', linewidth=2, markersize=8,
             label="EXIF/XMP Overhead", color="#2ca02c")
    ax2.plot(indices, sidecar_overhead, 'd-.', linewidth=2, markersize=8,
             label="Sidecar Overhead", color="#d62728")

    ax2.axhline(0, color='gray', linestyle='-', alpha=0.3)
    ax2.set_ylabel("Storage Overhead (%)", fontsize=12)
    ax2.set_title("Relative Metadata Overhead", fontsize=14)
    ax2.legend(loc='best', fontsize=10)
    ax2.grid(linestyle='--', alpha=0.7)
    ax2.set_xticks(indices)
    ax2.set_xticklabels([f"Image {i+1}" for i in range(n)], fontsize=10)

    # Add summary statistics as text box
    avg_gen5 = np.mean(gen5_overhead)
    avg_exif = np.mean(exif_overhead)
    avg_sidecar = np.mean(sidecar_overhead)

    stats_text = (
        f"Average Overhead:\n"
        f".gen5:       {avg_gen5:.1f}%\n"
        f"EXIF/XMP:    {avg_exif:.1f}%\n"
        f"Sidecar:     {avg_sidecar:.1f}%"
    )

    props = dict(boxstyle='round', facecolor='wheat', alpha=0.9)
    ax2.text(0.95, 0.05, stats_text, transform=ax2.transAxes,
             fontsize=10, verticalalignment='bottom', horizontalalignment='right',
             bbox=props)

    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.savefig("storage_benchmark.png", dpi=300, bbox_inches='tight')
    print(f"\nüìà Graph saved to: storage_benchmark.png")

    # Print detailed summary
    print("\n" + "="*60)
    print("üìä BENCHMARK SUMMARY")
    print("="*60)
    print(f"{'Format':<20} {'Avg Size (KB)':>15} {'Avg Overhead (%)':>20}")
    print("-"*60)
    print(f"{'Raw PNG':<20} {np.mean(raw_png)/1024:15.1f} {'-':>20}")
    print(f"{'.gen5':<20} {np.mean(gen5)/1024:15.1f} {avg_gen5:20.1f}")
    print(f"{'EXIF/XMP PNG':<20} {np.mean(exif_xmp)/1024:15.1f} {avg_exif:20.1f}")
    print(f"{'Sidecar (total)':<20} {np.mean(sidecar_total)/1024:15.1f} {avg_sidecar:20.1f}")
    print("="*60)

    # Technical analysis
    print("\nüí° KEY INSIGHTS:")
    print(f"‚Ä¢ .gen5 shows {abs(avg_gen5-avg_exif):.1f}% {'better' if avg_gen5 < avg_exif else 'worse'} efficiency than EXIF/XMP")
    print(f"‚Ä¢ Sidecar approach adds {avg_sidecar - min(avg_gen5, avg_exif):.1f}% more overhead than the best embedded format")
    print(f"‚Ä¢ .gen5 maintains consistent overhead ({np.std(gen5_overhead):.1f}% std dev) across different images")
    print("\n‚úÖ Benchmark completed successfully!")

if __name__ == "__main__":
    print("="*50)
    print("üöÄ GEN5 STORAGE EFFICIENCY BENCHMARK")
    print("="*50)
    print(f"üïí Starting at: {os.environ.get('CURRENT_TIME', 'Current time not set')}")
    print(f"üì¶ Using gen5 package version: {importlib.metadata.version('gen5')}")
    print("="*50)

    # Run benchmark with 5 test images
    results = run_benchmark(num_images=5)

    if results:
        plot_results(results)
    else:
        print("‚ùå No valid results collected. Check error messages above.")

    print("\n" + "="*50)
    print("üéâ BENCHMARK COMPLETE")
    print("="*50)

üöÄ GEN5 STORAGE EFFICIENCY BENCHMARK
üïí Starting at: Current time not set
üì¶ Using gen5 package version: 0.1.0
üìÅ Using temporary directory: /tmp/tmpfx7wp7xf

üîÑ Processing image 1/5...


  return Image.fromarray(img_arr, mode="RGB")


ENCODER STORED FLAG: 0000
‚ùå Failed on image 1: module 'piexif' has no attribute 'UserComment'

üîÑ Processing image 2/5...


Traceback (most recent call last):
  File "/tmp/ipython-input-1793139977.py", line 278, in run_benchmark
    res = benchmark_single_image(img, metadata, tmp_dir)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1793139977.py", line 241, in benchmark_single_image
    embed_exif_xmp_into_png(img.copy(), metadata, str(png_embed_path))
  File "/tmp/ipython-input-1793139977.py", line 105, in embed_exif_xmp_into_png
    exif_dict["Exif"][piexif.ExifIFD.UserComment] = piexif.UserComment.dump(
                                                    ^^^^^^^^^^^^^^^^^^
AttributeError: module 'piexif' has no attribute 'UserComment'


ENCODER STORED FLAG: 0000
‚ùå Failed on image 2: module 'piexif' has no attribute 'UserComment'

üîÑ Processing image 3/5...


Traceback (most recent call last):
  File "/tmp/ipython-input-1793139977.py", line 278, in run_benchmark
    res = benchmark_single_image(img, metadata, tmp_dir)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1793139977.py", line 241, in benchmark_single_image
    embed_exif_xmp_into_png(img.copy(), metadata, str(png_embed_path))
  File "/tmp/ipython-input-1793139977.py", line 105, in embed_exif_xmp_into_png
    exif_dict["Exif"][piexif.ExifIFD.UserComment] = piexif.UserComment.dump(
                                                    ^^^^^^^^^^^^^^^^^^
AttributeError: module 'piexif' has no attribute 'UserComment'


ENCODER STORED FLAG: 0000
‚ùå Failed on image 3: module 'piexif' has no attribute 'UserComment'

üîÑ Processing image 4/5...


Traceback (most recent call last):
  File "/tmp/ipython-input-1793139977.py", line 278, in run_benchmark
    res = benchmark_single_image(img, metadata, tmp_dir)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1793139977.py", line 241, in benchmark_single_image
    embed_exif_xmp_into_png(img.copy(), metadata, str(png_embed_path))
  File "/tmp/ipython-input-1793139977.py", line 105, in embed_exif_xmp_into_png
    exif_dict["Exif"][piexif.ExifIFD.UserComment] = piexif.UserComment.dump(
                                                    ^^^^^^^^^^^^^^^^^^
AttributeError: module 'piexif' has no attribute 'UserComment'


ENCODER STORED FLAG: 0000
‚ùå Failed on image 4: module 'piexif' has no attribute 'UserComment'

üîÑ Processing image 5/5...


Traceback (most recent call last):
  File "/tmp/ipython-input-1793139977.py", line 278, in run_benchmark
    res = benchmark_single_image(img, metadata, tmp_dir)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1793139977.py", line 241, in benchmark_single_image
    embed_exif_xmp_into_png(img.copy(), metadata, str(png_embed_path))
  File "/tmp/ipython-input-1793139977.py", line 105, in embed_exif_xmp_into_png
    exif_dict["Exif"][piexif.ExifIFD.UserComment] = piexif.UserComment.dump(
                                                    ^^^^^^^^^^^^^^^^^^
AttributeError: module 'piexif' has no attribute 'UserComment'


ENCODER STORED FLAG: 0000
‚ùå Failed on image 5: module 'piexif' has no attribute 'UserComment'
‚ùå No valid results collected. Check error messages above.

üéâ BENCHMARK COMPLETE


Traceback (most recent call last):
  File "/tmp/ipython-input-1793139977.py", line 278, in run_benchmark
    res = benchmark_single_image(img, metadata, tmp_dir)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1793139977.py", line 241, in benchmark_single_image
    embed_exif_xmp_into_png(img.copy(), metadata, str(png_embed_path))
  File "/tmp/ipython-input-1793139977.py", line 105, in embed_exif_xmp_into_png
    exif_dict["Exif"][piexif.ExifIFD.UserComment] = piexif.UserComment.dump(
                                                    ^^^^^^^^^^^^^^^^^^
AttributeError: module 'piexif' has no attribute 'UserComment'


In [None]:
pip install gen5 piexif pyexiv2 numpy matplotlib pillow

