In [None]:
import json
import os
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
from datetime import datetime
import re
from collections import Counter
import math
from rouge_score import rouge_scorer
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')


TEST_PROMPTS = [
    "Create a new Git branch and switch to it.",
    "Compress the folder reports into reports.tar.gz.",
    "List all Python files in the current directory recursively.",
    "Set up a virtual environment and install requests.",
    "Fetch only the first ten lines of a file named output.log.",
    "How do I find and replace text in multiple files using command line?",
    "What command should I use to monitor real-time system processes and memory usage?"
]


REFERENCE_ANSWERS = [
    "git checkout -b new_branch",
    "tar -czf reports.tar.gz reports/",
    "find . -name '*.py' -type f",
    "python -m venv myenv && source myenv/bin/activate && pip install requests",
    "head -n 10 output.log",
    "find . -type f -exec sed -i 's/oldtext/newtext/g' {} +",
    "top -o %MEM"
]

class MetricsCalculator:    
    def __init__(self):
        self.rouge_scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
        self.smoothing_function = SmoothingFunction().method1

    def calculate_bleu(self, reference, candidate):
        if not reference or not candidate:
            return 0.0
        
        reference_tokens = reference.lower().split()
        candidate_tokens = candidate.lower().split()        
        try:
            score = sentence_bleu([reference_tokens], candidate_tokens,smoothing_function=self.smoothing_function)
            return score
        except:
            return 0.0
    
    def calculate_rouge_l(self, reference, candidate):
        if not reference or not candidate:
            return 0.0
        
        try:
            scores = self.rouge_scorer.score(reference, candidate)
            return scores['rougeL'].fmeasure
        except:
            return 0.0
    
    def calculate_command_accuracy(self, reference, candidate):
        if not reference or not candidate:
            return 0.0
        ref_commands = self.extract_commands(reference)
        cand_commands = self.extract_commands(candidate)
        
        if not ref_commands and not cand_commands:
            return 1.0  
        if not ref_commands or not cand_commands:
            return 0.0  
        
        ref_main = ref_commands[0] if ref_commands else ""
        cand_main = cand_commands[0] if cand_commands else ""
        
        if ref_main.lower() in cand_main.lower() or cand_main.lower() in ref_main.lower():
            return 1.0
        
        return 0.0
    
    def extract_commands(self, text):
        command_pattern = r'\b(?:git|tar|find|python|pip|head|top|ls|cd|cp|mv|rm|mkdir|chmod|grep|sed|awk)\b[^\n]*'
        commands = re.findall(command_pattern, text.lower())
        return commands
    
    def score_plan_quality(self, prompt, response):
        if not response:
            return 0
        response_lower = response.lower()
        has_command = bool(re.search(r'\b(?:git|tar|find|python|pip|head|top|ls|cd|cp|mv|rm|mkdir|chmod|grep|sed|awk)\b', response_lower))
        has_steps = bool(re.search(r'\b(?:step|first|then|next|finally|\d+\.)\b', response_lower))
        prompt_lower = prompt.lower()
        relevant_keywords = []
        if "git" in prompt_lower:
            relevant_keywords = ["git", "branch", "checkout"]
        elif "compress" in prompt_lower or "tar" in prompt_lower:
            relevant_keywords = ["tar", "compress", "gz"]
        elif "python" in prompt_lower and "files" in prompt_lower:
            relevant_keywords = ["find", "python", "*.py"]
        elif "virtual environment" in prompt_lower:
            relevant_keywords = ["venv", "pip", "install"]
        elif "lines" in prompt_lower and "file" in prompt_lower:
            relevant_keywords = ["head", "lines"]
        elif "find" in prompt_lower and "replace" in prompt_lower:
            relevant_keywords = ["sed", "find", "replace"]
        elif "monitor" in prompt_lower and "process" in prompt_lower:
            relevant_keywords = ["top", "ps", "monitor"]
        
        has_relevant = any(keyword in response_lower for keyword in relevant_keywords)
        
        
        if has_command and has_relevant:
            if has_steps:
                return 2  
            else:
                return 1  
        elif has_command or has_relevant:
            return 1  
        else:
            return 0  

def load_phi2_model():
    print("Loading Phi-2 model and tokenizer...")    
    model_name = "microsoft/phi-2"
    tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    model = AutoModelForCausalLM.from_pretrained(model_name,torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,device_map="auto" if torch.cuda.is_available() else None,trust_remote_code=True)
    
    print(f"Model loaded on device: {next(model.parameters()).device}")
    
    return model, tokenizer

def format_prompt(instruction):
    return f"### Instruction:\n{instruction}\n\n### Response:\n"

