In [None]:
from transformers import pipeline
import torch
import pandas as pd
from tqdm.auto import tqdm
import json
import re
import os
import gc
import ast
import signal
import sys
from io import StringIO
from collections import defaultdict
import random
from typing import List, Dict, Tuple, Optional
from datasets import Dataset

In [None]:
def clear_memory():
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

clear_memory()

if torch.cuda.is_available():
    gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
    print(f"GPU: {torch.cuda.get_device_name()}")
    print(f"GPU Memory: {gpu_memory:.1f} GB")
else:
    print("No GPU detected - using CPU (will be very slow)")

## Data Loading

In [None]:

test_data_path = 'PATH_TO_TRANSLATED_DATESET'
test_df = pd.read_csv(test_data_path)
print(f"Loaded {len(test_df)} test samples")
print(f"Sample data structure: {list(test_df.columns)}")

In [None]:
print("Loading fine-tuned Qwen2.5-Coder model...")

# First try to load the fine-tuned model, fallback to base model if not available
try:
    from unsloth import FastLanguageModel
    
    # Try loading fine-tuned model
    model, tokenizer = FastLanguageModel.from_pretrained(
        model_name = "qwen25_mbpp_finetuned",  # Fine-tuned model path
        max_seq_length = 2048,
        dtype = None,
        load_in_4bit = True,
    )
    FastLanguageModel.for_inference(model)
    print("Fine-tuned model loaded successfully!")
    
except Exception as e:
    print(f"Fine-tuned model not found ({e}), loading base model...")
    # Use more conservative settings to avoid Triton issues
    pipe = pipeline(
        "text-generation", 
        model="unsloth/Qwen2.5-Coder-14B-Instruct-bnb-4bit",
        trust_remote_code=True,
        device_map="auto" if torch.cuda.is_available() else "cpu",
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
        model_kwargs={
            "attn_implementation": "eager",  # Avoid flash attention issues
            "use_cache": True,
        }
    )
    model = None
    tokenizer = pipe.tokenizer

# Create pipeline function for unified interface
if model is not None:
    # Fine-tuned model inference with proper device handling
    def generate_code_ft(prompt, **kwargs):
        try:
            # Ensure we're working with the right device
            device = next(model.parameters()).device
            
            inputs = tokenizer.apply_chat_template(
                prompt if isinstance(prompt, list) else [{"role": "user", "content": prompt}],
                tokenize=True,
                add_generation_prompt=True,
                return_tensors="pt"
            ).to(device)
            
            with torch.no_grad():  # Prevent gradient computation issues
                outputs = model.generate(
                    input_ids=inputs,
                    max_new_tokens=kwargs.get('max_new_tokens', 768),
                    temperature=kwargs.get('temperature', 0.1),
                    top_p=kwargs.get('top_p', 0.95),
                    do_sample=kwargs.get('do_sample', True),
                    pad_token_id=tokenizer.eos_token_id,
                    use_cache=True
                )
            
            response = tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True)
            return [{'generated_text': response}]
        
        except Exception as e:
            print(f"Fine-tuned model generation error: {e}")
            raise e
        
    pipe = type('Pipeline', (), {'tokenizer': tokenizer, '__call__': lambda self, prompt, **kwargs: generate_code_ft(prompt, **kwargs)})()
    print("Using fine-tuned model for generation")
else:
    print("Using base model pipeline for generation")

In [None]:
def format_prompt(example):
    """Format a single example into the required prompt format for fine-tuned model"""
    instruction = example['instruction']
    test_list = example['test_list']
    
    # Parse function name from instruction
    function_name = "unknown_function"
    if "Example:" in instruction:
        example_part = instruction.split("Example:")[1].strip()
        func_match = re.search(r'(\w+)\s*\(', example_part)
        if func_match:
            function_name = func_match.group(1)
    
    # Enhanced system message with algorithm-specific guidance
    system_message = """You are an expert Python programmer specializing in algorithmic problem solving. Your task is to generate clean, efficient, and correct Python code that passes all given test cases.

CRITICAL REQUIREMENTS:
1. Analyze the problem description carefully and identify algorithm type (DP, graph, string, math, etc.)
2. Study the test cases to understand input/output patterns, data types, and edge cases
3. Write clean, readable Python code with proper error handling
4. Ensure your solution passes ALL test cases exactly
5. Use appropriate algorithms and data structures for efficiency
6. Handle edge cases like empty inputs, None values, boundary conditions, negative numbers
7. Follow Python best practices and coding standards
8. Import necessary modules at the beginning if needed

RESPONSE FORMAT:
- Provide ONLY the Python code implementation
- Do NOT include explanations, comments about the approach, or markdown formatting
- Do NOT wrap code in backticks or code blocks
- Write complete, executable functions that solve the problem
- Start with imports if needed (math, re, collections, itertools, etc.)

ALGORITHM-SPECIFIC PATTERNS:

Dynamic Programming:
def dp_problem(n):
    dp = [0] * (n + 1)
    dp[0] = base_case
    for i in range(1, n + 1):
        dp[i] = recurrence_relation
    return dp[n]

String Processing:
def string_problem(s):
    if not s:  # Handle empty string
        return default_value
    # Process string with appropriate method
    return result

Mathematical:
import math
def math_problem(n):
    if n <= 0:  # Handle edge cases
        return base_case
    # Use math functions efficiently
    return result

EXAMPLES OF CORRECT RESPONSES:

Problem: Write a function to find the first repeated character in a string.
Test Cases: assert first_repeated_char("abcabc") == "a"

Correct Response:
def first_repeated_char(s):
    seen = set()
    for char in s:
        if char in seen:
            return char
        seen.add(char)
    return "None"

Problem: Write a function to check if a number is prime.
Test Cases: assert prime_num(13) == True

Correct Response:
import math
def prime_num(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

Problem: Write a function to find similar elements in two tuples.
Test Cases: assert similar_elements((3, 4, 5, 6),(5, 7, 4, 10)) == (4, 5)

Correct Response:
def similar_elements(test_tup1, test_tup2):
    result = tuple(sorted(set(test_tup1) & set(test_tup2)))
    return result

Problem: Write a function for dynamic programming - Fibonacci.
Test Cases: assert fib(10) == 55

Correct Response:
def fib(n):
    if n <= 1:
        return n
    dp = [0] * (n + 1)
    dp[1] = 1
    for i in range(2, n + 1):
        dp[i] = dp[i-1] + dp[i-2]
    return dp[n]"""
        
    user_prompt = f"""Problem: {instruction}

Test Cases:
{test_list}

Expected Function Name: {function_name}

ANALYSIS CHECKLIST:
✓ Identify algorithm type (DP, graph, string manipulation, math, etc.)
✓ Check input/output data types from test cases
✓ Consider edge cases (empty, single element, boundary values)
✓ Choose optimal data structures and algorithms
✓ Ensure exact return format matches test expectations

Generate the Python code solution that passes ALL test cases."""
        
    # Format for chat template
    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_prompt}
    ]
        
    # Apply chat template
    formatted_prompt = pipe.tokenizer.apply_chat_template(
        messages, 
        tokenize=False, 
        add_generation_prompt=True
    )
        
    return formatted_prompt

