In [1]:
import requests, datetime as dt, time, random, re
from typing import List, Dict, Any, Optional, Iterator
import pandas as pd
from google.colab import userdata

# === REQUIRED CONFIG ===
API_KEY = (userdata.get('compliance') or '').strip()
WS_ID   = (userdata.get('WS_ID') or '').strip()
assert API_KEY, "Missing userdata['compliance']"
assert WS_ID,   "Missing userdata['WS_ID']"

BASE = f"https://api.chatgpt.com/v1/compliance/workspaces/{WS_ID}"
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Accept": "application/json",
    "Content-Type": "application/json",
}

SESSION = requests.Session()
SESSION.headers.update(HEADERS)

# === INVESTIGATION INPUTS ===
TARGET        = "justin.dizon@adventhealth.com"   # email or a user-… id
END_UTC       = dt.datetime.now(dt.timezone.utc)
START_UTC     = END_UTC - dt.timedelta(days=7)

# If you already know the user id, put it here to skip /users lookup entirely
USER_ID_OVERRIDE: Optional[str] = None  # e.g., "user-XXXXXXXXXXXXXXXXXXXXXXXX"

# === PERFORMANCE / RESILIENCE SETTINGS ===
CONV_LIMIT      = 80        # <=500 per spec; drop if 522s persist
USERS_LIMIT     = 200       # default = 200 per spec
MAX_RETRIES     = 6
BASE_BACKOFF    = 0.6
TIMEOUT_CONNECT = 10
TIMEOUT_READ    = 90
PROBE_MIN_SLICE = dt.timedelta(hours=3)

print("Workspace:", WS_ID)
print("Window UTC:", START_UTC.isoformat(), "→", END_UTC.isoformat())


Workspace: 5cce4c27-054b-43f3-b365-4f25fe4619a6
Window UTC: 2025-10-28T00:50:37.338073+00:00 → 2025-11-04T00:50:37.338073+00:00


In [2]:
def _sleep_backoff(attempt: int, base: float = BASE_BACKOFF):
    delay = base * (2 ** attempt) + random.uniform(0, base)
    time.sleep(min(delay, 10))

