# Semantic Gravity: Research Project Implementation

This notebook implements the research methodology for "Semantic Gravity: Quantifying the Efficiency Gap in Negative Constraint Adherence".

**Steps:**
1.  **Prompt Generation:** Load prompts generated by batch system (see README_PROMPTS.md).
2.  **White Box Analysis:** Use Qwen-2.5-7B-Instruct to calculate Semantic Pressure ($P_{sem}$), test constraint adherence, and generate synonyms.
3.  **Black Box Experiment:** Test GPT-5 models (Nano, Mini, Base) on adherence failure using Batch API.
4.  **Experiment 3 (Mitigation):** Test Anchor Displacement on high-gravity prompts.
5.  **Analysis:** Plot Collapse Curves and analyze the Efficiency Gap.

In [1]:
# Setup & Imports
%pip install -q transformers accelerate bitsandbytes scipy pandas matplotlib openai

import os
import json
import time
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from transformers import AutoModelForCausalLM, AutoTokenizer
from openai import OpenAI

# Configuration
OPENAI_API_KEY = "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"
GPT_MODELS = ["gpt-5-nano-2025-08-07", "gpt-5-mini-2025-08-07", "gpt-5-2025-08-07"]
QWEN_MODEL_ID = "Qwen/Qwen2.5-7B-Instruct"

client = OpenAI(api_key=OPENAI_API_KEY)

