# MVP-Pipeline: Dataland ‚Üí Transform ‚Üí Mock-Rooms ‚Üí AI & Visualisierung

In diesem Notebook bauen wir Schritt f√ºr Schritt eine Daten-Pipeline, die:
1. **Nachhaltigkeitsdaten** von Unternehmen aus der Dataland-API holt
2. Diese Daten **transformiert** und in ein einheitliches Format bringt
3. Die Daten in **Mock-Rooms** (lokale Dateien) speichert
4. **KI-basierte Analysen** und **Visualisierungen** erm√∂glicht

---

### Was ist Dataland?

Dataland ist eine Plattform, die ESG-Daten (Environmental, Social, Governance) von Unternehmen sammelt und √ºber eine API bereitstellt. ESG-Daten umfassen z.B.:
- CO‚ÇÇ-Emissionen
- Energieverbrauch
- Mitarbeiter-Kennzahlen
- Governance-Strukturen

### Was ist ein Connector?

Ein Connector ist ein Programmteil, der:
- Eine **Verbindung** zu einer externen Datenquelle aufbaut
- Die Daten **abruft** (meist √ºber HTTP-Requests)
- Die Rohdaten f√ºr die weitere Verarbeitung **bereitstellt**

---

## 1. Setup & Konfiguration

Zuerst installieren und importieren wir alle ben√∂tigten Python-Bibliotheken.

### Was machen diese Bibliotheken?

- **requests**: Erm√∂glicht HTTP-Anfragen an APIs (wie Dataland)
- **json**: Verarbeitet JSON-Daten (das Standard-Datenformat von APIs)
- **datetime**: Hilft beim Arbeiten mit Datum und Zeit
- **uuid**: Erzeugt eindeutige IDs f√ºr unsere Events
- **os**: Erm√∂glicht Zugriff auf das Dateisystem
- **pathlib**: Moderner Weg, um mit Dateipfaden zu arbeiten

In [3]:
# Bibliotheken importieren
import requests
import json
from datetime import datetime
import uuid
import os
from dotenv import load_dotenv
from pathlib import Path
from typing import List, Dict, Any, Optional
import time
import hashlib

# Lade Umgebungsvariablen aus .env-Datei
load_dotenv()

print("‚úÖ Alle Bibliotheken erfolgreich importiert!")
print("‚úÖ Umgebungsvariablen aus .env geladen!")

‚úÖ Alle Bibliotheken erfolgreich importiert!
‚úÖ Umgebungsvariablen aus .env geladen!


---

## 2. Konfiguration der Dataland-API

Bevor wir Daten abrufen k√∂nnen, m√ºssen wir festlegen:
- Wo ist die API? (Base URL)
- Wie authentifizieren wir uns? (API Key)
- Welches Unternehmen interessiert uns?

### Wichtig: API-Zugang

In diesem MVP verwenden wir zun√§chst **Mock-Daten** oder √∂ffentlich zug√§ngliche Endpoints. F√ºr den Produktivbetrieb w√ºrden Sie:
1. Einen Account bei Dataland erstellen
2. Einen API-Key erhalten
3. Diesen sicher in einer Umgebungsvariable speichern

In [1]:
# Konfiguration (alles an einem Ort)
CONFIG = {
    # API Base URLs (getrennte Services!)
    "base_url_api": "https://dataland.com/api",
    "base_url_documents": "https://dataland.com/documents",
    
    # API Token aus Environment
    "api_token": os.getenv("DATALAND_TOKEN"),
    
    # Unternehmen zum Testen
    "company_query": "Siemens",
    
    # Company-Lookup Kandidaten (API-Service)
    # Probiere verschiedene Endpunkte, da √∂ffentliche Swagger nicht eindeutig ist
    "company_lookup_api_candidates": [
        # {"path": "/metadata/companies/search", "params": {"q": None}},
        # {"path": "/metadata/companies", "params": {"query": None}},
        # {"path": "/companies/search", "params": {"q": None}},
        # {"path": "/companies", "params": {"name": None}},
        # {"path": "/entities/search", "params": {"q": None}},
        # {"path": "/api/companies/search", "params": {"q": None}},
        {"path": "/companies/names", "params": {"searchString": None}},
    ],
    
    # Company-Lookup Kandidaten (Documents-Service) - Fallback
    "company_lookup_doc_candidates": [
        {"path": "/", "params": {"q": None}},
        {"path": "/search", "params": {"q": None}},
        {"path": "/", "params": {"query": None}},
        {"path": "/search", "params": {"query": None}},
        {"path": "/companies/search", "params": {"q": None}},
        {"path": "/documents/search", "params": {"q": None}},
    ],
    
    # Timeouts
    "timeout_search": 30,
    "timeout_data": 20,
    
    # Retry-Konfiguration
    "max_retries": 3,
    "backoff_base": 2,        # Exponential Backoff Basis (2^retry_count)
    "backoff_max": 60,        # Maximale Wartezeit in Sekunden
    "rate_limit_wait": 60,    # Wartezeit bei Rate Limit (429)
    
    # Raw Data Directory
    "raw_dir": Path("raw")
}

# Erstelle raw/ Verzeichnis
CONFIG["raw_dir"].mkdir(exist_ok=True)

print("‚úÖ Konfiguration geladen:")
print(f"   API Service: {CONFIG['base_url_api']}")
print(f"   Documents Service: {CONFIG['base_url_documents']}")
print(f"   Company Query: {CONFIG['company_query']}")
print(f"   API-Lookup Kandidaten: {len(CONFIG['company_lookup_api_candidates'])}")
print(f"   Docs-Lookup Kandidaten: {len(CONFIG['company_lookup_doc_candidates'])}")
print(f"   Token verf√ºgbar: {'‚úÖ Ja' if CONFIG['api_token'] else '‚ùå Nein (.env fehlt)'}")
print(f"   Retry-Config: max={CONFIG['max_retries']}, backoff={CONFIG['backoff_base']}^n (max {CONFIG['backoff_max']}s)")

NameError: name 'os' is not defined

---

## 3. Helper-Funktionen f√ºr die API-Kommunikation

Bevor wir Daten abrufen, erstellen wir wiederverwendbare Funktionen.

### Was macht ein guter Connector?

Ein professioneller Connector sollte:
- **Fehler behandeln**: Was passiert, wenn die API nicht antwortet?
- **Authentifizierung**: Korrektes Senden von API-Keys
- **Logging**: Was wird gerade gemacht? Gab es Probleme?
- **Retry-Logik**: Bei tempor√§ren Fehlern automatisch nochmal versuchen
- **Pagination**: Gro√üe Datenmengen in Teilen abrufen

---

## 3. JSONL Raw-Persistenz Funktionen

Bevor wir Daten von der API abrufen, erstellen wir Funktionen zum Speichern der Rohdaten.

### Was ist Raw-Persistenz?

**Raw-Persistenz** bedeutet, dass wir die Daten **exakt so** speichern, wie wir sie von der API bekommen:
- ‚úÖ Keine Transformation
- ‚úÖ Keine Normalisierung  
- ‚úÖ Komplette API-Response
- ‚úÖ Metadaten zum Request (Zeitstempel, Endpoint, Status)

### Warum JSONL?

**JSONL** (JSON Lines) ist perfekt f√ºr unseren Use-Case:
- Eine Zeile = ein API-Response
- Einfach zu erweitern (append-only)
- Sp√§ter leicht zu verarbeiten
- Fehlertoleranz (eine kaputte Zeile ‚â† kaputte Datei)

### Envelope-Format

Jede Zeile enth√§lt ein "Envelope" mit Metadaten:

```json
{
  "ts": "2025-11-05T13:45:12Z",
  "endpoint": "/api/metadata/companies/search",
  "status": 200,
  "request": {"params": {"q": "Siemens"}},
  "payload": { /* Original API-Response */ }
}
```

In [27]:
from datetime import datetime, timezone

def nowz() -> str:
    """
    Gibt aktuellen UTC-Zeitstempel im ISO-Format zur√ºck.
    
    Returns:
        String wie "2025-11-05T13:45:12Z"
    """
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def append_jsonl(path: Path, obj: Dict[str, Any]) -> None:
    """
    F√ºgt ein JSON-Objekt als neue Zeile zu einer JSONL-Datei hinzu.
    
    Args:
        path: Pfad zur JSONL-Datei
        obj: Dictionary, das gespeichert werden soll
    """
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")


def create_envelope(endpoint: str, status: int, request_params: Dict[str, Any], 
                    payload: Any, error: Optional[Any] = None) -> Dict[str, Any]:
    """
    Envelope mit Metadaten + optionalem Fehlerfeld erstellen.
    - endpoint: aufgerufener API-Pfad (ohne Base-URL)
    - status: HTTP-Statuscode (oder -1 bei Ausnahme)
    - request_params: Query-Parameter
    - payload: Response-Body (roh)
    - error: optionaler Fehlertext/-objekt
    """
    env = {
        "ts": nowz(),
        "endpoint": endpoint,
        "status": status,
        "request": {"params": request_params},
        "payload": payload
    }
    if error is not None:
        env["error"] = error
    return env


def generate_hash(company_id: str, data_point_id: str = None, 
                  period: str = None) -> str:
    """
    Generiert einen SHA256-Hash f√ºr Idempotenz-Checks.
    
    Verhindert, dass dieselben Daten mehrfach gespeichert werden.
    
    Args:
        company_id: Die Unternehmens-ID
        data_point_id: Optional - Datenpunkt-ID
        period: Optional - Periode
        
    Returns:
        Hexadezimaler Hash-String
    """
    key = f"{company_id}:{data_point_id or ''}:{period or ''}"
    return hashlib.sha256(key.encode()).hexdigest()


print("‚úÖ JSONL-Persistenz-Funktionen definiert")
print(f"   - nowz(): {nowz()}")
print(f"   - append_jsonl(): Bereit zum Schreiben")
print(f"   - create_envelope(): Bereit f√ºr API-Responses")
print(f"   - generate_hash(): Bereit f√ºr Idempotenz-Checks")

‚úÖ JSONL-Persistenz-Funktionen definiert
   - nowz(): 2025-11-10T10:02:00Z
   - append_jsonl(): Bereit zum Schreiben
   - create_envelope(): Bereit f√ºr API-Responses
   - generate_hash(): Bereit f√ºr Idempotenz-Checks


---

## 4. HTTP-Session mit Retry-Logik & Authentifizierung

Jetzt erstellen wir eine robuste HTTP-Session f√ºr die API-Kommunikation.

### Was macht diese Session besonders?

1. **Authentifizierung**: Bearer-Token automatisch in jedem Request
2. **Retry-Logik**: Bei tempor√§ren Fehlern automatisch wiederholen
3. **Exponential Backoff**: Bei Rate Limits (HTTP 429) intelligente Wartezeiten
4. **Fehlerbehandlung**: Unterscheidung zwischen permanenten und tempor√§ren Fehlern
5. **Logging**: Detaillierte Ausgaben f√ºr Debugging

### Welche Fehler werden behandelt?

- **HTTP 429** (Too Many Requests): Rate Limit erreicht ‚Üí Exponential Backoff
- **HTTP 5xx** (Server-Fehler): Tempor√§rer Fehler ‚Üí Retry
- **Timeout**: Netzwerk-Problem ‚Üí Retry
- **HTTP 4xx** (au√üer 429): Client-Fehler ‚Üí Kein Retry (z.B. 404, 401)

