In [5]:
import time
notebook_start = time.time()

In [6]:
# Cell 1: Complete Environment Setup
!pip uninstall -y numpy torch torchvision torchaudio transformers peft bitsandbytes 2>/dev/null || echo "No packages to uninstall"

# Clear pip cache
!pip cache purge

Found existing installation: numpy 1.26.4
Uninstalling numpy-1.26.4:
  Successfully uninstalled numpy-1.26.4
Found existing installation: torch 2.2.1+cu121
Uninstalling torch-2.2.1+cu121:
  Successfully uninstalled torch-2.2.1+cu121
Found existing installation: torchvision 0.17.1+cu121
Uninstalling torchvision-0.17.1+cu121:
  Successfully uninstalled torchvision-0.17.1+cu121
Found existing installation: torchaudio 2.2.1+cu121
Uninstalling torchaudio-2.2.1+cu121:
  Successfully uninstalled torchaudio-2.2.1+cu121
Found existing installation: transformers 4.41.2
Uninstalling transformers-4.41.2:
  Successfully uninstalled transformers-4.41.2
Found existing installation: peft 0.10.0
Uninstalling peft-0.10.0:
  Successfully uninstalled peft-0.10.0
Found existing installation: bitsandbytes 0.43.0
Uninstalling bitsandbytes-0.43.0:
  Successfully uninstalled bitsandbytes-0.43.0
Files removed: 84


In [7]:
# Install NumPy FIRST with clean environment
!pip install -q --ignore-installed numpy==1.26.4

# Install PyTorch with CUDA 12.1 (Kaggle's version)
!pip install -q torch==2.2.1 torchvision==0.17.1 torchaudio==2.2.1 --index-url https://download.pytorch.org/whl/cu121

# Install transformer-related packages with compatible versions
!pip install -q transformers==4.41.2 peft==0.10.0 datasets==2.18.0 accelerate==0.29.1
!pip install -q bitsandbytes==0.43.0 einops==0.7.0

# Handle gymnasium separately to avoid conflicts
!pip install -q gymnasium==0.29.0 --no-deps
!pip install -U bitsandbytes  # Ensure latest version
!pip install -U transformers accelerate peft

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m49.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
accelerate 0.29.1 requires torch>=1.10.0, which is not installed.
easyocr 1.7.2 requires torch, which is not installed.
easyocr 1.7.2 requires torchvision>=0.5, which is not installed.
torchmetrics 1.7.1 requires torch>=2.0.0, which is not installed.
pytorch-lightning 2.5.1.post0 requires torch>=2.1.0, which is not installed.
kaggle-environments 1.16.11 requires transformers>=4.33.1, which is not installed.
stable-baselines3 2.1.0 requires torch>=1.13, which is not installed.
sentence-transformers 3.4.1 requires torch>=1.11.0, which is not installed.
s

In [8]:
import sys
import os
import json
import numpy as np
import psutil
import torch
import torch.nn as nn
from typing import Optional, Dict, List, Tuple
from collections import defaultdict
from datasets import Dataset, load_dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    pipeline,
    GenerationConfig,
    BitsAndBytesConfig
)
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from sentence_transformers import util as semantic_util

In [9]:
# Model Loading

MODEL_NAME = "gpt2"  # Default to GPT-2 for Kaggle compatibility

def load_model(model_name: str):
    """Robust model loading with CPU/GPU handling"""
    print(f"\n=== Loading {model_name} ===")
    
    # Configure tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token
    
    # Handle quantization based on GPU availability
    if torch.cuda.is_available():
        print("Configuring for GPU with 4-bit quantization")
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.float16
        )
    else:
        print("Configuring for CPU without quantization")
        bnb_config = None
    
    # Model loading with fallbacks
    try:
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            quantization_config=bnb_config,
            device_map="auto",
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
        )
        print("✅ Model loaded successfully")
        print_memory()
        return model, tokenizer
    except Exception as e:
        print(f"❌ Error loading model: {e}")
        print("Attempting CPU fallback...")
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            device_map="cpu",
            torch_dtype=torch.float32
        )
        print("✅ Model loaded on CPU")
        return model, tokenizer

