In [1]:
from google.colab import drive
drive.mount('/content/drive')




Mounted at /content/drive


In [6]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
from tqdm import tqdm

# Configuration
MODEL_NAME = "Qwen/Qwen2.5-Coder-3B-Instruct"
SPARSITY = 0.2  # 50% pruning
CALIBRATION_SIZE = 126
SAVE_DIR = "./QwenCoder3B_JavaPruned-20"

# Load Model
print("Loading model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.float16, device_map="auto")
print("Load done")
model.eval() # switch to evaluation mode


# Load Calibration Data
print("Loading Java calibration samples...")
dataset = load_dataset("json", data_files={
    "train": "/content/drive/MyDrive/Colab Notebooks/data/train/train_small.jsonl",
    "validation": "/content/drive/MyDrive/Colab Notebooks/data/train/train_small.jsonl",
    "test": "/content/drive/MyDrive/Colab Notebooks/data/test/test_small.jsonl"
})
texts = [ex["code"] for ex in dataset["train"].select(range(min(CALIBRATION_SIZE, len(dataset["train"]))))]
print(f"Using {len(texts)} calibration samples")

# Collect Layer Activations (Wanda Technique)
def collect_activations(model, tokenizer, texts):
    """Collects activation scales for target Linear layers using Wanda technique."""
    torch.manual_seed(42)
    target_keywords = ['q_proj', 'k_proj', 'v_proj', 'o_proj', 'up_proj', 'down_proj']
    target_modules = {}
    param_to_module = {}

    # Find target layers and store to a dict
    for name, module in model.named_modules():
        if isinstance(module, torch.nn.Linear) and any(k in name for k in target_keywords):
            target_modules[name] = module # map layer object to install hook
            param_to_module[f"{name}.weight"] = name # map param name to module name

    print(f"Targeting {len(target_modules)} layers for pruning")
    activations = {name: [] for name in target_modules}

    # Register hooks to capture activations
    def make_hook(layer_name):
        def hook(module, inp, out):
            if inp and isinstance(inp[0], torch.Tensor):
                act = inp[0].detach().reshape(-1, inp[0].size(-1)).abs().mean(dim=0).cpu()
                activations[layer_name].append(act)
        return hook

    hooks = [module.register_forward_hook(make_hook(name)) for name, module in target_modules.items()]

    # Run calibration data
    print("Collecting activations...")
    for text in tqdm(texts, desc="Processing"):
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=256).to(model.device)
        with torch.no_grad():
            model(**inputs)

    # Remove hooks
    for h in hooks:
        h.remove()

    # Average activations across all samples
    print("Computing activation scalevs...")
    param_act_scales = {}
    for pname, mname in param_to_module.items():
        if activations[mname]:
            param_act_scales[pname] = torch.stack(activations[mname]).mean(dim=0)

    return param_act_scales

# Wanda Pruning
def wanda_prune(model, sparsity, param_act_scales):
    """Prunes model using Wanda (Weights AND Activations) technique."""
    pruned_count = total_params = 0

    for name, param in model.named_parameters():
        if name in param_act_scales and param.dim() == 2: # Only prune Linear weights
            act_scale = param_act_scales[name].to(param.device)

            # Verify dimensions match (act scale might be on CPU)
            if act_scale.size(0) != param.size(1):
                continue

            # Calculate importance: |Weight| Ã— Activation
            importance = param.abs() * act_scale.unsqueeze(0)

            # Find threshold (keep top (1-sparsity)% most important)
            k = int((1 - sparsity) * importance.numel())
            threshold = torch.topk(importance.flatten(), k, largest=True).values.min() if k > 0 else importance.max()

            # Apply mask (keep important, zero out unimportant)
            mask = importance >= threshold
            param.data *= mask

            # Track statistics
            pruned_count += (~mask).sum().item()
            total_params += mask.numel()

    print(f"Pruned {pruned_count:,} / {total_params:,} params ({pruned_count/total_params:.2%})")
    return model

