In [1]:
import os
import time
import base64
import secrets
from typing import Dict, Any, Optional
from urllib.parse import urlencode

import requests
from dotenv import load_dotenv
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from jose import jwt

load_dotenv()


True

In [2]:
# --- Required env (keep names EXACTLY as requested) ---
XERO_CLIENT_ID = os.environ["XERO_CLIENT_ID"]
XERO_CLIENT_SECRET = os.environ["XERO_CLIENT_SECRET"]
XERO_REDIRECT_URI = os.environ["XERO_REDIRECT_URI"]

# --- OpenID Discovery (once at startup) ---
DISCOVERY_URL = "https://identity.xero.com/.well-known/openid-configuration"
_disc = requests.get(DISCOVERY_URL, timeout=10).json()
AUTHORIZATION_ENDPOINT = _disc["authorization_endpoint"]
TOKEN_ENDPOINT = _disc["token_endpoint"]
JWKS_URI = _disc["jwks_uri"]
ISSUER = _disc["issuer"]

# --- Minimal in-memory stores (replace later with Redis/DB if needed) ---
SESS: Dict[str, Dict[str, Any]] = {}   # sid -> {state, nonce, ts}
JWKS_CACHE: Dict[str, Any] = {}
JWKS_TS = 0

In [3]:
SCOPES = "openid profile email"


def _basic_auth() -> str:
    return base64.b64encode(f"{XERO_CLIENT_ID}:{XERO_CLIENT_SECRET}".encode()).decode()


def _get_jwks() -> Dict[str, Any]:
    global JWKS_CACHE, JWKS_TS
    if not JWKS_CACHE or (time.time() - JWKS_TS) > 6 * 3600:
        JWKS_CACHE = requests.get(JWKS_URI, timeout=10).json()
        JWKS_TS = time.time()
    return JWKS_CACHE


def _pick_jwk_for_kid(jwks: Dict[str, Any], kid: Optional[str]) -> Dict[str, Any]:
    keys = jwks.get("keys", [])
    for k in keys:
        if k.get("kid") == kid:
            return k
    raise HTTPException(401, "Signing key not found for token kid")

In [7]:
def signin_xero():
    # Create short-lived state in a cookie-backed session
    sid = secrets.token_urlsafe(32)
    state = secrets.token_urlsafe(24)
    nonce = secrets.token_urlsafe(24)
    SESS[sid] = {"state": state, "nonce": nonce, "ts": time.time()}

    params = {
        "response_type": "code",
        "client_id": XERO_CLIENT_ID,
        "redirect_uri": XERO_REDIRECT_URI,
        "scope": SCOPES,
        "state": state,
        "nonce": nonce,
    }
    url = f"{AUTHORIZATION_ENDPOINT}?{urlencode(params)}"
    print(url)
    resp = RedirectResponse(url, status_code=302)
    # Basic cookie for correlating callback (use Secure/HttpOnly/SameSite in prod)
    resp.set_cookie("sid", sid, httponly=True, secure=True, samesite="lax", max_age=900)
    return resp

In [8]:
redirect = signin_xero()

https://login.xero.com/identity/connect/authorize?response_type=code&client_id=6A9419DCCD29452A922631458D1B4420&redirect_uri=http%3A%2F%2F127.0.0.1%3A8765%2Fcallback&scope=openid+profile+email&state=frkjRolnL0qoHCd6HJOcYQLtrNGNFCJj&nonce=Xbcqt7qiwa25-dxHe6VAYCkrGib363HF


In [5]:
def callback_xero(request: Request, code: str = "", state: str = ""):
    # 1) Validate state/session
    sid = request.cookies.get("sid")
    sess = SESS.get(sid or "", {})
    if not sid or not sess or state != sess.get("state"):
        raise HTTPException(400, "Invalid session/state")

    # 2) Exchange code for tokens
    tok = requests.post(
        TOKEN_ENDPOINT,
        headers={
            "Authorization": f"Basic {_basic_auth()}",
            "Content-Type": "application/x-www-form-urlencoded",
        },
        data={
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": XERO_REDIRECT_URI,
        },
        timeout=10,
    )
    if tok.status_code != 200:
        raise HTTPException(400, f"Token exchange failed: {tok.text}")
    token_set = tok.json()
    id_token = token_set.get("id_token")
    if not id_token:
        raise HTTPException(400, "No ID token in response")

    # 3) Verify ID token (RS256 using JWKS)
    #    - Parse header to get kid, select corresponding JWK
    try:
        unverified_header = jwt.get_unverified_header(id_token)
        kid = unverified_header.get("kid")
        jwk = _pick_jwk_for_kid(_get_jwks(), kid)

        claims = jwt.decode(
            id_token,
            jwk,                       # pass the selected JWK
            algorithms=["RS256"],
            audience=XERO_CLIENT_ID,
            issuer=ISSUER,
            options={"verify_at_hash": False},
        )
    except Exception as e:
        raise HTTPException(401, f"Invalid ID token: {e}")

    # 4) Verify nonce
    if claims.get("nonce") != sess.get("nonce"):
        raise HTTPException(401, "Nonce mismatch")

    # 5) Extract identity (you can now create/find your user)
    identity = {
        "sub": claims.get("sub"),
        "email": claims.get("email"),
        "given_name": claims.get("given_name"),
        "family_name": claims.get("family_name"),
        "iat": claims.get("iat"),
        "exp": claims.get("exp"),
        "iss": claims.get("iss"),
        "aud": claims.get("aud"),
    }

    # Clear transient session
    SESS.pop(sid, None)

    # For minimal SSO, just return identity JSON (or redirect to your frontend)
    return JSONResponse({"status": "sso_ok", "provider": "xero", "identity": identity})