In [1]:
import io
import os
import shutil

import pandas as pd
import polars as pl

import kaggle_evaluation.konwinski_prize_inference_server

# import warnings
# warnings.filterwarnings('ignore')

# # For specific transformers warnings
# from transformers import logging
# logging.set_verbosity_error()

# # For specific PyTorch warnings
# import torch
# torch.set_warn_always(False)

# # For deprecated numpy warnings
# import numpy as np
# np.seterr(all="ignore")

# # Ignore specific warning types
# warnings.filterwarnings('ignore', category=UserWarning)
# warnings.filterwarnings('ignore', category=FutureWarning)
# warnings.filterwarnings('ignore', category=DeprecationWarning)

# # Or ignore warnings from specific modules
# warnings.filterwarnings('ignore', module='transformers.*')
# warnings.filterwarnings('ignore', module='torch.*')

The evaluation API requires that you set up a server which will respond to inference requests. We have already defined the server; you just need write the predict function. When we evaluate your submission on the hidden test set the client defined in `konwinski_prize_gateway` will run in a different container with direct access to the hidden test set and hand off the data.

Your code will always have access to the published copies of the files.

In [2]:
# import tempfile
# from pathlib import Path
# from typing import Optional

# class GitHubIssueSolver:
#     def __init__(self):
#         self.setup_done = False
        
#     def setup_model(self):
#         if not self.setup_done:
#             # Initialize model here
#             self.setup_done = True
    
#     def analyze_codebase(self, repo_path: str) -> dict:
#         relevant_files = []
#         for root, _, files in os.walk(repo_path):
#             for file in files:
#                 if file.endswith(('.py', '.java', '.js', '.cpp', '.h')):
#                     relevant_files.append(os.path.join(root, file))
#         return {'relevant_files': relevant_files}

#     def generate_patch(self, problem_statement: str, repo_analysis: dict) -> Optional[str]:
#         try:
#             # TODO: Implement patch generation logic
#             return None
#         except Exception as e:
#             print(f"Error generating patch: {e}")
#             return None

# solver = GitHubIssueSolver()

In [3]:
# instance_count = None

# def get_number_of_instances(num_instances: int) -> None:
#     """ The very first message from the gateway will be the total number of instances to be served.
#     You don't need to edit this function.
#     """
#     global instance_count
#     instance_count = num_instances

In [4]:
# first_prediction = True


# def predict(problem_statement: str, repo_archive: io.BytesIO) -> str:
#     """ Replace this function with your inference code.
#     Args:
#         problem_statement: The text of the git issue.
#         repo_path: A BytesIO buffer path with a .tar containing the codebase that must be patched. The gateway will make this directory available immediately before this function runs.
#     """
#     global first_prediction
#     if not first_prediction:
#         return None  # Skip issue.

#     # Unpack
#     with open('repo_archive.tar', 'wb') as f:
#         f.write(repo_archive.read())
#     repo_path = 'repo'
#     if os.path.exists(repo_path):
#         shutil.rmtree(repo_path)
#     shutil.unpack_archive('repo_archive.tar', extract_dir=repo_path)
#     os.remove('repo_archive.tar')
#     first_prediction = False
#     # Instead of a valid diff, let's just submit a generic string. This will definitely fail.
#     return "Hello World"

When your notebook is run on the hidden test set, inference_server.serve must be called within 15 minutes of the notebook starting or the gateway will throw an error. If you need more than 15 minutes to load your model you can do so during the very first predict call, which does not have the usual 30 minute response deadline.

In [5]:
# inference_server = kaggle_evaluation.konwinski_prize_inference_server.KPrizeInferenceServer(
#     get_number_of_instances,   
#     predict
# )

# if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
#     inference_server.serve()
# else:
#     inference_server.run_local_gateway(
#         data_paths=(
#             '/kaggle/input/konwinski-prize/',  # Path to the entire competition dataset
#             '/kaggle/tmp/konwinski-prize/',   # Path to a scratch directory for unpacking data.a_zip.
#         )
#     )

## Process and get the length of the Github Dataset

In [6]:
import zipfile
import pandas as pd
from typing import Dict, List
import re
import os

# First unzip the data
def unzip_data():
    """Unzip the data.a_zip file"""
    try:
        with zipfile.ZipFile('/kaggle/input/konwinski-prize/data.a_zip', 'r') as zip_ref:
            zip_ref.extractall('/kaggle/working/data')
        print("Data successfully unzipped!")
    except Exception as e:
        print(f"Error unzipping data: {e}")

class DataProcessor:
    def __init__(self, df: pd.DataFrame):
        self.df = df
        
    def extract_code_snippets(self, problem_statement: str) -> List[str]:
        """Extract code snippets between triple backticks"""
        try:
            code_pattern = r"```(?:py|python)?\n(.*?)\n```"
            return re.findall(code_pattern, str(problem_statement), re.DOTALL)
        except Exception as e:
            print(f"Error extracting code snippets: {e}")
            return []
    
    def parse_patch(self, patch: str) -> Dict:
        """Parse git diff to understand file changes"""
        try:
            files_changed = []
            for line in str(patch).split('\n'):
                if line.startswith('diff --git'):
                    # Extract file name: diff --git a/file.py b/file.py
                    files_changed.append(line.split()[-1][2:])
            return {
                'files_changed': files_changed,
                'raw_patch': patch
            }
        except Exception as e:
            print(f"Error parsing patch: {e}")
            return {'files_changed': [], 'raw_patch': ''}
    
    def process_single_issue(self, row) -> Dict:
        """Process a single GitHub issue"""
        try:
            # Safely convert PASS_TO_PASS and FAIL_TO_PASS
            try:
                pass_to_pass = eval(str(row['PASS_TO_PASS']).replace('\x00', ''))
            except:
                pass_to_pass = []
                
            try:
                fail_to_pass = eval(str(row['FAIL_TO_PASS']).replace('\x00', ''))
            except:
                fail_to_pass = []
                
            return {
                'id': str(row['instance_id']),
                'repo': str(row['repo']),
                'problem': {
                    'description': str(row['problem_statement']),
                    'code_snippets': self.extract_code_snippets(str(row['problem_statement'])),
                    'error_type': self.extract_error_type(str(row['problem_statement']))
                },
                'solution': self.parse_patch(str(row['patch'])),
                'tests': {
                    'pass_to_pass': pass_to_pass,
                    'fail_to_pass': fail_to_pass
                }
            }
        except Exception as e:
            print(f"Error processing single issue: {e}")
            return {}
    
    def extract_error_type(self, problem_statement: str) -> str:
        """Extract the type of error from problem statement"""
        try:
            error_pattern = r"([A-Za-z]+Error:)"
            matches = re.findall(error_pattern, str(problem_statement))
            return matches[0] if matches else "Unknown"
        except Exception as e:
            print(f"Error extracting error type: {e}")
            return "Unknown"
    
    def process_all_data(self) -> List[Dict]:
        """Process all issues in the dataset"""
        processed_data = []
        for _, row in self.df.iterrows():
            try:
                processed_issue = self.process_single_issue(row)
                if processed_issue:  # Only add if not empty
                    processed_data.append(processed_issue)
            except Exception as e:
                print(f"Error processing row: {e}")
                continue
        return processed_data

