# Stratified Batch Cognitive Action Data Generator

Generate any number of examples with **consistent stratified distribution** across all 45 cognitive actions.

**Features:**
- 🎯 Stratified sampling: equal examples per cognitive action
- 📊 Generate any batch size (100, 1000, 10000, etc.)
- 👤 First-person perspective only
- 📝 Simple complexity examples
- 🎨 Rich variation: domains, triggers, emotional states, language styles, sentence starters
- 💾 Auto-checkpointing and resumable
- 🚀 Optimized for 16GB VRAM (uses gemma2:9b)
- ⚡ 8 concurrent requests for speed

**Example:** Request 4,500 examples → Get 100 examples per action (45 × 100 = 4,500)

## 1️⃣ Install Dependencies

In [None]:
# Install required packages
!pip install -q requests pandas numpy tqdm aiohttp nest-asyncio

# Clone the repository
import os
if not os.path.exists('datagen'):
    print("📥 Cloning datagen repository...")
    !git clone https://github.com/ChuloIva/datagen.git
    print("✅ Repository cloned successfully!")
else:
    print("✅ Repository already exists")

# Import libraries
import json
import time
import random
import asyncio
import aiohttp
import nest_asyncio
import requests
import subprocess
from typing import List, Dict, Any
from dataclasses import dataclass, asdict
from tqdm.notebook import tqdm
from datetime import datetime
import math

# Apply nest_asyncio for Jupyter/Colab compatibility
nest_asyncio.apply()

# Set random seeds
random.seed(42)

print("✅ Dependencies installed successfully!")

## 2️⃣ Install & Configure Ollama

In [None]:
# Install Ollama
!curl -fsSL https://ollama.ai/install.sh | sh

# Stop any existing Ollama processes
print("🛑 Stopping any existing Ollama processes...")
subprocess.run(['pkill', '-9', 'ollama'], stderr=subprocess.DEVNULL)
time.sleep(2)

# Set environment variables for 16GB VRAM
print("\n⚙️  Configuring Ollama for 16GB VRAM...")
os.environ['OLLAMA_HOST'] = '0.0.0.0:11434'
os.environ['OLLAMA_ORIGINS'] = '*'
os.environ['OLLAMA_NUM_PARALLEL'] = '8'
os.environ['OLLAMA_MAX_QUEUE'] = '256'
os.environ['OLLAMA_MAX_LOADED_MODELS'] = '1'
os.environ['LD_LIBRARY_PATH'] = '/usr/lib64-nvidia'

print("Configuration:")
print(f"  Model: gemma2:9b (~5GB VRAM)")
print(f"  Parallel requests: 8")
print(f"  Expected VRAM: 12-14GB")

# Start Ollama server
print("\n🚀 Starting Ollama server...")
subprocess.Popen(['ollama', 'serve'], 
                 env=os.environ.copy(),
                 stdout=subprocess.DEVNULL,
                 stderr=subprocess.DEVNULL)

print("⏳ Waiting for Ollama to start...")
time.sleep(10)

# Verify Ollama is running
try:
    response = requests.get('http://localhost:11434/api/tags', timeout=5)
    if response.status_code == 200:
        print("✅ Ollama is running!")
    else:
        print("❌ Ollama error")
except Exception as e:
    print(f"❌ Connection error: {e}")

## 3️⃣ Pull the Model

In [None]:
print("📥 Pulling gemma2:9b model (this may take 3-5 minutes)...")
!ollama pull gemma2:9b
print("\n✅ Model ready!")

## 4️⃣ Load Cognitive Actions and Variation Pools

In [None]:
# Add datagen to Python path
import sys
datagen_dir = os.path.abspath('datagen')
if datagen_dir not in sys.path:
    sys.path.insert(0, datagen_dir)

# Import cognitive actions and variation pools
from variable_pools import (
    COGNITIVE_ACTIONS,
    DOMAINS,
    TRIGGERS,
    EMOTIONAL_STATES,
    LANGUAGE_STYLES
)