In [34]:
class DatalandHTTPSession:
    """
    HTTP-Session mit Retry-Logik, Exponential Backoff und Authentifizierung.
    
    Diese Klasse kapselt alle HTTP-Kommunikation mit der Dataland API.
    """
    
    def __init__(self, base_url: str, api_token: Optional[str], config: Dict[str, Any]):
        """
        Initialisiert die HTTP-Session.
        
        Args:
            base_url: Die Base-URL f√ºr diese Session (z.B. /api oder /documents)
            api_token: Bearer-Token f√ºr Authentifizierung
            config: Konfigurationsdictionary mit Timeouts, Retries, etc.
        """
        self.base_url = base_url
        self.max_retries = config["max_retries"]
        self.backoff_base = config["backoff_base"]
        self.backoff_max = config["backoff_max"]
        
        # Session erstellen
        self.session = requests.Session()
        
        # Headers setzen
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json"
        }
        
        # Bearer-Token hinzuf√ºgen (falls vorhanden)
        if api_token:
            # print(api_token)
            headers["Authorization"] = f"{api_token}"
            # Zeige nur erste/letzte 4 Zeichen f√ºr Debugging
            token_preview = f"{api_token[:4]}...{api_token[-4:]}" if len(api_token) > 8 else "***"
            print(f"‚úÖ Session f√ºr {base_url} mit Token {token_preview} initialisiert")
        else:
            print(f"‚ö†Ô∏è  Session f√ºr {base_url} ohne Token (Mock-Modus)")
        
        self.session.headers.update(headers)
        
        # Statistiken
        self.stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "retries": 0,
            "rate_limits": 0
        }
    
    def _calculate_backoff(self, attempt: int) -> float:
        """
        Berechnet Wartezeit f√ºr Exponential Backoff.
        
        Args:
            attempt: Versuch-Nummer (1-basiert)
            
        Returns:
            Wartezeit in Sekunden
        """
        wait = min(self.backoff_base ** attempt, self.backoff_max)
        return wait
    
    def get(self, endpoint: str, params: Optional[Dict] = None, 
            timeout: int = 30) -> Dict[str, Any]:
        """
        F√ºhrt GET-Request mit Retry-Logik aus.
        
        Args:
            endpoint: API-Endpoint (z.B. "/" oder "/search")
            params: Query-Parameter
            timeout: Timeout in Sekunden
            
        Returns:
            Dictionary mit {"status": int, "data": Any, "error": str|None}
        """
        url = f"{self.base_url}{endpoint}"
        params = params or {}
        
        self.stats["total_requests"] += 1
        
        for attempt in range(1, self.max_retries + 1):
            try:
                print(f"üîÑ GET {endpoint} (Versuch {attempt}/{self.max_retries})")
                print(f"   URL: {url}")
                if params:
                    print(f"   Params: {params}")
                
                response = self.session.get(url, params=params, timeout=timeout)
                
                # Erfolg!
                if response.status_code == 200:
                    self.stats["successful_requests"] += 1
                    print(f"‚úÖ Status 200 - Erfolg")
                    return {
                        "status": 200,
                        "data": response.json(),
                        "error": None
                    }
                
                # Rate Limit (429)
                elif response.status_code == 429:
                    self.stats["rate_limits"] += 1
                    wait = self._calculate_backoff(attempt)
                    print(f"‚è±Ô∏è  Rate Limit (429) - Warte {wait}s...")
                    time.sleep(wait)
                    self.stats["retries"] += 1
                    continue
                
                # Server-Fehler (5xx) - Retry
                elif 500 <= response.status_code < 600:
                    print(f"‚ö†Ô∏è  Server-Fehler ({response.status_code})")
                    if attempt < self.max_retries:
                        wait = self._calculate_backoff(attempt)
                        print(f"   Retry in {wait}s...")
                        time.sleep(wait)
                        self.stats["retries"] += 1
                        continue
                    else:
                        self.stats["failed_requests"] += 1
                        return {
                            "status": response.status_code,
                            "data": None,
                            "error": f"Server-Fehler nach {self.max_retries} Versuchen"
                        }
                
                # Client-Fehler (4xx) - Kein Retry
                else:
                    self.stats["failed_requests"] += 1
                    print(f"‚ùå Client-Fehler ({response.status_code})")
                    try:
                        error_data = response.json()
                    except:
                        error_data = response.text
                    return {
                        "status": response.status_code,
                        "data": None,
                        "error": f"HTTP {response.status_code}: {error_data}"
                    }
            
            except requests.exceptions.Timeout:
                print(f"‚è±Ô∏è  Timeout bei Versuch {attempt}")
                if attempt < self.max_retries:
                    wait = self._calculate_backoff(attempt)
                    print(f"   Retry in {wait}s...")
                    time.sleep(wait)
                    self.stats["retries"] += 1
                else:
                    self.stats["failed_requests"] += 1
                    return {
                        "status": 0,
                        "data": None,
                        "error": f"Timeout nach {self.max_retries} Versuchen"
                    }
            
            except requests.exceptions.RequestException as e:
                print(f"‚ùå Netzwerk-Fehler: {e}")
                if attempt < self.max_retries:
                    wait = self._calculate_backoff(attempt)
                    print(f"   Retry in {wait}s...")
                    time.sleep(wait)
                    self.stats["retries"] += 1
                else:
                    self.stats["failed_requests"] += 1
                    return {
                        "status": 0,
                        "data": None,
                        "error": f"Netzwerk-Fehler: {str(e)}"
                    }
        
        # Sollte nie erreicht werden
        self.stats["failed_requests"] += 1
        return {
            "status": 0,
            "data": None,
            "error": "Maximale Retries erreicht"
        }
    
    def print_stats(self):
        """Gibt Statistiken aus."""
        print("\n" + "="*60)
        print(f"üìä HTTP-SESSION STATISTIKEN ({self.base_url})")
        print("="*60)
        for key, value in self.stats.items():
            print(f"  {key}: {value}")
        print("="*60)


# ‚ö†Ô∏è WICHTIG: ZWEI Sessions erstellen - eine f√ºr jeden Service!
print("\n" + "="*70)
print("üîß ERSTELLE ZWEI GETRENNTE SESSIONS")
print("="*70)

# Session 1: API Service (Metadaten, Dimensions, Values)
api_session = DatalandHTTPSession(
    base_url=CONFIG["base_url_api"],
    api_token=CONFIG["api_token"],
    config=CONFIG
)

# Session 2: Documents Service (Reports, Sustainability Documents)
doc_session = DatalandHTTPSession(
    base_url=CONFIG["base_url_documents"],
    api_token=CONFIG["api_token"],
    config=CONFIG
)

print("\n‚úÖ Beide Sessions bereit f√ºr API-Calls!")


üîß ERSTELLE ZWEI GETRENNTE SESSIONS
‚úÖ Session f√ºr https://dataland.com/api mit Token Bear...7586 initialisiert
‚úÖ Session f√ºr https://dataland.com/documents mit Token Bear...7586 initialisiert

‚úÖ Beide Sessions bereit f√ºr API-Calls!


---

## üîç SCHRITT 1a: COMPANY-LOOKUP VIA API SERVICE (Prim√§rweg)

**Ziel:** Finde die `company_id` √ºber den API-Service (nicht Documents).

**Problem:** Die Swagger-Dokumentation zeigt keinen eindeutigen `/metadata/companies/search` Endpoint. Daher implementieren wir einen **systematischen Probing-Ansatz**:

### Strategie:

1. **Probe-Liste:** Teste verschiedene Endpoint-Kandidaten aus der Konfiguration
2. **Persistenz:** Speichere alle Versuche in `raw/company_lookup_api.jsonl`
3. **Fallback:** Wenn API-Lookup fehlschl√§gt ‚Üí Documents-Service nutzen

### Warum API-Service bevorzugen?

- Direkte `company_id` ohne Umweg √ºber Documents
- Konsistent mit `/metadata/available-data-dimensions` Flow
- Bessere Performance (keine Document-Metadaten laden)

### Typische Response:

```json
{
  "companies": [
    {
      "companyId": "dataland_siemens_ag_001",
      "name": "Siemens AG",
      "sector": "Industrials",
      "country": "DE"
    }
  ]
}
```

---

In [53]:
def search_company_via_api(api_session: DatalandHTTPSession, query: str, 
                           raw_dir: Path) -> Optional[Dict[str, Any]]:
    """
    Sucht nach einem Unternehmen √ºber API-Service (Prim√§rweg).
    
    ‚ö†Ô∏è WICHTIG: Nutzt api_session, nicht doc_session!
    
    Probiert systematisch verschiedene Endpoint-Kandidaten aus der Config,
    da √∂ffentliche Swagger keinen eindeutigen Company-Search dokumentiert.
    
    Args:
        api_session: DatalandHTTPSession f√ºr API-Service
        query: Suchbegriff (z.B. "Siemens")
        raw_dir: Verzeichnis f√ºr Raw-Daten
        
    Returns:
        Dictionary mit company_id, name, sector, country oder None
    """
    print("\n" + "="*70)
    print("üîç SCHRITT 1a: COMPANY-LOOKUP VIA API SERVICE (Prim√§rweg)")
    print("="*70)
    print(f"Suche nach: '{query}' √ºber API-Service")
    print(f"Probe {len(CONFIG['company_lookup_api_candidates'])} Endpoint-Kandidaten...")
    
    candidates = CONFIG["company_lookup_api_candidates"]
    successful_result = None
    
    for idx, candidate in enumerate(candidates, 1):
        endpoint = candidate["path"]
        param_key = list(candidate["params"].keys())[0]
        params = {param_key: query}
        
        print(f"\nüß™ Probe {idx}/{len(candidates)}: {endpoint}")
        print(f"   Params: {params}")
        
        result = api_session.get(endpoint, params=params, timeout=CONFIG["timeout_search"])
        
        # Raw-Persistenz
        envelope = create_envelope(endpoint, result["status"], params, result["data"], 
                                   error=result.get("error"))
        append_jsonl(raw_dir / "company_lookup_api.jsonl", envelope)
        
        if result["status"] == 200 and result["data"]:
            print(f"   ‚úÖ HTTP 200 - Daten erhalten!")
            successful_result = result
            break
        elif result["status"] == 404:
            print(f"   ‚ùå HTTP 404 - Endpoint existiert nicht")
        elif result["status"] == 401:
            print(f"   ‚ùå HTTP 401 - Auth-Problem")
        else:
            print(f"   ‚ùå HTTP {result['status']}")
    
    if not successful_result:
        print(f"\n‚ùå Alle {len(candidates)} API-Probes fehlgeschlagen")
        print(f"üíæ Alle Versuche gespeichert in: company_lookup_api.jsonl")
        print(f"üîÑ Fallback: Wechsle zu Documents-Service...")
        return None
    
    # Analysiere Response
    response_data = successful_result["data"]
    
    # Finde Company-Array (verschiedene Feldnamen m√∂glich)
    companies = None
    for possible_key in ["companies", "results", "data", "items", "entities"]:
        if possible_key in response_data:
            companies = response_data[possible_key]
            break
    
    # Falls direkt ein Array zur√ºckkommt
    if companies is None and isinstance(response_data, list):
        companies = response_data
    
    # Falls einzelnes Objekt (kein Array)
    if companies is None and isinstance(response_data, dict):
        companies = [response_data]
    
    if not companies:
        print(f"‚ö†Ô∏è  Keine Unternehmen gefunden (leere Response)")
        return None
    
    print(f"\n‚úÖ {len(companies)} Unternehmen gefunden:")
    
    # Zeige ersten 5
    for i, comp in enumerate(companies[:5], 1):
        name = comp.get("name") or comp.get("companyName") or comp.get("company") or "?"
        cid = comp.get("companyId") or comp.get("id") or comp.get("dataId") or "?"
        sector = comp.get("sector") or "N/A"
        print(f"   {i}. {name} (ID: {cid}, Sektor: {sector})")
    
    if len(companies) > 5:
        print(f"   ... und {len(companies) - 5} weitere")
    
    # Nimm das erste Ergebnis (bei mehreren Treffern)
    selected = companies[0]
    
    if len(companies) > 1:
        print(f"\n‚ö†Ô∏è  Mehrere Treffer - nutze ersten: {selected.get('name') or selected.get('companyName')}")
    
    # Extrahiere relevante Felder (flexibel)
    company_info = {
        "company_id": selected.get("companyId") or selected.get("id") or selected.get("dataId"),
        "name": selected.get("companyName") or selected.get("name") or selected.get("company"),
        "sector": selected.get("sector"),
        "country": selected.get("countryCode") or selected.get("country"),
        "source": "api",  # Markiere als API-Lookup
        "raw": selected
    }
    
    print(f"\nüìå Ausgew√§hltes Unternehmen (via API):")
    print(f"   Name: {company_info['name']}")
    print(f"   ID: {company_info['company_id']}")
    print(f"   Sektor: {company_info.get('sector', 'N/A')}")
    print(f"   Land: {company_info.get('country', 'N/A')}")
    
    return company_info