# Execute Pruning
print("\n" + "="*50)
param_act_scales = collect_activations(model, tokenizer, texts)
print("="*50 + "\n")
model = wanda_prune(model, SPARSITY, param_act_scales)

# Save Pruned Model
print(f"\nSaving to {SAVE_DIR}...")
model.save_pretrained(SAVE_DIR, safe_serialization=True)
tokenizer.save_pretrained(SAVE_DIR)
print(f"âœ… Model saved successfully!")


Loading model...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Load done
Loading Java calibration samples...


Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

Using 100 calibration samples

Targeting 216 layers for pruning
Collecting activations...


Processing: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 100/100 [00:13<00:00,  7.33it/s]


Computing activation scalevs...

Pruned 387,443,304 / 1,962,934,272 params (19.74%)

Saving to ./QwenCoder3B_JavaPruned-20...
âœ… Model saved successfully!


In [7]:
!cp -r /content/QwenCoder3B_JavaPruned-20 /content/drive/MyDrive/

In [11]:

from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

MODEL_DIR = "./QwenCoder3B_JavaPruned-20"
BASE_NAME = "Qwen/Qwen2.5-Coder-3B-Instruct"

print("Loading pruned model...")

# Load tokenizer tá»« báº£n gá»‘c, khÃ´ng load tá»« prune folder
tokenizer = AutoTokenizer.from_pretrained(BASE_NAME)

# Load model tá»« folder prune
model = AutoModelForCausalLM.from_pretrained(
    MODEL_DIR,
    torch_dtype=torch.float16,
    device_map="auto"
)

print("ðŸ”¥ Loaded successfully!")



Loading pruned model...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

ðŸ”¥ Loaded successfully!


In [17]:
prompt = "can you generate Java code to reverse a string, no comment, just ONE method, no main method, do not generate this sentence again, just give me the code"

inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

with torch.no_grad():
    output = model.generate(
        **inputs,
        max_new_tokens=500,
        temperature=0.2,
        do_sample=False,
        repetition_penalty=1.2,
    )

print("\n=== OUTPUT ===")
gen_ids = output[0][inputs["input_ids"].shape[1]:]
print(tokenizer.decode(gen_ids,skip_special_tokens = True))

The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.



=== OUTPUT ===

```java
public class StringReverser {
    public static void reverseString(String input) {
        StringBuilder reversed = new StringBuilder(input);
        System.out.println(reversed.reverse());
    }
}
```
```


In [2]:
import json
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from tqdm import tqdm
import numpy as np
from typing import List, Dict, Any
import re

In [3]:
def load_dataset(file_path: str) -> List[Dict[str, Any]]:
    """Load JSONL dataset."""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return data

# Load the evaluation dataset
dataset_path = "/content/drive/MyDrive/Colab Notebooks/data/eval/code_samples_full.jsonl"
dataset = load_dataset(dataset_path)
print(f"Loaded {len(dataset)} samples")
print(f"\nExample sample:")
print(json.dumps(dataset[0], indent=2))

Loaded 60 samples

Example sample:
{
  "repo": "ExampleRepo0",
  "path": "src/handler/Class0.java",
  "func_name": "factorial",
  "language": "java",
  "code": "public static int factorial(int n) { if (n <= 1) return 1; return n * factorial(n - 1); }",
  "docstring": "Calculates factorial recursively.",
  "sha": "sha0000",
  "url": "https://example.com/repo/Class0.java#L0",
  "partition": "train"
}


ls: cannot access '/root/.cache/huggingface': No such file or directory


In [5]:
MODEL_PATH = "Qwen/Qwen2.5-Coder-3B-Instruct"

# Load model and tokenizer
print("Loading model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_PATH,
    torch_dtype=torch.float16,
    device_map="auto"
)
print("Model loaded successfully!")

Loading model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/661 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/1.21G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/4.96G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/243 [00:00<?, ?B/s]

Model loaded successfully!


In [44]:
def create_prompt(sample: Dict[str, Any]) -> str:
    """Create a prompt for code generation."""
    language = sample['language']
    func_name = sample['func_name']
    docstring = sample['docstring']

    # Strict prompt to get clean Java code
    prompt = f"""You are a Java code generator.

Write ONLY the Java method for this task. Follow these rules:
- Output MUST start with 'public static'
- Output MUST be a complete method
- NO explanations, NO comments, NO markdown
- JUST ONE METHOD, JUST THE CODE, NO HALLUCINATE

Task: {docstring}
Function name: {func_name}

Write the complete Java method:"""
    return prompt

def clean_generated_code(raw_output: str) -> str:
    """Clean and extract the method from raw model output."""
    code = raw_output.strip()

    # Remove markdown code blocks
    code = re.sub(r'```java\s*', '', code)
    code = re.sub(r'```\s*', '', code)

    # Remove any explanatory text before the code
    if 'public static' in code:
        code = code[code.index('public static'):]

    # Extract just the first method (find matching braces)
    if 'public static' in code:
        # Count braces to find the complete method
        brace_count = 0
        in_method = False
        method_chars = []

        for char in code:
            if char == '{':
                brace_count += 1
                in_method = True
            if in_method:
                method_chars.append(char)
            if char == '}':
                brace_count -= 1
                if in_method and brace_count == 0:
                    break

        if method_chars:
            code = ''.join(method_chars)
            # Add back the signature before the opening brace
            if '{' in code:
                sig_end = code.index('{')
                # Find signature from original code
                sig_match = re.search(r'public\s+static\s+[^{]+', raw_output)
                if sig_match:
                    signature = sig_match.group(0).strip()
                    code = signature + ' ' + code[sig_end:]

    # Alternative: simple extraction with rfind for closing brace
    if 'public static' in code and '}' in code:
        # Find last closing brace
        last_brace = code.rfind('}')
        code = code[:last_brace + 1]

    return code.strip()