# Load sentence starters
with open('datagen/all_truncated_outputs.json', 'r') as f:
    SENTENCE_STARTERS = [s for s in json.load(f) if s and len(s) > 2]

print(f"✅ Loaded {len(COGNITIVE_ACTIONS)} cognitive actions")
print(f"✅ Loaded {len(DOMAINS)} domains")
print(f"✅ Loaded {len(TRIGGERS)} triggers")
print(f"✅ Loaded {len(EMOTIONAL_STATES)} emotional states")
print(f"✅ Loaded {len(LANGUAGE_STYLES)} language styles")
print(f"✅ Loaded {len(SENTENCE_STARTERS)} sentence starters\n")

print("Cognitive actions (all will have equal representation):")
for idx, (action, desc) in enumerate(COGNITIVE_ACTIONS.items(), 1):
    print(f"{idx:2d}. {action:30s} - {desc}")

## 5️⃣ Mount Google Drive (for checkpoints)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Create checkpoint directory
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
checkpoint_dir = f'/content/drive/MyDrive/cognitive_stratified_data_{timestamp}'
os.makedirs(checkpoint_dir, exist_ok=True)
print(f"✅ Checkpoints will be saved to: {checkpoint_dir}")

## 6️⃣ Stratified Data Generator

In [None]:
@dataclass
class VariedExample:
    text: str
    cognitive_action: str
    domain: str
    trigger: str
    emotional_state: str
    language_style: str
    sentence_starter: str
    