def load_and_process_data():
    """Main function to load and process the data"""
    try:
        # First unzip the data
        unzip_data()
        
        # Read the parquet file with handling for null bytes
        df = pd.read_parquet('/kaggle/working/data/data/data.parquet')
        
        # Clean null bytes from string columns
        string_columns = df.select_dtypes(include=['object']).columns
        for col in string_columns:
            df[col] = df[col].astype(str).str.replace('\x00', '')
        
        # Create processor instance
        processor = DataProcessor(df)
        
        # Process all data
        processed_data = processor.process_all_data()
        
        if processed_data:
            print("\nSample processed issue:")
            #print(processed_data[0])
            
            # Some basic statistics
            print("\nDataset Statistics:")
            error_types = [issue['problem']['error_type'] for issue in processed_data]
            print(f"Error types found: {set(error_types)}")
            
            # Print number of processed issues
            print(f"\nTotal issues processed: {len(processed_data)}")
        
        return processed_data
    
    except Exception as e:
        print(f"Error in data processing: {e}")
        return None

# Run the processing
if __name__ == "__main__":
    processed_data = load_and_process_data()

Data successfully unzipped!

Sample processed issue:

Dataset Statistics:
Error types found: {'Unknown', 'UnitsError:', 'TypeError:', 'ValueError:', 'InputParameterError:', 'KeyError:'}

Total issues processed: 6


## Download peft and bits and bytes

In [7]:
!pip install peft
!pip install bitsandbytes

In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

  pid, fd = os.forkpty()


Collecting peft
  Downloading peft-0.14.0-py3-none-any.whl.metadata (13 kB)
