# Test: graphfw.core.auth.TokenProvider

Dieses Notebook testet die Authentifizierung gegen Microsoft Graph via **Client Credentials**.

## Voraussetzungen
- `pip install msal`
- Gültige App-Registrierung (Application Permissions) in Azure AD
- Konfiguration via `config.json` **oder** Umgebungsvariablen:
  - `GRAPH_TENANT_ID`, `GRAPH_CLIENT_ID`, `GRAPH_CLIENT_SECRET`

⚠️ Dieses Notebook druckt **keine** Secrets; Tokens werden maskiert angezeigt.

In [30]:
# Imports & Setup
from pathlib import Path
import pandas as pd
import sys

import sys
sys.path.insert(0, "..")  # eine Ebene hoch zum Repo-Root

from graphfw.core.auth import TokenProvider

TokenProvider.__version__

'1.3.0'

In [55]:
# Wenn man Änderungen am Modul auth.py vorgenommen hat, kann man es folgendermaßen neu laden
import sys, importlib
from pathlib import Path

# optional: sicherstellen, dass dein Repo-Root vorne steht
sys.path.insert(0, str(Path("..").resolve()))

import graphfw.core.auth as auth_mod
importlib.invalidate_caches()          # falls Dateien gerade geändert wurden
auth_mod = importlib.reload(auth_mod)  # Modul neu laden

# Symbole neu importieren, damit du die aktualisierte Klasse nutzt
from graphfw.core.auth import TokenProvider
TokenProvider.__version__


'1.4.1'

## Möglichkeit 1: TENANT_ID, CLIENT_ID, CLIENT_SECRET aus einer JSON Datei

### Möglichkeit 1.1. einfaches auslesen des Einstellunge aus einer config.json ohne Fehlerhandling.
Diese Methode ist für Testsystem gut geeignet bei dem man sicher ist, dass die config.json dort richtig ist
Hierbei ist das Attribut return_status=False, dh. es kommt zu einer Fehlermeldung, wenn irgendwa nicht stimmt.

In [31]:
# 1) Aus JSON
tp = TokenProvider.from_json(r"C:\python\Scripts\config.json")
token = tp.get_access_token()  # verwendet Graph .default
print("OK, Token-Länge:", len(token))

OK, Token-Länge: 1854


### Möglichkeit 1.2. Auslesen der config.json jedoch mit Fehlerhandling
Will man keine Fehlermeldung haben, wenn die config.json entweder nicht existiert oder die Einträge nicht stimmen, dann kann man auch return_status = True setzten und dann sich seine eigene Logik zusammenbauen

In [None]:
from graphfw.core.auth import TokenProvider

CONFIG = r"C:\python\Scripts\config.json"

# 1) Provider aus JSON laden (ohne Exception)
tp, ok_load, err_load = TokenProvider.from_json(CONFIG, return_status=True)

if not ok_load or tp is None:
    print(f"Konfiguration konnte nicht geladen werden: {err_load}")
else:
    # 2) Token holen (ohne Exception; nutzt Graph .default)
    token, ok_tok, err_tok = tp.get_access_token(return_status=True)
    if ok_tok:
        print("OK, Token-Länge:", len(token))
    else:
        print("Token-Erwerb fehlgeschlagen:", err_tok)


Hier jetzt noch ein Beispiel - wenn man auf unterschiedlichen Systemen die Config immer irgendwo anders liegen hat und das Script über die unterschiedlichen Pfade iterieren soll, um eine funktionierende config.json zu finden

In [42]:
from pathlib import Path

# Kandidaten in Reihenfolge der Priorität
candidates = [
    "./config.json",
    "./secrets/graph.json",
    str(Path.home() / ".config" / "graphfw" / "config.json"),
    r"C:\python\Scripts\config.json"
]

def mask(tok: str) -> str:
    return (tok[:12] + "…" + tok[-8:]) if tok and len(tok) > 24 else (tok or "")

tp = None
used = None
last_err = ""