# Load model and tokenizer
model, tokenizer = load_model(MODEL_NAME)


=== Loading gpt2 ===


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]



config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Configuring for CPU without quantization
❌ Error loading model: Failed to import transformers.models.mega.configuration_mega because of the following error (look up to see its traceback):
No module named 'transformers.models.mega.configuration_mega'
Attempting CPU fallback...


RuntimeError: Failed to import transformers.models.mega.configuration_mega because of the following error (look up to see its traceback):
No module named 'transformers.models.mega.configuration_mega'

In [None]:
# Tokenizer Setup
# =====================
def load_tokenizer(model_name):
    """Load and configure tokenizer"""
    try:
        tokenizer = AutoTokenizer.from_pretrained(
            model_name,
            trust_remote_code=True,
            padding_side="right"
        )
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        print("Tokenizer loaded successfully")
        return tokenizer
    except Exception as e:
        print(f"Tokenizer loading failed: {str(e)}")
        raise

tokenizer = load_tokenizer(MODEL_NAME)

In [None]:
# === Loading Tokenizer ===
tokenizer = AutoTokenizer.from_pretrained("gpt2")

# NEW: Analyze vocabulary coverage
from data_quality import analyze_vocab_coverage
vocab_report = analyze_vocab_coverage(tokenizer)
print(f"Tokenizer vocabulary coverage: {vocab_report['coverage']:.1%}")

In [None]:
# Data Preparation

def analyze_dataset(dataset, tokenizer) -> Dict:
    """Comprehensive dataset quality analysis"""
    # Vocabulary coverage
    vocab = set(tokenizer.get_vocab().keys())
    dataset_tokens = set()
    text_lengths = []
    
    for example in dataset:
        tokens = tokenizer.tokenize(example['text'])
        dataset_tokens.update(tokens)
        text_lengths.append(len(tokens))
    
    coverage = len(dataset_tokens & vocab) / len(vocab)
    
    # Statistical analysis
    length_stats = {
        'mean': np.mean(text_lengths),
        'std': np.std(text_lengths),
        'min': min(text_lengths),
        'max': max(text_lengths),
        'percentiles': np.percentile(text_lengths, [25, 50, 75])
    }
    
    # Topic diversity (simple heuristic)
    topics = defaultdict(int)
    for text in dataset['text']:
        for term in ['blockchain', 'wallet', 'mining', 'crypto', 'token']:
            if term in text.lower():
                topics[term] += 1
    
    return {
        'vocab_coverage': round(coverage, 4),
        'length_stats': length_stats,
        'topic_distribution': dict(topics),
        'total_samples': len(dataset)
    }

def stratified_sample(dataset, stratify_by: str = 'label', n_samples: int = None) -> Dataset:
    """Stratified sampling for small datasets"""
    if n_samples is None:
        n_samples = min(1000, len(dataset))
    
    if stratify_by not in dataset.features:
        return dataset.select(range(n_samples))
    
    from sklearn.model_selection import train_test_split
    import pandas as pd
    
    df = pd.DataFrame(dataset)
    _, sample = train_test_split(
        df,
        train_size=n_samples,
        stratify=df[stratify_by],
        random_state=42
    )
    return Dataset.from_pandas(sample)

def prepare_dataset(file_path: str, max_samples: int = 1000) -> Dataset:
    """Robust dataset preparation with quality checks"""
    try:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"Dataset path not found: {file_path}")
            
        dataset = load_dataset('json', data_files=file_path, split=f'train[:{max_samples}]')
        
        # Standardize text column
        if 'text' not in dataset.features:
            text_cols = [c for c in dataset.features if 'text' in c.lower()]
            dataset = dataset.rename_column(text_cols[0], 'text') if text_cols else dataset
        
        # Quality analysis
        quality_report = analyze_dataset(dataset, tokenizer)
        print("Dataset Quality Report:")
        for k, v in quality_report.items():
            print(f"- {k}: {v}")
            
        return dataset
        
    except Exception as e:
        print(f"Error preparing dataset: {e}")
        print("Creating minimal fallback dataset...")
        return Dataset.from_dict({"text": [
            "Blockchain is a decentralized ledger technology.",
            "Cryptocurrencies use public-key cryptography.",
            "Proof of Work requires computational effort.",
            "Hardware wallets provide secure key storage."
        ]})