def _get(url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    timeout = (TIMEOUT_CONNECT, TIMEOUT_READ)
    for attempt in range(MAX_RETRIES):
        try:
            r = SESSION.get(url, params=params or {}, timeout=timeout)
            if r.status_code == 429:
                print("429 rate-limit — backing off")
                _sleep_backoff(attempt); continue
            if r.status_code in (522, 524) or 500 <= r.status_code < 600:
                print(f"Server/edge error {r.status_code} — retrying")
                _sleep_backoff(attempt); continue
            if r.status_code >= 400:
                print("Compliance API error:", r.status_code, r.text[:300])
                r.raise_for_status()
            return r.json()
        except requests.RequestException as e:
            if attempt == MAX_RETRIES - 1: raise
            _sleep_backoff(attempt)
    raise RuntimeError("Unreachable after retries")

def parse_ts_unix(ts) -> Optional[dt.datetime]:
    if ts is None: return None
    try:
        return dt.datetime.fromtimestamp(float(ts), tz=dt.timezone.utc)
    except Exception:
        return None

def to_unix_seconds(d: dt.datetime) -> int:
    if not d.tzinfo: d = d.replace(tzinfo=dt.timezone.utc)
    return int(d.timestamp())


In [3]:
def _first_page_conversations(start_utc: dt.datetime, limit: int) -> Dict[str, Any]:
    url = f"{BASE}/conversations"
    params = {"since_timestamp": to_unix_seconds(start_utc), "limit": min(limit, 500)}
    return _get(url, params=params)

def _stream_conversations_from_first_page(first_page: Dict[str, Any],
                                          start_utc: dt.datetime,
                                          end_utc: dt.datetime,
                                          limit: int) -> Iterator[Dict[str, Any]]:
    url = f"{BASE}/conversations"
    for conv in first_page.get("data", []): yield conv
    after = first_page.get("last_id") if first_page.get("has_more") else None
    params = {"since_timestamp": to_unix_seconds(start_utc)}
    while after:
        page = _get(url, params={**params, "limit": limit, "after": after})
        for conv in page.get("data", []): yield conv
        after = page.get("last_id") if page.get("has_more") else None

def iter_conversations_adaptive(start_utc: dt.datetime,
                                end_utc: dt.datetime,
                                limit: int,
                                min_slice: dt.timedelta = PROBE_MIN_SLICE) -> Iterator[Dict[str, Any]]:
    """Bisects only on 522/5xx to narrow failing intervals."""
    stack = [(start_utc, end_utc)]
    while stack:
        s, e = stack.pop()
        if s > e: continue
        try:
            first = _first_page_conversations(s, limit)
            yield from _stream_conversations_from_first_page(first, s, e, limit)
        except requests.HTTPError as err:
            code = getattr(err.response, "status_code", None)
            if code and (code in (522, 524) or 500 <= code < 600):
                if (e - s) <= min_slice: raise
                mid = s + (e - s) / 2
                stack.append((s, mid)); stack.append((mid, e))
            else: raise


In [4]:
USER_ID_RE = re.compile(r"^user-[A-Za-z0-9]{8,}$")

def resolve_user_id(identifier: str,
                    start_utc: dt.datetime,
                    end_utc: dt.datetime) -> str:
    if USER_ID_OVERRIDE:
        print("Using USER_ID_OVERRIDE."); return USER_ID_OVERRIDE
    ident = (identifier or "").strip()
    if USER_ID_RE.match(ident):
        print("Identifier is already a user_id."); return ident
    assert "@" in ident, "Identifier must be an email or a user-… id"
    email = ident.lower()

    # === Spec-accurate /users crawl ===
    print(f"Resolving user ID via /users for {email}")
    url = f"{BASE}/users"
    after = None
    limit = min(max(USERS_LIMIT, 1), 200)
    last_call = 0.0
    pages = 0
    while True:
        # pace to ≤ 50 requests/min
        elapsed = time.time() - last_call
        if elapsed < 1.25: time.sleep(1.25 - elapsed)
        params = {"limit": limit}
        if after: params["after"] = after
        data = _get(url, params=params)
        last_call = time.time(); pages += 1
        for u in data.get("data", []):
            if (u.get("email") or "").lower() == email:
                print(f"Found via /users on page {pages}."); return u.get("id")
        if not data.get("has_more"):
            print("End of /users — no match, falling back to probe."); break
        after = data.get("last_id")
        if pages % 10 == 0: print(f"…/users pages scanned: {pages}")

    # === Conversation probe fallback ===
    print("Probing conversations to infer user id…")
    scanned = 0
    for conv in iter_conversations_adaptive(start_utc, end_utc, limit=min(40, CONV_LIMIT)):
        scanned += 1
        if (conv.get("user_email") or "").lower() == email and conv.get("user_id"):
            print("Found via conversation.user_id."); return conv["user_id"]
        for m in ((conv.get("messages") or {}).get("data") or []):
            a = m.get("author") or {}
            if (a.get("email") or "").lower() == email and a.get("id"):
                print("Found via message.author.id."); return a["id"]
        if scanned % 20 == 0: print(f"…probed {scanned} conversations")
    raise RuntimeError(f"Could not resolve user_id for {email}")


In [5]:
def iter_conversations_for_user_window(user_id: str,
                                       start_utc: dt.datetime,
                                       end_utc: dt.datetime,
                                       limit: int = CONV_LIMIT) -> Iterator[Dict[str, Any]]:
    url = f"{BASE}/conversations"
    first = _get(url, params={
        "since_timestamp": to_unix_seconds(start_utc),
        "limit": min(limit, 500),
        "users": user_id
    })
    for conv in first.get("data", []):
        la = parse_ts_unix(conv.get("last_active_at"))
        if la and la > end_utc: continue
        yield conv
    after = first.get("last_id") if first.get("has_more") else None
    while after:
        page = _get(url, params={"after": after, "limit": min(limit, 500), "users": user_id})
        for conv in page.get("data", []):
            la = parse_ts_unix(conv.get("last_active_at"))
            if la and la > end_utc: continue
            yield conv
        after = page.get("last_id") if page.get("has_more") else None

def flatten_messages_from_conversation(conv: Dict[str, Any],
                                       start_utc: dt.datetime,
                                       end_utc: dt.datetime) -> List[Dict[str, Any]]:
    out = []
    for m in ((conv.get("messages") or {}).get("data") or []):
        ts = parse_ts_unix(m.get("created_at"))
        if not ts or ts < start_utc or ts > end_utc: continue
        a = m.get("author") or {}; c = m.get("content") or {}
        files = ((m.get("files") or {}).get("data") or [])
        f_summary = "; ".join(f"{f.get('name','(no name)')}|{f.get('id','')}" for f in files)
        out.append({
            "conversation_id": conv.get("id"),
            "conversation_title": conv.get("title"),
            "message_id": m.get("id"),
            "message_created_at": ts.isoformat(),
            "author_role": a.get("role"),
            "author_id": a.get("id"),
            "author_email": a.get("email"),
            "content_type": c.get("type"),
            "content_value": c.get("value"),
            "file_summary": f_summary,
        })
    return out

def collect_user_messages_window(identifier: str,
                                 start_utc: dt.datetime,
                                 end_utc: dt.datetime) -> List[Dict[str, Any]]:
    user_id = resolve_user_id(identifier, start_utc, end_utc)
    all_rows = []
    scanned = 0
    for conv in iter_conversations_for_user_window(user_id, start_utc, end_utc, limit=CONV_LIMIT):
        scanned += 1
        rows = flatten_messages_from_conversation(conv, start_utc, end_utc)
        all_rows.extend(rows)
        if scanned % 25 == 0:
            print(f"Processed {scanned} conversations; {len(all_rows)} messages so far.")
    print(f"Conversations considered: {scanned}")
    print(f"Messages in window: {len(all_rows)}")
    return all_rows


In [6]:
rows = collect_user_messages_window(TARGET, START_UTC, END_UTC)

if rows:
    df = pd.DataFrame(rows).sort_values("message_created_at")
    display(df.head(20))
    out_path = "/content/user_messages_window.csv"
    df.to_csv(out_path, index=False)
    print(f"Saved: {out_path}")
else:
    print("No messages in that window.")


Resolving user ID via /users for justin.dizon@adventhealth.com
Found via /users on page 3.
Server/edge error 500 — retrying
Server/edge error 500 — retrying
Server/edge error 500 — retrying
Server/edge error 500 — retrying
Server/edge error 500 — retrying
Server/edge error 500 — retrying


RuntimeError: Unreachable after retries

In [None]:
import pytz

eastern = pytz.timezone("US/Eastern")

# Convert and store in a new column
df["message_created_at_et"] = (
    pd.to_datetime(df["message_created_at"], utc=True)
      .dt.tz_convert(eastern)
)

# Preview side-by-side
display(df[["message_created_at", "message_created_at_et"]].head(10))

In [None]:
# ==== Drill-Down Utilities (works on the in-memory df from Cell 6) ====
import json
from typing import Optional

# safety: ensure df exists
try:
    _ = df.head(1)
except NameError:
    raise RuntimeError("No dataframe named `df` found. Run the previous cell that builds `df` first.")

def drill_by_conversation(conversation_id: str,
                          save_csv_path: Optional[str] = "/content/drill_conversation.csv"):
    """
    Show all messages in a single conversation (sorted by time) and optionally save to CSV.
    """
    sub = df[df["conversation_id"] == conversation_id].copy()
    if sub.empty:
        print("No rows found for conversation:", conversation_id)
        return sub
    sub = sub.sort_values("message_created_at")
    display(sub.head(50))
    if save_csv_path:
        sub.to_csv(save_csv_path, index=False)
        print(f"Saved conversation slice to: {save_csv_path}")
    return sub

def drill_by_keyword(pattern: str,
                     case: bool = False,
                     regex: bool = False,
                     save_csv_path: Optional[str] = "/content/drill_keyword.csv"):
    """
    Filter messages where content_value contains `pattern`.
    regex=True enables full regex search.
    """
    col = df["content_value"].fillna("")
    mask = col.str.contains(pattern, case=case, regex=regex, na=False)
    sub = df[mask].copy().sort_values(["conversation_id", "message_created_at"])
    if sub.empty:
        print("No matches found for pattern:", pattern)
        return sub
    display(sub.head(50))
    if save_csv_path:
        sub.to_csv(save_csv_path, index=False)
        print(f"Saved keyword slice to: {save_csv_path}")
    return sub

def drill_thread(message_id: str, before: int = 3, after: int = 3,
                 save_csv_path: Optional[str] = "/content/drill_thread.csv"):
    """
    Show a window of messages around a given message_id within the same conversation.
    """
    # locate the anchor row
    row = df[df["message_id"] == message_id]
    if row.empty:
        print("Message not found:", message_id)
        return pd.DataFrame()
    conv_id = row.iloc[0]["conversation_id"]

    # get full convo slice ordered by time
    conv = df[df["conversation_id"] == conv_id].copy().sort_values("message_created_at").reset_index(drop=True)

    # find index of message within conversation
    idx = conv.index[conv["message_id"] == message_id]
    if len(idx) == 0:
        print("Message not found within its conversation (unexpected).")
        return pd.DataFrame()
    i = int(idx[0])

    start_i = max(0, i - before)
    end_i   = min(len(conv), i + after + 1)
    window = conv.iloc[start_i:end_i].copy()

    display(window)
    if save_csv_path:
        window.to_csv(save_csv_path, index=False)
        print(f"Saved thread window to: {save_csv_path}")
    return window

def export_conversation_json(conversation_id: str,
                             path: str = "/content/conversation_export.json"):
    """
    Export all rows for a conversation to a JSON file (records format).
    """
    sub = df[df["conversation_id"] == conversation_id].copy().sort_values("message_created_at")
    if sub.empty:
        print("No rows found for conversation:", conversation_id)
        return None
    records = sub.to_dict(orient="records")
    with open(path, "w") as f:
        json.dump(records, f, indent=2)
    print(f"Saved JSON export to: {path}  (records: {len(records)})")
    return path

# ---------- quick-use examples (uncomment and adjust) ----------
# 1) Drill the top conversation visible in df
example_conv = df.iloc[0]["conversation_id"]
drill_by_conversation(example_conv)

# 2) Find messages mentioning a keyword (case-insensitive)
# drill_by_keyword("PHI", case=False)

# 3) Show +/- 5 messages around a specific message_id
# example_msg = df.iloc[0]["message_id"]
# drill_thread(example_msg, before=5, after=5)

# 4) Export one conversation’s messages to JSON
# export_conversation_json(example_conv)