for p in candidates:
    # 1) Provider aus JSON laden (nicht-wurfend)
    provider, ok_load, err_load = TokenProvider.from_json(p, return_status=True)
    if not ok_load or provider is None:
        print(f"[-] {p}: {err_load}")   # genau die Fehlermeldung, die du wolltest
        last_err = f"{p}: {err_load}"
        continue

    # 2) Token holen (nicht-wurfend)
    token, ok_tok, err_tok = provider.get_token(return_status=True)  # nutzt .default
    if not ok_tok:
        print(f"[-] {p}: Token-Fehler: {err_tok}")
        last_err = f"{p}: {err_tok}"
        continue

    # Erfolg -> merken und abbrechen
    tp, used = provider, p
    print(f"[+] Erfolg mit {p} | Token: {mask(token)}")
    break

if tp is None:
    print("Kein funktionierender Eintrag gefunden.")
    print("Letzter Fehler:", last_err)
else:
    print("Verwende Provider aus:", used)


[-] ./config.json: FileNotFoundError: Config file not found: config.json
[-] ./secrets/graph.json: FileNotFoundError: Config file not found: secrets\graph.json
[-] C:\Users\erhard.rainer\.config\graphfw\config.json: FileNotFoundError: Config file not found: C:\Users\erhard.rainer\.config\graphfw\config.json
[+] Erfolg mit C:\python\Scripts\config.json | Token: eyJ0eXAiOiJK…NiQeJLtw
Verwende Provider aus: C:\python\Scripts\config.json


## Möglichkeit 2:  TENANT_ID, CLIENT_ID, CLIENT_SECRET aus den Umgebungsvariablen
Für Produktivsysteme ist es sinnvoller die IDs und das Secret in einer Umgebungsvariable zu speichern.

### Möglichkeit 2.1: In dieser Methode wird genau festgelegt, welche Environment Variable ausgelesen werden sollen.
Diese Methode führt zu einem Fehler "KeyError:"- wenn die entsprechende Environment Variable nicht existiert


In [35]:
# 2.1 bestimmte Enviroment Schlüssel mit Error
tp_env = TokenProvider.from_values(
    tenant_id=os.environ["GRAPH_TENANT_ID"],
    client_id=os.environ["GRAPH_CLIENT_ID"],
    client_secret=os.environ["GRAPH_CLIENT_SECRET"],
)
token2 = tp_env.get_access_token("https://graph.microsoft.com/.default")

KeyError: 'GRAPH_TENANT_ID'

### Möglichkeit 2.2: In dieser Methode wird der Standard-Pfad der Enviroment-Variablen verwendet
Über den Parameter return_status kann man steuern, ob es zu einer Fehlermeldung kommt, wenn der Schlüssel nicht gefunden wird oder nicht. Ist return_status=True, kommt es zu keiner Fehlermeldung.
Man kann aber über die Variable err auslesen, zu welchem Fehler es gekommen ist zB: ValueError: Environment variables GRAPH_TENANT_ID/_CLIENT_ID/_CLIENT_SECRET required.

In [40]:
# 2.2 Aus ENV (GRAPH_TENANT_ID / GRAPH_CLIENT_ID / GRAPH_CLIENT_SECRET) - mit Error
# Nicht-werfend laden (ENV):
tp_env, ok_env, err_env = TokenProvider.from_env(return_status=True)

if ok_env and tp_env:
    token, ok, err = tp_env.get_token(return_status=True)  # Scope .default automatisch
    print("Token ok?", ok, "| err:", err)
else:
    print(err_env)

ValueError: Environment variables GRAPH_TENANT_ID/_CLIENT_ID/_CLIENT_SECRET required.


Unter Verwendung von return_status=True kann man dann auch komplexe Lösungen bauen, die zB 
(1) am Anfang bei den Standardwerten nachsieht,
(2) dann an einem bestimmten Pfad
(3) in einem JSON