def generate_code(prompt: str, num_samples: int = 1, max_length: int = 256, temperature: float = 0.2, top_p: float = 0.95) -> List[str]:
    """Generate code completions from the model.

    Args:
        prompt: The input prompt
        num_samples: Number of completions to generate (for Pass@k)
        max_length: Maximum length of generated tokens
        temperature: Sampling temperature (default 0.2 for more focused output)
        top_p: Nucleus sampling parameter

    Returns:
        List of cleaned generated code strings
    """
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Generate multiple samples
    outputs = model.generate(
        **inputs,
        max_new_tokens=max_length,
        num_return_sequences=num_samples,
        temperature=temperature if num_samples > 1 else 0.1,
        top_p=top_p,
        do_sample=num_samples > 1,
        pad_token_id=tokenizer.eos_token_id,
        eos_token_id=tokenizer.eos_token_id
    )

    generated_texts = []
    for output in outputs:
        # Decode full output
        full_text = tokenizer.decode(output, skip_special_tokens=True)

        # Remove the prompt
        if prompt in full_text:
            generated_text = full_text[len(prompt):].strip()
        else:
            # Fallback: decode only new tokens
            gen_ids = output[inputs["input_ids"].shape[1]:]
            generated_text = tokenizer.decode(gen_ids, skip_special_tokens=True).strip()

        # Clean and extract the method
        cleaned_code = clean_generated_code(generated_text)
        generated_texts.append(cleaned_code)

    return generated_texts


# Test code generation
test_sample = dataset[1]
test_prompt = create_prompt(test_sample)
print("Test prompt:")
print(test_prompt)
print("\nGenerating code...")
test_output = generate_code(test_prompt, num_samples=1)
print("\nGenerated code:")
print(test_output)
print("\nReference code:")
print(test_sample['code'])

Test prompt:
You are a Java code generator.

Write ONLY the Java method for this task. Follow these rules:
- Output MUST start with 'public static'
- Output MUST be a complete method
- NO explanations, NO comments, NO markdown
- JUST ONE METHOD, JUST THE CODE, NO HALLUCINATE

Task: Counts words in a string.
Function name: countWords

Write the complete Java method:

Generating code...

