# Enhanced AI Text Generator for Google Colab

A streamlined AI text generator with quality controls for scientific papers.

**Features:**
- Length control (±20% of original)
- Topic consistency validation
- Real-time quality metrics
- Resume capability
- GPU acceleration

**Instructions:**
1. Upload your `human_text_50k.jsonl` file
2. Run all cells in order
3. Download the generated `ai_generated_colab.jsonl`

In [None]:
# Install dependencies
!pip install transformers torch accelerate datasets tqdm

import json
import time
import re
import random
import os
from datetime import datetime
from tqdm import tqdm
from typing import List, Dict, Tuple, Optional
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, set_seed
import gc

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"CUDA memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

In [None]:
# Upload your human text data file
from google.colab import files

print("Please upload your human_text_50k.jsonl file:")
uploaded = files.upload()

# Verify upload
input_file = 'human_text_50k.jsonl'
if input_file in uploaded:
    print(f"File uploaded: {input_file}")
    with open(input_file, 'r') as f:
        lines = f.readlines()
    print(f"Total texts to process: {len(lines):,}")
else:
    print("Please upload the human_text_50k.jsonl file")
    raise FileNotFoundError("Data file not uploaded")

In [None]:
# Quality Controller Class
class QualityController:
    """Handles quality control functions for AI text generation"""
    
    @staticmethod
    def extract_keywords(text: str, max_keywords: int = 10) -> List[str]:
        """Extract key terms from text for topic consistency checking"""
        words = re.findall(r'\b[a-zA-Z]{4,}\b', text.lower())
        word_freq = {}
        stop_words = {'this', 'that', 'with', 'from', 'they', 'have', 'been', 
                     'will', 'were', 'said', 'using', 'approach', 'method'}
        
        for word in words:
            if word not in stop_words:
                word_freq[word] = word_freq.get(word, 0) + 1
        
        return sorted(word_freq.keys(), key=lambda x: word_freq[x], reverse=True)[:max_keywords]
    
    @staticmethod
    def check_topic_consistency(original: str, generated: str, title: str, threshold: float = 0.3) -> bool:
        """Check if generated text maintains topic consistency"""
        original_keywords = set(QualityController.extract_keywords(original))
        generated_keywords = set(QualityController.extract_keywords(generated))
        title_keywords = set(QualityController.extract_keywords(title))
        
        if len(original_keywords) == 0:
            return True
        
        # Check overlap with original text
        overlap = len(original_keywords.intersection(generated_keywords))
        consistency_score = overlap / len(original_keywords)
        
        # Check overlap with title
        title_overlap = len(title_keywords.intersection(generated_keywords))
        title_consistency = title_overlap / max(len(title_keywords), 1)
        
        return consistency_score >= threshold or title_consistency >= threshold
    
    @staticmethod
    def calculate_target_length(original_length: int, tolerance: float = 0.2) -> Tuple[int, int]:
        """Calculate target length range for generated text"""
        min_length = int(original_length * (1 - tolerance))
        max_length = int(original_length * (1 + tolerance))
        return min_length, max_length

print("Quality Controller loaded")

In [None]:
# Setup AI Model for Generation
MODEL_NAME = "gpt2-large"  # High quality generation, better length matching
# Alternative: "gpt2-medium" (faster) or "microsoft/DialoGPT-large" (conversational)

print(f"Loading model: {MODEL_NAME}")

# Load model and tokenizer with GPU support
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Load model with proper device handling
if torch.cuda.is_available():
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        dtype=torch.float16,
        device_map="auto"
    )
    # Setup generation pipeline (no device argument when using device_map)
    generator = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer
    )
else:
    model = AutoModelForCausalLM.from_pretrained(MODEL_NAME)
    # Setup generation pipeline for CPU
    generator = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        device=-1
    )

# Add padding token if missing
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

print("Model loaded successfully!")

# Check model device
if hasattr(model, 'device'):
    print(f"Model device: {model.device}")
elif torch.cuda.is_available():
    print("Model device: GPU (auto-mapped)")
else:
    print("Model device: CPU")