def search_company_via_documents(doc_session: DatalandHTTPSession, query: str, 
                                  raw_dir: Path) -> Optional[Dict[str, Any]]:
    """
    Sucht nach einem Unternehmen √ºber Documents-Service (Fallback).
    
    ‚ö†Ô∏è WICHTIG: Nutzt doc_session, nicht api_session!
    
    Args:
        doc_session: DatalandHTTPSession f√ºr Documents-Service
        query: Suchbegriff (z.B. "Siemens")
        raw_dir: Verzeichnis f√ºr Raw-Daten
        
    Returns:
        Dictionary mit company_id, name, sector, country oder None
    """
    print("\n" + "="*70)
    print("üîç SCHRITT 1b: COMPANY-LOOKUP VIA DOCUMENTS SERVICE (Fallback)")
    print("="*70)
    print(f"Suche nach: '{query}' √ºber Documents-Service")
    
    candidates = CONFIG["company_lookup_doc_candidates"]
    successful_result = None
    
    for idx, candidate in enumerate(candidates, 1):
        endpoint = candidate["path"]
        param_key = list(candidate["params"].keys())[0]
        params = {param_key: query}
        
        print(f"\nüß™ Probe {idx}/{len(candidates)}: {endpoint}")
        print(f"   Params: {params}")
        
        result = doc_session.get(endpoint, params=params, timeout=CONFIG["timeout_search"])
        
        # Raw-Persistenz
        envelope = create_envelope(endpoint, result["status"], params, result["data"],
                                   error=result.get("error"))
        append_jsonl(raw_dir / "company_lookup_documents.jsonl", envelope)
        
        if result["status"] == 200 and result["data"]:
            print(f"   ‚úÖ HTTP 200 - Daten erhalten!")
            successful_result = result
            break
        elif result["status"] == 404:
            print(f"   ‚ùå HTTP 404 - Endpoint existiert nicht")
        else:
            print(f"   ‚ùå HTTP {result['status']}")
    
    if not successful_result:
        print(f"\n‚ùå Alle {len(candidates)} Documents-Probes fehlgeschlagen")
        print(f"üíæ Alle Versuche gespeichert in: company_lookup_documents.jsonl")
        return None
    
    # Analysiere Response (gleiche Logik wie API)
    response_data = successful_result["data"]
    
    companies = None
    for possible_key in ["companies", "results", "data", "items", "documents"]:
        if possible_key in response_data:
            companies = response_data[possible_key]
            break
    
    if companies is None and isinstance(response_data, list):
        companies = response_data
    
    if companies is None and isinstance(response_data, dict):
        companies = [response_data]
    
    if not companies:
        print(f"‚ö†Ô∏è  Keine Unternehmen gefunden")
        return None
    
    print(f"\n‚úÖ {len(companies)} Unternehmen gefunden (via Documents):")
    
    for i, comp in enumerate(companies[:5], 1):
        name = comp.get("name") or comp.get("companyName") or "?"
        cid = comp.get("companyId") or comp.get("id") or "?"
        print(f"   {i}. {name} (ID: {cid})")
    
    selected = companies[0]
    
    company_info = {
        "company_id": selected.get("companyId") or selected.get("id") or selected.get("dataId"),
        "name": selected.get("companyName") or selected.get("name"),
        "sector": selected.get("sector"),
        "country": selected.get("countryCode") or selected.get("country"),
        "source": "documents",  # Markiere als Documents-Lookup
        "raw": selected
    }
    
    print(f"\nüìå Ausgew√§hltes Unternehmen (via Documents):")
    print(f"   Name: {company_info['name']}")
    print(f"   ID: {company_info['company_id']}")
    print(f"   Source: Documents-Service (Fallback)")
    
    return company_info


# Company-Lookup mit Fallback-Strategie
company_info = None

# 1. Versuch: API-Service (Prim√§rweg)
company_info = search_company_via_api(api_session, CONFIG["company_query"], CONFIG["raw_dir"])

# 2. Versuch: Documents-Service (Fallback)
if company_info is None:
    print("\n" + "‚ö†Ô∏è "*35)
    print("‚ö†Ô∏è  API-Lookup fehlgeschlagen - aktiviere Documents-Fallback")
    print("‚ö†Ô∏è "*35)
    company_info = search_company_via_documents(doc_session, CONFIG["company_query"], CONFIG["raw_dir"])

# Finale Auswertung
if company_info:
    print("\n" + "="*70)
    print("‚úÖ COMPANY-LOOKUP ERFOLGREICH")
    print("="*70)
    print(f"   Methode: {company_info.get('source', 'unknown').upper()}")
    print(f"   Company ID: {company_info['company_id']}")
    print(f"   Name: {company_info['name']}")

    # üîΩüîΩüîΩ NEU: Finale Company-ID als Variable f√ºr den n√§chsten Schritt speichern
    # ‚úÖ Finale Company-ID global speichern
    global final_company_id
    final_company_id = company_info["company_id"]
    print(f"\nüîó Finale Company ID (Variable): {final_company_id}")

else:
    print("\n" + "="*70)
    print("‚ùå COMPANY-LOOKUP KOMPLETT FEHLGESCHLAGEN")
    print("="*70)
    print("‚ö†Ô∏è  Weder API noch Documents lieferten Ergebnisse")
    print("‚ö†Ô∏è  Fahre mit Mock-Daten fort...")
    print("\nüí° Tipp:")
    print("   - Pr√ºfe .env (DATALAND_TOKEN vorhanden?)")
    print("   - Pr√ºfe Netzwerk (Firewall, VPN?)")
    print("   - Pr√ºfe raw/*.jsonl Logs f√ºr Details")


üîç SCHRITT 1a: COMPANY-LOOKUP VIA API SERVICE (Prim√§rweg)
Suche nach: 'Siemens' √ºber API-Service
Probe 1 Endpoint-Kandidaten...

üß™ Probe 1/1: /companies/names
   Params: {'searchString': 'Siemens'}
üîÑ GET /companies/names (Versuch 1/3)
   URL: https://dataland.com/api/companies/names
   Params: {'searchString': 'Siemens'}
‚úÖ Status 200 - Erfolg
   ‚úÖ HTTP 200 - Daten erhalten!

‚úÖ 100 Unternehmen gefunden:
   1. Siemens Aktiengesellschaft (ID: f16a12ff-714c-4dd1-b141-eb8b0355c833, Sektor: N/A)
   2. Siemens Energy AG (ID: 39260625-315b-4158-97aa-64b69eb331db, Sektor: N/A)
   3. SIEMENS FINANCIERINGSMAATSCHAPPIJ N.V. (ID: adee0944-7d21-46cf-977a-6ec988b782a1, Sektor: N/A)
   4. Siemens Healthineers AG (ID: cb57b6f0-8523-4c74-9dde-ff20e727a55c, Sektor: N/A)
   5. Siemens Proprietary Limited (ID: 2be341ba-1336-4113-aa51-59f4a742726f, Sektor: N/A)
   ... und 95 weitere

‚ö†Ô∏è  Mehrere Treffer - nutze ersten: Siemens Aktiengesellschaft

üìå Ausgew√§hltes Unternehmen (via API)

---

## üìä SCHRITT 2: VERF√úGBARE DIMENSIONEN ABRUFEN

**Jetzt, wo wir eine `company_id` haben,** fragen wir die API: **Welche Daten sind f√ºr dieses Unternehmen verf√ºgbar?**

### Was sind "Dimensions"?

**Available Data Dimensions** (`/metadata/available-data-dimensions`) ist ein Metadaten-Endpoint, der zur√ºckgibt:
- Welche **Datenpunkte** (Indicators) verf√ºgbar sind
- F√ºr welche **Perioden** (Jahre, Quartale) Daten vorliegen
- Welche **Dimensionen** (z.B. Scope 1/2/3 bei CO‚ÇÇ) existieren
- **IDs**, die wir f√ºr den Datenabruf brauchen

### Swagger-Dokumentation:

Dieser Endpoint ist offiziell dokumentiert:
- **URL:** `https://dataland.com/api/swagger-ui/index.html`
- **Ressource:** `/metadata/available-data-dimensions`
- **Parameter:** `companyId` (Pflicht)

### Beispiel-Response:

```json
{
  "availableDataPoints": [
    {
      "dataPointId": "co2_scope1_2023",
      "period": "2023",
      "dimension": "Scope1",
      "indicator": "CO2_Emissions",
      "unit": "Mt"
    },
    {
      "dataPointId": "energy_total_2023",
      "period": "2023",
      "indicator": "Energy_Consumption",
      "unit": "TWh"
    }
  ]
}
```

### Warum wichtig?

Ohne Dimensions wissen wir nicht, **welche Daten √ºberhaupt abrufbar sind**. Jede Dimension liefert eine `dataPointId`, die wir im n√§chsten Schritt f√ºr den Value-Abruf brauchen.

---

In [None]:
from typing import Any, Dict, List, Optional
from pathlib import Path
from datetime import datetime

# Direkt vor dem Aufruf sicherstellen, dass final_company_id existiert
print(f"üìå Verwende finale Company ID: {final_company_id}")

def list_available_dimensions(api_session: DatalandHTTPSession, company_id: str,
                              raw_dir: Path) -> Optional[List[Dict[str, Any]]]:
    """
    Fragt verf√ºgbare Datendimensionen f√ºr ein Unternehmen ab.
    Verwendet die von Dataland erwarteten Filter:
      - companyIds: Liste mit genau einer ID
      - reportingPeriodFrom / reportingPeriodTo: sinnvoller Zeitraum
    Persistiert Raw-Response in raw/available_dimensions.jsonl.
    """