Downloading peft-0.14.0-py3-none-any.whl (374 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.8/374.8 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: peft
Successfully installed peft-0.14.0
Collecting bitsandbytes
  Downloading bitsandbytes-0.45.0-py3-none-manylinux_2_24_x86_64.whl.metadata (2.9 kB)
Downloading bitsandbytes-0.45.0-py3-none-manylinux_2_24_x86_64.whl (69.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.1/69.1 MB[0m [31m24.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: bitsandbytes
Successfully installed bitsandbytes-0.45.0


In [8]:
len(processed_data)

6

In [9]:
for i in range(len(processed_data)):

    keys = list(processed_data[i].keys())
    print(processed_data[i])
    print('--------------------------- \n')
    break

--------------------------- 



## Create data cleaning functions and import transformer libraries

In [10]:
from sklearn.model_selection import train_test_split

import numpy as np
from typing import Dict, List
import re
import os
import json
from tqdm import tqdm

# PyTorch and related libraries
import torch
from torch.cuda import is_available
from torch.utils.data import Dataset as TorchDataset

# Hugging Face libraries
from transformers import (
    AutoTokenizer, 
    AutoModelForCausalLM, 
    TrainingArguments, 
    Trainer, 
    DataCollatorForLanguageModeling,
    AutoConfig,
    BitsAndBytesConfig
)
from datasets import Dataset
from peft import (
    LoraConfig,
    get_peft_model,
    prepare_model_for_kbit_training,
    TaskType
)

def extract_error_message(description):
    """Extract just the error message from the description"""
    # Look for common error message patterns
    patterns = [
        r'Error:.*?\n',
        r'Exception:.*?\n',
        r'Traceback.*?\n.*?\n',
    ]
    
    for pattern in patterns:
        match = re.search(pattern, description, re.DOTALL)
        if match:
            return match.group(0).strip()
    return description.split('\n')[0]

def extract_code_changes(patch):
    """Extract actual code changes from git patch format"""
    lines = patch.split('\n')
    code_lines = [line[1:] for line in lines if line.startswith('+') and not line.startswith('+++')]
    return '\n'.join(code_lines)

def clean_code_snippet(code):
    """Remove comments, docstrings, and normalize whitespace"""
    # Remove markdown code blocks if present
    code = re.sub(r'```.*?\n', '', code)
    code = re.sub(r'```', '', code)
    
    # Remove comments and normalize whitespace
    lines = [line.split('#')[0].strip() for line in code.split('\n')]
    return '\n'.join(line for line in lines if line)

def split_dataset(processed_data, test_size=0.2):
    """Split the dataset into train and test sets"""
    train_data, test_data = train_test_split(processed_data, test_size=test_size, random_state=42)
    print(f"Train set size: {len(train_data)}")
    print(f"Test set size: {len(test_data)}")
    return train_data, test_data

In [11]:
# def process_dataset(processed_data):
#     """Process the dataset for training"""
#     print("\nPreparing training data...")
#     training_data = []
#     error_types = set()
    
#     for issue in tqdm(processed_data, desc="Processing issues"):
#         if not issue['problem']['code_snippets'] or not issue['problem']['description']:
#             continue
            
#         # Extract just the actual code changes from the patch
#         patch = issue['solution']['raw_patch']
#         code_changes = extract_code_changes(patch)
        
#         if not code_changes:
#             continue
            
#         error_types.add(issue['problem']['error_type'])
#         training_data.append({
#             'code': clean_code_snippet(issue['problem']['code_snippets'][0]),
#             'error': extract_error_message(issue['problem']['description']),
#             'error_type': issue['problem']['error_type'],
#             'solution': code_changes
#         })
    
#     print(f"\nCollected {len(training_data)} training examples")
#     print("Error types found:", sorted(list(error_types)))
    
#     return training_data

In [12]:
def process_dataset(processed_data):
    """Process the dataset for training"""
    print("\nPreparing training data...")
    training_data = []
    error_types = set()
    
    for issue in tqdm(processed_data, desc="Processing issues"):
        # Use empty string if code snippets are missing
        code_snippet = clean_code_snippet(issue['problem']['code_snippets'][0]) if issue['problem']['code_snippets'] else ""
        
        # Extract just the actual code changes from the patch
        patch = issue['solution']['raw_patch']
        code_changes = extract_code_changes(patch) if patch else ""
        
        error_types.add(issue['problem']['error_type'])
        training_data.append({
            'code': code_snippet,
            'error': extract_error_message(issue['problem']['description']),
            'error_type': issue['problem']['error_type'],
            'solution': code_changes
        })
    
    print(f"\nCollected {len(training_data)} training examples")
    print("Error types found:", sorted(list(error_types)))
    
    return training_data

In [13]:
# import torch
# from transformers import (
#     AutoConfig,
#     AutoTokenizer,
#     AutoModelForCausalLM,
#     BitsAndBytesConfig,
#     TrainingArguments,
#     Trainer,
#     DataCollatorForLanguageModeling,
# )
# from peft import LoraConfig, TaskType, get_peft_model, prepare_model_for_kbit_training
# from datasets import Dataset
# from tqdm import tqdm
# import gc

# class ErrorSolver:
#     def __init__(self, data):
#         """
#         Initialize the ErrorSolver with a list of dictionaries containing repo, problem, and solution data.
#         """
#         self.data = data  # List of dictionaries
#         self.model_path = "/kaggle/input/qwen2.5/transformers/0.5b/1"
#         print("Loading tokenizer and model (this may take a few minutes)...")
        
#         try:
#             # Load model configuration
#             config = AutoConfig.from_pretrained(
#                 self.model_path,
#                 trust_remote_code=True,
#                 local_files_only=True
#             )
            
#             # Load tokenizer
#             self.tokenizer = AutoTokenizer.from_pretrained(
#                 self.model_path,
#                 local_files_only=True,
#                 use_fast=True,
#                 trust_remote_code=True
#             )
#             print("Tokenizer loaded successfully!")
            
#             # Configure quantization for efficient inference
#             quantization_config = BitsAndBytesConfig(
#                 load_in_4bit=True,
#                 bnb_4bit_compute_dtype=torch.float16,
#                 bnb_4bit_use_double_quant=True,
#                 bnb_4bit_quant_type='nf4'
#             )
            
#             # Load pre-trained model
#             self.model = AutoModelForCausalLM.from_pretrained(
#                 self.model_path,
#                 config=config,
#                 device_map="auto",
#                 local_files_only=True,
#                 trust_remote_code=True,
#                 use_safetensors=True,
#                 torch_dtype=torch.float16,
#                 quantization_config=quantization_config,
#                 low_cpu_mem_usage=True,
#                 offload_folder="offload",
#                 offload_state_dict=True,
#                 max_memory={0: "12GB", "cpu": "24GB"}
#             )
            
#             # Enable gradient checkpointing and prepare for training
#             self.model.gradient_checkpointing_enable()
#             self.model = prepare_model_for_kbit_training(self.model)
            
#             # Apply Parameter-Efficient Fine-Tuning (PEFT) using LoRA
#             peft_config = LoraConfig(
#                 task_type=TaskType.CAUSAL_LM,
#                 inference_mode=False,
#                 r=8,
#                 lora_alpha=32,
#                 lora_dropout=0.1,
#                 target_modules=[
#                     "self_attn.q_proj", 
#                     "self_attn.k_proj", 
#                     "self_attn.v_proj", 
#                     "self_attn.o_proj",
#                     "mlp.gate_proj", 
#                     "mlp.up_proj", 
#                     "mlp.down_proj"
#                 ]
#             )
            
#             self.model = get_peft_model(self.model, peft_config)
#             self.model.print_trainable_parameters()
            
#             print("Model loaded successfully with PEFT!")
            
#             # Clear GPU memory if available
#             if torch.cuda.is_available():
#                 torch.cuda.empty_cache()
#                 print(f"GPU memory after loading: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
            
#         except Exception as e:
#             print(f"Error during initialization: {str(e)}")
#             raise e

#     def get_solution(self, repo, problem_description, error_type):
#         """
#         Retrieve the solution for a given repo, problem description, and error type.
        
#         Args:
#             repo (str): The repository name.
#             problem_description (str): The problem description.
#             error_type (str): The type of error.
        
#         Returns:
#             dict: The solution if found, otherwise None.
#         """
#         for entry in self.data:
#             if (
#                 entry['repo'] == repo and
#                 entry['problem']['description'] == problem_description and
#                 entry['problem']['error_type'] == error_type
#             ):
#                 return entry['solution']
#         return None  # Return None if no matching entry is found

#     def create_prompt(self, code_snippet, error_message, error_type):
#         """
#         Generate a prompt for the model to fix the code.
        
#         Args:
#             code_snippet (str): The code causing the error.
#             error_message (str): The error message.
#             error_type (str): The type of error.
        
#         Returns:
#             str: A formatted prompt.
#         """
#         return f"""Task: Fix the Python code that's raising a {error_type}.\n\nCode:\n{code_snippet}\n\nError:\n{error_message}\n\nSolution:"""

#     def fine_tune(self, train_data, eval_data=None):
#         """
#         Fine-tune the model on GitHub issue data.
        
#         Args:
#             train_data (list): List of training examples.
#             eval_data (list): List of evaluation examples (optional).
#         """
#         # Define training arguments
#         training_args = TrainingArguments(
#             output_dir="/kaggle/working/",
#             num_train_epochs=50,
#             per_device_train_batch_size=4,
#             gradient_accumulation_steps=4,
#             warmup_ratio=0.1,
#             learning_rate=1e-5,
#             logging_dir="./logs",
#             fp16=True,
#             logging_steps=1,
#             save_strategy="epoch",
#             evaluation_strategy="epoch" if eval_data else "no",
#             load_best_model_at_end=True if eval_data else False,
#             metric_for_best_model="loss",
#             greater_is_better=False,
#             remove_unused_columns=True,
#             report_to="none"  # Disable W&B logging
#         )
        
#         # Prepare training dataset
#         train_dataset = Dataset.from_dict({
#             'code': [example['code'] for example in train_data],
#             'error': [example['error'] for example in train_data],
#             'error_type': [example['error_type'] for example in train_data],
#             'solution': [example['solution'] for example in train_data]
#         })
        
#         # Prepare evaluation dataset (if provided)
#         eval_dataset = None
#         if eval_data:
#             eval_dataset = Dataset.from_dict({
#                 'code': [example['code'] for example in eval_data],
#                 'error': [example['error'] for example in eval_data],
#                 'error_type': [example['error_type'] for example in eval_data],
#                 'solution': [example['solution'] for example in eval_data]
#             })
        
#         # Preprocess data
#         def preprocess_data(examples):
#             prompts = [
#                 self.create_prompt(code, error, error_type)
#                 for code, error, error_type in zip(
#                     examples['code'],
#                     examples['error'],
#                     examples['error_type']
#                 )
#             ]
#             return self.tokenizer(
#                 prompts,
#                 padding=True,
#                 truncation=True,
#                 max_length=512
#             )
        
#         print(f"Preparing dataset with {len(train_data)} examples...")
#         train_dataset = train_dataset.map(
#             preprocess_data,
#             batched=True,
#             remove_columns=train_dataset.column_names,
#             desc="Preprocessing dataset"
#         )
        
#         if eval_dataset:
#             eval_dataset = eval_dataset.map(
#                 preprocess_data,
#                 batched=True,
#                 remove_columns=eval_dataset.column_names,
#                 desc="Preprocessing eval dataset"
#             )
        
#         # Define data collator
#         data_collator = DataCollatorForLanguageModeling(
#             tokenizer=self.tokenizer,
#             mlm=False
#         )
        
#         # Initialize Trainer
#         trainer = Trainer(
#             model=self.model,
#             args=training_args,
#             train_dataset=train_dataset,
#             eval_dataset=eval_dataset,
#             data_collator=data_collator
#         )
        
#         try:
#             print("Starting training...")
#             trainer.train()
            
#             print("Saving model...")
#             self.model.save_pretrained("/kaggle/working/")
#             self.tokenizer.save_pretrained("/kaggle/working/")
#             print("Training completed successfully!")
            
#             # Clear memory
#             gc.collect()
#             if torch.cuda.is_available():
#                 torch.cuda.empty_cache()
        
#         except Exception as e:
#             print(f"Error during training: {str(e)}")
#             raise e

#     def evaluate(self, test_data):
#         """
#         Evaluate the fine-tuned model on the test set.
        
#         Args:
#             test_data (list): List of test examples.
        
#         Returns:
#             float: Test accuracy.
#         """
#         self.model.eval()
#         correct = 0
#         total = 0
        
#         for example in tqdm(test_data, desc="Evaluating"):
#             prompt = self.create_prompt(example['code'], example['error'], example['error_type'])
#             inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
            
#             outputs = self.model.generate(
#                 inputs.input_ids,
#                 max_new_tokens=512,
#                 temperature=0.7,
#                 top_p=0.9,
#                 do_sample=True,
#                 num_return_sequences=1,
#                 pad_token_id=self.tokenizer.eos_token_id,
#                 repetition_penalty=1.2
#             )
            
#             generated_solution = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            
#             actual_solution = example['solution']
            
#             if generated_solution.strip() == actual_solution.strip():
                
#                 correct += 1
#             total += 1
        
#         accuracy = correct / total
#         print(f"Test Accuracy: {accuracy * 100:.2f}%")
#         return accuracy, generated_solution

In [14]:
# import torch
# from transformers import (
#     AutoConfig,
#     AutoTokenizer,
#     AutoModelForCausalLM,
#     BitsAndBytesConfig,
#     TrainingArguments,
#     Trainer,
#     DataCollatorForLanguageModeling,
# )
# from peft import LoraConfig, TaskType, get_peft_model, prepare_model_for_kbit_training
# from datasets import Dataset
# from tqdm import tqdm
# import gc

# class ErrorSolver:
#     def __init__(self, data):
#         """
#         Initialize the ErrorSolver with a list of dictionaries containing repo, problem, and solution data.
#         """
#         self.data = data  # List of dictionaries
#         self.model_path = "/kaggle/input/qwen2.5/transformers/0.5b/1"
#         print("Loading tokenizer and model (this may take a few minutes)...")
        
#         try:
#             # Load model configuration
#             config = AutoConfig.from_pretrained(
#                 self.model_path,
#                 trust_remote_code=True,
#                 local_files_only=True
#             )
            
#             # Load tokenizer
#             self.tokenizer = AutoTokenizer.from_pretrained(
#                 self.model_path,
#                 local_files_only=True,
#                 use_fast=True,
#                 trust_remote_code=True
#             )
#             print("Tokenizer loaded successfully!")
            
#             # Configure quantization for efficient inference
#             quantization_config = BitsAndBytesConfig(
#                 load_in_4bit=True,
#                 bnb_4bit_compute_dtype=torch.float16,
#                 bnb_4bit_use_double_quant=True,
#                 bnb_4bit_quant_type='nf4'
#             )
            
#             # Load pre-trained model
#             self.model = AutoModelForCausalLM.from_pretrained(
#                 self.model_path,
#                 config=config,
#                 device_map="auto",
#                 local_files_only=True,
#                 trust_remote_code=True,
#                 use_safetensors=True,
#                 torch_dtype=torch.float16,
#                 quantization_config=quantization_config,
#                 low_cpu_mem_usage=True,
#                 offload_folder="offload",
#                 offload_state_dict=True,
#                 max_memory={0: "12GB", "cpu": "24GB"}
#             )
            
#             # Enable gradient checkpointing and prepare for training
#             self.model.gradient_checkpointing_enable()
#             self.model = prepare_model_for_kbit_training(self.model)
            
#             # Apply Parameter-Efficient Fine-Tuning (PEFT) using LoRA
#             peft_config = LoraConfig(
#                 task_type=TaskType.CAUSAL_LM,
#                 inference_mode=False,
#                 r=8,
#                 lora_alpha=32,
#                 lora_dropout=0.1,
#                 target_modules=[
#                     "self_attn.q_proj", 
#                     "self_attn.k_proj", 
#                     "self_attn.v_proj", 
#                     "self_attn.o_proj",
#                     "mlp.gate_proj", 
#                     "mlp.up_proj", 
#                     "mlp.down_proj"
#                 ]
#             )
            
#             self.model = get_peft_model(self.model, peft_config)
#             self.model.print_trainable_parameters()
            
#             print("Model loaded successfully with PEFT!")
            
#             # Clear GPU memory if available
#             if torch.cuda.is_available():
#                 torch.cuda.empty_cache()
#                 print(f"GPU memory after loading: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
            
#         except Exception as e:
#             print(f"Error during initialization: {str(e)}")
#             raise e

#     def get_solution(self, repo, problem_description, error_type):
#         """
#         Retrieve the solution for a given repo, problem description, and error type.
#         """
#         for entry in self.data:
#             if (
#                 entry['repo'] == repo and
#                 entry['problem']['description'] == problem_description and
#                 entry['problem']['error_type'] == error_type
#             ):
#                 return entry['solution']
#         return None

#     def create_prompt(self, code_snippet, error_message, error_type):
#         """
#         Generate a prompt for the model to fix the code.
#         """
#         return f"""Fix the following Python code that's raising a {error_type}.
# Original code:
# {code_snippet}

# Error message:
# {error_message}

# Provide ONLY the fixed code without any explanations or markdown formatting:
# """

#     def fine_tune(self, train_data, eval_data=None):
#         """
#         Fine-tune the model on GitHub issue data.
#         """
#         # Define training arguments
#         training_args = TrainingArguments(
#             output_dir="/kaggle/working/",
#             num_train_epochs=500,
#             per_device_train_batch_size=4,
#             gradient_accumulation_steps=4,
#             warmup_ratio=0.1,
#             learning_rate=1e-5,
#             logging_dir="./logs",
#             fp16=True,
#             logging_steps=1,
#             save_strategy="epoch",
#             save_total_limit=1,        # Only keep the most recent checkpoint
#             evaluation_strategy="epoch" if eval_data else "no",
#             load_best_model_at_end=True if eval_data else False,
#             metric_for_best_model="loss",
#             greater_is_better=False,
#             remove_unused_columns=True,
#             report_to="none"  # Disable W&B logging
#         )
        
#         # Prepare training dataset
#         train_dataset = Dataset.from_dict({
#             'code': [example['code'] for example in train_data],
#             'error': [example['error'] for example in train_data],
#             'error_type': [example['error_type'] for example in train_data],
#             'solution': [example['solution'] for example in train_data]
#         })
        
#         # Prepare evaluation dataset (if provided)
#         eval_dataset = None
#         if eval_data:
#             eval_dataset = Dataset.from_dict({
#                 'code': [example['code'] for example in eval_data],
#                 'error': [example['error'] for example in eval_data],
#                 'error_type': [example['error_type'] for example in eval_data],
#                 'solution': [example['solution'] for example in eval_data]
#             })
        
#         # Preprocess data
#         def preprocess_data(examples):
#             prompts = [
#                 self.create_prompt(code, error, error_type)
#                 for code, error, error_type in zip(
#                     examples['code'],
#                     examples['error'],
#                     examples['error_type']
#                 )
#             ]
#             return self.tokenizer(
#                 prompts,
#                 padding=True,
#                 truncation=True,
#                 max_length=512
#             )
        
#         print(f"Preparing dataset with {len(train_data)} examples...")
#         train_dataset = train_dataset.map(
#             preprocess_data,
#             batched=True,
#             remove_columns=train_dataset.column_names,
#             desc="Preprocessing dataset"
#         )
        
#         if eval_dataset:
#             eval_dataset = eval_dataset.map(
#                 preprocess_data,
#                 batched=True,
#                 remove_columns=eval_dataset.column_names,
#                 desc="Preprocessing eval dataset"
#             )
        
#         # Define data collator
#         data_collator = DataCollatorForLanguageModeling(
#             tokenizer=self.tokenizer,
#             mlm=False
#         )
        
#         # Initialize Trainer
#         trainer = Trainer(
#             model=self.model,
#             args=training_args,
#             train_dataset=train_dataset,
#             eval_dataset=eval_dataset,
#             data_collator=data_collator
#         )
        
#         try:
#             print("Starting training...")
#             trainer.train()
            
#             print("Saving model...")
#             self.model.save_pretrained("/kaggle/working/")
#             self.tokenizer.save_pretrained("/kaggle/working/")
#             print("Training completed successfully!")
            
#             # Clear memory
#             gc.collect()
#             if torch.cuda.is_available():
#                 torch.cuda.empty_cache()
        
#         except Exception as e:
#             print(f"Error during training: {str(e)}")
#             raise e

#     def evaluate(self, test_data):
#         """
#         Evaluate the fine-tuned model on the test set.
#         """
#         def extract_solution(generated_text: str) -> str:
#             """Extract the actual solution code from the generated text"""
#             try:
#                 # Remove any markdown code blocks
#                 code = generated_text.replace("```python", "").replace("```", "")
                
#                 # Remove explanatory text
#                 if "Fixed code:" in code:
#                     code = code.split("Fixed code:", 1)[1]
#                 elif "Solution:" in code:
#                     code = code.split("Solution:", 1)[1]
                    
#                 # Clean up the code
#                 code = code.strip()
                
#                 # Remove any explanatory text after the code
#                 if "Explanation:" in code:
#                     code = code.split("Explanation:", 1)[0]
                    
#                 return code
#             except Exception as e:
#                 print(f"Error extracting solution: {e}")
#                 return ""
                
#         self.model.eval()
#         correct = 0
#         total = 0
        
#         for example in tqdm(test_data, desc="Evaluating"):
#             try:
#                 prompt = self.create_prompt(example['code'], example['error'], example['error_type'])
#                 inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
                
#                 outputs = self.model.generate(
#                     inputs.input_ids,
#                     max_new_tokens=512,
#                     temperature=0.7,
#                     top_p=0.9,
#                     do_sample=True,
#                     num_return_sequences=1,
#                     pad_token_id=self.tokenizer.eos_token_id,
#                     repetition_penalty=1.2
#                 )
                
#                 generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
#                 generated_solution = extract_solution(generated_text)
#                 actual_solution = example['solution']
                
#                 # Debug prints
#                 print("\nGenerated Text:", generated_text)
#                 print("\nExtracted Solution:", generated_solution)
#                 print("\nActual Solution:", actual_solution)
                
#                 if generated_solution.strip() == actual_solution.strip():
#                     correct += 1
#                 total += 1
                
#             except Exception as e:
#                 print(f"Error evaluating example: {e}")
#                 continue
        
#         accuracy = correct / total if total > 0 else 0
#         print(f"Test Accuracy: {accuracy * 100:.2f}%")
#         return accuracy

In [15]:
# import torch
# from transformers import (
#     AutoConfig,
#     AutoTokenizer,
#     AutoModelForCausalLM,
#     BitsAndBytesConfig,
#     TrainingArguments,
#     Trainer,
#     DataCollatorForLanguageModeling,
# )
# from peft import LoraConfig, TaskType, get_peft_model, prepare_model_for_kbit_training
# from datasets import Dataset
# from tqdm import tqdm
# import gc

# class ErrorSolver:
#     def __init__(self, data):
#         """
#         Initialize the ErrorSolver with a list of dictionaries containing repo, problem, and solution data.
#         """
#         self.data = data  # List of dictionaries
#         self.model_path = "/kaggle/input/qwen2.5/transformers/0.5b/1"
#         print("Loading tokenizer and model (this may take a few minutes)...")
        
#         try:
#             # Load model configuration
#             config = AutoConfig.from_pretrained(
#                 self.model_path,
#                 trust_remote_code=True,
#                 local_files_only=True
#             )
            
#             # Load tokenizer
#             self.tokenizer = AutoTokenizer.from_pretrained(
#                 self.model_path,
#                 local_files_only=True,
#                 use_fast=True,
#                 trust_remote_code=True
#             )
#             print("Tokenizer loaded successfully!")
            
#             # Configure quantization for efficient inference
#             quantization_config = BitsAndBytesConfig(
#                 load_in_4bit=True,
#                 bnb_4bit_compute_dtype=torch.float16,
#                 bnb_4bit_use_double_quant=True,
#                 bnb_4bit_quant_type='nf4'
#             )
            
#             # Load pre-trained model
#             self.model = AutoModelForCausalLM.from_pretrained(
#                 self.model_path,
#                 config=config,
#                 device_map="auto",
#                 local_files_only=True,
#                 trust_remote_code=True,
#                 use_safetensors=True,
#                 torch_dtype=torch.float16,
#                 quantization_config=quantization_config,
#                 low_cpu_mem_usage=True,
#                 offload_folder="offload",
#                 offload_state_dict=True,
#                 max_memory={0: "12GB", "cpu": "24GB"}
#             )
            
#             # Enable gradient checkpointing and prepare for training
#             self.model.gradient_checkpointing_enable()
#             self.model = prepare_model_for_kbit_training(self.model)
            
#             # Apply Parameter-Efficient Fine-Tuning (PEFT) using LoRA
#             peft_config = LoraConfig(
#                 task_type=TaskType.CAUSAL_LM,
#                 inference_mode=False,
#                 r=8,
#                 lora_alpha=32,
#                 lora_dropout=0.1,
#                 target_modules=[
#                     "self_attn.q_proj", 
#                     "self_attn.k_proj", 
#                     "self_attn.v_proj", 
#                     "self_attn.o_proj",
#                     "mlp.gate_proj", 
#                     "mlp.up_proj", 
#                     "mlp.down_proj"
#                 ]
#             )
            
#             self.model = get_peft_model(self.model, peft_config)
#             self.model.print_trainable_parameters()
            
#             print("Model loaded successfully with PEFT!")
            
#             # Clear GPU memory if available
#             if torch.cuda.is_available():
#                 torch.cuda.empty_cache()
#                 print(f"GPU memory after loading: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
            
#         except Exception as e:
#             print(f"Error during initialization: {str(e)}")
#             raise e

#     def create_prompt(self, issue_data):
#         """
#         Generate a structured prompt for the model to fix the code.
        
#         Args:
#             issue_data: Dictionary containing processed issue data
#         """
#         code_snippet = issue_data['problem']['code_snippets'][0] if issue_data['problem']['code_snippets'] else 'No code snippet available'
        
#         # Join files changed into a readable format
#         files_changed = ', '.join(issue_data['solution']['files_changed'])
        
#         prompt = f"""You are a Python expert debugging assistant. Fix the following issue:

# REPOSITORY: {issue_data['repo']}

# PROBLEM DESCRIPTION:
# {issue_data['problem']['description']}

# ERROR TYPE: {issue_data['problem']['error_type']}

# FILES CHANGED: {files_changed}

# ORIGINAL CODE:
# ```python
# {code_snippet}
# ```

# TEST REQUIREMENTS:
# - Must maintain passing tests: {', '.join(map(str, issue_data['tests']['pass_to_pass']))}
# - Must fix failing tests: {', '.join(map(str, issue_data['tests']['fail_to_pass']))}

# Return a git patch that fixes this issue. Format your response as a git diff without any additional text or markdown:"""

#         return prompt

#     def prepare_training_data(self, issue_data):
#         """
#         Prepare a single issue for training.
        
#         Args:
#             issue_data: Dictionary containing the issue data
#         """
#         return {
#             'input_text': self.create_prompt(issue_data),
#             'output_text': issue_data['solution']['raw_patch']
#         }

#     def fine_tune(self, train_data, eval_data=None):
#         """
#         Fine-tune the model on GitHub issue data.
#         """
#         # Define training arguments
#         training_args = TrainingArguments(
#             output_dir="/kaggle/working/",
#             num_train_epochs=500,
#             per_device_train_batch_size=4,
#             gradient_accumulation_steps=4,
#             warmup_ratio=0.1,
#             learning_rate=1e-5,
#             logging_dir="./logs",
#             fp16=True,
#             logging_steps=1,
#             save_strategy="epoch",
#             save_total_limit=1,
#             evaluation_strategy="epoch" if eval_data else "no",
#             load_best_model_at_end=True if eval_data else False,
#             metric_for_best_model="loss",
#             greater_is_better=False,
#             remove_unused_columns=True,
#             report_to="none"
#         )
        
#         # Prepare training data
#         print("Preparing training data...")
#         prepared_train_data = [self.prepare_training_data(example) for example in train_data]
        
#         # Create datasets
#         train_dataset = Dataset.from_dict({
#             'input_text': [example['input_text'] for example in prepared_train_data],
#             'output_text': [example['output_text'] for example in prepared_train_data]
#         })
        
#         # Prepare evaluation dataset (if provided)
#         eval_dataset = None
#         if eval_data:
#             prepared_eval_data = [self.prepare_training_data(example) for example in eval_data]
#             eval_dataset = Dataset.from_dict({
#                 'input_text': [example['input_text'] for example in prepared_eval_data],
#                 'output_text': [example['output_text'] for example in prepared_eval_data]
#             })
        
#         # Preprocess data
#         def preprocess_function(examples):
#             # Tokenize inputs
#             model_inputs = self.tokenizer(
#                 examples['input_text'],
#                 padding=True,
#                 truncation=True,
#                 max_length=512,
#                 return_tensors="pt"
#             )
            
#             # Tokenize outputs
#             labels = self.tokenizer(
#                 examples['output_text'],
#                 padding=True,
#                 truncation=True,
#                 max_length=512,
#                 return_tensors="pt"
#             )
            
#             model_inputs["labels"] = labels["input_ids"]
#             return model_inputs
        
#         print(f"Processing {len(train_data)} training examples...")
#         train_dataset = train_dataset.map(
#             preprocess_function,
#             batched=True,
#             remove_columns=train_dataset.column_names,
#             desc="Preprocessing training data"
#         )
        
#         if eval_dataset:
#             eval_dataset = eval_dataset.map(
#                 preprocess_function,
#                 batched=True,
#                 remove_columns=eval_dataset.column_names,
#                 desc="Preprocessing evaluation data"
#             )
        
#         # Define data collator
#         data_collator = DataCollatorForLanguageModeling(
#             tokenizer=self.tokenizer,
#             mlm=False
#         )
        
#         # Initialize Trainer
#         trainer = Trainer(
#             model=self.model,
#             args=training_args,
#             train_dataset=train_dataset,
#             eval_dataset=eval_dataset,
#             data_collator=data_collator,
#         )
        
#         try:
#             print("Starting training...")
#             trainer.train()
            
#             print("Saving model...")
#             self.model.save_pretrained("/kaggle/working/")
#             self.tokenizer.save_pretrained("/kaggle/working/")
#             print("Training completed successfully!")
            
#             # Clear memory
#             gc.collect()
#             if torch.cuda.is_available():
#                 torch.cuda.empty_cache()
        
#         except Exception as e:
#             print(f"Error during training: {str(e)}")
#             raise e

#     def evaluate(self, test_data):
#         """
#         Evaluate the fine-tuned model on the test set.
#         """
#         def extract_solution(generated_text: str) -> str:
#             """Extract the actual solution code from the generated text"""
#             try:
#                 # If the text contains a git diff, return it as is
#                 if "diff --git" in generated_text:
#                     return generated_text.strip()
                
#                 # Otherwise, try to extract the code portion
#                 code = generated_text.strip()
                
#                 # Remove any markdown formatting if present
#                 code = code.replace("```python", "").replace("```", "").strip()
                
#                 return code
#             except Exception as e:
#                 print(f"Error extracting solution: {e}")
#                 return ""
        
#         self.model.eval()
#         correct = 0
#         total = 0
#         results = []
        
#         for example in tqdm(test_data, desc="Evaluating"):
#             try:
#                 prompt = self.create_prompt(example)
#                 inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
                
#                 outputs = self.model.generate(
#                     inputs.input_ids,
#                     max_new_tokens=512,
#                     temperature=0.7,
#                     top_p=0.9,
#                     do_sample=True,
#                     num_return_sequences=1,
#                     pad_token_id=self.tokenizer.eos_token_id,
#                     repetition_penalty=1.2
#                 )
                
#                 generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
#                 generated_solution = extract_solution(generated_text)
#                 actual_solution = example['solution']['raw_patch']
                
#                 # Store results
#                 result = {
#                     'repo': example['repo'],
#                     'error_type': example['problem']['error_type'],
#                     'prompt': prompt,
#                     'generated_solution': generated_solution,
#                     'actual_solution': actual_solution,
#                     'is_correct': generated_solution.strip() == actual_solution.strip()
#                 }
#                 results.append(result)
                
#                 # Update metrics
#                 if result['is_correct']:
#                     correct += 1
#                 total += 1
                
#                 # Print detailed debugging info
#                 print(f"\nEvaluating example {total}:")
#                 print(f"Repository: {example['repo']}")
#                 print(f"Error Type: {example['problem']['error_type']}")
#                 print("\nGenerated Solution:", generated_solution)
#                 print("\nActual Solution:", actual_solution)
#                 print(f"Correct: {result['is_correct']}")
                
#             except Exception as e:
#                 print(f"Error evaluating example: {e}")
#                 continue
        
#         # Calculate and print metrics
#         accuracy = correct / total if total > 0 else 0
#         print("\nEvaluation Results:")
#         print(f"Total examples: {total}")
#         print(f"Correct solutions: {correct}")
#         print(f"Accuracy: {accuracy * 100:.2f}%")
        
#         # Calculate accuracy by error type
#         error_type_results = {}
#         for result in results:
#             error_type = result['error_type']
#             if error_type not in error_type_results:
#                 error_type_results[error_type] = {'correct': 0, 'total': 0}
#             error_type_results[error_type]['total'] += 1
#             if result['is_correct']:
#                 error_type_results[error_type]['correct'] += 1
        
#         print("\nAccuracy by Error Type:")
#         for error_type, counts in error_type_results.items():
#             type_accuracy = counts['correct'] / counts['total'] * 100
#             print(f"{error_type}: {type_accuracy:.2f}% ({counts['correct']}/{counts['total']})")
        
#         return accuracy, results

In [16]:
import torch
from transformers import (
    AutoConfig,
    AutoTokenizer,
    AutoModelForCausalLM,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling,
)
from peft import LoraConfig, TaskType, get_peft_model, prepare_model_for_kbit_training
from datasets import Dataset
from tqdm import tqdm
import gc

class ErrorSolver:
    def __init__(self, data):
        """
        Initialize the ErrorSolver with a list of dictionaries containing code, error, and solution data.
        """
        self.data = data  # List of dictionaries
        self.model_path = "/kaggle/input/qwen2.5/transformers/0.5b/1"
        print("Loading tokenizer and model (this may take a few minutes)...")
        
        try:
            # Load model configuration
            config = AutoConfig.from_pretrained(
                self.model_path,
                trust_remote_code=True,
                local_files_only=True
            )
            
            # Load tokenizer
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_path,
                local_files_only=True,
                use_fast=True,
                trust_remote_code=True
            )
            print("Tokenizer loaded successfully!")
            
            # Configure quantization for efficient inference
            quantization_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_compute_dtype=torch.float16,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_quant_type='nf4'
            )
            
            # Load pre-trained model
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_path,
                config=config,
                device_map="auto",
                local_files_only=True,
                trust_remote_code=True,
                use_safetensors=True,
                torch_dtype=torch.float16,
                quantization_config=quantization_config,
                low_cpu_mem_usage=True,
                offload_folder="offload",
                offload_state_dict=True,
                max_memory={0: "12GB", "cpu": "24GB"}
            )
            
            # Enable gradient checkpointing and prepare for training
            self.model.gradient_checkpointing_enable()
            self.model = prepare_model_for_kbit_training(self.model)
            
            # Apply Parameter-Efficient Fine-Tuning (PEFT) using LoRA
            peft_config = LoraConfig(
                task_type=TaskType.CAUSAL_LM,
                inference_mode=False,
                r=8,
                lora_alpha=32,
                lora_dropout=0.1,
                target_modules=[
                    "self_attn.q_proj", 
                    "self_attn.k_proj", 
                    "self_attn.v_proj", 
                    "self_attn.o_proj",
                    "mlp.gate_proj", 
                    "mlp.up_proj", 
                    "mlp.down_proj"
                ]
            )
            
            self.model = get_peft_model(self.model, peft_config)
            self.model.print_trainable_parameters()
            
            print("Model loaded successfully with PEFT!")
            
            # Clear GPU memory if available
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
                print(f"GPU memory after loading: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
            
        except Exception as e:
            print(f"Error during initialization: {str(e)}")
            raise e

    def create_prompt(self, issue_data):
        """
        Generate a structured prompt for the model to fix the code.
        
        Args:
            issue_data: Dictionary containing code, error, and solution data
        """
        prompt = f"""You are a Python expert debugging assistant. Fix the following issue:

ERROR TYPE: {issue_data['error_type']}

ERROR MESSAGE:
{issue_data['error']}

ORIGINAL CODE:
```python
{issue_data['code'] if issue_data['code'].strip() else '# No code provided'}
```

Return the fixed code without any additional text, explanations, or markdown formatting."""

        return prompt

    def prepare_training_data(self, issue_data):
        """
        Prepare a single issue for training.
        
        Args:
            issue_data: Dictionary containing code, error, and solution data
        """
        return {
            'input_text': self.create_prompt(issue_data),
            'output_text': issue_data['solution']
        }

    def fine_tune(self, train_data, eval_data=None):
        """
        Fine-tune the model on error correction data.
        """
        # Define training arguments
        training_args = TrainingArguments(
            output_dir="/kaggle/working/",
            num_train_epochs=500,
            per_device_train_batch_size=4,
            gradient_accumulation_steps=4,
            warmup_ratio=0.1,
            learning_rate=1e-5,
            logging_dir="./logs",
            fp16=True,
            logging_steps=1,
            save_strategy="epoch",
            save_total_limit=1,
            evaluation_strategy="epoch" if eval_data else "no",
            load_best_model_at_end=True if eval_data else False,
            metric_for_best_model="loss",
            greater_is_better=False,
            remove_unused_columns=True,
            report_to="none"
        )
        
        # Prepare training data
        print("Preparing training data...")
        prepared_train_data = [self.prepare_training_data(example) for example in train_data]
        
        # Create datasets
        train_dataset = Dataset.from_dict({
            'input_text': [example['input_text'] for example in prepared_train_data],
            'output_text': [example['output_text'] for example in prepared_train_data]
        })
        
        # Prepare evaluation dataset (if provided)
        eval_dataset = None
        if eval_data:
            prepared_eval_data = [self.prepare_training_data(example) for example in eval_data]
            eval_dataset = Dataset.from_dict({
                'input_text': [example['input_text'] for example in prepared_eval_data],
                'output_text': [example['output_text'] for example in prepared_eval_data]
            })
        
        # Preprocess data
        def preprocess_function(examples):
            # Tokenize inputs
            model_inputs = self.tokenizer(
                examples['input_text'],
                padding=True,
                truncation=True,
                max_length=512,
                return_tensors="pt"
            )
            
            # Tokenize outputs
            labels = self.tokenizer(
                examples['output_text'],
                padding=True,
                truncation=True,
                max_length=512,
                return_tensors="pt"
            )
            
            model_inputs["labels"] = labels["input_ids"]
            return model_inputs
        
        print(f"Processing {len(train_data)} training examples...")
        train_dataset = train_dataset.map(
            preprocess_function,
            batched=True,
            remove_columns=train_dataset.column_names,
            desc="Preprocessing training data"
        )
        
        if eval_dataset:
            eval_dataset = eval_dataset.map(
                preprocess_function,
                batched=True,
                remove_columns=eval_dataset.column_names,
                desc="Preprocessing evaluation data"
            )
        
        # Define data collator
        data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer,
            mlm=False
        )
        
        # Initialize Trainer
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=eval_dataset,
            data_collator=data_collator,
        )
        
        try:
            print("Starting training...")
            trainer.train()
            
            print("Saving model...")
            self.model.save_pretrained("/kaggle/working/model_output")
            self.tokenizer.save_pretrained("/kaggle/working/model_output")
            print("Training completed successfully!")
            
            # Clear memory
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
        
        except Exception as e:
            print(f"Error during training: {str(e)}")
            raise e

    def evaluate(self, test_data):
        """
        Evaluate the fine-tuned model on the test set.
        """
        def extract_solution(generated_text: str) -> str:
            """Extract the actual solution code from the generated text"""
            try:
                # Clean up the text
                code = generated_text.strip()
                
                # Remove any markdown formatting if present
                code = code.replace("```python", "").replace("```", "").strip()
                
                return code
            except Exception as e:
                print(f"Error extracting solution: {e}")
                return ""
        
        self.model.eval()
        correct = 0
        total = 0
        results = []
        
        for example in tqdm(test_data, desc="Evaluating"):
            try:
                prompt = self.create_prompt(example)
                inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
                
                outputs = self.model.generate(
                    inputs.input_ids,
                    max_new_tokens=512,
                    temperature=0.7,
                    top_p=0.9,
                    do_sample=True,
                    num_return_sequences=1,
                    pad_token_id=self.tokenizer.eos_token_id,
                    repetition_penalty=1.2
                )
                
                generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
                generated_solution = extract_solution(generated_text)
                actual_solution = example['solution']
                
                # Store results
                result = {
                    'error_type': example['error_type'],
                    'prompt': prompt,
                    'original_code': example['code'],
                    'error_message': example['error'],
                    'generated_solution': generated_solution,
                    'actual_solution': actual_solution,
                    'is_correct': generated_solution.strip() == actual_solution.strip()
                }
                results.append(result)
                
                # Update metrics
                if result['is_correct']:
                    correct += 1
                total += 1
                
                # Print detailed debugging info
                print(f"\nEvaluating example {total}:")
                print(f"Error Type: {example['error_type']}")
                print("\nGenerated Solution:", generated_solution)
                print("\nActual Solution:", actual_solution)
                print(f"Correct: {result['is_correct']}")
                
            except Exception as e:
                print(f"Error evaluating example: {e}")
                continue
        
        # Calculate and print metrics
        accuracy = correct / total if total > 0 else 0
        print("\nEvaluation Results:")
        print(f"Total examples: {total}")
        print(f"Correct solutions: {correct}")
        print(f"Accuracy: {accuracy * 100:.2f}%")
        
        # Calculate accuracy by error type
        error_type_results = {}
        for result in results:
            error_type = result['error_type']
            if error_type not in error_type_results:
                error_type_results[error_type] = {'correct': 0, 'total': 0}
            error_type_results[error_type]['total'] += 1
            if result['is_correct']:
                error_type_results[error_type]['correct'] += 1
        
        print("\nAccuracy by Error Type:")
        for error_type, counts in error_type_results.items():
            type_accuracy = counts['correct'] / counts['total'] * 100
            print(f"{error_type}: {type_accuracy:.2f}% ({counts['correct']}/{counts['total']})")
        
        return accuracy, results