In [None]:
# -*- coding: utf-8 -*-
"""
Beispiel: Auth-Fallback mit return_status=True

Strategie:
(1) ENV-Standardwerte prüfen (GRAPH_TENANT_ID / GRAPH_CLIENT_ID / GRAPH_CLIENT_SECRET)
(2) Fixen Pfad versuchen (AUTH_CONFIG_PATH oder vordefinierte Kandidaten)
(3) JSON-Quelle versuchen (z. B. aus ENV-Variable mit JSON-Inhalt)

Alle Schritte nutzen return_status=True, werfen also keine Exceptions, sondern
liefern (obj|None, succeeded: bool, error_message: str).

Aufruf:
    python scripts/example_auth_fallback.py
oder in Jupyter eine Zelle mit:
    %run -m scripts.example_auth_fallback
"""
from __future__ import annotations

from pathlib import Path
import json
import os
import sys
from typing import Optional, Tuple

# Repo-Root einhängen (falls dieses Skript aus /scripts/ läuft)
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))

from graphfw.core.auth import TokenProvider  # noqa: E402


SCOPE = os.getenv("GRAPH_SCOPE", "https://graph.microsoft.com/.default")

# Kandidaten für Schritt (2)
PREFERRED_PATHS = [
    os.getenv("AUTH_CONFIG_PATH", "").strip() or None,                 # expliziter Pfad via ENV
    str(Path.cwd() / "config.json"),                                   # ./config.json
    str(Path.cwd() / "secrets" / "graph.json"),                        # ./secrets/graph.json
    str(Path.home() / ".config" / "graphfw" / "config.json"),          # ~/.config/graphfw/config.json
]
PREFERRED_PATHS = [p for p in PREFERRED_PATHS if p]  # None entfernen

# JSON-Quelle für Schritt (3) – z. B. Inhalt einer Secret-Variable in CI
JSON_ENV_KEYS = ["GRAPHFW_AZUREAD_JSON", "GRAPH_JSON", "AZUREAD_JSON"]


def _mask(token: str) -> str:
    if not token:
        return ""
    return (token[:12] + "…" + token[-8:]) if len(token) > 24 else token[:6] + "…"


def try_env() -> Tuple[Optional[TokenProvider], bool, str]:
    """Schritt (1): ENV-Standardwerte."""
    tp, ok, err = TokenProvider.from_env(return_status=True)
    return tp, bool(ok and tp is not None), err


def try_paths() -> Tuple[Optional[TokenProvider], bool, str, Optional[str]]:
    """Schritt (2): feste Pfade der Reihe nach testen."""
    last_err = ""
    for p in PREFERRED_PATHS:
        path = Path(p)
        if not path.exists():
            continue
        tp, ok, err = TokenProvider.from_json(path, return_status=True)
        if ok and tp:
            return tp, True, "", str(path)
        last_err = f"{path}: {err}"
    return None, False, last_err or "no candidate path succeeded", None


def try_json_blob() -> Tuple[Optional[TokenProvider], bool, str, Optional[str]]:
    """Schritt (3): JSON aus ENV-Variable(n). Erlaubt zwei Schemata:
       a) {"azuread": {"tenant_id": "...", "client_id": "...", "client_secret": "..."}}
       b) {"tenant_id": "...", "client_id": "...", "client_secret": "..."}
    """
    for key in JSON_ENV_KEYS:
        blob = os.getenv(key, "").strip()
        if not blob:
            continue
        try:
            data = json.loads(blob)
        except Exception as ex:
            return None, False, f"{key} JSON parse error: {ex}", key

        section = data.get("azuread") if isinstance(data, dict) else None
        payload = section if section else data
        tp, ok, err = TokenProvider.from_dict(payload, return_status=True)
        if ok and tp:
            return tp, True, "", key
        return None, False, f"{key}: {err}", key
    return None, False, "no JSON env var provided", None