# üí° √úberschreibe sicherheitshalber company_id mit der globalen final_company_id
    company_id = globals().get("final_company_id", company_id)

    print("\n" + "="*70)
    print("SCHRITT 2: AVAILABLE DIMENSIONS (via API-Service)")
    print("="*70)
    print(f"Hole verf√ºgbare Daten f√ºr company_id: {company_id}")

    endpoint = "/metadata/available-data-dimensions"

    # Zeitraumfilter: z.B. die letzten 5 vollen Jahre bis inkl. aktuelles Jahr
    year_now = datetime.now().year
    params = {
        "companyIds": [company_id],          # <- wichtig: Plural + Array
        "reportingPeriodFrom": year_now - 5, # anpassbar
        "reportingPeriodTo": year_now        # anpassbar
        # Optional (sp√§ter): categories, indicatorIds etc., wenn ihr weiter filtern wollt
    }

    print(f"‚û°Ô∏è API-Request an {endpoint}")
    print(json.dumps(params, indent=2))

    result = api_session.get(endpoint, params=params, timeout=CONFIG["timeout_data"])

    # Raw-Persistenz
    envelope = create_envelope(endpoint, result["status"], params, result.get("data"), error=result.get("error"))
    append_jsonl(raw_dir / "available_dimensions.jsonl", envelope)
    print("Dimensions gespeichert in: available_dimensions.jsonl")

    # Status pr√ºfen
    if result["status"] != 200:
        print(f"Fehler beim Abruf: HTTP {result['status']}")
        if result.get("error"):
            print(f"   Error: {result['error']}")
            # Typische Hilfe bei 400:
            if result["status"] == 400:
                print("   Hinweis: Pr√ºfe, dass 'companyIds' (Plural) eine Liste ist "
                      "und dass ein Zeitraum (reportingPeriodFrom/To) gesetzt ist.")
        return None

    # Response extrahieren (flexibel je nach Schema)
    data = result.get("data")
    dims = None
    if isinstance(data, dict):
        for key in ("availableDataPoints", "dimensions", "dataPoints", "data", "results", "items"):
            if key in data and isinstance(data[key], list):
                dims = data[key]
                break
    if dims is None and isinstance(data, list):
        dims = data

    if not dims:
        print("Keine Dimensionen gefunden (leere Response)")
        return None

    print(f"\n{len(dims)} Dimensionen gefunden:")
    for i, d in enumerate(dims[:10], 1):
        dp_id = d.get("dataPointId") or d.get("id") or "?"
        indicator = d.get("indicator") or d.get("metric") or d.get("code") or "?"
        period = d.get("period") or d.get("year") or "?"
        print(f"   {i}. {indicator} ({period}) - ID: {dp_id}")
    if len(dims) > 10:
        print(f"   ... und {len(dims) - 10} weitere")

    return dims


# # Dimensions abrufen (via api_session!)
# # ‚ö†Ô∏è WICHTIG: Nutzt api_session, NICHT doc_session!
# dimensions = None
# # if company_info and company_info.get("company_id"):
# #     dimensions = list_available_dimensions(
# #         api_session,  # <- API Service nutzen!
# #         company_info["company_id"],
# #         CONFIG["raw_dir"]
# #     )

# # Verwendet direkt die finale Company-ID Variable
# if 'final_company_id' in locals() and final_company_id:
#     dimensions = list_available_dimensions(
#         api_session,          # <- API Service nutzen!
#         final_company_id,     # <- hier wird die finale ID verwendet!
#         CONFIG["raw_dir"]
#     )

#     if dimensions:
#         print("\n" + "="*70)
#         print("‚úÖ DIMENSIONS ERFOLGREICH ABGERUFEN")
#         print("="*70)
#     else:
#         print("\n" + "="*70)
#         print("‚ùå DIMENSIONS FEHLER ODER LEER")
#         print("="*70)
#         print("‚ö†Ô∏è  Fahre mit Mock-Daten fort...")
# else:
#     print("\n" + "="*70)
#     print("‚ö†Ô∏è  √úberspringe Dimensions (keine company_id)")
#     print("="*70)


# ======================================================================
# AUSF√úHRUNG
# ======================================================================

# 1Ô∏è‚É£ Company Lookup
# company_info = search_company_via_api(api_session, CONFIG["company_query"], CONFIG["raw_dir"])

# 2Ô∏è‚É£ Available Dimensions abrufen ‚Äì NUR mit final_company_id
if "final_company_id" in globals() and final_company_id:
    print(f"\nüìé Verwende finale Company ID f√ºr Dimensions: {final_company_id}")
    dimensions = list_available_dimensions(api_session, final_company_id, CONFIG["raw_dir"])

    if dimensions:
        print("\n" + "="*70)
        print("‚úÖ DIMENSIONS ERFOLGREICH ABGERUFEN")
        print("="*70)
    else:
        print("\n" + "="*70)
        print("‚ùå DIMENSIONS FEHLER ODER LEER")
        print("="*70)
else:
    print("\n" + "="*70)
    print("‚ö†Ô∏è Keine finale Company-ID gefunden ‚Äì √úberspringe Dimensions")
    print("="*70)

üìå Verwende finale Company ID: f16a12ff-714c-4dd1-b141-eb8b0355c833

üìé Verwende finale Company ID f√ºr Dimensions: f16a12ff-714c-4dd1-b141-eb8b0355c833

SCHRITT 2: AVAILABLE DIMENSIONS (via API-Service)
Hole verf√ºgbare Daten f√ºr company_id: f16a12ff-714c-4dd1-b141-eb8b0355c833
‚û°Ô∏è API-Request an /metadata/available-data-dimensions
{
  "companyIds": [
    "f16a12ff-714c-4dd1-b141-eb8b0355c833"
  ],
  "reportingPeriodFrom": 2020,
  "reportingPeriodTo": 2025
}
üîÑ GET /metadata/available-data-dimensions (Versuch 1/3)
   URL: https://dataland.com/api/metadata/available-data-dimensions
   Params: {'companyIds': ['f16a12ff-714c-4dd1-b141-eb8b0355c833'], 'reportingPeriodFrom': 2020, 'reportingPeriodTo': 2025}
‚úÖ Status 200 - Erfolg
Dimensions gespeichert in: available_dimensions.jsonl

421 Dimensionen gefunden:
   1. ? (?) - ID: ?
   2. ? (?) - ID: ?
   3. ? (?) - ID: ?
   4. ? (?) - ID: ?
   5. ? (?) - ID: ?
   6. ? (?) - ID: ?
   7. ? (?) - ID: ?
   8. ? (?) - ID: ?
   9. ? (?) 

---

## 7. Values ziehen: Alle Daten systematisch abrufen

Jetzt kommt der Hauptteil: F√ºr **jeden** Datenpunkt aus den Dimensions rufen wir die tats√§chlichen **Werte** ab.

### Strategie: "Alles Verf√ºgbare"

Wir iterieren √ºber alle Dimensions-Eintr√§ge und rufen f√ºr jeden:
- Den **Wert** (Value)
- Die **Metadaten** (Einheit, Quelle, Qualit√§t)
- Weitere **Kontexte** (Scope, Kategorie, etc.)

### Idempotenz & Duplicate-Vermeidung

Um zu verhindern, dass wir denselben Datenpunkt mehrfach ziehen:
- Generieren wir einen **Hash** aus `(company_id, data_point_id, period)`
- Pr√ºfen, ob dieser Hash schon in einer Set-Variable existiert
- Speichern nur, wenn neu

### Fehlertoleranz

- Einzelne Fehler (z.B. 404 f√ºr einen Datenpunkt) stoppen nicht die gesamte Pipeline
- Jeder Fehler wird geloggt
- Am Ende: Zusammenfassung von Erfolgen und Fehlern

### Progress-Tracking

Bei vielen Datenpunkten (>100):
- Zeigen wir alle 10% den Fortschritt an
- Geben Zwischenstatistiken aus