Generated code:
['public static int countWords(String input) {\n    if (input == null || input.isEmpty()) {\n        return 0;\n    }\n    String[] words = input.split("\\\\s+");\n    return words.length;\n}']

Reference code:
public static int countWords(String text) { return text.trim().split("\\s+").length; }


In [None]:
def normalize_code(code: str) -> str:
    """Normalize code by removing whitespace and formatting differences."""
    code = code.strip()
    # Remove all whitespace for comparison
    code = ''.join(code.split())
    # Remove extra spaces around operators and punctuation
    code = re.sub(r'\s+', '', code)
    return code

def normalize_code_semantic(code: str) -> str:
    """More aggressive normalization for semantic similarity."""
    code = code.strip()
    # Remove all whitespace
    code = re.sub(r'\s+', '', code)
    # Normalize parameter names to a standard name (e.g., "param0", "param1")
    # This is more lenient but may give false positives
    return code

def exact_match(generated_code: str, reference_code: str, strict: bool = True) -> int:
    """Calculate exact match score (1 if match, 0 otherwise).

    Args:
        generated_code: Code generated by the model
        reference_code: Ground truth reference code
        strict: If True, requires exact match. If False, normalizes whitespace.

    Returns:
        1 if exact match, 0 otherwise
    """
    if strict:
        gen_normalized = generated_code.strip()
        ref_normalized = reference_code.strip()
    else:
        # Lenient: remove all whitespace
        gen_normalized = normalize_code(generated_code)
        ref_normalized = normalize_code(reference_code)

    match = 1 if gen_normalized == ref_normalized else 0
    
    # Debug: print if close but not matching
    if not match and not strict:
        print(f"No match even after normalization:")
        print(f"Gen: {gen_normalized[:100]}...")
        print(f"Ref: {ref_normalized[:100]}...")
    
    return match

# Test exact match with both strict and lenient modes
print("Testing Exact Match metric:")
print(f"Strict match (same): {exact_match(test_sample['code'], test_sample['code'], strict=True)}")
print(f"Strict match (different): {exact_match('different code', test_sample['code'], strict=True)}")
print(f"Lenient match (whitespace diff): {exact_match('public static int countWords(String text) { return text.trim().split(\"\\\\s+\").length; }', test_sample['code'], strict=False)}")

In [None]:
def extract_function_code(generated_text: str, func_name: str) -> str:
    """Extract the function code from generated text."""
    # Try to find the function definition
    # This is a simple extraction - you may need to customize based on your LLM's output format
    lines = generated_text.strip().split('\n')
    code_lines = []
    in_function = False

    for line in lines:
        if func_name in line and ('public' in line or 'private' in line or 'static' in line):
            in_function = True
        if in_function:
            code_lines.append(line)
            # Simple heuristic: if we find a closing brace at the start of line, function might end
            if line.strip() == '}':
                break

    return '\n'.join(code_lines) if code_lines else generated_text

def run_tests(code: str, test_cases: List[Dict]) -> bool:
    """Run test cases against generated code.

    Note: This is a placeholder. For Java code, you would need to:
    1. Write the code to a .java file
    2. Compile it
    3. Run test cases
    4. Check if all tests pass

    For now, we'll use exact match as a proxy for passing tests.
    """
    # Placeholder implementation
    # In a real scenario, you would compile and execute the code
    return True  # Placeholder

def calculate_pass_at_k(completions: List[str], reference_code: str, k: int = 1, strict: bool = True) -> float:
    """Calculate Pass@k metric.

    Args:
        completions: List of generated code completions
        reference_code: Ground truth reference code
        k: Number of completions to consider
        strict: If True, requires exact match. If False, normalizes whitespace.

    Returns:
        1.0 if at least one of the top-k completions passes, 0.0 otherwise
    """
    # Limit to k completions
    completions_to_check = completions[:k]

    # For this implementation, we'll use exact match as a proxy for "passing tests"
    # In a real scenario, you would run actual unit tests
    for completion in completions_to_check:
        if exact_match(completion, reference_code, strict=strict):
            return 1.0

    return 0.0

