# IoS-001 QUARANTINED Asset Backfill V2

**FjordHQ - STIG-2025-001 Compliant**

Denne notebook henter prisdata for **kun QUARANTINED assets** (45 stk).

## Korrekt Schema
- UNIQUE: `(listing_id, date, resolution)`
- Idempotent: `ON CONFLICT DO UPDATE`

## QUARANTINED Assets:
- **ETFs (15):** DIA, IWM, VOO, VTI, XLB-XLY
- **Mag7 (4):** AMZN, GOOGL, META, TSLA
- **US Large Cap (7):** JNJ, MA, V, UNH, BRK.B, HES, SQ
- **Oslo Børs (12):** BELCO, CRAYN, FLNG, GOGL, etc.
- **Andre (7):** ANSS, ASML, GOOG, PARA, SPLK, BT.A.L, STM.PA

## 1. Setup

In [None]:
!pip install yfinance pandas tqdm -q

from google.colab import drive
drive.mount('/content/drive')

!mkdir -p "/content/drive/MyDrive/FjordHQ/ios001_backfill/data"

print("Setup fullført!")

## 2. Konfigurasjon og Imports

In [None]:
import os
import json
import time
from datetime import datetime, timedelta, date
from pathlib import Path
from typing import Optional

import pandas as pd
import yfinance as yf
from tqdm import tqdm

# Konfigurasjon
CHECKPOINT_DIR = "/content/drive/MyDrive/FjordHQ/ios001_backfill"

# Rate Limits (Colab-optimalisert)
BATCH_SIZE = 5
DELAY_BETWEEN_ASSETS = 8.0
DELAY_BETWEEN_BATCHES = 180.0
MAX_RETRIES = 5
RETRY_BASE_DELAY = 30.0
RATE_LIMIT_BACKOFF = 600.0

# Historie
MAX_HISTORY_YEARS = 10

# Iron Curtain (IoS-001 §4.1)
EQUITY_FX_QUARANTINE = 252
EQUITY_FX_FULL_HISTORY = 1260

print(f"Checkpoint: {CHECKPOINT_DIR}")
print(f"Rate: {DELAY_BETWEEN_ASSETS}s mellom assets, {DELAY_BETWEEN_BATCHES}s mellom batches")

## 3. QUARANTINED Assets (45 stk fra database)

In [None]:
# Kun assets som er QUARANTINED i fhq_meta.assets
QUARANTINED_ASSETS = {
    # ARCX ETFs (15)
    "DIA": {"exchange": "ARCX", "yf_ticker": "DIA"},
    "IWM": {"exchange": "ARCX", "yf_ticker": "IWM"},
    "VOO": {"exchange": "ARCX", "yf_ticker": "VOO"},
    "VTI": {"exchange": "ARCX", "yf_ticker": "VTI"},
    "XLB": {"exchange": "ARCX", "yf_ticker": "XLB"},
    "XLC": {"exchange": "ARCX", "yf_ticker": "XLC"},
    "XLE": {"exchange": "ARCX", "yf_ticker": "XLE"},
    "XLF": {"exchange": "ARCX", "yf_ticker": "XLF"},
    "XLI": {"exchange": "ARCX", "yf_ticker": "XLI"},
    "XLK": {"exchange": "ARCX", "yf_ticker": "XLK"},
    "XLP": {"exchange": "ARCX", "yf_ticker": "XLP"},
    "XLRE": {"exchange": "ARCX", "yf_ticker": "XLRE"},
    "XLU": {"exchange": "ARCX", "yf_ticker": "XLU"},
    "XLV": {"exchange": "ARCX", "yf_ticker": "XLV"},
    "XLY": {"exchange": "ARCX", "yf_ticker": "XLY"},

    # XNAS (9)
    "AMZN": {"exchange": "XNAS", "yf_ticker": "AMZN"},
    "ANSS": {"exchange": "XNAS", "yf_ticker": "ANSS"},
    "ASML": {"exchange": "XNAS", "yf_ticker": "ASML"},
    "GOOG": {"exchange": "XNAS", "yf_ticker": "GOOG"},
    "GOOGL": {"exchange": "XNAS", "yf_ticker": "GOOGL"},
    "META": {"exchange": "XNAS", "yf_ticker": "META"},
    "PARA": {"exchange": "XNAS", "yf_ticker": "PARA"},
    "SPLK": {"exchange": "XNAS", "yf_ticker": "SPLK"},
    "TSLA": {"exchange": "XNAS", "yf_ticker": "TSLA"},

    # XNYS (7)
    "BRK.B": {"exchange": "XNYS", "yf_ticker": "BRK-B"},
    "HES": {"exchange": "XNYS", "yf_ticker": "HES"},
    "JNJ": {"exchange": "XNYS", "yf_ticker": "JNJ"},
    "MA": {"exchange": "XNYS", "yf_ticker": "MA"},
    "SQ": {"exchange": "XNYS", "yf_ticker": "SQ"},
    "UNH": {"exchange": "XNYS", "yf_ticker": "UNH"},
    "V": {"exchange": "XNYS", "yf_ticker": "V"},

    # XOSL (12)
    "BELCO.OL": {"exchange": "XOSL", "yf_ticker": "BELCO.OL"},
    "CRAYN.OL": {"exchange": "XOSL", "yf_ticker": "CRAYN.OL"},
    "FLNG.OL": {"exchange": "XOSL", "yf_ticker": "FLNG.OL"},
    "GJFAH.OL": {"exchange": "XOSL", "yf_ticker": "GJFAH.OL"},
    "GOGL.OL": {"exchange": "XOSL", "yf_ticker": "GOGL.OL"},
    "KAHOT.OL": {"exchange": "XOSL", "yf_ticker": "KAHOT.OL"},
    "PROTCT.OL": {"exchange": "XOSL", "yf_ticker": "PROTCT.OL"},
    "REC.OL": {"exchange": "XOSL", "yf_ticker": "REC.OL"},
    "SCHA.OL": {"exchange": "XOSL", "yf_ticker": "SCHA.OL"},
    "SCHB.OL": {"exchange": "XOSL", "yf_ticker": "SCHB.OL"},
    "SRBNK.OL": {"exchange": "XOSL", "yf_ticker": "SRBNK.OL"},
    "XXL.OL": {"exchange": "XOSL", "yf_ticker": "XXL.OL"},

    # XLON (1)
    "BT.A.L": {"exchange": "XLON", "yf_ticker": "BT-A.L"},

    # XPAR (1)
    "STM.PA": {"exchange": "XPAR", "yf_ticker": "STM.PA"},
}