In [None]:
# Training Configuration

def suggest_hyperparameters(model, dataset) -> Dict:
    """Auto-suggest training parameters based on model and data"""
    params = {
        'batch_size': max(1, min(8, len(dataset) // 100)),
        'learning_rate': 2e-5,
        'epochs': 1 if len(dataset) < 1000 else 3,
        'grad_accum': max(1, 32 // suggested_batch_size)
    }
    
    # Adjust for model size
    num_params = sum(p.numel() for p in model.parameters())
    if num_params > 1e9:  # Large model
        params['learning_rate'] /= 2
        params['batch_size'] = max(1, params['batch_size'] // 2)
    
    return params

def configure_training(model) -> Tuple:
    """Complete training configuration with LoRA"""
    # LoRA setup
    peft_config = LoraConfig(
        r=16,
        lora_alpha=32,
        target_modules=["q_proj", "v_proj"],
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM"
    )
    
    # Training arguments
    training_args = TrainingArguments(
        output_dir="./results",
        per_device_train_batch_size=1,
        gradient_accumulation_steps=4,
        num_train_epochs=1,
        learning_rate=2e-5,
        fp16=torch.cuda.is_available(),
        logging_steps=10,
        save_strategy="steps",
        save_steps=500,
        report_to="none"
    )
    
    # Prepare model
    model = prepare_model_for_kbit_training(model)
    model = get_peft_model(model, peft_config)
    model.print_trainable_parameters()
    
    return model, training_args

class TrainingMonitor:
    """Real-time training monitoring"""
    def __init__(self):
        self.metrics = defaultdict(list)
        self.start_time = time.time()
        
    def update(self, **kwargs):
        for k, v in kwargs.items():
            self.metrics[k].append(v)
        
    def display_dashboard(self):
        clear_output(wait=True)
        fig, axes = plt.subplots(1, 3, figsize=(15, 4))
        
        # Loss plot
        axes[0].plot(self.metrics.get('loss', []))
        axes[0].set_title("Training Loss")
        
        # Grad norm
        axes[1].plot(self.metrics.get('grad_norm', []))
        axes[1].set_title("Gradient Norm")
        
        # Hardware
        axes[2].bar(['CPU', 'GPU', 'RAM'],
                  [psutil.cpu_percent(),
                   get_gpu_usage(),
                   psutil.virtual_memory().percent])
        axes[2].set_title("Hardware Usage")
        
        plt.tight_layout()
        plt.show()
        
    def should_stop_early(self, patience=3) -> bool:
        """Early stopping check"""
        losses = self.metrics.get('loss', [])
        if len(losses) < patience * 2:
            return False
        return losses[-1] > np.mean(losses[-patience*2:-patience])

In [None]:
# Training Execution
# =====================
def train_model(model, tokenized_dataset, training_args):
    """Execute the training process"""
    # Disable cache if gradient checkpointing is enabled
    if training_args.gradient_checkpointing:
        model.config.use_cache = False
    
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        data_collator=lambda data: {'input_ids': torch.stack([f['input_ids'] for f in data]),
                             'attention_mask': torch.stack([f['attention_mask'] for f in data]),
                             'labels': torch.stack([f['input_ids'] for f in data])}
    )
    
    print("Starting training...")
    print_memory()
    trainer.train()
    print("Training completed!")
    return trainer

def generate_contrastive_examples(example):
    """Generate contrastive examples for training"""
    # Generate negative sample by:
    # 1. Random Q/A from different category
    # 2. GPT-generated incorrect answer
    # 3. Perturbed correct answer
    return {
        'anchor': example['answer'],
        'positive': augment_answer(example['answer']),
        'negative': get_negative_sample(example)
    }

In [None]:
# Model Saving
# =====================
def save_model_artifacts(
    model, 
    tokenizer, 
    training_args: Optional[TrainingArguments] = None, 
    output_dir: str = "/kaggle/working/gpt2-lora-trained"
) -> str:
    """
    Save all model artifacts with comprehensive verification.
    Handles both single-file and sharded model formats.
    """
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    print(f"\n💾 Saving model artifacts to: {output_dir}")
    
    # For LoRA models - DON'T merge adapters before saving
    # We want to save the adapter separately
    print("💽 Saving model and adapter...")
    
    # Save the entire model (base model + adapter)
    model.save_pretrained(
        output_dir,
        safe_serialization=True,
        state_dict=model.state_dict()  # Save the complete state including LoRA
    )
    
    # Save tokenizer
    print("🔤 Saving tokenizer...")
    tokenizer.save_pretrained(output_dir)
    
    # Save training arguments if provided
    if training_args is not None:
        print("📝 Saving training arguments...")
        try:
            args_path = os.path.join(output_dir, "training_args.json")
            if hasattr(training_args, 'to_dict'):
                with open(args_path, "w") as f:
                    json.dump(training_args.to_dict(), f, indent=2)
            elif hasattr(training_args, 'to_json_string'):
                with open(args_path, "w") as f:
                    f.write(training_args.to_json_string())
            else:
                print("⚠️ Warning: TrainingArguments has no serialization method")
        except Exception as e:
            print(f"⚠️ Warning: Failed to save training args - {str(e)}")
    
    # Verify the adapter files were saved
    required_files = ['adapter_config.json', 'adapter_model.safetensors']
    missing_files = []
    for file in required_files:
        if not os.path.exists(os.path.join(output_dir, file)):
            missing_files.append(file)
    
    if missing_files:
        print(f"\n⚠️ Warning: Missing adapter files: {missing_files}")
        print("Trying alternative save method...")
        # Explicitly save the adapter
        model.save_pretrained(
            output_dir,
            safe_serialization=True,
            adapter_only=True  # This ensures adapter files are saved
        )
    
    print("\n🔍 Verifying saved files:")
    for file in os.listdir(output_dir):
        size = os.path.getsize(os.path.join(output_dir, file)) / 1024
        print(f"- {file} ({size:.2f} KB)")
    
    return output_dir

In [None]:
# Model Loading and Testing
# =====================
def load_and_test_model(
    model_path: str = "/kaggle/working/gpt2-lora-trained", 
    max_length: int = 250,
    test_prompts: Optional[list] = None,
    is_peft_model: bool = True
):
    """
    Load and test a saved model with comprehensive error handling
    """
    print(f"\n🔍 Preparing to load model from: {model_path}")
    
    # Verify model directory exists
    if not os.path.exists(model_path):
        raise ValueError(f"Model directory {model_path} does not exist")
    
    # Show directory contents for debugging
    print("\n📂 Model directory contents:")
    for f in sorted(os.listdir(model_path)):
        size = os.path.getsize(os.path.join(model_path, f)) / 1024
        print(f"- {f} ({size:.2f} KB)")
    
    try:
        print("\n🔄 Loading tokenizer...")
        tokenizer = AutoTokenizer.from_pretrained(
            model_path,
            local_files_only=True
        )
        
        print("\n🔄 Loading model...")
        if is_peft_model:
            # First check if we have adapter files
            adapter_files = [
                f for f in os.listdir(model_path) 
                if f.startswith('adapter_') or f == 'adapter_config.json'
            ]
            
            if not adapter_files:
                print("⚠️ No adapter files found. Loading as regular model.")
                model = AutoModelForCausalLM.from_pretrained(
                    model_path,
                    device_map="auto",
                    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                    local_files_only=True
                )
            else:
                print(f"Found adapter files: {adapter_files}")
                # Load base model first
                base_model = AutoModelForCausalLM.from_pretrained(
                    model_path,
                    device_map="auto",
                    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                    local_files_only=True
                )
                
                # Then load the PEFT adapter
                model = PeftModel.from_pretrained(
                    base_model,
                    model_path,
                    local_files_only=True
                )
                
                # Merge and unload for inference
                model = model.merge_and_unload()
        else:
            # For regular models
            model = AutoModelForCausalLM.from_pretrained(
                model_path,
                device_map="auto",
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                local_files_only=True
            )
            
        print("\n🎉 Model loaded successfully!")
        
        # Default test prompts if none provided
        if test_prompts is None:
            test_prompts = [
                "What is hardware wallet?? ",
                "What is Proof of Work (PoW)?? ",
                "What is cryptography?? ",
                "What is Peer-to-Peer (P2P)?? ",
                "What is block chain?? ",
                "What is private key?? "
            ]
        
        # Create pipeline - REMOVED device parameter since we're using device_map="auto"
        print("\n🚀 Creating text generation pipeline...")
        pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
        )
        
        # Run tests
        print("\n🧪 Running generation tests...")
        for i, prompt in enumerate(test_prompts, 1):
            print(f"\n🔹 Test {i}: {prompt}")
            output = pipe(
                prompt,
                max_length=max_length,
                do_sample=True,
                temperature=0.7,
                top_p=0.9,
                num_return_sequences=1,
                repetition_penalty=1.2
            )
            print("💬 Response:", output[0]['generated_text'])
            
        return model, tokenizer
        
    except Exception as e:
        print(f"\n❌ Critical error loading model: {str(e)}")
        print("\n🛠️ Debugging info:")
        print(f"- Path: {os.path.abspath(model_path)}")
        print(f"- Directory exists: {os.path.exists(model_path)}")
        if os.path.exists(model_path):
            print("- Contents:", os.listdir(model_path))
        raise

In [None]:
class EnhancedModelWrapper:
    """Advanced wrapper for constrained generation with technical enforcement"""
    
    def __init__(self, model, tokenizer, knowledge_base: Optional[Dict] = None):
        self.model = model
        self.tokenizer = tokenizer
        self.required_terms = []
        self.complete_sentences = True
        self.technical_terms = knowledge_base or {
            'blockchain': ['decentralized', 'immutable', 'consensus', 'ledger'],
            'wallet': ['private key', 'public key', 'address', 'security'],
            'PoW': ['mining', 'difficulty', 'hash', 'computational'],
            'cryptography': ['encryption', 'signature', 'asymmetric', 'algorithm'],
            'P2P': ['network', 'nodes', 'direct', 'decentralized']
        }
        self.banned_phrases = [
            "I don't know", "as an AI", "I'm not sure",
            "I can't answer", "my training data"
        ]

    def set_constraints(self, 
                      required_terms: List[str] = None,
                      complete_sentences: bool = True,
                      technical_focus: str = None):
        """Configure generation constraints"""
        self.required_terms = required_terms or []
        self.complete_sentences = complete_sentences
        
        if technical_focus:
            self.required_terms.extend(self.technical_terms.get(technical_focus, []))

    def generate(self, 
                prompt: str,
                max_length: int = 200,
                temperature: float = 0.7,
                **kwargs) -> Dict:
        """Generate response with multiple validation layers"""
        
        # Create generation config
        gen_config = GenerationConfig(
            max_length=max_length,
            temperature=temperature,
            do_sample=True,
            top_p=0.9,
            repetition_penalty=1.2,
            pad_token_id=self.tokenizer.eos_token_id,
            **kwargs
        )
        
        # Generate raw output
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
        outputs = self.model.generate(**inputs, generation_config=gen_config)
        raw_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        # Apply processing pipeline
        processed_text = self._process_output(prompt, raw_text)
        
        # Validate and score
        validation = self._validate_response(prompt, processed_text)
        
        return {
            'raw': raw_text,
            'processed': processed_text,
            'validation': validation,
            'prompt_analysis': self._analyze_prompt(prompt)
        }

    def _process_output(self, prompt: str, text: str) -> str:
        """Apply all text processing constraints"""
        # Remove prompt from output
        if text.startswith(prompt):
            text = text[len(prompt):].strip()
        
        # Apply term enforcement
        if self.required_terms:
            text = self._enforce_terms(text)
        
        # Complete sentences
        if self.complete_sentences:
            text = self._complete_sentences(text)
            
        # Remove banned phrases
        for phrase in self.banned_phrases:
            text = text.replace(phrase, "")
            
        return text.strip()

    def _enforce_terms(self, text: str) -> str:
        """Ensure required technical terms are present"""
        missing = [t for t in self.required_terms 
                  if not re.search(rf'\b{re.escape(t)}\b', text, re.IGNORECASE)]
        
        if missing:
            # Try to naturally incorporate missing terms
            additions = []
            for term in missing:
                if term in self.technical_terms:
                    addition = f" {term} is important because {self._explain_term(term)}."
                    additions.append(addition)
            
            text += ''.join(additions) if additions else f"\n\n[Missing terms: {', '.join(missing)}]"
        
        return text

    def _complete_sentences(self, text: str) -> str:
        """Ensure output ends with complete sentence"""
        # Find last sentence boundary
        last_boundary = max(
            text.rfind('.'), 
            text.rfind('!'), 
            text.rfind('?'),
            text.rfind('\n')
        )
        
        if last_boundary > 0 and len(text) - last_boundary < 50:
            text = text[:last_boundary+1]
            
        # If no proper ending, add one
        if text and text[-1] not in {'.', '!', '?'}:
            text += '.' if not text.endswith(',') else '..'
            
        return text

    def _validate_response(self, prompt: str, response: str) -> Dict:
        """Comprehensive quality validation"""
        # Detect topic from prompt
        topic = next((t for t in self.technical_terms 
                     if re.search(rf'\b{t}\b', prompt, re.IGNORECASE)), None)
        
        # Check technical terms
        missing_terms = []
        if topic:
            missing_terms = [t for t in self.technical_terms[topic]
                          if not re.search(rf'\b{re.escape(t)}\b', response, re.IGNORECASE)]
        
        # Check for hallucinations
        hallucinations = any(
            phrase.lower() in response.lower() 
            for phrase in self.banned_phrases
        )
        
        # Calculate scores
        tech_score = 1 - (len(missing_terms) / len(self.technical_terms.get(topic, ['']))
        clarity_score = min(1, len(response.split()) / 50)  # Normalize to 0-1
        
        return {
            'technical_score': tech_score,
            'clarity_score': clarity_score,
            'missing_terms': missing_terms,
            'has_hallucinations': hallucinations,
            'is_complete': response[-1] in {'.', '!', '?'}
        }

    def _analyze_prompt(self, prompt: str) -> Dict:
        """Evaluate prompt quality"""
        return {
            'length': len(prompt.split()),
            'has_question': '?' in prompt,
            'technical_focus': any(
                term in prompt.lower() 
                for term in self.technical_terms
            ),
            'specificity': len(set(prompt.split())) / len(prompt.split())  # Unique words ratio
        }

    def _explain_term(self, term: str) -> str:
        """Generate simple explanations for technical terms"""
        explanations = {
            'blockchain': "it enables secure decentralized record-keeping",
            'private key': "it provides secure access to cryptocurrency funds",
            'mining': "it secures the network through computational work",
            'encryption': "it protects data through mathematical algorithms"
        }
        return explanations.get(term, f"it's a fundamental concept in cryptocurrency")

In [None]:
# Enhanced Generation
# =====================
CRYPTO_GENERATION_CONFIG = GenerationConfig(
    max_new_tokens=150,
    no_repeat_ngram_size=4,
    do_sample=True,
    temperature=0.7,
    top_p=0.9,
    top_k=40,
    repetition_penalty=1.15,
    num_beams=3,
    early_stopping=True
)

def generate_with_validation(model, tokenizer, prompt, max_length=200):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    
    # First pass generation
    outputs = model.generate(
        **inputs,
        max_length=max_length,
        generation_config=CRYPTO_GENERATION_CONFIG
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    # Validation checks
    validation_passed = True
    validation_notes = []
    
    # 1. Technical term check
    last_term = model.get_last_term(prompt)
    if last_term in model.technical_terms:
        missing = [t for t in model.technical_terms[last_term] 
                  if t.lower() not in response.lower()]
        if missing:
            validation_passed = False
            validation_notes.append(f"Missing technical terms: {missing}")
    
    # 2. Hallucination check
    if any(phrase in response for phrase in model.banned_sequences):
        validation_passed = False
        validation_notes.append("Potential hallucination")
    
    # Generate final output
    if not validation_passed:
        print(f"⚠️ Validation issues: {validation_notes}")
        outputs = model.generate(
            **inputs,
            max_length=max_length,
            generation_config=CRYPTO_GENERATION_CONFIG,
            bad_words_ids=[[tid] for tid in tokenizer.encode(" ".join(model.banned_sequences), add_special_tokens=False)]
        )
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    return {
        'response': response,
        'validation_passed': validation_passed,
        'validation_notes': validation_notes
    }

In [None]:
def main():
    # Configuration
    DATASET_PATH = "/kaggle/input/database2"
    OUTPUT_DIR = "/kaggle/working/output"
    
    try:
        # 1. Data Preparation
        print("\n=== Preparing Dataset ===")
        def prepare_dataset():
            try:
                dataset = load_dataset('json', data_files=DATASET_PATH)['train']
                print(f"Loaded {len(dataset)} samples")
                
                # Simple quality check
                if len(dataset) < 10:
                    raise ValueError("Dataset too small, using fallback")
                    
                return dataset
            except Exception as e:
                print(f"Error loading dataset: {e}")
                return Dataset.from_dict({
                    "text": [
                        "Blockchain is a decentralized ledger technology.",
                        "Cryptocurrencies use cryptographic keys for security.",
                        "Proof of Work requires computational resources."
                    ]
                })
        
        dataset = prepare_dataset()
        
        # Tokenization
        def tokenize(examples):
            return tokenizer(
                examples["text"],
                truncation=True,
                max_length=128,
                padding="max_length"
            )
            
        tokenized_dataset = dataset.map(tokenize, batched=True)
        tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask"])
        
        # 2. Training Configuration
        print("\n=== Configuring Training ===")
        def configure_training(model):
            # LoRA configuration
            peft_config = LoraConfig(
                r=8,
                lora_alpha=16,
                target_modules=["c_attn", "c_proj", "c_fc"],
                lora_dropout=0.05,
                bias="none",
                task_type="CAUSAL_LM"
            )
            
            # Training arguments
            training_args = TrainingArguments(
                output_dir=OUTPUT_DIR,
                per_device_train_batch_size=2,
                gradient_accumulation_steps=4,
                num_train_epochs=1,
                learning_rate=2e-5,
                fp16=torch.cuda.is_available(),
                save_strategy="steps",
                save_steps=500,
                report_to="none"
            )
            
            model = get_peft_model(model, peft_config)
            model.print_trainable_parameters()
            return model, training_args
        
        model, training_args = configure_training(model)
        
        # 3. Training Execution
        print("\n=== Starting Training ===")
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=tokenized_dataset,
            data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
        )
        
        trainer.train()
        
        # 4. Saving Model
        print("\n=== Saving Model ===")
        trainer.save_model(OUTPUT_DIR)
        print(f"Model saved to {OUTPUT_DIR}")
        
        # 5. Testing
        print("\n=== Testing Model ===")
        test_prompts = [
            "Explain blockchain in simple terms:",
            "What is the difference between hardware and software wallets?"
        ]
        
        for prompt in test_prompts:
            inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
            outputs = model.generate(**inputs, max_length=100)
            print(f"\nPrompt: {prompt}")
            print("Response:", tokenizer.decode(outputs[0], skip_special_tokens=True))
            
    except Exception as e:
        print(f"\n❌ Error in training: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
notebook_end = time.time()
print(f"Total notebook execution time: {notebook_end - notebook_start:.2f} seconds")