In [17]:
if __name__ == "__main__":
    print("Processing data...")
    processed_data = load_and_process_data()
    
    if processed_data:
        # Split the dataset
        train_data, test_data = split_dataset(processed_data)
        
        # Process the training and test datasets
        train_processed = process_dataset(train_data)
        test_processed = process_dataset(test_data)
        print(f"Training samples: {len(train_processed)}")
        print('----------------------------\n')
        print(f"Test samples: {len(test_processed)}")
        
        # Initialize ErrorSolver with the processed data
        solver = ErrorSolver(processed_data)
        
        # Fine-tune the model
        solver.fine_tune(train_processed, test_processed)
        
        # Evaluate the model
        accuracy = solver.evaluate(test_processed)
        #print(f"Final Test Accuracy: {accuracy * 100:.2f}%")

Processing data...
Data successfully unzipped!

Sample processed issue:

Dataset Statistics:
Error types found: {'Unknown', 'UnitsError:', 'TypeError:', 'ValueError:', 'InputParameterError:', 'KeyError:'}

Total issues processed: 6
Train set size: 4
Test set size: 2

Preparing training data...


Processing issues: 100%|██████████| 4/4 [00:00<00:00, 6043.67it/s]



Collected 4 training examples
Error types found: ['InputParameterError:', 'KeyError:', 'UnitsError:', 'ValueError:']