print("Setup Complete.")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.4/59.4 MB[0m [31m29.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hSetup Complete.


## Step 1: Load Prompts

**IMPORTANT:** Before running this notebook, generate prompts using the batch system:
```bash
python generate_prompts_batch.py
python check_batch_status.py --monitor
python deduplicate_prompts.py
```

See `README_PROMPTS.md` for details.

In [2]:
# Load prompts generated by batch system
import os
import pandas as pd

# Priority: Load prompts_with_synonyms.csv (most complete), then qwen results, then psem, then raw.
if os.path.exists("prompts_with_synonyms.csv"):
    print("Loading existing prompts_with_synonyms.csv...")
    prompts_df = pd.read_csv("prompts_with_synonyms.csv")
elif os.path.exists("prompts_with_qwen_results.csv"):
    print("Loading existing prompts_with_qwen_results.csv...")
    prompts_df = pd.read_csv("prompts_with_qwen_results.csv")
elif os.path.exists("prompts_with_psem.csv"):
    print("Loading existing prompts_with_psem.csv...")
    prompts_df = pd.read_csv("prompts_with_psem.csv")
elif os.path.exists("prompts.csv"):
    print("Loading prompts.csv...")
    prompts_df = pd.read_csv("prompts.csv")
else:
    print("prompts.csv not found. Downloading from temporary backup...")
    # Download from termbin (uploaded from local machine)
    !wget https://termbin.com/e4wu -O prompts.csv
    if os.path.exists("prompts.csv"):
        prompts_df = pd.read_csv("prompts.csv")
    else:
        raise FileNotFoundError(
            "prompts.csv not found. Please run the batch generation system first.\n"
            "See README_PROMPTS.md for instructions."
        )

print(f"Loaded {len(prompts_df)} prompts")
print(f"\nDistribution by bucket:")
print(prompts_df['bucket'].value_counts().sort_index())

prompts.csv not found. Downloading from temporary backup...
--2025-11-25 20:18:24--  https://termbin.com/e4wu
Resolving termbin.com (termbin.com)... 162.19.243.32
Connecting to termbin.com (termbin.com)|162.19.243.32|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 80011 (78K) [text/plain]
Saving to: ‘prompts.csv’


2025-11-25 20:18:25 (685 KB/s) - ‘prompts.csv’ saved [80011/80011]

Loaded 500 prompts

Distribution by bucket:
bucket
A_Idioms         100
B_Facts          100
C_CommonSense    100
D_Creative       100
E_OOD            100
Name: count, dtype: int64


## Step 2: White Box Analysis (Qwen)

### Step 2a: Calculate Semantic Pressure ($P_{sem}$)
Calculate the probability of the target word appearing in the unconstrained completion.

In [3]:
# Load Qwen
print("Loading Qwen Model...")
tokenizer = AutoTokenizer.from_pretrained(QWEN_MODEL_ID, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    QWEN_MODEL_ID,
    device_map="auto",
    trust_remote_code=True,
    torch_dtype=torch.float16
)
print("Model Loaded.")

def get_p_sem(prompt_text, target_word):
    inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits[0, -1, :]
        probs = torch.softmax(logits, dim=-1)
        
    # Get top 100 tokens to search for variations of target
    top_probs, top_indices = torch.topk(probs, 100)
    
    p_sem = 0.0
    target_clean = target_word.strip().lower()
    
    for prob, idx in zip(top_probs, top_indices):
        token_str = tokenizer.decode([idx])
        if target_clean in token_str.lower():
            p_sem += prob.item()
            
    return p_sem

# Run Analysis
if "p_sem" not in prompts_df.columns:
    print("Calculating P_sem...")
    p_sems = []
    for index, row in prompts_df.iterrows():
        p = get_p_sem(row['prompt'], row['target_word'])
        p_sems.append(p)
        torch.cuda.empty_cache()
        if index % 10 == 0:
            print(f"Processed {index}/{len(prompts_df)}")
            
    prompts_df['p_sem'] = p_sems
    prompts_df.to_csv("prompts_with_psem.csv", index=False)
    print("Saved prompts_with_psem.csv")
else:
    print("P_sem already calculated.")

Loading Qwen Model...


Error while fetching `HF_TOKEN` secret value from your vault: 'Requesting secret HF_TOKEN timed out. Secrets can only be fetched when running from the Colab UI.'.
You are not authenticated with the Hugging Face Hub in this notebook.
If the error persists, please let us know by opening an issue on GitHub (https://github.com/huggingface/huggingface_hub/issues/new).


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/663 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

model-00004-of-00004.safetensors:   0%|          | 0.00/3.56G [00:00<?, ?B/s]

model-00002-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/3.95G [00:00<?, ?B/s]

model-00003-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/243 [00:00<?, ?B/s]

Model Loaded.
Calculating P_sem...
Processed 0/500
Processed 10/500
Processed 20/500
Processed 30/500
Processed 40/500
Processed 50/500
Processed 60/500
Processed 70/500
Processed 80/500
Processed 90/500
Processed 100/500
Processed 110/500
Processed 120/500
Processed 130/500
Processed 140/500
Processed 150/500
Processed 160/500
Processed 170/500
Processed 180/500
Processed 190/500
Processed 200/500
Processed 210/500
Processed 220/500
Processed 230/500
Processed 240/500
Processed 250/500
Processed 260/500
Processed 270/500
Processed 280/500
Processed 290/500
Processed 300/500
Processed 310/500
Processed 320/500
Processed 330/500
Processed 340/500
Processed 350/500
Processed 360/500
Processed 370/500
Processed 380/500
Processed 390/500
Processed 400/500
Processed 410/500
Processed 420/500
Processed 430/500
Processed 440/500
Processed 450/500
Processed 460/500
Processed 470/500
Processed 480/500
Processed 490/500
Saved prompts_with_psem.csv


### Step 2b: Qwen Constraint Verification (Condition B)

Run Qwen *with* the negative constraint to measure its own failure rate.

In [4]:
# Step 2b: Run Qwen with Constraints
print("Running Qwen Constraint Verification...")

qwen_outputs = []
qwen_failed = []

if "qwen_output" in prompts_df.columns:
    print("Qwen results already present. Skipping generation.")
else:
    for index, row in prompts_df.iterrows():
        prompt = row['prompt']
        forbidden = row['target_word']
        
        messages = [
            {"role": "system", "content": "You are a completion engine. Do not output conversational filler. Just output the completion."},
            {"role": "user", "content": f"{prompt}\n\nConstraint: Do not use the word '{forbidden}'."}
        ]
        
        text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

        with torch.no_grad():
            generated_ids = model.generate(
                model_inputs.input_ids,
                max_new_tokens=100,
                do_sample=False
            )
        
        generated_ids = [
            output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
        ]
        response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
        
        failed = forbidden.lower() in response.lower()
        
        qwen_outputs.append(response)
        qwen_failed.append(failed)
        
        if index % 10 == 0:
            print(f"Generated {index}/{len(prompts_df)}")

    prompts_df['qwen_output'] = qwen_outputs
    prompts_df['qwen_failed'] = qwen_failed
    
    prompts_df.to_csv("prompts_with_qwen_results.csv", index=False)
    print("Saved prompts_with_qwen_results.csv")

The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Running Qwen Constraint Verification...
Generated 0/500
Generated 10/500
Generated 20/500
Generated 30/500
Generated 40/500
Generated 50/500
Generated 60/500
Generated 70/500
Generated 80/500
Generated 90/500
Generated 100/500
Generated 110/500
Generated 120/500
Generated 130/500
Generated 140/500
Generated 150/500
Generated 160/500
Generated 170/500
Generated 180/500
Generated 190/500
Generated 200/500
Generated 210/500
Generated 220/500
Generated 230/500
Generated 240/500
Generated 250/500
Generated 260/500
Generated 270/500
Generated 280/500
Generated 290/500
Generated 300/500
Generated 310/500
Generated 320/500
Generated 330/500
Generated 340/500
Generated 350/500
Generated 360/500
Generated 370/500
Generated 380/500
Generated 390/500
Generated 400/500
Generated 410/500
Generated 420/500
Generated 430/500
Generated 440/500
Generated 450/500
Generated 460/500
Generated 470/500
Generated 480/500
Generated 490/500
Saved prompts_with_qwen_results.csv


### Step 2c: Generate Synonyms (For Experiment 3)

Generate a synonym for each target word using Qwen. This is required for Experiment 3 (Anchor Displacement).

In [5]:
# Step 2c: Generate Synonyms
print("Generating Synonyms for Experiment 3...")

synonyms = []

if "synonym" in prompts_df.columns:
    print("Synonyms already present. Skipping generation.")
else:
    for index, row in prompts_df.iterrows():
        target = row['target_word']
        
        messages = [
            {"role": "system", "content": "You are a helpful assistant. Output only a single word synonym."},
            {"role": "user", "content": f"Give me a single synonym for the word '{target}'. Do not explain. Just the word."}
        ]
        
        text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

        with torch.no_grad():
            generated_ids = model.generate(
                model_inputs.input_ids,
                max_new_tokens=10,
                do_sample=False
            )
        
        generated_ids = [
            output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
        ]
        synonym = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0].strip().split()[0] # Take first word
        # Clean punctuation
        synonym = synonym.strip(".,!?'")
        
        synonyms.append(synonym)
        
        if index % 20 == 0:
            print(f"Generated Synonym {index}/{len(prompts_df)}: {target} -> {synonym}")

    prompts_df['synonym'] = synonyms
    prompts_df.to_csv("prompts_with_synonyms.csv", index=False)
    print("Saved prompts_with_synonyms.csv")

Generating Synonyms for Experiment 3...
Generated Synonym 0/500: disguise -> costume
Generated Synonym 20/500: lie -> mislead
Generated Synonym 40/500: change -> alter
Generated Synonym 60/500: time -> duration
Generated Synonym 80/500: disguise -> costume
Generated Synonym 100/500: Jefferson -> Monticello
Generated Synonym 120/500: photosynthesis -> synthesis
Generated Synonym 140/500: hemoglobin -> oxyhemoglobin
Generated Synonym 160/500: gravity -> gravitation
Generated Synonym 180/500: photosynthesis -> synthesis
Generated Synonym 200/500: second -> next
Generated Synonym 220/500: White House -> Presidency
Generated Synonym 240/500: square -> rectangular
Generated Synonym 260/500: water -> H2O
Generated Synonym 280/500: Oxygen -> Air
Generated Synonym 300/500: smiling -> grinning
Generated Synonym 320/500: phantasm -> Vision
Generated Synonym 340/500: destruction -> Ruination
Generated Synonym 360/500: evacuate -> Clears
Generated Synonym 380/500: computers -> machines
Generated Sy

In [6]:
# Emergency Data Rescue (Updated)
import os
import socket

# Prefer the most complete file
if os.path.exists("prompts_with_synonyms.csv"):
    file_to_rescue = "prompts_with_synonyms.csv"
elif os.path.exists("prompts_with_qwen_results.csv"):
    file_to_rescue = "prompts_with_qwen_results.csv"
else:
    file_to_rescue = "prompts_with_psem.csv"

if os.path.exists(file_to_rescue):
    print(f"Found {file_to_rescue}. Uploading to termbin.com...")
    try:
        with socket.create_connection(("termbin.com", 9999)) as sock:
            with open(file_to_rescue, "rb") as f:
                sock.sendall(f.read())
            url = sock.recv(1024).decode("utf-8").strip()
        
        print(f"\nDATA RESCUE URL: {url}")
        print("Save this URL! You can download your data with:")
        print(f"wget {url} -O {file_to_rescue}")
    except Exception as e:
        print(f"Upload failed: {e}")
else:
    print("No results file found. Did Step 2 finish?")

Found prompts_with_synonyms.csv. Uploading to termbin.com...

DATA RESCUE URL: https://termbin.com/tkgu
 
Save this URL! You can download your data with:
wget https://termbin.com/tkgu
  -O prompts_with_synonyms.csv


## Step 3a: Submit Batch Experiment (GPT-5)

Submit the experiment to OpenAI Batch API. **Note:** We create one batch per model.

In [7]:
# Step 3a: Submit Batch Experiment
import json

batch_ids = {}

for model_name in GPT_MODELS:
    batch_file_name = f"batch_requests_{model_name}.jsonl"
    print(f"\nPreparing {batch_file_name}...")

    with open(batch_file_name, 'w') as f:
        for index, row in prompts_df.iterrows():
            prompt = row['prompt']
            forbidden = row['target_word']
            
            request = {
                "custom_id": f"{model_name}-{index}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": model_name,
                    "messages": [
                        {"role": "system", "content": "You are a completion engine. Do not output conversational filler. Just output the completion."},
                        {"role": "user", "content": f"{prompt}\n\nConstraint: Do not use the word '{forbidden}'."}
                    ],
                    "temperature": 1.0,
                    "max_tokens": 100
                }
            }
            f.write(json.dumps(request) + "\n")

    print(f"Uploading {batch_file_name}...")
    batch_input_file = client.files.create(
      file=open(batch_file_name, "rb"),
      purpose="batch"
    )
    
    print(f"Creating Batch Job for {model_name}...")
    batch_job = client.batches.create(
      input_file_id=batch_input_file.id,
      endpoint="/v1/chat/completions",
      completion_window="24h",
      metadata={"description": f"Semantic Gravity Exp 2 - {model_name}"}
    )
    
    batch_ids[model_name] = batch_job.id
    print(f"Batch ID ({model_name}): {batch_job.id}")

print("\nALL BATCHES SUBMITTED!")
print("Copy these IDs for Step 3b:")
print(json.dumps(batch_ids, indent=2))


Preparing batch_requests_gpt-5-nano-2025-08-07.jsonl...
Uploading batch_requests_gpt-5-nano-2025-08-07.jsonl...
Creating Batch Job for gpt-5-nano-2025-08-07...
Batch ID (gpt-5-nano-2025-08-07): batch_692610fcc0488190aec0958f7ded830f

Preparing batch_requests_gpt-5-mini-2025-08-07.jsonl...
Uploading batch_requests_gpt-5-mini-2025-08-07.jsonl...
Creating Batch Job for gpt-5-mini-2025-08-07...
Batch ID (gpt-5-mini-2025-08-07): batch_692610fddbb48190815e08c7b25e6d19

Preparing batch_requests_gpt-5-2025-08-07.jsonl...
Uploading batch_requests_gpt-5-2025-08-07.jsonl...
Creating Batch Job for gpt-5-2025-08-07...
Batch ID (gpt-5-2025-08-07): batch_692610ff3bbc8190b20db5aff114b4a1

ALL BATCHES SUBMITTED!
Copy these IDs for Step 3b:
{
  "gpt-5-nano-2025-08-07": "batch_692610fcc0488190aec0958f7ded830f",
  "gpt-5-mini-2025-08-07": "batch_692610fddbb48190815e08c7b25e6d19",
  "gpt-5-2025-08-07": "batch_692610ff3bbc8190b20db5aff114b4a1"
}


## Step 3b: Retrieve Batch Results (Exp 2)

Retrieve the results once the batches are complete.

In [None]:
# Step 3b: Retrieve Batch Results
# ENTER YOUR BATCH IDS HERE
BATCH_IDS = {
  "gpt-5-nano-2025-08-07": "batch_692610fcc0488190aec0958f7ded830f",
  "gpt-5-mini-2025-08-07": "batch_692610fddbb48190815e08c7b25e6d19",
  "gpt-5-2025-08-07": "batch_692610ff3bbc8190b20db5aff114b4a1"
}

for model_name, batch_id in BATCH_IDS.items():
    print(f"\nChecking status for {model_name} ({batch_id})...")
    try:
        batch_job = client.batches.retrieve(batch_id)
        print(f"Status: {batch_job.status}")

        if batch_job.status == 'completed':
            print("Batch complete! Downloading results...")
            output_file_id = batch_job.output_file_id
            content = client.files.content(output_file_id).text
            
            results_data = []
            
            for line in content.split('\n'):
                if not line.strip(): continue
                
                response = json.loads(line)
                custom_id = response['custom_id']
                # custom_id format: {model_name}-{index}
                last_hyphen_idx = custom_id.rfind('-')
                index = int(custom_id[last_hyphen_idx+1:])
                
                output = response['response']['body']['choices'][0]['message']['content']
                
                original_row = prompts_df.loc[index]
                forbidden = original_row['target_word']
                failed = forbidden.lower() in output.lower()
                
                result_entry = {
                    'index': index,
                    'prompt': original_row['prompt'],
                    'target_word': forbidden,
                    'p_sem': original_row['p_sem'],
                    'bucket': original_row['bucket'],
                    'output': output,
                    'failed': failed
                }
                results_data.append(result_entry)
                
            df = pd.DataFrame(results_data)
            filename = f"results_{model_name}.csv"
            df.to_csv(filename, index=False)
            print(f"Saved {filename} ({len(df)} rows)")
            
        elif batch_job.status == 'failed':
            print("Batch FAILED.")
            print(batch_job.errors)
    except Exception as e:
        print(f"Error retrieving batch {batch_id}: {e}")

## Step 3c: Experiment 3 (Anchor Displacement)

Select top 250 failed/high-pressure prompts and run with synonym constraint.

In [None]:
# Step 3c: Submit Experiment 3 (Anchor Displacement)

# Select Top 250 High Gravity Prompts (Highest P_sem)
high_gravity_df = prompts_df.sort_values(by='p_sem', ascending=False).head(250)
print(f"Selected {len(high_gravity_df)} High Gravity prompts for Exp 3.")

exp3_batch_ids = {}

for model_name in GPT_MODELS:
    batch_file_name = f"batch_requests_exp3_{model_name}.jsonl"
    print(f"\nPreparing {batch_file_name}...")

    with open(batch_file_name, 'w') as f:
        for index, row in high_gravity_df.iterrows():
            prompt = row['prompt']
            forbidden = row['target_word']
            synonym = row['synonym']
            
            request = {
                "custom_id": f"{model_name}-{index}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": model_name,
                    "messages": [
                        {"role": "system", "content": "You are a completion engine. Do not output conversational filler. Just output the completion."},
                        # ANCHOR DISPLACEMENT PROMPT
                        {"role": "user", "content": f"{prompt}\n\nConstraint: Do not use the word '{forbidden}'. Use '{synonym}' instead."}
                    ],
                    "temperature": 1.0,
                    "max_tokens": 100
                }
            }
            f.write(json.dumps(request) + "\n")

    print(f"Uploading {batch_file_name}...")
    batch_input_file = client.files.create(
      file=open(batch_file_name, "rb"),
      purpose="batch"
    )
    
    print(f"Creating Exp 3 Batch Job for {model_name}...")
    batch_job = client.batches.create(
      input_file_id=batch_input_file.id,
      endpoint="/v1/chat/completions",
      completion_window="24h",
      metadata={"description": f"Semantic Gravity Exp 3 - {model_name}"}
    )
    
    exp3_batch_ids[model_name] = batch_job.id
    print(f"Batch ID ({model_name}): {batch_job.id}")