class StratifiedDataGenerator:
    def __init__(self, base_url="http://localhost:11434", max_parallel=8):
        self.base_url = base_url
        self.max_parallel = max_parallel
        self.semaphore = asyncio.Semaphore(max_parallel)
        
        # Store variation pools
        self.cognitive_actions = COGNITIVE_ACTIONS
        self.domains = DOMAINS
        self.triggers = TRIGGERS
        self.emotional_states = EMOTIONAL_STATES
        self.language_styles = LANGUAGE_STYLES
        self.sentence_starters = SENTENCE_STARTERS
    
    def calculate_stratified_distribution(self, total_examples: int) -> Dict[str, int]:
        """Calculate how many examples per cognitive action for stratified sampling."""
        num_actions = len(self.cognitive_actions)
        base_per_action = total_examples // num_actions
        remainder = total_examples % num_actions
        
        distribution = {}
        for idx, action in enumerate(self.cognitive_actions.keys()):
            # Distribute remainder across first few actions
            distribution[action] = base_per_action + (1 if idx < remainder else 0)
        
        return distribution
    
    def create_prompt(self, action: str, action_desc: str, domain: str,
                     trigger: str, emotional_state: str, language_style: str,
                     sentence_starter: str) -> str:
        """Create varied first-person prompt with rich context."""
        # Randomly decide whether to use sentence starter (50% of the time)
        use_starter = random.random() < 0.5
        
        starter_instruction = ""
        if use_starter:
            starter_instruction = f"\n- Start the example with: '{sentence_starter}'"
        
        return f"""Generate a simple, first-person example of someone {action}.

Action: {action}
Description: {action_desc}
Domain: {domain}
Trigger: {trigger}
Emotional state: {emotional_state}
Language style: {language_style}

Requirements:
- Write in first person (I, my, me)
- Keep it simple and realistic
- 2-5 sentences maximum
- Focus on the {action} cognitive action
- Use the {language_style} language style
- Incorporate the trigger and emotional state naturally{starter_instruction}
- Make it feel natural and relatable
- Show the cognitive process, not just state it

Example only (no explanation):"""
    
    async def generate_one(self, session: aiohttp.ClientSession, action: str,
                          action_desc: str, domain: str, trigger: str,
                          emotional_state: str, language_style: str,
                          sentence_starter: str, model: str) -> VariedExample:
        """Generate one varied example."""
        async with self.semaphore:
            prompt = self.create_prompt(action, action_desc, domain, trigger,
                                       emotional_state, language_style, sentence_starter)

            try:
                async with session.post(
                    f"{self.base_url}/api/generate",
                    json={"model": model, "prompt": prompt, "stream": False},
                    timeout=aiohttp.ClientTimeout(total=60)
                ) as response:
                    result = await response.json()
                    text = result.get('response', '').strip()

                    # Clean up the text
                    text = text.replace('"', '').strip()
                    if not text or len(text) < 20:
                        return None

                    return VariedExample(
                        text=text,
                        cognitive_action=action,
                        domain=domain,
                        trigger=trigger,
                        emotional_state=emotional_state,
                        language_style=language_style,
                        sentence_starter=sentence_starter
                    )
            except Exception as e:
                return None
    
    async def generate_batch_async(self, count: int, action: str,
                                  action_desc: str, model: str, pbar=None) -> List[VariedExample]:
        """Generate a batch of examples with rich variation."""
        async with aiohttp.ClientSession() as session:
            tasks = []
            for _ in range(count):
                # Randomly select variations for each example
                domain = random.choice(self.domains)
                trigger = random.choice(self.triggers)
                emotional_state = random.choice(self.emotional_states)
                language_style = random.choice(self.language_styles)
                sentence_starter = random.choice(self.sentence_starters)

                task = self.generate_one(session, action, action_desc, domain,
                                       trigger, emotional_state, language_style,
                                       sentence_starter, model)
                tasks.append(task)

            results = await asyncio.gather(*tasks)
            valid_results = [r for r in results if r is not None]
            
            # Update progress bar after batch completes
            if pbar:
                pbar.update(len(valid_results))
            
            return valid_results
    
    def generate_batch(self, count: int, action: str, action_desc: str,
                      model: str, pbar=None) -> List[VariedExample]:
        """Synchronous wrapper for batch generation."""
        return asyncio.run(self.generate_batch_async(count, action, action_desc, model, pbar))
    
    def generate_stratified(self, total_examples: int, model: str,
                           checkpoint_dir: str) -> List[VariedExample]:
        """Generate stratified examples across all cognitive actions."""
        # Calculate distribution
        distribution = self.calculate_stratified_distribution(total_examples)
        
        print("\n📊 STRATIFIED DISTRIBUTION")
        print("="*60)
        print(f"Total examples to generate: {total_examples:,}")
        print(f"Cognitive actions: {len(self.cognitive_actions)}")
        print(f"Examples per action: {min(distribution.values())} - {max(distribution.values())}")
        print("="*60 + "\n")
        
        all_examples = []
        start_time = time.time()
        
        # Progress bar that updates after each batch
        pbar = tqdm(total=total_examples, desc="Generating", unit="examples")
        
        # Generate in smaller batches for better progress tracking
        batch_size = 25  # Generate 25 examples at a time for smooth progress updates
        
        for action_idx, (action, action_desc) in enumerate(self.cognitive_actions.items(), 1):
            count = distribution[action]
            
            # Update progress bar description with current action
            pbar.set_description(f"[{action_idx}/45] {action[:20]}")
            
            # Generate in smaller batches
            num_batches = (count + batch_size - 1) // batch_size
            
            for batch_idx in range(num_batches):
                batch_count = min(batch_size, count - (batch_idx * batch_size))
                
                # Generate batch (progress bar updates inside)
                batch_examples = self.generate_batch(
                    count=batch_count,
                    action=action,
                    action_desc=action_desc,
                    model=model,
                    pbar=pbar
                )
                
                all_examples.extend(batch_examples)
                
                # Update progress info
                elapsed = time.time() - start_time
                rate = len(all_examples) / elapsed if elapsed > 0 else 0
                remaining = total_examples - len(all_examples)
                eta = remaining / rate if rate > 0 else 0
                
                pbar.set_postfix({
                    'rate': f'{rate:.1f}/s',
                    'ETA': f'{eta/60:.0f}m'
                })
        
        pbar.close()
        
        # Save checkpoint
        checkpoint_file = os.path.join(
            checkpoint_dir,
            f"stratified_{total_examples}_{int(time.time())}.jsonl"
        )
        
        with open(checkpoint_file, 'w') as f:
            for ex in all_examples:
                f.write(json.dumps(asdict(ex)) + '\n')
        
        print(f"\n✅ Saved to: {checkpoint_file}")
        
        return all_examples