print(f"QUARANTINED assets: {len(QUARANTINED_ASSETS)}")
print("\nPer exchange:")
exchanges = {}
for cid, info in QUARANTINED_ASSETS.items():
    ex = info['exchange']
    exchanges[ex] = exchanges.get(ex, 0) + 1
for ex, count in sorted(exchanges.items()):
    print(f"  {ex}: {count}")

## 4. Checkpoint System

In [None]:
def load_checkpoint():
    checkpoint_file = Path(CHECKPOINT_DIR) / "checkpoint_v2.json"
    if checkpoint_file.exists():
        with open(checkpoint_file, 'r') as f:
            return json.load(f)
    return {"completed": [], "failed": [], "last_update": None}

def save_checkpoint(checkpoint):
    checkpoint["last_update"] = datetime.now().isoformat()
    checkpoint_file = Path(CHECKPOINT_DIR) / "checkpoint_v2.json"
    with open(checkpoint_file, 'w') as f:
        json.dump(checkpoint, f, indent=2)

def save_csv(canonical_id: str, df: pd.DataFrame) -> Path:
    """Lagre med korrekt format for fhq_data.price_series"""
    data_dir = Path(CHECKPOINT_DIR) / "data"
    data_dir.mkdir(exist_ok=True)

    export_df = pd.DataFrame({
        'listing_id': canonical_id,
        'date': df.index,
        'open': df['Open'],
        'high': df['High'],
        'low': df['Low'],
        'close': df['Close'],
        'adj_close': df.get('Adj Close', df['Close']),
        'volume': df['Volume'].astype('Int64'),
        'price_type': 'RAW',
        'resolution': '1d',
        'data_source': 'yfinance_colab',
        'adr_epoch': 'COLAB_2024'
    })

    filepath = data_dir / f"{canonical_id.replace('.', '_')}.csv"
    export_df.to_csv(filepath, index=False)
    return filepath

# Last eksisterende checkpoint
checkpoint = load_checkpoint()
print(f"Checkpoint status:")
print(f"  Fullført: {len(checkpoint['completed'])}")
print(f"  Feilet: {len(checkpoint['failed'])}")
print(f"  Sist: {checkpoint.get('last_update', 'Aldri')}")

## 5. Fetch med Rate Limiting

In [None]:
def fetch_with_backoff(yf_ticker: str, start_date, end_date) -> Optional[pd.DataFrame]:
    """Fetch med eksponentiell backoff for Colab"""
    for attempt in range(MAX_RETRIES):
        try:
            if attempt > 0:
                delay = RETRY_BASE_DELAY * (2 ** (attempt - 1))
                print(f"  Retry {attempt + 1}/{MAX_RETRIES}, venter {delay:.0f}s...")
                time.sleep(delay)

            df = yf.download(
                yf_ticker,
                start=start_date.strftime('%Y-%m-%d'),
                end=(end_date + timedelta(days=1)).strftime('%Y-%m-%d'),
                interval='1d',
                auto_adjust=False,
                progress=False,
                threads=False
            )

            if df is None or df.empty:
                continue

            df = df.dropna(subset=['Close'])
            if not df.empty:
                return df

        except Exception as e:
            error_str = str(e).lower()
            if any(x in error_str for x in ["rate limit", "429", "too many"]):
                print(f"  RATE LIMITED! Venter {RATE_LIMIT_BACKOFF}s...")
                time.sleep(RATE_LIMIT_BACKOFF)
            else:
                print(f"  Feil: {e}")

    return None