In [None]:
# AI Generator Class
class ColabAIGenerator:
    """Colab-optimized AI text generator"""
    
    def __init__(self):
        self.quality_controller = QualityController()
    
    def create_prompts(self, title: str, abstract: str) -> List[str]:
        """Create enhanced prompts for AI generation"""
        original_length = len(abstract)
        min_length, max_length = self.quality_controller.calculate_target_length(original_length)
        
        prompts = [
            f"""Rewrite this scientific abstract using different academic language while maintaining the same topic and approximate length ({min_length}-{max_length} characters):

Title: {title}
Original Abstract: {abstract}

Rewritten Abstract:""",
            
            f"""Create an alternative version of this scientific abstract. Keep the same research topic and maintain similar length ({min_length}-{max_length} characters):

Title: {title}
Original: {abstract}

Alternative Version:""",
            
            f"""Generate a new version of this scientific abstract using different wording but the same research focus. Target length: {min_length}-{max_length} characters.

Title: {title}
Reference: {abstract}

New Abstract:"""
        ]
        
        return prompts
    
    def generate_ai_version(self, title: str, abstract: str, max_attempts: int = 3) -> str:
        """Generate AI version with quality controls"""
        original_length = len(abstract)
        prompts = self.create_prompts(title, abstract)
        
        for attempt in range(max_attempts):
            try:
                prompt = random.choice(prompts)
                set_seed(random.randint(1, 10000))
                
                # Calculate dynamic token length based on original text
                # Use tokenizer for accurate token count
                orig_tokens = len(tokenizer.encode(abstract, add_special_tokens=False))
                
                # Target tokens with ±20% tolerance, clamped to reasonable bounds
                # Respect model's 1024 token limit
                prompt_tokens = len(tokenizer.encode(prompt, add_special_tokens=False))
                available_tokens = 1020 - prompt_tokens  # Leave buffer for model limit
                target_tokens = min(orig_tokens, available_tokens - 20)
                target_tokens = max(80, min(500, target_tokens))  # Higher bounds for gpt2-large
                
                min_new_tokens = max(60, int(target_tokens * 0.9))  # Target 90% length
                max_new_tokens = min(available_tokens - 5, int(target_tokens * 1.1))  # Allow 10% over
                
                result = generator(
                    prompt,
                    max_new_tokens=max_new_tokens,
                    min_new_tokens=min_new_tokens,
                    temperature=random.uniform(0.7, 1.0),
                    top_p=random.uniform(0.85, 0.95),
                    top_k=random.randint(40, 80),
                    do_sample=True,
                    pad_token_id=tokenizer.eos_token_id,
                    num_return_sequences=1,
                    repetition_penalty=1.2
                )
                
                generated_text = result[0]['generated_text']
                
                # Extract generated part (after prompt)
                ai_text = generated_text[len(prompt):].strip()
                
                # Clean up artifacts
                ai_text = re.sub(r'^(rewritten abstract:|alternative version:|new abstract:)\s*', 
                                '', ai_text, flags=re.IGNORECASE)
                ai_text = re.sub(r'\[ai-generated.*?\]', '', ai_text, flags=re.IGNORECASE)
                ai_text = ai_text.replace('\n', ' ').strip()
                
                # Length adjustment
                if len(ai_text) > original_length * 2.0:
                    ai_text = ai_text[:int(original_length * 1.5)]
                elif len(ai_text) < max(50, original_length * 0.3):
                    continue  # Only reject if extremely short
                
                # Quality check
                if self.quality_controller.check_topic_consistency(abstract, ai_text, title):
                    return ai_text
                    
            except Exception as e:
                print(f"Generation attempt {attempt + 1} failed: {e}")
                continue
        
        # Enhanced fallback
        fallback_length = min(300, original_length)
        title_clean = title.lower().replace(':', '').strip()
        abstract_excerpt = abstract[:fallback_length]
        
        return f"This research on {title_clean} investigates {abstract_excerpt}... [Enhanced AI-generated summary maintaining original research focus]"

# Initialize generator
ai_gen = ColabAIGenerator()
print("AI Generator initialized")

In [None]:
# Data Processing Functions
class DataProcessor:
    """Handles data loading and processing"""
    
    @staticmethod
    def load_human_texts(file_path: str) -> List[Dict]:
        """Load human texts from JSONL file"""
        texts = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                try:
                    entry = json.loads(line.strip())
                    texts.append(entry)
                except json.JSONDecodeError as e:
                    print(f"Skipping invalid JSON on line {line_num}: {e}")
                    continue
        return texts
    
    @staticmethod
    def extract_title_and_text(entry: Dict) -> Tuple[Optional[str], Optional[str]]:
        """Extract title and text from entry"""
        title = entry.get('metadata', {}).get('title', entry.get('title'))
        text = entry.get('text', '')
        return title, text
    
    @staticmethod
    def save_ai_entry(f, ai_text: str, original_entry: Dict, 
                     original_length: int, generated_length: int) -> None:
        """Save AI-generated entry to file"""
        length_ratio = generated_length / original_length if original_length > 0 else 0
        
        ai_entry = {
            'text': ai_text,
            'label': 'ai',
            'source': 'enhanced_colab_generation',
            'original_id': original_entry.get('metadata', {}).get('arxiv_id', 'unknown'),
            'original_length': original_length,
            'generated_length': generated_length,
            'length_ratio': round(length_ratio, 2),
            'generated_at': datetime.now().isoformat()
        }
        
        f.write(json.dumps(ai_entry, ensure_ascii=False) + '\n')
        f.flush()

