# Revoke Microsoft Entra ID User Sessions (Python)

This notebook replicates the Logic App playbook flow in Python: it accepts a list of account entities, resolves each to a Microsoft Entra ID user (by AAD user ID or constructed UPN), and calls Microsoft Graph to revoke sign-in sessions.

Allowed services only: Microsoft 365, Entra ID (Microsoft Graph), Defender XDR, Purview, Exchange Online, SharePoint/OneDrive. No Sentinel references are used.

## Prerequisites
- An Entra ID app registration with application permissions: `User.ReadWrite.All` on Microsoft Graph, and admin consent granted.
- Client credentials for the app (tenant ID, client ID, client secret) available as environment variables.
- Network egress to `login.microsoftonline.com` and `graph.microsoft.com`.

## Input schema (array of accounts)
Each account object supports:
- `aadUserId` (string | optional) — Entra ID user object ID (preferred if provided)
- `accountName` (string | optional) — e.g., samAccountName or left of UPN
- `upnSuffix` (string | optional) — e.g., contoso.com (right of UPN)
- `Name` (string | optional) — friendly display for reporting

Resolution logic: if `aadUserId` is present and non-empty, use it. Otherwise, build UPN from `accountName@upnSuffix`.

In [None]:
# Imports and configuration
import os
import json
from typing import Dict, List, Tuple
import time
import requests

# Read credentials from environment
TENANT_ID = os.getenv('TENANT_ID')
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')

# Safety: dry-run avoids calling external services while you validate inputs
DRY_RUN = True  # set to False to perform real calls

# Basic validation to help users
if not DRY_RUN:
    missing = [k for k,v in [('TENANT_ID', TENANT_ID), ('CLIENT_ID', CLIENT_ID), ('CLIENT_SECRET', CLIENT_SECRET)] if not v]
    if missing:
        raise ValueError(f"Missing required environment variables: {', '.join(missing)}")

GRAPH_SCOPE = 'https://graph.microsoft.com/.default'
GRAPH_BASE = 'https://graph.microsoft.com/v1.0'

In [None]:
# Token acquisition using client credentials (no external libraries required)
def get_graph_token(tenant_id: str, client_id: str, client_secret: str) -> Tuple[str, int]:
    """Return (access_token, expires_at_epoch).
    Raises an exception on failure.
    """
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    data = {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
        'scope': GRAPH_SCOPE
    }
    resp = requests.post(token_url, data=data, timeout=30)
    if not resp.ok:
        raise RuntimeError(f"Token request failed: {resp.status_code} {resp.text}")
    js = resp.json()
    token = js.get('access_token')
    if not token:
        raise RuntimeError(f"No access_token in response: {json.dumps(js)[:400]}")
    expires_in = int(js.get('expires_in', 0))
    expires_at = int(time.time()) + expires_in
    return token, expires_at

# Simple in-notebook token cache
_TOKEN_CACHE = {'token': None, 'exp': 0}

def ensure_token() -> str:
    if DRY_RUN:
        return 'dry-run-token'
    now = int(time.time())
    if _TOKEN_CACHE['token'] and _TOKEN_CACHE['exp'] - 120 > now:
        return _TOKEN_CACHE['token']
    token, exp = get_graph_token(TENANT_ID, CLIENT_ID, CLIENT_SECRET)
    _TOKEN_CACHE['token'] = token
    _TOKEN_CACHE['exp'] = exp
    return token

In [None]:
# Helpers: resolve user identifier and revoke sessions
def resolve_user_identifier(account: Dict) -> str:
    """Return the identifier accepted by Graph: either AAD object ID or UPN.
    Prefers aadUserId; otherwise constructs UPN from accountName and upnSuffix.
    Raises ValueError if insufficient data.
    """
    aad_id = (account.get('aadUserId') or '').strip()
    if aad_id:
        return aad_id
    acct = (account.get('accountName') or '').strip()
    suffix = (account.get('upnSuffix') or '').strip()
    if acct and suffix:
        return f"{acct}@{suffix}"
    raise ValueError('Account requires either aadUserId or accountName+upnSuffix')

def revoke_signin_sessions(user_identifier: str) -> Tuple[bool, Dict]:
    """Call POST /users/{id-or-upn}/revokeSignInSessions.
    Returns (success, response_json_or_error).
    """
    url = f"{GRAPH_BASE}/users/{user_identifier}/revokeSignInSessions"
    if DRY_RUN:
        # Simulate a success response without calling the network
        return True, {'simulated': True, 'url': url}
    token = ensure_token()
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }
    resp = requests.post(url, headers=headers, timeout=30)
    if resp.status_code == 200:
        # Graph returns {"value": true} per docs when sessions were revoked
        try:
            return True, resp.json()
        except Exception:
            return True, {'raw': resp.text}
    # Non-200 considered failure; return details for logging
    try:
        return False, resp.json()
    except Exception:
        return False, {'status': resp.status_code, 'raw': resp.text}