print("Fetch-funksjon klar!")

## 6. Kjør Backfill

In [None]:
def run_backfill(resume=True):
    """Backfill alle QUARANTINED assets"""
    global checkpoint

    if not resume:
        checkpoint = {"completed": [], "failed": [], "last_update": None}

    pending = [cid for cid in QUARANTINED_ASSETS.keys()
               if cid not in checkpoint["completed"]]

    print("=" * 60)
    print("IoS-001 QUARANTINED BACKFILL")
    print("=" * 60)
    print(f"Total: {len(QUARANTINED_ASSETS)}")
    print(f"Fullført: {len(checkpoint['completed'])}")
    print(f"Gjenstår: {len(pending)}")
    print("=" * 60)

    if not pending:
        print("Alle assets prosessert!")
        return

    end_date = date.today() - timedelta(days=1)
    start_date = date.today() - timedelta(days=MAX_HISTORY_YEARS * 365)

    results = {"processed": 0, "success": 0, "failed": 0, "total_rows": 0}
    batch_count = 0

    for i, canonical_id in enumerate(tqdm(pending, desc="Backfill")):
        info = QUARANTINED_ASSETS[canonical_id]
        yf_ticker = info["yf_ticker"]

        results["processed"] += 1
        print(f"\n[{i+1}/{len(pending)}] {canonical_id} ({yf_ticker})...")

        try:
            df = fetch_with_backoff(yf_ticker, start_date, end_date)

            if df is None or df.empty:
                print(f"  FEILET: Ingen data")
                results["failed"] += 1
                if canonical_id not in checkpoint["failed"]:
                    checkpoint["failed"].append(canonical_id)
            else:
                csv_path = save_csv(canonical_id, df)
                rows = len(df)
                results["total_rows"] += rows

                if rows < EQUITY_FX_QUARANTINE:
                    status = "QUARANTINED"
                elif rows < EQUITY_FX_FULL_HISTORY:
                    status = "SHORT_HISTORY"
                else:
                    status = "FULL_HISTORY"

                print(f"  OK: {rows} rader -> {status}")
                results["success"] += 1

                if canonical_id not in checkpoint["completed"]:
                    checkpoint["completed"].append(canonical_id)
                if canonical_id in checkpoint["failed"]:
                    checkpoint["failed"].remove(canonical_id)

            save_checkpoint(checkpoint)

        except KeyboardInterrupt:
            print("\n\nAvbrutt! Checkpoint lagret.")
            save_checkpoint(checkpoint)
            return results

        except Exception as e:
            print(f"  ERROR: {e}")
            results["failed"] += 1

        batch_count += 1
        time.sleep(DELAY_BETWEEN_ASSETS)

        if batch_count >= BATCH_SIZE:
            batch_count = 0
            print(f"\nBatch pause: {DELAY_BETWEEN_BATCHES}s...")
            time.sleep(DELAY_BETWEEN_BATCHES)

    print("\n" + "=" * 60)
    print("FULLFØRT")
    print(f"Suksess: {results['success']}/{results['processed']}")
    print(f"Totalt rader: {results['total_rows']}")
    print("=" * 60)

    return results

## 7. START BACKFILL HER

In [None]:
# Kjør backfill for alle 45 QUARANTINED assets
# Resume=True fortsetter fra forrige checkpoint ved disconnect
results = run_backfill(resume=True)

## 8. Sjekk Status

In [None]:
# Oppdater og vis status
checkpoint = load_checkpoint()
data_dir = Path(CHECKPOINT_DIR) / "data"
csv_files = list(data_dir.glob("*.csv")) if data_dir.exists() else []

status_counts = {"FULL_HISTORY": 0, "SHORT_HISTORY": 0, "QUARANTINED": 0}
total_rows = 0

for csv_file in csv_files:
    df = pd.read_csv(csv_file)
    rows = len(df)
    total_rows += rows

    if rows < EQUITY_FX_QUARANTINE:
        status_counts["QUARANTINED"] += 1
    elif rows < EQUITY_FX_FULL_HISTORY:
        status_counts["SHORT_HISTORY"] += 1
    else:
        status_counts["FULL_HISTORY"] += 1