processor = DataProcessor()
print("Data Processor initialized")

In [None]:
# Load and analyze human texts
print("Loading human texts...")
human_texts = processor.load_human_texts('human_text_50k.jsonl')
print(f"Loaded {len(human_texts):,} human texts")

# Analyze data structure
if human_texts:
    sample = human_texts[0]
    print(f"\nData Structure Analysis:")
    print(f"   Entry keys: {list(sample.keys())}")
    
    if 'metadata' in sample:
        print(f"   Metadata keys: {list(sample['metadata'].keys())}")
        title = sample['metadata'].get('title', 'N/A')
        text_length = len(sample.get('text', ''))
        
        print(f"\nSample Entry:")
        print(f"   Title: {title[:80]}...")
        print(f"   Text length: {text_length:,} characters")
        print(f"   Source: {sample.get('source', 'N/A')}")

# Calculate average length
lengths = [len(entry.get('text', '')) for entry in human_texts[:1000]]
avg_length = sum(lengths) / len(lengths)
print(f"Average text length (sample): {avg_length:.0f} characters")

In [None]:
# Generation Settings - Parallel Processing Friendly

# PARALLEL PROCESSING CONFIGURATION
# Set these values for each Colab instance:
INSTANCE_ID = 1        # Change this: 1, 2, 3, etc. for each instance
TOTAL_INSTANCES = 3    # Total number of parallel instances
TARGET_TOTAL = 15000   # Total texts to generate across all instances (5k each)

# Calculate range for this instance
texts_per_instance = TARGET_TOTAL // TOTAL_INSTANCES
start_offset = (INSTANCE_ID - 1) * texts_per_instance
end_offset = min(start_offset + texts_per_instance, len(human_texts))

# Override batch settings for parallel processing
BATCH_SIZE_ENTRIES = texts_per_instance
MAX_GENERATE = texts_per_instance

print(f"Parallel Processing Info:")
print(f"   Instance ID: {INSTANCE_ID} of {TOTAL_INSTANCES}")
print(f"   Total dataset: {len(human_texts):,} entries")
print(f"   This instance range: {start_offset:,} to {end_offset:,}")
print(f"   This instance will generate: {texts_per_instance:,} texts")
print(f"   Estimated time: {texts_per_instance/2.5/60:.1f} hours")
GPU_BATCH_SIZE = 4 if torch.cuda.is_available() else 2
SAVE_EVERY = 50  # Save more frequently for better resume
OUTPUT_FILE = 'ai_generated_colab.jsonl'

print(f"\nGeneration Settings:")
print(f"   This session will generate: {MAX_GENERATE:,} entries")
print(f"   GPU batch size: {GPU_BATCH_SIZE}")
print(f"   Save progress every: {SAVE_EVERY} entries")
print(f"   Output file: {OUTPUT_FILE}")

# Check for existing progress
start_idx = 0
if os.path.exists(OUTPUT_FILE):
    with open(OUTPUT_FILE, 'r') as f:
        start_idx = len(f.readlines())
    print(f"Resuming from index {start_idx:,}")

total_to_generate = min(MAX_GENERATE, len(human_texts) - start_idx)
print(f"Will generate: {total_to_generate:,} new texts")

In [None]:
# Main Generation Loop
if total_to_generate <= 0:
    print("Generation already complete!")