In [None]:
# Batch runner: iterate accounts and aggregate affected users
def run_revocations(accounts: List[Dict]) -> Dict:
    affected_lines: List[str] = []
    results: List[Dict] = []
    for acct in accounts:
        try:
            ident = resolve_user_identifier(acct)
        except Exception as e:
            results.append({'account': acct, 'success': False, 'error': str(e)})
            continue
        ok, payload = revoke_signin_sessions(ident)
        display = acct.get('Name') or ident
        if ok:
            affected_lines.append(f"{display} [{ident}]")
        results.append({'account': acct, 'identifier': ident, 'success': ok, 'response': payload})
    summary = '\n'.join(affected_lines)
    return {
        'affected': affected_lines,
        'summary': summary,
        'results': results
    }

In [None]:
# Example usage (mock accounts). Toggle DRY_RUN=False and set env vars to perform real calls.
example_accounts = [
    { 'Name': 'Adele Vance', 'aadUserId': '', 'accountName': 'adelev', 'upnSuffix': 'contoso.com' },
    { 'Name': 'MOD Administrator', 'aadUserId': '00000000-0000-0000-0000-000000000001', 'accountName': '', 'upnSuffix': '' }
]
out = run_revocations(example_accounts)
print('Affected users:')
print(out['summary'] or '(none)')
# Optionally, here you could integrate with other allowed services (e.g., add a comment to a Defender XDR incident)
# using their respective APIs. This notebook intentionally avoids Sentinel-specific actions.

In [None]:
# Dry-run tests: validate behavior without tokens or external calls
# This cell does not require TENANT_ID/CLIENT_ID/CLIENT_SECRET.
# It sets DRY_RUN=True and uses mock inputs to exercise the logic.

DRY_RUN = True  # ensure no network calls

# 1) UPN constructed from accountName + upnSuffix
# 2) AAD object ID provided
# 3) Missing identifier data should be captured as an error in results
_test_accounts = [
    {"Name": "User One", "aadUserId": "", "accountName": "userone", "upnSuffix": "contoso.com"},
    {"Name": "User Two", "aadUserId": "00000000-0000-0000-0000-000000000002", "accountName": "", "upnSuffix": ""},
    {"Name": "Broken", "accountName": "no-suffix"}
]

out = run_revocations(_test_accounts)

# Expectations: 2 successes (first two), 1 failure (third)
all_results = out.get("results", [])
successes = sum(1 for r in all_results if r.get("success"))
failures = sum(1 for r in all_results if not r.get("success"))

print(f"Successes: {successes}, Failures: {failures}")
print("Summary (dry-run):\n" + (out.get("summary") or "(none)"))

# Simple assertions for quick feedback
assert len(all_results) == 3, "Expected 3 results"
assert successes == 2, "Expected 2 successes in dry-run"
assert failures == 1, "Expected 1 failure for missing identifiers"

print("Dry-run tests passed ✅")

In [None]:
# Retrieve Entra ID sign-in logs and directory audit logs for a user via Microsoft Graph
# Requirements (app permissions): AuditLog.Read.All, Directory.Read.All (admin consent required)
# Optionally use `start` and `end` (ISO 8601 strings) to narrow results.

import re
from datetime import datetime, timedelta, timezone
from urllib.parse import urlencode

ISO8601 = "%Y-%m-%dT%H:%M:%SZ"

def _is_guid(value: str) -> bool:
    return bool(re.fullmatch(r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}", value or ""))


def _graph_get(path: str, params: dict | None = None) -> dict:
    """Minimal GET helper with bearer token. Returns JSON dict (may include '@odata.nextLink')."""
    if DRY_RUN:
        return {"value": [], "simulated": True, "path": path, "params": params or {}}
    token = ensure_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json"
    }
    url = f"{GRAPH_BASE}{path}"
    resp = requests.get(url, headers=headers, params=params, timeout=60)
    if not resp.ok:
        try:
            raise RuntimeError(f"GET {path} failed: {resp.status_code} {resp.text}")
        finally:
            pass
    return resp.json()


def _graph_paged(path: str, params: dict | None = None, page_limit: int = 5) -> list:
    """Fetch paged results (basic) with an optional page limit to avoid huge downloads."""
    results = []
    page = _graph_get(path, params)
    results.extend(page.get("value", []))
    next_link = page.get("@odata.nextLink")
    count = 1
    while next_link and count < page_limit and not DRY_RUN:
        resp = requests.get(next_link, headers={"Authorization": f"Bearer {ensure_token()}"}, timeout=60)
        if not resp.ok:
            break
        page = resp.json()
        results.extend(page.get("value", []))
        next_link = page.get("@odata.nextLink")
        count += 1
    return results