Preparing training data...


Processing issues: 100%|██████████| 2/2 [00:00<00:00, 4736.65it/s]


Collected 2 training examples
Error types found: ['TypeError:', 'Unknown']
Training samples: 4
----------------------------

Test samples: 2
Loading tokenizer and model (this may take a few minutes)...





Tokenizer loaded successfully!
trainable params: 4,399,104 || all params: 498,431,872 || trainable%: 0.8826
Model loaded successfully with PEFT!
GPU memory after loading: 0.70 GB
Preparing training data...
Processing 4 training examples...




Preprocessing training data:   0%|          | 0/4 [00:00<?, ? examples/s]

Preprocessing evaluation data:   0%|          | 0/2 [00:00<?, ? examples/s]

`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`...


Starting training...


  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]


Epoch,Training Loss,Validation Loss
1,4.1358,3.757086
2,4.1358,3.757086
3,4.1358,3.755467
4,4.1347,3.753307
5,4.1327,3.749624
6,4.1284,3.745786
7,4.1236,3.739688
8,4.1176,3.733559
9,4.1093,3.724385
10,4.1006,3.716237


  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]
  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]
  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]
  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]
  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]
  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]
  return fn(*args, **kwargs)
  with torch.enab

Saving model...
Training completed successfully!


Evaluating:   0%|          | 0/2 [00:00<?, ?it/s]The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Evaluating:  50%|█████     | 1/2 [00:46<00:46, 46.15s/it]