print("=" * 60)
print("IoS-001 BACKFILL STATUS")
print("=" * 60)
print(f"Checkpoint: {len(checkpoint['completed'])}/{len(QUARANTINED_ASSETS)} fullført")
print(f"Feilet: {len(checkpoint['failed'])}")
print(f"CSV-filer: {len(csv_files)}")
print(f"Totalt rader: {total_rows:,}")
print()
print("Iron Curtain Status:")
print(f"  FULL_HISTORY (5+ år): {status_counts['FULL_HISTORY']}")
print(f"  SHORT_HISTORY (1-5 år): {status_counts['SHORT_HISTORY']}")
print(f"  QUARANTINED (<1 år): {status_counts['QUARANTINED']}")
print("=" * 60)

## 9. Generer Import Script

In [None]:
# Generer Python script for lokal import
import_script = f'''#!/usr/bin/env python3
"""
IoS-001 IMPORT SCRIPT - fhq_data.price_series
Generert: {datetime.now().isoformat()}
UNIQUE: (listing_id, date, resolution)
"""
import pandas as pd
import psycopg2
from pathlib import Path

DB = {{
    "host": "127.0.0.1",
    "port": "54322",
    "database": "postgres",
    "user": "postgres",
    "password": "postgres"
}}

DATA_DIR = Path("./ios001_data")

def import_csv(conn, filepath):
    df = pd.read_csv(filepath)
    sql = """
        INSERT INTO fhq_data.price_series (
            listing_id, date, open, high, low, close, adj_close,
            volume, price_type, resolution, data_source, adr_epoch
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s::price_type, %s::resolution, %s, %s)
        ON CONFLICT (listing_id, date, resolution) DO UPDATE SET
            open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low,
            close = EXCLUDED.close, adj_close = EXCLUDED.adj_close,
            volume = EXCLUDED.volume, data_source = EXCLUDED.data_source
    """
    inserted = 0
    with conn.cursor() as cur:
        for _, row in df.iterrows():
            cur.execute(sql, (
                row["listing_id"], row["date"],
                row["open"] if pd.notna(row["open"]) else None,
                row["high"] if pd.notna(row["high"]) else None,
                row["low"] if pd.notna(row["low"]) else None,
                row["close"] if pd.notna(row["close"]) else None,
                row["adj_close"] if pd.notna(row["adj_close"]) else None,
                int(row["volume"]) if pd.notna(row["volume"]) else None,
                row["price_type"], row["resolution"],
                row["data_source"], row["adr_epoch"]
            ))
            inserted += 1
        conn.commit()
    return inserted

def update_status(conn, canonical_id):
    with conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM fhq_data.price_series WHERE listing_id = %s AND close IS NOT NULL", (canonical_id,))
        count = cur.fetchone()[0]
        status = "QUARANTINED" if count < 252 else "SHORT_HISTORY" if count < 1260 else "FULL_HISTORY"
        cur.execute("UPDATE fhq_meta.assets SET valid_row_count = %s, data_quality_status = %s::data_quality_status WHERE canonical_id = %s",
                    (count, status, canonical_id))
        conn.commit()
    return count, status

if __name__ == "__main__":
    conn = psycopg2.connect(**DB)
    files = list(DATA_DIR.glob("*.csv"))
    print(f"Importerer {{len(files)}} filer...")
    total = 0
    for f in files:
        cid = f.stem.replace("_", ".")
        try:
            rows = import_csv(conn, f)
            count, status = update_status(conn, cid)
            total += rows
            print(f"{{cid}}: {{rows}} -> {{status}} ({{count}} total)")
        except Exception as e:
            print(f"{{cid}}: FEIL - {{e}}")
    conn.close()
    print(f"Totalt: {{total}} rader")
'''

script_path = Path(CHECKPOINT_DIR) / "import_to_db_v2.py"
with open(script_path, 'w') as f:
    f.write(import_script)

print(f"Import script: {script_path}")
print()
print("INSTRUKSJONER:")
print("1. Last ned /data/ mappen fra Google Drive")
print("2. Kopier CSV-filer til ./ios001_data/")
print("3. Kopier import_to_db_v2.py")
print("4. Kjør: python import_to_db_v2.py")

## 10. Retry Feilede Assets

In [None]:
# Vis og retry feilede assets
checkpoint = load_checkpoint()

if checkpoint["failed"]:
    print(f"Feilede assets ({len(checkpoint['failed'])}):")
    for cid in checkpoint["failed"]:
        print(f"  - {cid}")
    print()
    print("Fjern fra failed-listen for å retry:")
    print("checkpoint['failed'] = []")
    print("save_checkpoint(checkpoint)")
    print("run_backfill(resume=True)")
else:
    print("Ingen feilede assets!")