In [None]:
# Prepare dataset for inference
instructions_data = []
formatted_prompts = []
ids_list = []

for _, row in tqdm(test_df.iterrows(), desc="Preparing data", unit="row", total=len(test_df)):
    instructions_data.append({
        'instruction': row['instruction'],
        'test_list': row['test_list'],
        'id': row['id']
    })

for item in tqdm(instructions_data, desc="Formatting prompts", unit="prompt"):
    formatted_prompt = format_prompt(item)
    formatted_prompts.append(formatted_prompt)
    ids_list.append(item['id'])

print(f"Formatted {len(formatted_prompts)} prompts")

# Create dataset
dataset_dict = {
    'prompt': formatted_prompts,
    'id': ids_list
}

dataset = Dataset.from_dict(dataset_dict)
print(f"Dataset created with {len(dataset)} samples")

## Enhanced Generation with Test-Driven Refinement

In [None]:
def generate_code(prompt):
    try:
        # Ensure proper device handling for tensors
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        result = pipe(
            prompt,
            max_new_tokens=768,
            temperature=0.1,
            top_p=0.95,
            do_sample=True,
            return_full_text=False,
            pad_token_id=pipe.tokenizer.eos_token_id
        )
        
        generated_code = result[0]['generated_text'].strip()
        
        return generated_code
        
    except Exception as e:
        print(f"Error in generate_code: {e}")
        return "def placeholder(): pass"

In [None]:
# Enhanced Test-Driven Code Generation Loop with Fine-tuned Model
responses = []
failed_ids = []

print("Starting inference with enhanced error handling...")

for idx in tqdm(range(len(dataset)), desc="Enhanced generation"):
    try:
        # Get sample data
        prompt = dataset[idx]['prompt']
        sample_id = dataset[idx]['id']
        
        # Get corresponding test cases and instruction from original data
        original_row = test_df[test_df['id'] == sample_id].iloc[0]
        test_cases_str = original_row['test_list']
        instruction = original_row['instruction']
        
        # Clear any potential GPU memory issues before generation
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        # Call the corrected generate_code function
        generated_code = generate_code(
            prompt
        )
        
        responses.append(generated_code)    
        
        # More frequent memory clearing to prevent accumulation issues
        if (idx + 1) % 10 == 0:
            clear_memory()
            print(f"\nMemory cleared after {idx + 1} samples")
            
        if (idx + 1) % 50 == 0:
            print(f"Progress Update after {idx + 1} samples:")
            print(f"Successful: {len(responses) - len(failed_ids)}")
            print(f"Failed: {len(failed_ids)}")

    except Exception as e:
        print(f"Complete failure for ID {sample_id}: {e}")
        print(f"Error type: {type(e).__name__}")
        failed_ids.append(sample_id)
        responses.append("def placeholder(): pass")
        
        # Clear memory after errors to prevent cascading issues
        clear_memory()
        
        continue

print("GENERATION WITH FINE-TUNED MODEL COMPLETED!")
print(f"Final stats - Successful: {len(responses) - len(failed_ids)}, Failed: {len(failed_ids)}")

In [None]:
import pickle

# Save raw responses to binary file — NO ENCODING ISSUES!
with open("raw_responses_backup.pkl", "wb") as f:
    pickle.dump(responses, f)

print("Successfully saved raw responses to 'raw_responses_backup.pkl'")

In [None]:
test_df.to_json("test_df.json", orient="records", indent=2)
print("Saved test_df to 'test_df.json'")