Evaluating example 1:
Error Type: TypeError:

Generated Solution: You are a Python expert debugging assistant. Fix the following issue:

ERROR TYPE: TypeError:

ERROR MESSAGE:
Error: unsupported format string passed to NoneType.__format__

ORIGINAL CODE:

# No code provided


Return the fixed code without any additional text, explanations, or markdown formatting. The explanation for each part of the solution is omitted.
The error message indicates that you cannot use an `None` value as a valid argument in a `__format__()` method call because it expects arguments with type names (e.g., strings) and does not support non-string types.

To fix this issue, you should check if there's a specific reason why your input could be treated as `None`, such as when converting a list to a tuple, iterating over all items in a sequence, etc.

If no particular case exists where `None` can't be converted into another expected data type (`str`) via a custom conversion function like `int()`, then the curr

Evaluating: 100%|██████████| 2/2 [01:02<00:00, 31.41s/it]


Evaluating example 2:
Error Type: Unknown

Generated Solution: You are a Python expert debugging assistant. Fix the following issue:

ERROR TYPE: Unknown

ERROR MESSAGE:
Pylint checks against incorrect type with properties that have a getter and setter

ORIGINAL CODE:

# No code provided


Return the fixed code without any additional text, explanations, or markdown formatting. The error message should be clear and concise.
The correct way to fix this issue is by ensuring proper data types for variables when accessing them in functions.

Here's an example of how you can modify your code:

Original Code (incorrect):

def my_function():
    x = 10
    return x
x = my_function()
print(x)


Fixed Code (correct):

def my_function():
    x = 10
    y = "Hello"  # This will raise a TypeError as 'str' cannot be converted to int
    z = True     # This will also cause a TypeError since boolean values cannot be used directly on integers
    return x + y * z
y = my_function()   # Corrected variab




In [18]:
print(train_processed[0])

{'code': '', 'error': 'Error: Value for parameter c0 does not match shape or size', 'error_type': 'InputParameterError:', 'solution': '    def _reset_parameters(self, *args, **kwargs):\n        """\n        Reset parameters on the models to those specified.\n\n        Parameters can be specified either as positional arguments or keyword\n        arguments, as in the model initializer. Any parameters not specified\n        will be reset to their default values.\n        """\n        self._initialize_parameters(args, kwargs)\n        self._initialize_slices()\n'}