print("\nALL EXP 3 BATCHES SUBMITTED!")
print("Copy these IDs for Step 3d:")
print(json.dumps(exp3_batch_ids, indent=2))

In [None]:
# Step 3d: Retrieve Exp 3 Results
# ENTER YOUR EXP 3 BATCH IDS HERE
EXP3_BATCH_IDS = {
    "gpt-5-nano-2025-08-07": "batch_...",
    "gpt-5-mini-2025-08-07": "batch_...",
    "gpt-5-2025-08-07": "batch_..."
}

for model_name, batch_id in EXP3_BATCH_IDS.items():
    print(f"\nChecking status for {model_name} ({batch_id})...")
    try:
        batch_job = client.batches.retrieve(batch_id)
        print(f"Status: {batch_job.status}")

        if batch_job.status == 'completed':
            print("Batch complete! Downloading results...")
            output_file_id = batch_job.output_file_id
            content = client.files.content(output_file_id).text
            
            results_data = []
            
            for line in content.split('\n'):
                if not line.strip(): continue
                
                response = json.loads(line)
                custom_id = response['custom_id']
                last_hyphen_idx = custom_id.rfind('-')
                index = int(custom_id[last_hyphen_idx+1:])
                
                output = response['response']['body']['choices'][0]['message']['content']
                
                original_row = prompts_df.loc[index]
                forbidden = original_row['target_word']
                failed = forbidden.lower() in output.lower()
                
                result_entry = {
                    'index': index,
                    'prompt': original_row['prompt'],
                    'target_word': forbidden,
                    'p_sem': original_row['p_sem'],
                    'bucket': original_row['bucket'],
                    'output': output,
                    'failed': failed
                }
                results_data.append(result_entry)
                
            df = pd.DataFrame(results_data)
            filename = f"results_exp3_{model_name}.csv"
            df.to_csv(filename, index=False)
            print(f"Saved {filename} ({len(df)} rows)")
            
        elif batch_job.status == 'failed':
            print("Batch FAILED.")
            print(batch_job.errors)
    except Exception as e:
        print(f"Error retrieving batch {batch_id}: {e}")