# Test Pass@k
print("Testing Pass@k metric:")
test_completions = ["wrong code", test_sample['code'], "another wrong code"]
print(f"Pass@1 (strict): {calculate_pass_at_k(test_completions, test_sample['code'], k=1, strict=True)}")
print(f"Pass@3 (strict): {calculate_pass_at_k(test_completions, test_sample['code'], k=3, strict=True)}")
print(f"Pass@1 (lenient): {calculate_pass_at_k(test_completions, test_sample['code'], k=1, strict=False)}")
print(f"Pass@3 (lenient): {calculate_pass_at_k(test_completions, test_sample['code'], k=3, strict=False)}")

In [None]:
def evaluate_model(dataset: List[Dict], k_values: List[int] = [1, 5, 10], num_samples: int = 10, strict_matching: bool = False, verbose: bool = True):
    """Run complete evaluation on the dataset.

    Args:
        dataset: List of evaluation samples
        k_values: List of k values for Pass@k metric
        num_samples: Number of completions to generate per problem (should be >= max(k_values))
        strict_matching: If True, requires exact character match. If False, normalizes whitespace.
        verbose: If True, prints debug info for each sample

    Returns:
        Dictionary containing evaluation results
    """
    results = {
        'exact_match': [],
        'pass_at_k': {k: [] for k in k_values},
        'samples': []
    }

    for idx, sample in enumerate(tqdm(dataset, desc="Evaluating")):
        # Create prompt
        prompt = create_prompt(sample)

        # Generate completions
        completions = generate_code(prompt, num_samples=num_samples)

        # Calculate Exact Match for the first generation
        em_score = exact_match(completions[0], sample['code'], strict=strict_matching)
        results['exact_match'].append(em_score)

        # Calculate Pass@k for different k values
        for k in k_values:
            pass_k_score = calculate_pass_at_k(completions, sample['code'], k=k, strict=strict_matching)
            results['pass_at_k'][k].append(pass_k_score)

        # Store sample results
        results['samples'].append({
            'idx': idx,
            'func_name': sample['func_name'],
            'reference': sample['code'],
            'generated': completions[0],
            'all_completions': completions,
            'exact_match': em_score
        })

        # Debug output for first few samples
        if verbose and idx < 3:
            print(f"\n{'='*60}")
            print(f"Sample {idx}: {sample['func_name']}")
            print(f"{'='*60}")
            print(f"Reference:\n{sample['code']}\n")
            print(f"Generated:\n{completions[0]}\n")
            print(f"Match: {'âœ“' if em_score else 'âœ—'}")
            print(f"{'='*60}\n")

    # Calculate aggregate metrics
    results['aggregate'] = {
        'exact_match': np.mean(results['exact_match']),
        'pass_at_k': {k: np.mean(results['pass_at_k'][k]) for k in k_values}
    }

    return results

In [35]:
# Run evaluation
# Note: Adjust num_samples based on your computational resources
# For quick testing, use a subset of the dataset

# Option 1: Evaluate on full dataset
# eval_results = evaluate_model(dataset, k_values=[1, 5, 10], num_samples=10)

# Option 2: Evaluate on a small subset for testing
eval_results = evaluate_model(dataset[:5], k_values=[1, 5, 10], num_samples=10)

Evaluating: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5/5 [01:19<00:00, 15.99s/it]


In [43]:
# Print aggregate results
print("="*50)
print("EVALUATION RESULTS")
print("="*50)
print(f"\nDataset size: {len(dataset)} samples")
print(f"\nExact Match (EM): {eval_results['aggregate']['exact_match']:.2%}")
print(f"\nPass@k Scores:")
for k, score in eval_results['aggregate']['pass_at_k'].items():
    print(f"  Pass@{k}: {score:.2%}")

EVALUATION RESULTS

Dataset size: 60 samples

Exact Match (EM): 0.00%

Pass@k Scores:
  Pass@1: 0.00%
  Pass@5: 0.00%
  Pass@10: 0.00%