In [55]:
def fetch_all_values(api_session: DatalandHTTPSession, company_id: str,
                     dimensions: List[Dict[str, Any]], raw_dir: Path) -> Dict[str, Any]:
    """
    Ruft alle Werte f√ºr die gegebenen Dimensions ab.
    
    ‚ö†Ô∏è WICHTIG: Nutzt api_session (API Service), nicht doc_session!
    
    Speichert jede Response in raw/values_<timestamp>.jsonl
    
    Args:
        api_session: DatalandHTTPSession f√ºr API-Service
        company_id: Die Unternehmens-ID
        dimensions: Liste der Dimensions (aus list_available_dimensions)
        raw_dir: Verzeichnis f√ºr Raw-Daten
        
    Returns:
        Dictionary mit Statistiken
    """
    print("\n" + "="*70)
    print("üíé SCHRITT 3: VALUES ZIEHEN (via API-Service)")
    print("="*70)
    print(f"Ziehe Werte f√ºr {len(dimensions)} Datenpunkte...")
    
    # Zeitstempel f√ºr Dateiname
    timestamp = nowz().replace(":", "").replace("-", "").replace("Z", "")
    values_file = raw_dir / f"values_{timestamp}.jsonl"
    
    # Statistiken
    stats = {
        "total": len(dimensions),
        "success": 0,
        "failed": 0,
        "skipped_duplicates": 0,
        "errors": []
    }
    
    # Duplicate-Tracking
    seen_hashes = set()
    
    # API-Endpoint (aus Swagger √ºbernehmen!)
    endpoint = "/data-points/values"
    
    # Fortschrittsbalken-Schritte (alle 10%)
    progress_step = max(1, len(dimensions) // 10)
    
    print(f"\nüîÑ Starte Datenabruf...")
    start_time = time.time()
    
    for i, dim in enumerate(dimensions, 1):
        # Extrahiere IDs (Feldnamen aus Swagger anpassen!)
        data_point_id = dim.get("dataPointId") or dim.get("id")
        period = dim.get("period") or dim.get("year")
        indicator = dim.get("indicator") or dim.get("name") or "Unknown"
        
        if not data_point_id:
            print(f"‚ö†Ô∏è  [{i}/{len(dimensions)}] √úberspringe: Keine data_point_id")
            stats["failed"] += 1
            continue
        
        # Duplicate-Check
        hash_key = generate_hash(company_id, data_point_id, str(period) if period else None)
        if hash_key in seen_hashes:
            stats["skipped_duplicates"] += 1
            continue
        seen_hashes.add(hash_key)
        
        # Progress
        if i % progress_step == 0 or i == 1:
            progress_pct = (i / len(dimensions)) * 100
            elapsed = time.time() - start_time
            rate = i / elapsed if elapsed > 0 else 0
            eta = (len(dimensions) - i) / rate if rate > 0 else 0
            print(f"\nüìä Fortschritt: {i}/{len(dimensions)} ({progress_pct:.1f}%)")
            print(f"   Erfolg: {stats['success']}, Fehler: {stats['failed']}")
            print(f"   Rate: {rate:.1f}/s, ETA: {eta:.0f}s")
        
        # Parameter f√ºr Request
        params = {
            "companyId": company_id,
            "dataPointId": data_point_id
        }
        if period:
            params["period"] = period
        
        # API-Call (mit api_session!)
        result = api_session.get(endpoint, params=params, timeout=CONFIG["timeout_values"])
        
        # Raw-Persistenz (auch Fehler speichern!)
        envelope = create_envelope(endpoint, result["status"], params, result["data"])
        append_jsonl(values_file, envelope)
        
        # Statistiken
        if result["status"] == 200:
            stats["success"] += 1
            print(f"   ‚úÖ [{i}/{len(dimensions)}] {indicator} ({period})")
        else:
            stats["failed"] += 1
            error_msg = f"{indicator} ({period}): {result['error']}"
            stats["errors"].append(error_msg)
            print(f"   ‚ùå [{i}/{len(dimensions)}] {error_msg}")
    
    elapsed_total = time.time() - start_time
    
    print("\n" + "="*70)
    print("üìä VALUES-ABRUF ABGESCHLOSSEN")
    print("="*70)
    print(f"üíæ Daten gespeichert in: {values_file.name}")
    print(f"\nüìà Statistiken:")
    print(f"   Total: {stats['total']}")
    print(f"   Erfolgreich: {stats['success']}")
    print(f"   Fehlgeschlagen: {stats['failed']}")
    print(f"   Duplikate √ºbersprungen: {stats['skipped_duplicates']}")
    print(f"   Dauer: {elapsed_total:.1f}s")
    print(f"   Rate: {stats['total']/elapsed_total:.1f} Datenpunkte/s")
    
    if stats["errors"] and len(stats["errors"]) <= 10:
        print(f"\n‚ùå Fehler-Details:")
        for error in stats["errors"][:10]:
            print(f"   - {error}")
    elif stats["errors"]:
        print(f"\n‚ùå {len(stats['errors'])} Fehler (erste 10 gezeigt)")
        for error in stats["errors"][:10]:
            print(f"   - {error}")
    
    return stats


# Values abrufen (nur wenn dimensions vorhanden)
# ‚ö†Ô∏è WICHTIG: Nutzt api_session!
values_stats = None
if dimensions and company_info:
    values_stats = fetch_all_values(
        api_session,  # <- API Service nutzen!
        company_info["company_id"],
        dimensions,
        CONFIG["raw_dir"]
    )
    
    if values_stats["success"] > 0:
        print("\n‚úÖ VALUES ERFOLGREICH ABGERUFEN")
    else:
        print("\n‚ùå KEINE VALUES ABGERUFEN")
else:
    print("\n" + "="*70)
    print("‚ö†Ô∏è  √úberspringe Values (keine dimensions)")
    print("="*70)


üíé SCHRITT 3: VALUES ZIEHEN (via API-Service)
Ziehe Werte f√ºr 421 Datenpunkte...

üîÑ Starte Datenabruf...
‚ö†Ô∏è  [1/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [2/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [3/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [4/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [5/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [6/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [7/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [8/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [9/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [10/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [11/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [12/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [13/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [14/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [15/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [16/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [17/421] √úberspringe: Keine data_point_id
‚ö†Ô∏è  [18/421] √úberspringe:

---

## 8. (Optional) Dokumente & Sustainability Reports

Zus√§tzlich zu den strukturierten Daten (Metriken) k√∂nnen wir auch **Dokumente** abrufen:
- Sustainability Reports (Nachhaltigkeitsberichte)
- Annual Reports (Gesch√§ftsberichte)
- Weitere PDF/Text-Dokumente

### Warum Dokumente?

Dokumente enthalten:
- ‚úÖ **Textuelle Kontexte** f√ºr AI/Q&A
- ‚úÖ **Strategien & Narrative** (nicht nur Zahlen)
- ‚úÖ **Qualitative Informationen**
- ‚úÖ **Erkl√§rungen** zu den Metriken

### Ablauf:

1. **Suche** nach Dokumenten f√ºr das Unternehmen
2. **Metadaten** abrufen (Titel, Jahr, Typ, URL)
3. **Speichern** in `raw/document_search.jsonl`

### Hinweis:

Der vollst√§ndige Download der PDF-Dateien ist **optional** und nicht Teil dieses MVPs. Wir speichern zun√§chst nur die Metadaten und URLs.

In [32]:
def search_documents(doc_session: DatalandHTTPSession, query: str, 
                     company_id: Optional[str], raw_dir: Path) -> Optional[List[Dict[str, Any]]]:
    """
    Sucht nach Dokumenten f√ºr ein Unternehmen.
    
    ‚ö†Ô∏è WICHTIG: Nutzt doc_session (Documents Service), nicht api_session!
    
    Speichert Suchergebnisse in raw/document_search.jsonl
    
    Args:
        doc_session: DatalandHTTPSession f√ºr Documents-Service
        query: Suchbegriff (z.B. "Siemens")
        company_id: Optional - Unternehmens-ID f√ºr gezielte Suche
        raw_dir: Verzeichnis f√ºr Raw-Daten
        
    Returns:
        Liste von Dokument-Metadaten, oder None bei Fehler
    """
    print("\n" + "="*70)
    print("üìÑ SCHRITT 4 (Optional): DOKUMENTE SUCHEN (via Documents-Service)")
    print("="*70)
    print(f"Suche nach Dokumenten f√ºr: '{query}'")
    
    # Probing-Liste f√ºr Documents
    # Da doc_session base_url="/documents" hat, ist "/" oder "/search" korrekt
    candidates = [
        ("/",       {"q": query}),
        ("/search", {"q": query}),
        ("/",       {"query": query}),
        ("/search", {"query": query}),
    ]
    
    # Falls company_id vorhanden, erweitere Parameter
    if company_id:
        candidates.extend([
            ("/",       {"q": query, "companyId": company_id}),
            ("/search", {"q": query, "companyId": company_id}),
        ])
    
    successful_result = None
    
    for endpoint, params in candidates:
        print(f"\nüß™ Probe: {endpoint} mit {params}")
        
        result = doc_session.get(endpoint, params=params, timeout=CONFIG["timeout_search"])
        
        # Raw-Persistenz
        envelope = create_envelope(endpoint, result["status"], params, result["data"])
        append_jsonl(raw_dir / "document_search.jsonl", envelope)
        
        if result["status"] == 200 and result["data"]:
            print(f"   ‚úÖ Erfolg! Nutze diesen Endpoint")
            successful_result = result
            break
        else:
            print(f"   ‚ùå Fehler: {result['status']}")
    
    if not successful_result:
        print(f"\n‚ùå Alle Probing-Versuche fehlgeschlagen")
        print(f"üíæ Alle Versuche gespeichert in: document_search.jsonl")
        return None
    
    docs_response = successful_result["data"]
    
    # Extrahiere Dokumente-Array (Feldname anpassen!)
    documents = None
    for possible_key in ["documents", "results", "data", "items"]:
        if possible_key in docs_response:
            documents = docs_response[possible_key]
            break
    
    if documents is None and isinstance(docs_response, list):
        documents = docs_response
    
    if not documents:
        print(f"‚ö†Ô∏è  Keine Dokumente gefunden")
        return None
    
    print(f"\n‚úÖ {len(documents)} Dokumente gefunden:")
    
    # Zeige erste 10
    for i, doc in enumerate(documents[:10], 1):
        title = doc.get("title") or doc.get("name") or "Untitled"
        year = doc.get("year") or doc.get("reportingPeriod") or "?"
        doc_type = doc.get("documentType") or doc.get("type") or "?"
        print(f"   {i}. {title} ({year}) - Typ: {doc_type}")
    
    if len(documents) > 10:
        print(f"   ... und {len(documents) - 10} weitere")
    
    # Statistiken
    types = {}
    years = set()
    for doc in documents:
        doc_type = doc.get("documentType") or doc.get("type") or "Unknown"
        types[doc_type] = types.get(doc_type, 0) + 1
        if "year" in doc:
            years.add(doc["year"])
    
    print(f"\nüìä Statistiken:")
    print(f"   Dokumenttypen: {dict(types)}")
    print(f"   Jahre: {sorted(years) if years else 'N/A'}")
    
    return documents


# Dokumente suchen (optional)
# ‚ö†Ô∏è WICHTIG: Nutzt doc_session!
documents = None
if company_info:
    documents = search_documents(
        doc_session,  # <- Documents Service nutzen!
        CONFIG["company_query"],
        company_info.get("company_id"),
        CONFIG["raw_dir"]
    )
    
    if documents:
        print("\n‚úÖ DOKUMENTE GEFUNDEN")
    else:
        print("\n‚ö†Ô∏è  KEINE DOKUMENTE ODER FEHLER")
else:
    print("\n" + "="*70)
    print("‚ö†Ô∏è  √úberspringe Dokumente (keine company_info)")
    print("="*70)


üìÑ SCHRITT 4 (Optional): DOKUMENTE SUCHEN (via Documents-Service)
Suche nach Dokumenten f√ºr: 'Siemens'

üß™ Probe: / mit {'q': 'Siemens'}
üîÑ GET / (Versuch 1/3)
   URL: https://dataland.com/documents/
   Params: {'q': 'Siemens'}
‚ùå Client-Fehler (400)
   ‚ùå Fehler: 400

üß™ Probe: /search mit {'q': 'Siemens'}
üîÑ GET /search (Versuch 1/3)
   URL: https://dataland.com/documents/search
   Params: {'q': 'Siemens'}
‚ùå Client-Fehler (404)
   ‚ùå Fehler: 404

üß™ Probe: / mit {'query': 'Siemens'}
üîÑ GET / (Versuch 1/3)
   URL: https://dataland.com/documents/
   Params: {'query': 'Siemens'}
‚ùå Client-Fehler (400)
   ‚ùå Fehler: 400

üß™ Probe: /search mit {'query': 'Siemens'}
üîÑ GET /search (Versuch 1/3)
   URL: https://dataland.com/documents/search
   Params: {'query': 'Siemens'}
‚ùå Client-Fehler (404)
   ‚ùå Fehler: 404

üß™ Probe: / mit {'q': 'Siemens', 'companyId': 'f16a12ff-714c-4dd1-b141-eb8b0355c833'}
üîÑ GET / (Versuch 1/3)
   URL: https://dataland.com/documents/

---

## 9. Run-Log & Gesamt-Statistiken

Zum Abschluss erstellen wir ein **Run-Log**, das alle wichtigen Informationen √ºber diesen Durchlauf speichert.

### Was enth√§lt das Run-Log?

- ‚úÖ **Zeitstempel** (Start, Ende, Dauer)
- ‚úÖ **Konfiguration** (Company Query, Base URL)
- ‚úÖ **Statistiken** (Erfolge, Fehler, Records)
- ‚úÖ **HTTP-Session-Stats** (Requests, Retries, Rate Limits)
- ‚úÖ **Fehler-Zusammenfassung**
- ‚úÖ **Compliance-Info** (z.B. API-Nutzungsbedingungen)

### Warum ist das wichtig?

Das Run-Log erm√∂glicht:
- üìä **Monitoring**: Wie gut l√§uft die Pipeline?
- üêõ **Debugging**: Was lief schief?
- üìù **Audit**: Wann wurden welche Daten gezogen?
- ‚öñÔ∏è **Compliance**: Nachweis der regelkonformen Nutzung

In [33]:
def create_run_log(api_session: DatalandHTTPSession, doc_session: DatalandHTTPSession, 
                   raw_dir: Path, company_info: Optional[Dict], dimensions: Optional[List],
                   values: Optional[List], documents: Optional[List]) -> Dict[str, Any]:
    """
    Erzeugt Run-Log mit Statistiken von BEIDEN Sessions.
    
    Args:
        api_session: DatalandHTTPSession f√ºr API-Service
        doc_session: DatalandHTTPSession f√ºr Documents-Service
        raw_dir: Verzeichnis f√ºr Raw-Daten
        company_info: Unternehmensdaten
        dimensions: Dimensionen
        values: Datenpunkte
        documents: Dokumente
        
    Returns:
        Run-Log als Dictionary
    """
    print("\n" + "="*70)
    print("üìä SCHRITT 5: RUN-LOG ERZEUGEN")
    print("="*70)
    
    # Statistiken von beiden Sessions kombinieren
    api_stats = api_session.get_stats()
    doc_stats = doc_session.get_stats()
    
    run_log = {
        "timestamp": nowz(),
        "company_query": CONFIG["company_query"],
        "company_found": bool(company_info),
        "company_id": company_info.get("company_id") if company_info else None,
        
        # Gesammelte Daten
        "dimensions_count": len(dimensions) if dimensions else 0,
        "values_count": len(values) if values else 0,
        "documents_count": len(documents) if documents else 0,
        
        # API Service Statistiken
        "api_service": {
            "total_requests": api_stats["total_requests"],
            "successful_requests": api_stats["successful_requests"],
            "failed_requests": api_stats["failed_requests"],
            "total_retries": api_stats["total_retries"],
            "rate_limits_hit": api_stats["rate_limits_hit"],
        },
        
        # Documents Service Statistiken
        "documents_service": {
            "total_requests": doc_stats["total_requests"],
            "successful_requests": doc_stats["successful_requests"],
            "failed_requests": doc_stats["failed_requests"],
            "total_retries": doc_stats["total_retries"],
            "rate_limits_hit": doc_stats["rate_limits_hit"],
        },
        
        # Gesamt-Statistiken
        "total_statistics": {
            "total_requests": api_stats["total_requests"] + doc_stats["total_requests"],
            "successful_requests": api_stats["successful_requests"] + doc_stats["successful_requests"],
            "failed_requests": api_stats["failed_requests"] + doc_stats["failed_requests"],
            "total_retries": api_stats["total_retries"] + doc_stats["total_retries"],
            "rate_limits_hit": api_stats["rate_limits_hit"] + doc_stats["rate_limits_hit"],
        }
    }
    
    # Persistiere
    envelope = create_envelope("run_log", 200, {}, run_log)
    append_jsonl(raw_dir / "run_log.jsonl", envelope)
    
    print("\n‚úÖ Run-Log gespeichert")
    print(f"   Dimensionen: {run_log['dimensions_count']}")
    print(f"   Values: {run_log['values_count']}")
    print(f"   Dokumente: {run_log['documents_count']}")
    print(f"\n   API Service:")
    print(f"      Requests: {run_log['api_service']['total_requests']} " +
          f"(‚úÖ {run_log['api_service']['successful_requests']}, " +
          f"‚ùå {run_log['api_service']['failed_requests']})")
    print(f"      Retries: {run_log['api_service']['total_retries']}")
    print(f"      Rate Limits: {run_log['api_service']['rate_limits_hit']}")
    print(f"\n   Documents Service:")
    print(f"      Requests: {run_log['documents_service']['total_requests']} " +
          f"(‚úÖ {run_log['documents_service']['successful_requests']}, " +
          f"‚ùå {run_log['documents_service']['failed_requests']})")
    print(f"      Retries: {run_log['documents_service']['total_retries']}")
    print(f"      Rate Limits: {run_log['documents_service']['rate_limits_hit']}")
    print(f"\n   GESAMT:")
    print(f"      Total Requests: {run_log['total_statistics']['total_requests']}")
    print(f"      Retries: {run_log['total_statistics']['total_retries']}")
    
    return run_log


# Run-Log mit BEIDEN Sessions erzeugen
run_log = create_run_log(
    api_session,  # <- API Service
    doc_session,  # <- Documents Service
    CONFIG["raw_dir"],
    company_info,
    dimensions,
    values,
    documents
)

NameError: name 'values' is not defined

---

## 10. HTTP-Session Statistiken & Abschluss

Zum Abschluss zeigen wir alle HTTP-Statistiken und eine finale Zusammenfassung.

### Was haben wir erreicht? ‚úÖ

1. **Company-Resolve**: Aus "Siemens" wurde eine eindeutige `company_id`
2. **Available Dimensions**: Liste aller verf√ºgbaren Datenpunkte geholt
3. **Values**: Systematisch alle Werte abgerufen
4. **Documents** (optional): Dokument-Metadaten gesammelt
5. **Raw-Persistenz**: Alles in JSONL gespeichert
6. **Run-Log**: Vollst√§ndige Dokumentation des Durchlaufs

### N√§chste Schritte

Im n√§chsten Teil der Pipeline:
- **Transform Layer**: Rohdaten ins kanonische Schema √ºberf√ºhren
- **Mock-Rooms**: Events strukturiert speichern
- **AI & Visualisierung**: Use-Cases implementieren

In [None]:
# Finale Statistiken f√ºr BEIDE Sessions
print("\n" + "="*70)
print("üìä FINALE HTTP STATISTIKEN")
print("="*70)

print("\n? API SERVICE (https://dataland.com/api)")
print("-" * 70)
api_session.print_stats()

print("\n? DOCUMENTS SERVICE (https://dataland.com/documents)")
print("-" * 70)
doc_session.print_stats()

# Gesamt-Statistiken
api_stats = api_session.get_stats()
doc_stats = doc_session.get_stats()
total_requests = api_stats["total_requests"] + doc_stats["total_requests"]
total_retries = api_stats["total_retries"] + doc_stats["total_retries"]
total_rate_limits = api_stats["rate_limits_hit"] + doc_stats["rate_limits_hit"]

print("\nüìà GESAMT-STATISTIKEN (beide Services)")
print("-" * 70)
print(f"Total Requests: {total_requests}")
print(f"Total Retries: {total_retries}")
print(f"Total Rate Limits: {total_rate_limits}")
print("="*70)

---

## üé≠ MOCK-DATEN (Entwicklungs-Fallback)

Falls **weder API noch Documents** erfolgreich waren, erstellen wir Mock-Daten f√ºr die Entwicklung.

### Wann wird Mock-Modus aktiviert?

- Kein `DATALAND_TOKEN` in `.env`
- Alle Company-Lookup Endpoints fehlgeschlagen (HTTP 404/401)
- Network-Probleme (Timeout, Firewall)
- Dataland-API offline

### Was wird gemockt?

1. **Company Info:** Mock-Unternehmen mit fester `company_id`
2. **Dimensions:** 6 typische ESG-Dimensionen (CO‚ÇÇ, Energie, Wasser, etc.)
3. **Values:** Realistische Werte f√ºr Testdaten
4. **Documents:** 2 Mock-Sustainability-Reports

### Vorteil:

Du kannst die **gesamte Pipeline testen**, ohne Zugang zur echten API. Ideal f√ºr:
- Entwicklung ohne API-Zugang
- Offline-Arbeit
- CI/CD Tests
- Demo-Pr√§sentationen

---

In [None]:
# Mock-Daten f√ºr Entwicklung (falls echte API nicht verf√ºgbar)
def create_mock_data_if_needed():
    """
    Erstellt Mock-Daten, falls keine echten Daten abgerufen wurden.
    """
    global company_info, dimensions, values_stats, documents
    
    # Pr√ºfe, ob wir Mock-Daten brauchen
    need_mock = (
        company_info is None or
        dimensions is None or
        (values_stats and values_stats["success"] == 0)
    )
    
    if not need_mock:
        print("‚úÖ Echte Daten vorhanden - keine Mock-Daten n√∂tig")
        return
    
    print("\n" + "="*70)
    print("üé≠ MOCK-MODUS: Erstelle Testdaten")
    print("="*70)
    
    # Mock Company Info
    if company_info is None:
        company_info = {
            "company_id": "mock_siemens_001",
            "name": "Siemens AG (Mock)",
            "sector": "Industrials",
            "country": "Germany"
        }
        print("‚úÖ Mock Company Info erstellt")
    
    # Mock Dimensions
    if dimensions is None:
        dimensions = [
            {"dataPointId": "co2_scope1_2023", "indicator": "CO2_Scope1", 
             "period": "2023", "unit": "Mt"},
            {"dataPointId": "co2_scope2_2023", "indicator": "CO2_Scope2",
             "period": "2023", "unit": "Mt"},
            {"dataPointId": "energy_2023", "indicator": "Energy_Consumption",
             "period": "2023", "unit": "TWh"},
            {"dataPointId": "renewable_2023", "indicator": "Renewable_Energy_Share",
             "period": "2023", "unit": "%"},
            {"dataPointId": "water_2023", "indicator": "Water_Consumption",
             "period": "2023", "unit": "Mio. m¬≥"},
            {"dataPointId": "waste_2023", "indicator": "Waste_Recycling_Rate",
             "period": "2023", "unit": "%"}
        ]
        print(f"‚úÖ Mock Dimensions erstellt ({len(dimensions)} Datenpunkte)")
    
    # Mock Values - simuliere erfolgreichen Abruf
    if values_stats is None or values_stats["success"] == 0:
        # Erstelle Mock-Values-Datei
        timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
        values_file = CONFIG["raw_dir"] / f"values_mock_{timestamp}.jsonl"
        
        mock_values = [
            {"dataPointId": "co2_scope1_2023", "value": 1.24, "unit": "Mt", 
             "quality": "verified"},
            {"dataPointId": "co2_scope2_2023", "value": 0.86, "unit": "Mt",
             "quality": "verified"},
            {"dataPointId": "energy_2023", "value": 4.5, "unit": "TWh",
             "quality": "verified"},
            {"dataPointId": "renewable_2023", "value": 70, "unit": "%",
             "quality": "estimated"},
            {"dataPointId": "water_2023", "value": 3.2, "unit": "Mio. m¬≥",
             "quality": "verified"},
            {"dataPointId": "waste_2023", "value": 78, "unit": "%",
             "quality": "verified"}
        ]
        
        for val in mock_values:
            envelope = create_envelope(
                "/data-points/values",
                200,
                {"dataPointId": val["dataPointId"]},
                val
            )
            append_jsonl(values_file, envelope)
        
        values_stats = {
            "total": len(mock_values),
            "success": len(mock_values),
            "failed": 0,
            "skipped_duplicates": 0,
            "errors": []
        }
        print(f"‚úÖ Mock Values erstellt ({len(mock_values)} Werte)")
    
    # Mock Documents
    if documents is None:
        documents = [
            {
                "documentId": "doc_2023_sus",
                "title": "Sustainability Report 2023",
                "year": 2023,
                "documentType": "SustainabilityReport",
                "url": "https://example.com/siemens_sus_2023.pdf"
            },
            {
                "documentId": "doc_2022_sus",
                "title": "Sustainability Report 2022",
                "year": 2022,
                "documentType": "SustainabilityReport",
                "url": "https://example.com/siemens_sus_2022.pdf"
            }
        ]
        
        # Speichere in document_search.jsonl
        envelope = create_envelope(
            "/documents/search",
            200,
            {"q": CONFIG["company_query"]},
            {"documents": documents}
        )
        append_jsonl(CONFIG["raw_dir"] / "document_search.jsonl", envelope)
        print(f"‚úÖ Mock Documents erstellt ({len(documents)} Dokumente)")
    
    print("\n‚úÖ Mock-Daten-Setup abgeschlossen")
    print("   ‚Üí Sie k√∂nnen jetzt mit der Transform-Phase fortfahren")


# Mock-Daten erstellen falls n√∂tig
create_mock_data_if_needed()

In [None]:
class DatalandConnector:
    """
    Connector-Klasse f√ºr die Dataland API.
    
    Diese Klasse kapselt alle Funktionen, die wir brauchen, um:
    - Mit der Dataland API zu kommunizieren
    - Daten abzurufen
    - Fehler zu behandeln
    """
    
    def __init__(self, config: Dict[str, Any]):
        """
        Initialisiert den Connector mit der Konfiguration.
        
        Args:
            config: Dictionary mit API-Konfiguration (URL, Keys, etc.)
        """
        self.base_url = config["base_url"]
        self.api_key = config["api_key"]
        self.timeout = config["timeout"]
        self.max_retries = config["max_retries"]
        
        # Session f√ºr effizientere HTTP-Requests (Connection Pooling)
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        })
        
        print("‚úÖ DatalandConnector initialisiert")
    
    def _make_request(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """
        F√ºhrt einen HTTP-GET Request an einen API-Endpoint aus.
        
        Diese interne Methode (Pr√§fix _) sollte nicht direkt aufgerufen werden.
        Sie wird von den √∂ffentlichen Methoden verwendet.
        
        Args:
            endpoint: Der API-Endpoint (z.B. "/companies/12345")
            params: Optional - Query-Parameter als Dictionary
            
        Returns:
            Dictionary mit den API-Response-Daten
            
        Raises:
            Exception: Bei Netzwerk- oder API-Fehlern
        """
        url = f"{self.base_url}{endpoint}"
        
        for attempt in range(1, self.max_retries + 1):
            try:
                print(f"üîÑ Request an {endpoint} (Versuch {attempt}/{self.max_retries})...")
                
                response = self.session.get(
                    url,
                    params=params,
                    timeout=self.timeout
                )
                
                # HTTP-Fehler pr√ºfen (4xx, 5xx Status Codes)
                response.raise_for_status()
                
                print(f"‚úÖ Erfolgreiche Antwort (Status {response.status_code})")
                return response.json()
                
            except requests.exceptions.Timeout:
                print(f"‚è±Ô∏è Timeout bei Versuch {attempt}")
                if attempt == self.max_retries:
                    raise Exception(f"Timeout nach {self.max_retries} Versuchen")
                    
            except requests.exceptions.HTTPError as e:
                print(f"‚ùå HTTP-Fehler: {e}")
                if response.status_code >= 500 and attempt < self.max_retries:
                    print(f"   Server-Fehler, versuche erneut...")
                    continue
                raise
                
            except requests.exceptions.RequestException as e:
                print(f"‚ùå Netzwerk-Fehler: {e}")
                if attempt == self.max_retries:
                    raise
    
    def get_company_data(self, company_id: str) -> Dict[str, Any]:
        """
        Ruft alle verf√ºgbaren Daten f√ºr ein Unternehmen ab.
        
        Args:
            company_id: Die eindeutige ID des Unternehmens (z.B. ISIN)
            
        Returns:
            Dictionary mit allen Unternehmensdaten
        """
        print(f"\nüìä Hole Daten f√ºr Unternehmen: {company_id}")
        endpoint = f"/companies/{company_id}"
        return self._make_request(endpoint)
    
    def get_company_reports(self, company_id: str, year: Optional[int] = None) -> List[Dict[str, Any]]:
        """
        Ruft Nachhaltigkeitsberichte f√ºr ein Unternehmen ab.
        
        Args:
            company_id: Die eindeutige ID des Unternehmens
            year: Optional - Filtert nach einem bestimmten Jahr
            
        Returns:
            Liste von Berichten als Dictionaries
        """
        print(f"\nüìÑ Hole Berichte f√ºr Unternehmen: {company_id}")
        if year:
            print(f"   Gefiltert nach Jahr: {year}")
        
        endpoint = f"/companies/{company_id}/reports"
        params = {"year": year} if year else None
        
        response = self._make_request(endpoint, params)
        return response.get("reports", [])
    
    def get_company_metrics(self, company_id: str) -> List[Dict[str, Any]]:
        """
        Ruft Nachhaltigkeits-Metriken f√ºr ein Unternehmen ab.
        
        Metriken sind quantitative Werte wie:
        - CO‚ÇÇ-Emissionen
        - Energieverbrauch
        - Wasserverbrauch
        - etc.
        
        Args:
            company_id: Die eindeutige ID des Unternehmens
            
        Returns:
            Liste von Metriken als Dictionaries
        """
        print(f"\nüìà Hole Metriken f√ºr Unternehmen: {company_id}")
        endpoint = f"/companies/{company_id}/metrics"
        
        response = self._make_request(endpoint)
        return response.get("metrics", [])

print("‚úÖ DatalandConnector-Klasse definiert")

---

## 4. Connector initialisieren und testen

Jetzt erstellen wir eine Instanz unseres Connectors und testen die Verbindung.

In [None]:
# Connector-Instanz erstellen
connector = DatalandConnector(CONFIG)

print("\n" + "="*60)
print("üöÄ Connector ist bereit f√ºr API-Calls!")
print("="*60)

---

## 5. Mock-Daten f√ºr Entwicklung und Tests

**Wichtiger Hinweis:** Da wir m√∂glicherweise noch keinen echten Dataland-API-Zugang haben, erstellen wir Mock-Daten, die realistisch sind.

### Warum Mock-Daten?

Mock-Daten erlauben es uns:
- **Ohne API-Zugang** zu entwickeln
- **Reproduzierbare Tests** durchzuf√ºhren
- Die **Pipeline-Logik** zu testen, ohne auf externe Systeme angewiesen zu sein
- **Kosten zu sparen** (viele APIs sind kostenpflichtig)

Sp√§ter k√∂nnen wir die Mock-Daten einfach durch echte API-Calls ersetzen.

In [None]:
def create_mock_dataland_response() -> Dict[str, Any]:
    """
    Erstellt Mock-Daten, die einer echten Dataland-API-Antwort √§hneln.
    
    Diese Funktion simuliert, was die Dataland API zur√ºckgeben w√ºrde.
    
    Returns:
        Dictionary mit simulierten Unternehmensdaten
    """
    return {
        "company": {
            "id": "DE0007236101",
            "name": "Siemens AG",
            "sector": "Industrials",
            "country": "Germany",
            "website": "https://www.siemens.com"
        },
        "reports": [
            {
                "id": "rep_2023_001",
                "year": 2023,
                "title": "Sustainability Report 2023",
                "url": "https://dataland.com/reports/siemens-2023",
                "sections": [
                    {
                        "section": "Klimastrategie",
                        "text": "Siemens hat sich verpflichtet, bis 2030 klimaneutral zu werden. Das Unternehmen reduziert kontinuierlich seine CO‚ÇÇ-Emissionen und investiert massiv in erneuerbare Energien."
                    },
                    {
                        "section": "Emissionsreduktion",
                        "text": "Im Gesch√§ftsjahr 2023 konnte Siemens seine Scope-1- und Scope-2-Emissionen um 15% im Vergleich zum Vorjahr reduzieren. Dies wurde durch Effizienzsteigerungen und den Wechsel zu erneuerbaren Energien erreicht."
                    },
                    {
                        "section": "Circular Economy",
                        "text": "Siemens f√∂rdert die Kreislaufwirtschaft durch Produktdesign, das Recycling und Wiederverwendung erleichtert. 78% der Produktionsabf√§lle werden recycelt."
                    }
                ]
            },
            {
                "id": "rep_2022_001",
                "year": 2022,
                "title": "Sustainability Report 2022",
                "url": "https://dataland.com/reports/siemens-2022",
                "sections": [
                    {
                        "section": "Energieverbrauch",
                        "text": "Der Energieverbrauch von Siemens betrug 2022 insgesamt 4,2 TWh, wovon 65% aus erneuerbaren Quellen stammten."
                    }
                ]
            }
        ],
        "metrics": [
            {
                "id": "met_001",
                "indicator": "CO2_Scope1",
                "name": "CO‚ÇÇ-Emissionen Scope 1",
                "value": 1.24,
                "unit": "Mt",
                "period": "2023",
                "description": "Direkte Emissionen aus eigenen oder kontrollierten Quellen"
            },
            {
                "id": "met_002",
                "indicator": "CO2_Scope2",
                "name": "CO‚ÇÇ-Emissionen Scope 2",
                "value": 0.86,
                "unit": "Mt",
                "period": "2023",
                "description": "Indirekte Emissionen aus eingekaufter Energie"
            },
            {
                "id": "met_003",
                "indicator": "Energy_Consumption",
                "name": "Gesamtenergieverbrauch",
                "value": 4.5,
                "unit": "TWh",
                "period": "2023",
                "description": "Gesamter Energieverbrauch aller Standorte"
            },
            {
                "id": "met_004",
                "indicator": "Renewable_Energy_Share",
                "name": "Anteil erneuerbarer Energien",
                "value": 70,
                "unit": "%",
                "period": "2023",
                "description": "Anteil erneuerbarer Energien am Gesamtverbrauch"
            },
            {
                "id": "met_005",
                "indicator": "Water_Consumption",
                "name": "Wasserverbrauch",
                "value": 3.2,
                "unit": "Mio. m¬≥",
                "period": "2023",
                "description": "Gesamter Frischwasserverbrauch"
            },
            {
                "id": "met_006",
                "indicator": "Waste_Recycling_Rate",
                "name": "Recyclingquote Abfall",
                "value": 78,
                "unit": "%",
                "period": "2023",
                "description": "Anteil recycelter Produktionsabf√§lle"
            }
        ],
        "metadata": {
            "source": "Dataland",
            "retrieved_at": datetime.now().isoformat(),
            "api_version": "v1",
            "data_quality": "verified"
        }
    }

# Mock-Daten erstellen
mock_data = create_mock_dataland_response()

print("‚úÖ Mock-Daten erstellt")
print(f"\nüìä √úbersicht der Mock-Daten:")
print(f"  - Unternehmen: {mock_data['company']['name']}")
print(f"  - Anzahl Berichte: {len(mock_data['reports'])}")
print(f"  - Anzahl Metriken: {len(mock_data['metrics'])}")
print(f"  - Datenquelle: {mock_data['metadata']['source']}")

---

## 6. Rohdaten inspizieren

Schauen wir uns die Struktur der abgerufenen Daten genauer an. Das ist wichtig, um zu verstehen:
- Welche Felder vorhanden sind
- Wie die Daten strukturiert sind
- Was wir im Transform-Schritt verarbeiten m√ºssen

In [None]:
# Sch√∂ner formatierter Output der Rohdaten
print("="*80)
print("üì¶ ROHDATEN VON DATALAND (JSON-Format)")
print("="*80)
print(json.dumps(mock_data, indent=2, ensure_ascii=False))
print("\n" + "="*80)

---

## 7. Datenqualit√§t pr√ºfen

Bevor wir die Daten weiterverarbeiten, sollten wir einige grundlegende Qualit√§tspr√ºfungen durchf√ºhren.

### Was pr√ºfen wir?

- Sind alle erwarteten Felder vorhanden?
- Sind die Datentypen korrekt?
- Gibt es fehlende Werte?
- Sind die Werte plausibel?

In [None]:
def validate_dataland_response(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    F√ºhrt grundlegende Validierungen der API-Response durch.
    
    Args:
        data: Die zu validierende API-Response
        
    Returns:
        Dictionary mit Validierungsergebnissen
    """
    results = {
        "valid": True,
        "errors": [],
        "warnings": [],
        "summary": {}
    }
    
    # 1. Pr√ºfe Hauptstruktur
    required_keys = ["company", "reports", "metrics", "metadata"]
    for key in required_keys:
        if key not in data:
            results["errors"].append(f"Fehlendes Feld: {key}")
            results["valid"] = False
    
    # 2. Pr√ºfe Company-Daten
    if "company" in data:
        company = data["company"]
        if "name" not in company or not company["name"]:
            results["errors"].append("Firmenname fehlt")
            results["valid"] = False
        results["summary"]["company_name"] = company.get("name", "N/A")
    
    # 3. Pr√ºfe Berichte
    if "reports" in data:
        reports = data["reports"]
        results["summary"]["report_count"] = len(reports)
        
        if len(reports) == 0:
            results["warnings"].append("Keine Berichte vorhanden")
        
        # Pr√ºfe jeden Bericht
        for i, report in enumerate(reports):
            if "year" not in report:
                results["warnings"].append(f"Bericht {i}: Jahr fehlt")
            if "sections" not in report or len(report["sections"]) == 0:
                results["warnings"].append(f"Bericht {i}: Keine Sections vorhanden")
    
    # 4. Pr√ºfe Metriken
    if "metrics" in data:
        metrics = data["metrics"]
        results["summary"]["metric_count"] = len(metrics)
        
        if len(metrics) == 0:
            results["warnings"].append("Keine Metriken vorhanden")
        
        # Pr√ºfe jede Metrik
        for i, metric in enumerate(metrics):
            required_metric_fields = ["indicator", "value", "unit", "period"]
            for field in required_metric_fields:
                if field not in metric:
                    results["warnings"].append(f"Metrik {i}: Feld '{field}' fehlt")
            
            # Pr√ºfe, ob value numerisch ist
            if "value" in metric:
                try:
                    float(metric["value"])
                except (ValueError, TypeError):
                    results["errors"].append(f"Metrik {i}: value ist nicht numerisch")
                    results["valid"] = False
    
    return results

# Validierung durchf√ºhren
validation_results = validate_dataland_response(mock_data)

print("\n" + "="*80)
print("üîç DATENQUALIT√ÑTS-PR√úFUNG")
print("="*80)

if validation_results["valid"]:
    print("‚úÖ Daten sind g√ºltig!\n")
else:
    print("‚ùå Daten haben Fehler!\n")

print("üìä Zusammenfassung:")
for key, value in validation_results["summary"].items():
    print(f"  - {key}: {value}")

if validation_results["errors"]:
    print("\n‚ùå Fehler:")
    for error in validation_results["errors"]:
        print(f"  - {error}")

if validation_results["warnings"]:
    print("\n‚ö†Ô∏è Warnungen:")
    for warning in validation_results["warnings"]:
        print(f"  - {warning}")

print("\n" + "="*80)

---

## 8. Zusammenfassung: Connector/Ingest

### Was haben wir erreicht? ‚úÖ

1. **Connector-Klasse erstellt**: Eine professionelle, wiederverwendbare Klasse f√ºr API-Kommunikation
2. **Fehlerbehandlung**: Retry-Logik, Timeouts, HTTP-Error-Handling
3. **Mock-Daten**: Realistische Testdaten f√ºr die Entwicklung
4. **Validierung**: Qualit√§tspr√ºfung der eingehenden Daten

### Daten√ºbersicht

Wir haben folgende Rohdaten gesammelt:

In [None]:
# Sch√∂ne Zusammenfassung ausgeben
print("\n" + "="*80)
print("üìã ZUSAMMENFASSUNG DER GESAMMELTEN DATEN")
print("="*80)

print(f"\nüè¢ Unternehmen: {mock_data['company']['name']}")
print(f"   Sektor: {mock_data['company']['sector']}")
print(f"   Land: {mock_data['company']['country']}")

print(f"\nüìÑ Berichte: {len(mock_data['reports'])} St√ºck")
for report in mock_data['reports']:
    print(f"   - {report['year']}: {report['title']}")
    print(f"     Sections: {len(report['sections'])}")

print(f"\nüìà Metriken: {len(mock_data['metrics'])} St√ºck")
for metric in mock_data['metrics']:
    print(f"   - {metric['name']}: {metric['value']} {metric['unit']} ({metric['period']})")

print(f"\nüîñ Metadaten:")
print(f"   Quelle: {mock_data['metadata']['source']}")
print(f"   Abgerufen: {mock_data['metadata']['retrieved_at']}")
print(f"   API-Version: {mock_data['metadata']['api_version']}")

print("\n" + "="*80)
print("‚úÖ CONNECTOR/INGEST ABGESCHLOSSEN")
print("="*80)
print("\n‚û°Ô∏è  N√§chster Schritt: Transform Layer (Daten ins kanonische Schema √ºberf√ºhren)")

---

## ‚úÖ Company-Lookup: Robuste Fallback-Strategie implementiert!

### üéØ Implementierte L√∂sung

Da die Dataland-Swagger **keinen eindeutigen Company-Search-Endpoint** dokumentiert, haben wir eine **dreistufige Fallback-Strategie** implementiert:

#### 1Ô∏è‚É£ **Prim√§rweg: API-Service Probing**
```python
search_company_via_api(api_session, query, raw_dir)
```

**Probiert systematisch:**
- `/metadata/companies/search?q=Siemens`
- `/metadata/companies?query=Siemens`
- `/companies/search?q=Siemens`
- `/companies?name=Siemens`
- `/entities/search?q=Siemens`
- `/api/companies/search?q=Siemens`

**Persistenz:** Alle Versuche ‚Üí `raw/company_lookup_api.jsonl`

#### 2Ô∏è‚É£ **Fallback: Documents-Service Probing**
```python
search_company_via_documents(doc_session, query, raw_dir)
```

**Probiert systematisch:**
- `/?q=Siemens`
- `/search?q=Siemens`
- `/?query=Siemens`
- `/search?query=Siemens`
- `/companies/search?q=Siemens`
- `/documents/search?q=Siemens`

**Persistenz:** Alle Versuche ‚Üí `raw/company_lookup_documents.jsonl`

#### 3Ô∏è‚É£ **Last-Resort: Mock-Daten**
```python
create_mock_data_if_needed()
```

Falls beide Wege fehlschlagen, aktiviert sich automatisch der Mock-Modus.

---

### üìä Vorteile dieser Architektur

| Feature | Beschreibung |
|---------|--------------|
| **Resilience** | System funktioniert auch bei API-√Ñnderungen |
| **Debugging** | Alle Probes in JSONL ‚Üí einfache Fehleranalyse |
| **Flexibility** | Neue Endpoints via CONFIG hinzuf√ºgen |
| **Transparency** | Klare Logs zeigen, welcher Weg erfolgreich war |
| **Offline-Ready** | Mock-Modus f√ºr Entwicklung ohne API |

---

### üîç Swagger-Recherche Ergebnis

**API-Swagger:** `https://dataland.com/api/swagger-ui/index.html`
- ‚úÖ `/metadata/available-data-dimensions` (dokumentiert)
- ‚ùå `/metadata/companies/search` (nicht gefunden)
- ‚ùì Company-Lookup-Endpoint unklar

**Documents-Swagger:** `https://dataland.com/documents/swagger-ui/index.html`
- ‚úÖ Document-Operationen (exists, update, extend)
- ‚ùå Expliziter Company-Search (nicht gefunden)

**‚Üí Daher: Probing-Ansatz ist die robusteste L√∂sung!**

---

### üß™ N√§chste Schritte

1. **Teste das Notebook:**
   ```bash
   # Erstelle .env mit Token
   echo "DATALAND_TOKEN=your_token_here" > .env
   
   # F√ºhre Notebook aus
   jupyter notebook mvp_pipeline.ipynb
   ```

2. **Analysiere Logs:**
   ```bash
   # Welcher Endpoint war erfolgreich?
   cat raw/company_lookup_api.jsonl | jq '.status'
   cat raw/company_lookup_documents.jsonl | jq '.status'
   ```

3. **Update CONFIG:**
   - Falls ein Endpoint erfolgreich ist ‚Üí nur diesen in CONFIG behalten
   - Reduziere Probing-Liste auf funktionierende Endpoints

4. **Transform-Layer:**
   - JSONL ‚Üí MetricEvents & TextEvents
   - Mock-Rooms Schema
   - Visualisierungen

---

### üí° F√ºr Produktiv-Umgebung

Sobald du den **exakten funktionierenden Endpoint** identifiziert hast:

```python
# Anstatt 6 Probes:
CONFIG["company_lookup_api_candidates"] = [
    {"path": "/metadata/companies/search", "params": {"q": None}}  # ‚Üê nur dieser
]
```

Das reduziert API-Calls und verbessert Performance!

---

---

## üß™ Quick-Test: Welcher Endpoint funktioniert?

Nach dem ersten Run kannst du schnell pr√ºfen, welcher Endpoint erfolgreich war:

In [None]:
# Quick-Check: Welcher Endpoint war erfolgreich?
import json

print("="*70)
print("üîç ENDPOINT-ANALYSE")
print("="*70)

# Pr√ºfe API-Lookup
api_lookup_file = CONFIG["raw_dir"] / "company_lookup_api.jsonl"
if api_lookup_file.exists():
    print("\nüìÇ API-Service Probes:")
    with open(api_lookup_file, 'r') as f:
        for line in f:
            entry = json.loads(line)
            status = entry.get("status", "?")
            endpoint = entry.get("endpoint", "?")
            symbol = "‚úÖ" if status == 200 else "‚ùå"
            print(f"   {symbol} {endpoint} ‚Üí HTTP {status}")
else:
    print("\n‚ö†Ô∏è  Keine API-Lookup Versuche gefunden")

# Pr√ºfe Documents-Lookup
doc_lookup_file = CONFIG["raw_dir"] / "company_lookup_documents.jsonl"
if doc_lookup_file.exists():
    print("\nüìÇ Documents-Service Probes:")
    with open(doc_lookup_file, 'r') as f:
        for line in f:
            entry = json.loads(line)
            status = entry.get("status", "?")
            endpoint = entry.get("endpoint", "?")
            symbol = "‚úÖ" if status == 200 else "‚ùå"
            print(f"   {symbol} {endpoint} ‚Üí HTTP {status}")
else:
    print("\n‚ö†Ô∏è  Keine Documents-Lookup Versuche gefunden")

print("\n" + "="*70)
print("üí° Tipp: Erfolgreiche Endpoints (‚úÖ) in CONFIG √ºbernehmen!")
print("="*70)

---

## üèóÔ∏è Architektur-√úbersicht: Company-Lookup mit Fallback

```mermaid
graph TD
    A[Start: Company Query 'Siemens'] --> B{API-Service Probing}
    
    B -->|6 Endpoint-Kandidaten| C[/metadata/companies/search]
    B --> D[/metadata/companies]
    B --> E[/companies/search]
    B --> F[/companies]
    B --> G[/entities/search]
    B --> H[/api/companies/search]
    
    C -->|HTTP 200| SUCCESS1[‚úÖ Company ID gefunden]
    D -->|HTTP 200| SUCCESS1
    E -->|HTTP 200| SUCCESS1
    F -->|HTTP 200| SUCCESS1
    G -->|HTTP 200| SUCCESS1
    H -->|HTTP 200| SUCCESS1
    
    C -->|HTTP 404/401| FALLBACK{Documents-Service Probing}
    D -->|HTTP 404/401| FALLBACK
    E -->|HTTP 404/401| FALLBACK
    F -->|HTTP 404/401| FALLBACK
    G -->|HTTP 404/401| FALLBACK
    H -->|HTTP 404/401| FALLBACK
    
    FALLBACK -->|6 Endpoint-Kandidaten| I[/]
    FALLBACK --> J[/search]
    FALLBACK --> K[/companies/search]
    
    I -->|HTTP 200| SUCCESS2[‚úÖ Company ID gefunden]
    J -->|HTTP 200| SUCCESS2
    K -->|HTTP 200| SUCCESS2
    
    I -->|HTTP 404/401| MOCK[üé≠ Mock-Modus aktivieren]
    J -->|HTTP 404/401| MOCK
    K -->|HTTP 404/401| MOCK
    
    SUCCESS1 --> NEXT[Weiter zu Dimensions]
    SUCCESS2 --> NEXT
    MOCK --> NEXT
    
    NEXT --> DIMS[/metadata/available-data-dimensions]
    DIMS --> VALUES[/data-points/values]
    VALUES --> DOCS[/documents/search]
    DOCS --> END[‚úÖ Ingest Complete]
    
    style SUCCESS1 fill:#90EE90
    style SUCCESS2 fill:#90EE90
    style MOCK fill:#FFD700
    style END fill:#90EE90
```

**Legende:**
- üü¢ **Gr√ºn:** Erfolgreicher Pfad (Company ID gefunden)
- üü° **Gelb:** Fallback-Modus (Mock-Daten)
- **Alle Probes:** Werden in JSONL persistiert f√ºr Debugging

---