## Step 4: Analysis & Visualization

Plot the Collapse Curves.

In [None]:
def sigmoid(x, L, x0, k, b):
    return L / (1 + np.exp(-k * (x - x0))) + b

plt.figure(figsize=(10, 6))

for model_name in GPT_MODELS:
    results_file = f"results_{model_name}.csv"
    if not os.path.exists(results_file):
        continue
        
    df = pd.read_csv(results_file, names=['index', 'prompt', 'target_word', 'p_sem', 'bucket', 'output', 'failed'], header=0)
    
    # Binning
    bins = np.linspace(0, 1, 11)
    df['bin'] = pd.cut(df['p_sem'], bins, labels=bins[:-1])
    binned_data = df.groupby('bin')['failed'].mean().reset_index()
    binned_data['p_sem_mid'] = binned_data['bin'].astype(float) + 0.05
    
    # Plot Points
    plt.scatter(binned_data['p_sem_mid'], binned_data['failed'], label=f"{model_name} (Data)", alpha=0.5)
    
    # Fit Curve
    try:
        p0 = [max(binned_data['failed']), np.median(binned_data['p_sem_mid']), 1, min(binned_data['failed'])]
        popt, _ = curve_fit(sigmoid, binned_data['p_sem_mid'], binned_data['failed'], p0=p0, maxfev=5000)
        x_model = np.linspace(0, 1, 100)
        y_model = sigmoid(x_model, *popt)
        plt.plot(x_model, y_model, label=f"{model_name} (Fit)")
    except:
        print(f"Could not fit curve for {model_name}")