print("✅ Stratified data generator ready")

## 7️⃣ Configure Generation

**Set the number of examples you want to generate.**

The generator will automatically distribute them evenly across all 45 cognitive actions.

In [None]:
# ============================================================
# CONFIGURE THIS: Set your desired total examples
# ============================================================
TOTAL_EXAMPLES = 4500  # Will generate 100 per action (4500 / 45 = 100)
# Try: 450 (10 each), 2250 (50 each), 4500 (100 each), 9000 (200 each), etc.
# ============================================================

CONFIG = {
    'total_examples': TOTAL_EXAMPLES,
    'model': 'gemma2:9b',
    'max_parallel': 8,
    'checkpoint_dir': checkpoint_dir
}

# Calculate distribution
num_actions = len(COGNITIVE_ACTIONS)
examples_per_action = TOTAL_EXAMPLES // num_actions
estimated_minutes = TOTAL_EXAMPLES / CONFIG['max_parallel'] * 20 / 60

print("="*60)
print("STRATIFIED GENERATION CONFIGURATION")
print("="*60)
print(f"Total examples: {CONFIG['total_examples']:,}")
print(f"Cognitive actions: {num_actions}")
print(f"Examples per action: ~{examples_per_action}")
print(f"Model: {CONFIG['model']} (~5GB VRAM)")
print(f"Parallel requests: {CONFIG['max_parallel']}")
print(f"\nVariation dimensions:")
print(f"  - Domains: {len(DOMAINS)}")
print(f"  - Triggers: {len(TRIGGERS)}")
print(f"  - Emotional states: {len(EMOTIONAL_STATES)}")
print(f"  - Language styles: {len(LANGUAGE_STYLES)}")
print(f"  - Sentence starters: {len(SENTENCE_STARTERS)}")
print(f"\nPerspective: First-person only")
print(f"Complexity: Simple only")
print(f"\nEstimated time: {estimated_minutes:.0f} minutes ({estimated_minutes/60:.1f} hours)")
print(f"Expected VRAM: 12-14GB")
print("="*60)

## 8️⃣ Generate Stratified Data

Run this cell to generate your stratified dataset. You can run it multiple times with different totals!

In [None]:
# Initialize generator
generator = StratifiedDataGenerator(max_parallel=CONFIG['max_parallel'])

print("="*60)
print(f"🚀 GENERATING {CONFIG['total_examples']:,} STRATIFIED EXAMPLES")
print("="*60)

start_time = time.time()

# Generate
examples = generator.generate_stratified(
    total_examples=CONFIG['total_examples'],
    model=CONFIG['model'],
    checkpoint_dir=CONFIG['checkpoint_dir']
)

# Calculate statistics
elapsed = time.time() - start_time

print("\n" + "="*60)
print("🎉 GENERATION COMPLETE!")
print("="*60)
print(f"Total examples generated: {len(examples):,}")
print(f"Time elapsed: {elapsed/60:.1f} minutes ({elapsed/3600:.2f} hours)")
print(f"Average rate: {len(examples)/elapsed:.1f} examples/sec")
print(f"\nData saved to: {CONFIG['checkpoint_dir']}")
print("="*60)

# Verify stratification
import pandas as pd
df = pd.DataFrame([asdict(ex) for ex in examples])
distribution = df['cognitive_action'].value_counts().sort_index()