def _ensure_user_id(identifier: str) -> str:
    """If identifier is a GUID, return as-is; else resolve UPN to user id using /users/{id}?$select=id."""
    if _is_guid(identifier):
        return identifier
    if DRY_RUN:
        return "00000000-0000-0000-0000-0000000000AB"  # simulated user id
    data = _graph_get(f"/users/{identifier}", params={"$select": "id,userPrincipalName"})
    uid = data.get("id")
    if not uid:
        raise RuntimeError(f"Could not resolve user id for '{identifier}'")
    return uid


def get_signins_for_user(identifier: str, top: int = 50, start: str | None = None, end: str | None = None) -> list:
    """Return a list of sign-in log entries from auditLogs/signIns filtered to the user.
    identifier can be a GUID (user id) or UPN. Uses userId filter for GUIDs, userPrincipalName otherwise.
    """
    if DRY_RUN:
        return [{
            "simulated": True,
            "category": "signIn",
            "userDisplayName": "Simulated User",
            "userPrincipalName": identifier if not _is_guid(identifier) else "user@contoso.com",
            "userId": identifier if _is_guid(identifier) else "00000000-0000-0000-0000-0000000000AB",
            "createdDateTime": datetime.now(timezone.utc).strftime(ISO8601),
            "status": {"errorCode": 0, "additionalDetails": None}
        }]
    # Build filter
    if _is_guid(identifier):
        filter_expr = f"userId eq '{identifier}'"
    else:
        filter_expr = f"userPrincipalName eq '{identifier}'"
    if start:
        filter_expr += f" and createdDateTime ge {start}"
    if end:
        filter_expr += f" and createdDateTime le {end}"
    params = {
        "$filter": filter_expr,
        "$orderby": "createdDateTime desc",
        "$top": top
    }
    return _graph_paged("/auditLogs/signIns", params=params, page_limit=3)


def get_directory_audits_for_user(identifier: str, top: int = 50, start: str | None = None, end: str | None = None) -> list:
    """Return a list of directory audit log entries associated with the user. Prefer filtering by userId."""
    if DRY_RUN:
        return [{
            "simulated": True,
            "category": "UserManagement",
            "activityDisplayName": "Update user",
            "initiatedBy": {"user": {"id": "00000000-0000-0000-0000-0000000000AB", "userPrincipalName": "user@contoso.com"}},
            "activityDateTime": datetime.now(timezone.utc).strftime(ISO8601),
        }]
    user_id = _ensure_user_id(identifier)
    # Filter: either initiatedBy/user/id eq '{user_id}' or targetResources contains user id
    # We'll start with initiatedBy filter.
    filters = [f"initiatedBy/user/id eq '{user_id}'"]
    if start:
        filters.append(f"activityDateTime ge {start}")
    if end:
        filters.append(f"activityDateTime le {end}")
    filter_expr = " and ".join(filters)
    params = {
        "$filter": filter_expr,
        "$orderby": "activityDateTime desc",
        "$top": top
    }
    return _graph_paged("/auditLogs/directoryAudits", params=params, page_limit=3)


def default_time_range(hours: int = 24) -> tuple[str, str]:
    """Convenience helper to return (start, end) in ISO 8601 for last N hours."""
    end_dt = datetime.now(timezone.utc)
    start_dt = end_dt - timedelta(hours=hours)
    return start_dt.strftime(ISO8601), end_dt.strftime(ISO8601)


In [None]:
# Example: fetch last 24h of logs for a user (dry-run safe)
# Provide either a UPN (user@contoso.com) or an AAD user id (GUID) as identifier.

identifier = "user@contoso.com"  # change to your target user UPN or objectId
start, end = default_time_range(24)

print("Time range:", start, "to", end)

signins = get_signins_for_user(identifier, top=20, start=start, end=end)
diraudits = get_directory_audits_for_user(identifier, top=20, start=start, end=end)

print(f"Sign-ins returned: {len(signins)}")
print(f"Directory audits returned: {len(diraudits)}")

# Show a compact preview of the first few records
from itertools import islice

print("\nSample sign-ins:")
for item in islice(signins, 0, 3):
    print({k: item.get(k) for k in ("createdDateTime", "userPrincipalName", "userId", "status")})

print("\nSample directory audits:")
for item in islice(diraudits, 0, 3):
    print({k: item.get(k) for k in ("activityDateTime", "activityDisplayName", "category")})