def resolve_provider() -> Tuple[Optional[TokenProvider], str]:
    """
    Führt die drei Schritte der Reihe nach aus und gibt (provider, source) zurück.
    source ∈ {"env", "path:<pfad>", "json:<envkey>", ""} – leer bei Fehlschlag.
    """
    # (1) ENV
    tp, ok, err = try_env()
    if ok and tp:
        return tp, "env"

    # (2) PATH
    tp, ok, err, used_path = try_paths()
    if ok and tp:
        return tp, f"path:{used_path}"

    # (3) JSON
    tp, ok, err, used_key = try_json_blob()
    if ok and tp:
        return tp, f"json:{used_key}"

    return None, ""


def main() -> int:
    print("TokenProvider Version:", getattr(TokenProvider, "__version__", "<unknown>"))
    tp, source = resolve_provider()
    if not tp:
        print("Kein TokenProvider gefunden. Prüfe ENV/Pfade/JSON.")
        print("Hinweis ENV: GRAPH_TENANT_ID / GRAPH_CLIENT_ID / GRAPH_CLIENT_SECRET")
        print("Hinweis Pfade:", PREFERRED_PATHS)
        print("Hinweis JSON-ENV-Keys:", JSON_ENV_KEYS)
        return 2

    print("Quelle:", source)
    token, ok, err = tp.get_token(return_status=True)  # nutzt .default
    if not ok:
        print("Token-Erwerb fehlgeschlagen:", err)
        return 1

    print("Token OK:", ok, "| Länge:", len(token), "| maskiert:", _mask(token))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())


In [None]:
Das Skript lädt einen TokenProvider auf zwei mögliche Arten – aus einer JSON-Datei oder aus Umgebungsvariablen – und gibt am Ende einfach das Objekt tp zurück

In [None]:
from pathlib import Path
import os, time
from graphfw.core.auth import TokenProvider

# Wähle einen Modus: 'json' oder 'env'
MODE = os.getenv("AUTH_TEST_MODE", "env")  # 'json' | 'env'
CONFIG_PATH = os.getenv("AUTH_CONFIG_PATH", "config.json")

if MODE == "json" and not Path(CONFIG_PATH).exists():
    raise FileNotFoundError(f"Config-Datei nicht gefunden: {CONFIG_PATH}")

if MODE == "json":
    tp = TokenProvider.from_json(CONFIG_PATH)
else:
    # Erwartet: GRAPH_TENANT_ID / GRAPH_CLIENT_ID / GRAPH_CLIENT_SECRET
    tp = TokenProvider.from_env()

tp

ValueError: Environment variables GRAPH_TENANT_ID/_CLIENT_ID/_CLIENT_SECRET required.

## Möglichkeit 3: Zugangsdaten Hardcoded
Das ist die unsicherste Variante, da in diesem Fall die IDs und das Secret Hardcoded sind. Diese Methode eigenet sich nur dann, wenn man neue Zugangsdaten testen möchte, bevor man die config.json erstellt oder die Umgebungsvariablen setzt. 

In [None]:
# 3) Direkt aus Werten (alias früher: from_client_credentials)
tp_val = TokenProvider.from_client_credentials(
    tenant_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    client_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
    client_secret="***"
)
token3 = tp_val.get_access_token(force_refresh=False)

In [56]:
SCOPE = "https://graph.microsoft.com/.default"

t0 = time.time()
token = tp.get_access_token(SCOPE)
t1 = time.time()

masked = token[:16] + "…" + token[-8:]
print("Token erhalten (maskiert):", masked)
print("Dauer (s):", round(t1 - t0, 3))

Token erhalten (maskiert): eyJ0eXAiOiJKV1Qi…NiQeJLtw
Dauer (s): 0.0


## Optional: Force-Refresh
Erzwingt das Umgehen des Caches.

In [57]:
t0 = time.time()
token2, ok, err = tp.get_access_token(SCOPE, force_refresh=True, return_status=True)
t1 = time.time()
print("OK?", ok, "| err:", err)
if ok:
    print("Token (neu, maskiert):", token2[:16] + "…" + token2[-8:])
    print("Dauer (s):", round(t1 - t0, 3))
    print("Gleiches Token wie zuvor?", token2 == token)


OK? False | err: ValueError: Historically, this method does not support force_refresh behavior. 