else:
    print(f"Starting AI text generation...")
    print(f"Expected time: ~{total_to_generate/20:.0f} minutes with GPU")
    
    start_time = time.time()
    generated_count = 0
    quality_stats = {'length_matches': 0, 'topic_consistent': 0, 'fallback_used': 0}
    
    mode = 'a' if start_idx > 0 else 'w'
    with open(OUTPUT_FILE, mode, encoding='utf-8') as f:
        # Use parallel processing range
        actual_start = start_offset + start_idx
        actual_end = min(actual_start + total_to_generate, end_offset)
        
        for i in tqdm(range(actual_start, actual_end), desc=f"Instance {INSTANCE_ID} generating"):
            entry = human_texts[i]
            
            try:
                title, text = processor.extract_title_and_text(entry)
                
                if not title or not text:
                    print(f"\nMissing title or text for entry {i}, skipping...")
                    continue
                
                # Generate AI version
                ai_text = ai_gen.generate_ai_version(title, text)
                
                # Quality assessment
                original_length = len(text)
                ai_length = len(ai_text)
                length_ratio = ai_length / original_length if original_length > 0 else 0
                
                if 0.8 <= length_ratio <= 1.2:
                    quality_stats['length_matches'] += 1
                
                if ai_gen.quality_controller.check_topic_consistency(text, ai_text, title):
                    quality_stats['topic_consistent'] += 1
                
                if '[Enhanced AI-generated' in ai_text:
                    quality_stats['fallback_used'] += 1
                
                # Save entry
                processor.save_ai_entry(f, ai_text, entry, original_length, ai_length)
                generated_count += 1
                
                # Progress update
                if generated_count % SAVE_EVERY == 0:
                    elapsed = time.time() - start_time
                    rate = generated_count / elapsed * 60
                    length_match_pct = quality_stats['length_matches'] / generated_count * 100
                    topic_match_pct = quality_stats['topic_consistent'] / generated_count * 100
                    
                    # Calculate overall progress including previous sessions
                    total_completed = start_idx + generated_count
                    overall_progress = total_completed / len(human_texts) * 100
                    
                    print(f"\nSession Progress: {generated_count}/{total_to_generate}")
                    print(f"Overall Progress: {total_completed:,}/{len(human_texts):,} ({overall_progress:.1f}%)")
                    print(f"Rate: {rate:.1f} texts/minute")
                    print(f"Quality: {length_match_pct:.1f}% length match, {topic_match_pct:.1f}% topic consistent")
                    
                    # Memory cleanup
                    gc.collect()
                    if torch.cuda.is_available():
                        torch.cuda.empty_cache()
                
            except Exception as e:
                print(f"\nError with entry {i}: {e}")
                continue
    
    # Final statistics
    total_time = time.time() - start_time
    final_rate = generated_count / total_time * 60
    
    print(f"\nSession Complete!")
    print(f"Generated this session: {generated_count:,} texts")
    print(f"Final rate: {final_rate:.1f} texts/minute")
    print(f"Session time: {total_time/60:.1f} minutes")
    
    # Calculate overall progress
    total_completed = start_idx + generated_count
    overall_progress = total_completed / len(human_texts) * 100
    remaining = len(human_texts) - total_completed
    sessions_left = (remaining + BATCH_SIZE_ENTRIES - 1) // BATCH_SIZE_ENTRIES
    
    print(f"\nOverall Progress:")
    print(f"   Completed: {total_completed:,}/{len(human_texts):,} ({overall_progress:.1f}%)")
    print(f"   Remaining: {remaining:,} texts")
    if sessions_left > 0:
        print(f"   Sessions left: {sessions_left}")
    else:
        print(f"   ALL SESSIONS COMPLETE!")
    print(f"   Saved to: {OUTPUT_FILE}")
    
    # Quality summary
    if generated_count > 0:
        length_match_pct = quality_stats['length_matches'] / generated_count * 100
        topic_consistent_pct = quality_stats['topic_consistent'] / generated_count * 100
        fallback_pct = quality_stats['fallback_used'] / generated_count * 100
        
        print(f"\nFinal Quality Summary:")
        print(f"   Length matching (±20%): {length_match_pct:.1f}%")
        print(f"   Topic consistency: {topic_consistent_pct:.1f}%")
        print(f"   Fallback used: {fallback_pct:.1f}%")

In [None]:
# Download Results
from google.colab import files

if os.path.exists(OUTPUT_FILE):
    # Show final stats
    with open(OUTPUT_FILE, 'r') as f:
        ai_texts = f.readlines()
    
    print(f"Final Statistics:")
    print(f"   Human texts: {len(human_texts):,}")
    print(f"   AI texts generated: {len(ai_texts):,}")
    print(f"   File size: {os.path.getsize(OUTPUT_FILE) / 1024 / 1024:.1f} MB")
    
    # Show sample AI text
    if ai_texts:
        sample_ai = json.loads(ai_texts[0])
        print(f"\nSample AI text:")
        print(f"   {sample_ai['text'][:200]}...")
        print(f"   Length ratio: {sample_ai['length_ratio']}")
    
    print(f"\nDownloading {OUTPUT_FILE}...")
    files.download(OUTPUT_FILE)
    print("Download complete!")
    
    print(f"\nNext Steps:")
    print(f"   1. Download completed: {OUTPUT_FILE}")
    print(f"   2. Use this file for your training dataset")
    print(f"   3. Combine with human texts for balanced dataset")
    
else:
    print(f"Output file not found: {OUTPUT_FILE}")