print("\n📊 DISTRIBUTION VERIFICATION")
print("="*60)
print(f"Min examples per action: {distribution.min()}")
print(f"Max examples per action: {distribution.max()}")
print(f"Mean examples per action: {distribution.mean():.1f}")
print(f"\nTop 10 cognitive actions:")
print(distribution.head(10))
print("="*60)

## 9️⃣ Preview Examples

In [None]:
import pandas as pd

# Create dataframe
df = pd.DataFrame([asdict(ex) for ex in examples])

print("📊 DATASET PREVIEW\n")
print(f"Shape: {df.shape}")
print(f"\nColumns: {list(df.columns)}")

print(f"\n📈 Cognitive actions distribution:")
print(df['cognitive_action'].value_counts().sort_values(ascending=False))

print(f"\n🌍 Domain distribution (top 10):")
print(df['domain'].value_counts().head(10))

print(f"\n🎨 Language style distribution:")
print(df['language_style'].value_counts())

print("\n" + "="*60)
print("Random examples:")
print("="*60)

for _, row in df.sample(min(5, len(df))).iterrows():
    print(f"\n[{row['cognitive_action']}]")
    print(f"Domain: {row['domain']}")
    print(f"Trigger: {row['trigger'][:50]}..." if len(row['trigger']) > 50 else f"Trigger: {row['trigger']}")
    print(f"Style: {row['language_style']}")
    print(f"Text: {row['text']}")

## 🔟 Download Dataset

In [None]:
from google.colab import files
import glob

# Find the most recent stratified file
stratified_files = glob.glob(os.path.join(CONFIG['checkpoint_dir'], 'stratified_*.jsonl'))

if stratified_files:
    latest_file = max(stratified_files, key=os.path.getctime)
    print(f"Downloading: {os.path.basename(latest_file)}")
    print(f"Size: {os.path.getsize(latest_file) / (1024*1024):.1f} MB")
    files.download(latest_file)
    print("✅ Download started!")
else:
    print("❌ No stratified file found")

## 1️⃣1️⃣ Generate Multiple Batches (Optional)

Run this if you want to generate multiple batches with different sizes.

In [None]:
# Define multiple batch sizes
BATCH_SIZES = [450, 2250, 4500]  # 10, 50, 100 per action

generator = StratifiedDataGenerator(max_parallel=CONFIG['max_parallel'])

for batch_size in BATCH_SIZES:
    print(f"\n{'='*60}")
    print(f"Generating batch of {batch_size:,} examples...")
    print(f"{'='*60}")
    
    examples = generator.generate_stratified(
        total_examples=batch_size,
        model=CONFIG['model'],
        checkpoint_dir=CONFIG['checkpoint_dir']
    )
    
    print(f"✅ Completed: {len(examples):,} examples")

print("\n🎉 All batches complete!")

## 🎉 Complete!

You now have a **stratified dataset** with equal representation across all 45 cognitive actions!

### Key benefits:
- ✅ **Consistent distribution**: Every batch has the same proportions
- ✅ **Flexible batch sizes**: Generate 100, 1000, 10000, or any number
- ✅ **Reproducible**: Same distribution every time
- ✅ **Rich variation**: 5 dimensions of variation per example

### Example distributions:
- **450 examples** = 10 per action
- **2,250 examples** = 50 per action
- **4,500 examples** = 100 per action
- **9,000 examples** = 200 per action
- **45,000 examples** = 1,000 per action

### Files created:
- `stratified_{count}_{timestamp}.jsonl` for each generation

### Example format:
```json
{
  "text": "I need to analyze my spending patterns...",
  "cognitive_action": "analyzing",
  "domain": "financial planning",
  "trigger": "reviewing monthly bank statement",
  "emotional_state": "feeling anxious about the implications",
  "language_style": "straightforward and direct",
  "sentence_starter": "I can see that"
}
```

### To generate more:
1. Change `TOTAL_EXAMPLES` in cell 7
2. Re-run cells 7 and 8
3. Download the new file