plt.title("Semantic Gravity: Collapse Curves")
plt.xlabel("Semantic Pressure ($P_{sem}$)")
plt.ylabel("Failure Rate ($R_{fail}$)")
plt.legend()
plt.grid(True)
plt.savefig("collapse_curves.png")
plt.show()

In [None]:
# Step 5: Export & Download
import json
from datetime import datetime
from google.colab import files
import shutil

# Create metadata
metadata = {
    'timestamp': datetime.now().isoformat(),
    'models': GPT_MODELS,
    'qwen_model': QWEN_MODEL_ID,
    'num_prompts': len(prompts_df)
}
with open('experiment_details.json', 'w') as f:
    json.dump(metadata, f, indent=2)

# List of files to zip
files_to_zip = [
    'prompts.csv',
    'prompts_with_psem.csv',
    'prompts_with_qwen_results.csv',
    'prompts_with_synonyms.csv',
    'experiment_details.json',
    'collapse_curves.png'
]

# Add result files
for model in GPT_MODELS:
    files_to_zip.append(f'results_{model}.csv')
    files_to_zip.append(f'results_exp3_{model}.csv')

# Zip everything
output_filename = 'experiment_results.zip'
with shutil.ZipFile(output_filename, 'w') as zipf:
    for file in files_to_zip:
        if os.path.exists(file):
            zipf.write(file)
        else:
            print(f'Warning: {file} not found')

# Download
print(f'Downloading {output_filename}...')
files.download(output_filename)