In [286]:
import json
from typing import Any, Dict, Optional


def parse_entry(entry: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extract all timing, size, metadata fields, and raw content text from a single HAR entry.
    """
    metrics: Dict[str, Any] = {}

    # Top-level metadata
    metrics['priority']           = entry.get('_priority')
    metrics['resourceType']       = entry.get('_resourceType')
    metrics['pageref']            = entry.get('pageref')
    metrics['connection_id']      = entry.get('connection')
    metrics['server_ip_address']  = entry.get('serverIPAddress')
    metrics['startedDateTime']    = entry.get('startedDateTime')
    metrics['time_total_ms']      = entry.get('time')

    # — Request block —
    req = entry.get('request', {})
    metrics['request_method']         = req.get('method')
    metrics['request_url']            = req.get('url')
    metrics['request_httpVersion']    = req.get('httpVersion')
    metrics['request_headers_count']  = len(req.get('headers', []))
    metrics['request_query_count']    = len(req.get('queryString', []))
    metrics['request_cookies_count']  = len(req.get('cookies', []))
    metrics['request_headers_size']   = req.get('headersSize')
    metrics['request_body_size']      = req.get('bodySize')

    # Post-data
    post = req.get('postData')
    if post:
        metrics['postData_mimeType']     = post.get('mimeType')
        text = post.get('text')
        if text is not None:
            metrics['postData_text_length'] = len(text)

    # — Response block —
    res = entry.get('response', {})
    metrics['response_status']         = res.get('status')
    metrics['response_httpVersion']    = res.get('httpVersion')
    metrics['response_headers_count']  = len(res.get('headers', []))
    metrics['response_cookies_count']  = len(res.get('cookies', []))
    metrics['response_headers_size']   = res.get('headersSize')
    metrics['response_body_size']      = res.get('bodySize')

    # Content sub-block
    content = res.get('content', {})
    metrics['content_size']            = content.get('size')
    metrics['content_mimeType']        = content.get('mimeType')
    metrics['content_text']            = content.get('text')  # raw response text
    # (_transferSize is the true on-the-wire bytes including headers)
    metrics['transfer_size']           = res.get('_transferSize')

    # — Cache info —
    cache = entry.get('cache', {})
    metrics['cache_beforeRequest']     = cache.get('beforeRequest')
    metrics['cache_afterRequest']      = cache.get('afterRequest')

    # — Detailed timings —
    timings = entry.get('timings', {})
    for phase, t in timings.items():
        metrics[f'time_{phase}_ms']     = t

    return metrics


def find_and_parse_entry_by_url(har_path: str, target_url: str) -> Optional[Dict[str, Any]]:
    """
    Load a HAR file, find the first entry matching target_url, and parse it.
    Returns a dict of metrics including raw content text, or raises ValueError if not found.
    """
    with open(har_path, 'r', encoding='utf-8') as f:
        har = json.load(f)

    entries = har.get('entries') or har.get('log', {}).get('entries', [])
    for idx, entry in enumerate(entries):
        req = entry.get('request', {})
        if req.get('url') == target_url:
            metrics = parse_entry(entry)
            metrics['entry_index'] = idx
            return metrics

    raise ValueError(f"No entry with URL '{target_url}' found in HAR file.")

import json
from typing import Any, List, Dict

def parse_sse_stream(content_text: str) -> List[Dict[str, Any]]:
    """
    Parse a Server-Sent Events (SSE) stream into a list of events.
    Each event is a dict with:
      - 'eventType': the SSE event type (e.g., 'delta', 'delta_encoding')
      - 'payload': the JSON-decoded data or raw string if not valid JSON
    """
    entries: List[Dict[str, Any]] = []
    last_event_type: str = None

    # Split on double newlines to separate SSE blocks
    for chunk in content_text.strip().split("\n\n"):
        lines = chunk.splitlines()
        event_type = None
        data_parts: List[str] = []

        for line in lines:
            if line.startswith("event:"):
                event_type = line[len("event:"):].strip()
            elif line.startswith("data:"):
                # collect the data payload lines
                data_parts.append(line[len("data:"):].strip())

        # If no event: line, reuse the last seen event type
        if event_type is not None:
            last_event_type = event_type
        event_type = event_type or last_event_type

        # Combine all data parts into one payload string
        data_str = "".join(data_parts)

        # Try to JSON-decode; fallback to raw string on failure
        try:
            payload: Any = json.loads(data_str)
        except json.JSONDecodeError:
            payload = data_str

        entries.append({
            "eventType": event_type,
            "payload": payload
        })

    return entries

from typing import Any, List, Dict

def extract_search_queries(parsed_events: List[Dict[str, Any]]) -> List[str]:
    queries: List[str] = []

    for i, ev in enumerate(parsed_events):
        if ev.get("eventType") != "delta":
            continue
        d = ev["payload"]
        if not isinstance(d, dict):
            continue

        # look for a batch-patch that appends to /message/metadata
        if d.get("o") == "patch" and isinstance(d.get("v"), list):
            for op in d["v"]:
                if op.get("p") == "/message/metadata" and op.get("o") == "append":
                    meta = op.get("v")
                    if isinstance(meta, dict):
                        sqs = meta.get("search_queries")
                        if isinstance(sqs, list):
                            for sq in sqs:
                                q = sq.get("q")
                                if isinstance(q, str):
                                    queries.append(q)

    return queries

from typing import Any, List, Dict

def count_urls(parsed_events: List[Dict[str, Any]]) -> None:
    """
    Given a list of SSE events as returned by parse_sse_stream(),
    prints how many URLs GPT accessed (during the search phase)
    vs. how many it actually returned in its final response.

    We start collecting “accessed” URLs once we see the assistant
    append "Searching" to its thoughts, and we stop (and begin collecting
    “given” URLs) once we hit the finished_successfully separator.
    """
    accessed: List[str] = []
    given:    List[str] = []
    seen_search_marker = False
    after_sep = False
    after_sep_counter = 0

    for ev in parsed_events:
        if ev.get("eventType") != "delta":
            continue

        d = ev["payload"]
        if not isinstance(d, dict):
            continue

        # # 1) detect search kickoff
        # if d.get("p") == "/message/content/thoughts/0/summary" and d.get("v") == "Searching":
        #     seen_search_marker = True
        #     continue

        # 2) detect separator (end of search phase)
        if (d.get("p") == "/message/status"
            and d.get("o") == "replace"
            and d.get("v") == "finished_successfully"):
            after_sep_counter += 1
            if after_sep_counter == 2:
                after_sep = True
            continue

        # 3a) pre-separator: collect any search_result_group URLs
        if not after_sep:
            # Case A: delta contains a list of search_result_group objects
            if isinstance(d.get("v"), list):
                for item in d["v"]:
                    if isinstance(item, dict) and item.get("type") == "search_result_group":
                        for entry in item.get("entries", []):
                            url = entry.get("url")
                            if url:
                                accessed.append(url)

            # Case B: delta is metadata/search_result_groups/.../entries
            #    where d["v"] is a list of plain search_result dicts
            if isinstance(d.get("p"), str) and "/search_result_groups" in d["p"] and d["p"].endswith("/entries"):
                for entry in d["v"]:
                    if isinstance(entry, dict):
                        url = entry.get("url")
                        if url:
                            accessed.append(url)

        # # 3b) post-separator: collect only url_moderation URLs
        # if after_sep and d.get("type") == "url_moderation":
        #     um = d.get("url_moderation_result", {})
        #     url = um.get("full_url")
        #     if url:
        #         given.append(url)

        # 3b) post-separator: collect only url_moderation URLs
        if after_sep and d.get("type") == "url_moderation":
            um = d.get("url_moderation_result", {})
            url = um.get("full_url")
            if url:
                given.append(url)



    # print counts
    # print(f"GPT accessed URLs ({len(accessed)}):")
    # for u in accessed:
    #     print("  ", u)

    # print(f"\nURLs in assistant response ({len(given)}):")
    # for u in given:
    #     print("  ", u)
    
    return accessed, given

## Directory

In [287]:
import pandas as pd
from pathlib import Path
import json

url = "https://chatgpt.com/backend-api/f/conversation"

rows = []
har_dir = Path("transactional_hars")
pattern = "network-logs-prompt-*"

for har_path in sorted(har_dir.glob(pattern)):
    har_filename = har_path.name
    # print(har_filename)
    try:
        # 1) parse the main entry
        m = find_and_parse_entry_by_url(str(har_path), url)

        # 2) parse SSE and extract queries + URL counts
        content_text = m.get("content_text", "")
        parsed_events = parse_sse_stream(content_text)
        queries = extract_search_queries(parsed_events)
        accessed, given = count_urls(parsed_events)

        # 3) collect the row
        rows.append({
            "file": har_filename,
            "total_time_ms": m["time_total_ms"],
            "transfer_size": m["transfer_size"],
            "queries": queries,
            "n_accessed": len(accessed),
            "n_given": len(given),
        })

    except ValueError:
        # URL not found → record NaNs/empty
        rows.append({
            "file": har_filename,
            "total_time_ms": None,
            "transfer_size": None,
            "queries": [],
            "n_accessed": 0,
            "n_given": 0,
        })

# build DataFrame
df = pd.DataFrame(rows).set_index("file")

# show it
df


Unnamed: 0_level_0,total_time_ms,transfer_size,queries,n_accessed,n_given
file,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
network-logs-prompt-1.har,39649.04,125025,"[how to order prescription eyeglasses online, ...",19,0
network-logs-prompt-10.har,43639.23,158396,[Download Outlook for Windows 10 official site...,13,7
network-logs-prompt-11.har,35028.976,103060,"[CSS full screen page loader spinner overlay, ...",24,0
network-logs-prompt-12.har,24939.454,104640,"[Microsoft Access query date function Date(), ...",17,0
network-logs-prompt-13.har,38400.007,112531,"[best apps to get free internet access, apps t...",29,12
network-logs-prompt-14.har,30012.687,75716,"[UPS TV uninterruptible power supply for tv, b...",13,10
network-logs-prompt-15.har,37590.292,93106,[Microsoft High Definition Audio Device driver...,11,0
network-logs-prompt-16.har,59085.14,131332,"[System Mechanic official download Iolo, Iolo ...",15,8
network-logs-prompt-17.har,46336.585,152116,[ASCVD risk calculator ACC AHA Pooled Cohort E...,18,13
network-logs-prompt-18.har,31427.887,131218,"[best currency converter online 2025, XE curre...",21,14


In [288]:
df['total_time_sec'] = df['total_time_ms'] / 1000

# Prepare nicely formatted summary using the correct units (time in sec, transfer size already in B)
average_time = df['total_time_sec'].mean()
average_transfer_size = df['transfer_size'].mean()
average_accessed = df['n_accessed'].mean()
average_given = df['n_given'].mean()

min_time = df['total_time_sec'].min()
max_time = df['total_time_sec'].max()

min_transfer_size = df['transfer_size'].min()
max_transfer_size = df['transfer_size'].max()

min_accessed = df['n_accessed'].min()
max_accessed = df['n_accessed'].max()

min_given = df['n_given'].min()
max_given = df['n_given'].max()

print(f"Average time: {average_time:.2f} seconds (min: {min_time:.2f}s, max: {max_time:.2f}s)")
print(f"Average transfer size: {average_transfer_size:.2f} B (min: {min_transfer_size}B, max: {max_transfer_size}B)")
print(f"Average URLs accessed: {average_accessed:.2f} (min: {min_accessed}, max: {max_accessed})")
# print(f"Average URLs given: {average_given:.2f} (min: {min_given}, max: {max_given})")

Average time: 38.42 seconds (min: 5.55s, max: 83.70s)
Average transfer size: 114060.88 B (min: 10045B, max: 207793B)
Average URLs accessed: 18.35 (min: 0, max: 39)


## For One File

In [289]:
har_file = "network-logs-prompt-1.har"

url = "https://chatgpt.com/backend-api/f/conversation"

metrics = find_and_parse_entry_by_url(har_file, url)

print(json.dumps(metrics, indent=2))

FileNotFoundError: [Errno 2] No such file or directory: 'network-logs-prompt-1.har'

In [None]:
content_text = metrics["content_text"]

parsed_events = parse_sse_stream(content_text)
for ev in parsed_events:
    print(f"{ev['eventType']}: {ev['payload']}\n")

delta_encoding: v1

delta_encoding: {'type': 'resume_conversation_token', 'token': 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJjb25kdWl0X3V1aWQiOiIyZjExMDM2ZDBkMGI0YjM1YmRjOGNkNDRmZDNhMDI3MiIsImNvbmR1aXRfbG9jYXRpb24iOiIxMC4xMjguNDIuMTc5OjgzMDgiLCJpYXQiOjE3NTQwNzY4MDUsImV4cCI6MTc1NDA3NzQwNX0.m-PTb19viSpiZqCzP0kBEqvqintFvNiobMIeyux4cv4s-NgtANtOieGQPkG8P3KRA2Qs8SZmonjiRMypozqNHQ', 'conversation_id': '688d1685-7f1c-8333-81ed-87c0a924704c'}

delta: {'p': '', 'o': 'add', 'v': {'message': {'id': 'fd915373-bde7-4d2f-ace2-dffb2af6afa3', 'author': {'role': 'system', 'name': None, 'metadata': {}}, 'create_time': None, 'update_time': None, 'content': {'content_type': 'text', 'parts': ['']}, 'status': 'finished_successfully', 'end_turn': True, 'weight': 0.0, 'metadata': {'is_visually_hidden_from_conversation': True, 'model_switcher_deny': []}, 'recipient': 'all', 'channel': None}, 'conversation_id': '688d1685-7f1c-8333-81ed-87c0a924704c', 'error': None}, 'c': 0}

delta: {'v': {'message': {'id': 'e2416

In [None]:
queries = extract_search_queries(parsed_events)
for q in queries:
    print("Found search query:", q)

In [None]:
accessed, given = count_urls(parsed_events)

GPT accessed URLs (31):
   https://www.espn.com/soccer/story/_/id/45522470/premier-league-fixtures-schedule-2025-26-full
   https://www.nbcsports.com/soccer/news/premier-league-2025-26-fixtures-released-dates-schedule-how-to-watch-live
   https://en.wikipedia.org/wiki/List_of_Premier_League_seasons
   https://www.usanetwork.com/usa-insider/premier-league-2025-2026-schedule-dates-times-how-to-watch
   https://talksport.com/football/3300096/community-shield-date-time-channel-live-stream-free-liverpool-vs-crystal-palace/
   https://www.youtube.com/watch?v=rW4oy6XoeIQ
   https://www.skysports.com/football/news/11661/13394008/premier-league-2025-26-fixtures-dates-schedule-liverpool-vs-arsenal-live-on-sky-sports-in-august
   https://www.reuters.com/sports/soccer/amorim-is-right-man-man-utd-says-ugarte-2025-07-29/
   https://www.p1travel.com/en/blog/start-football-season-2025-2026
   https://royalbluemersey.sbnation.com/2025/7/26/24466951/evertons-transfer-priorities-the-summer-of-2025-right-