def generate_response(model, tokenizer, prompt, max_new_tokens=150): # limiting the output to 150 token only and omitting repitions
    formatted_prompt = format_prompt(prompt)
    
    inputs = tokenizer(formatted_prompt, return_tensors="pt")
    if torch.cuda.is_available():
        inputs = {k: v.to(model.device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model.generate(**inputs,max_new_tokens=max_new_tokens,do_sample=True,temperature=0.7,top_p=0.9,repetition_penalty=1.1,pad_token_id=tokenizer.eos_token_id,eos_token_id=tokenizer.eos_token_id,)
    
    input_length = inputs['input_ids'].shape[1]
    response_tokens = outputs[0][input_length:]
    response = tokenizer.decode(response_tokens, skip_special_tokens=True)
    
    response = response.strip()
    
    
    lines = response.split('\n')
    cleaned_lines = []
    for line in lines:
        if line.strip() and (not cleaned_lines or line.strip() != cleaned_lines[-1]):
            cleaned_lines.append(line.strip())
    
    return '\n'.join(cleaned_lines[:5])  

def evaluate_model():
    print("PHI-2 BASE MODEL EVALUATION")
    print("=" * 60)
    
    os.makedirs("logs", exist_ok=True)
    
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")
    if torch.cuda.is_available():
        print(f"GPU: {torch.cuda.get_device_name(0)}")
    
    model, tokenizer = load_phi2_model()
    metrics_calc = MetricsCalculator()
    results = []
    total_bleu = 0.0
    total_rouge = 0.0
    total_command_acc = 0.0
    total_plan_quality = 0
    
    print("\nEvaluating on test prompts...")
    print("-" * 60)
    
    for i, (prompt, reference) in enumerate(zip(TEST_PROMPTS, REFERENCE_ANSWERS), 1):
        print(f"\nTest {i}/7: {prompt}")
        print(f"Reference: {reference}")
        try:
            response = generate_response(model, tokenizer, prompt)
            print(f"Generated: {response}")
            bleu_score = metrics_calc.calculate_bleu(reference, response)
            rouge_score = metrics_calc.calculate_rouge_l(reference, response)
            command_acc = metrics_calc.calculate_command_accuracy(reference, response)
            plan_quality = metrics_calc.score_plan_quality(prompt, response)
            print(f"BLEU: {bleu_score:.3f}")
            print(f"ROUGE-L: {rouge_score:.3f}")
            print(f"Command Accuracy: {command_acc:.3f}")
            print(f"Plan Quality: {plan_quality}/2")
            
            
            result = {"prompt_id": i,"prompt": prompt,"reference_answer": reference,"generated_response": response,"metrics": {"bleu_score": bleu_score,"rouge_l_score": rouge_score,"command_accuracy": command_acc,"plan_quality": plan_quality},"timestamp": datetime.now().isoformat()}
            
            results.append(result)
            
            total_bleu += bleu_score
            total_rouge += rouge_score
            total_command_acc += command_acc
            total_plan_quality += plan_quality
            
        except Exception as e:
            print(f"Error: {e}")
            result = {
                "prompt_id": i,
                "prompt": prompt,
                "reference_answer": reference,
                "generated_response": f"ERROR: {str(e)}",
                "metrics": {
                    "bleu_score": 0.0,
                    "rouge_l_score": 0.0,
                    "command_accuracy": 0.0,
                    "plan_quality": 0
                },
                "timestamp": datetime.now().isoformat()
            }
            results.append(result)
    
    num_prompts = len(TEST_PROMPTS)
    avg_bleu = total_bleu / num_prompts
    avg_rouge = total_rouge / num_prompts
    avg_command_acc = total_command_acc / num_prompts
    avg_plan_quality = total_plan_quality / num_prompts
    
    summary = { "model_name": "microsoft/phi-2", "evaluation_date": datetime.now().isoformat(), "num_test_prompts": num_prompts, "average_metrics": { "bleu_score": avg_bleu, "rouge_l_score": avg_rouge, "command_accuracy": avg_command_acc, "plan_quality": avg_plan_quality }, "detailed_results": results }
    
    with open("logs/phi2_base_evaluation.json", "w") as f:
        json.dump(summary, f, indent=2)
    
    print("\n" + "=" * 60)
    print("EVALUATION SUMMARY")
    print("=" * 60)
    print(f"Average BLEU Score: {avg_bleu:.3f}")
    print(f"Average ROUGE-L Score: {avg_rouge:.3f}")
    print(f"Average Command Accuracy: {avg_command_acc:.3f}")
    print(f"Average Plan Quality: {avg_plan_quality:.3f}/2.0")
    print(f"\nDetailed results saved to: logs/phi2_base_evaluation.json")
    
    return summary

if __name__ == "__main__":
    try:
        summary = evaluate_model()
        print("\nEvaluation completed successfully!")
    except Exception as e:
        print(f"Evaluation failed: {e}")
        import traceback
        traceback.print_exc()

PHI-2 BASE MODEL EVALUATION
Using device: cpu
Loading Phi-2 model and tokenizer...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Model loaded on device: cpu

Evaluating on test prompts...
------------------------------------------------------------

Test 1/7: Create a new Git branch and switch to it.
Reference: git checkout -b new_branch
Generated: To create a new branch, run the following command in your terminal or command prompt:
```python
git checkout -b <branch_name>
```
Replace `<branch_name>` with any name you want for your new branch. To switch to the new branch, use the same command again:
BLEU: 0.021
ROUGE-L: 0.204
Command Accuracy: 0.000
Plan Quality: 1/2

Test 2/7: Compress the folder reports into reports.tar.gz.
Reference: tar -czf reports.tar.gz reports/
Generated: To compress a folder, we can use the tar command with the -z option to create a gzip compressed file.
```python
!tar zcf /path/to/folder/reports/*.txt.gz
```
This will create a new file called reports.tar.gz in the current directory containing all.txt files in the given folder and their contents are compressed using gzip compression.
BLE