Install required packages

In [None]:
!pip install -q bitsandbytes
!pip install -q accelerate
!pip install -q peft transformers datasets tensorboard

In [None]:
import os
from datetime import datetime, timedelta
import json
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments, TrainerCallback, BitsAndBytesConfig
from datasets import Dataset
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from tensorboard import program
from tqdm.notebook import tqdm as notebook_tqdm

# Disable wandb to prevent it from causing issues
os.environ["WANDB_DISABLED"] = "true"
HUGGINGFACETOKEN = os.environ.get("HUGGINGFACETOKEN")

In [None]:
def load_data_from_json(file_path):
    """Load data from JSON file"""
    print(f"Loading data from {file_path}...")
    with open(file_path, 'r') as f:
        data = json.load(f)
    return {
        'nl': [item['nl'] for item in data],
        'bash': [item['bash'] for item in data]
    }

train_data_dict = load_data_from_json("/kaggle/input/nlp2bash/nl2bash/train.json")
dev_data_dict = load_data_from_json("/kaggle/input/nlp2bash/nl2bash/dev.json")
test_data_dict = load_data_from_json("/kaggle/input/nlp2bash/nl2bash/test.json")

In [None]:
#Training callbacks
class ProgressCallback(TrainerCallback):
    def __init__(self):
        super().__init__()
        self.training_bar = None
        self.epoch_bar = None
        self.start_time = datetime.utcnow()

    def on_train_begin(self, args, state, control, **kwargs):
        self.total_steps = state.max_steps
        self.training_bar = notebook_tqdm(total=self.total_steps, desc="Training")
        print(f"Training started at {self.start_time} UTC")
        
        # Debug GPU memory at start
        print(f"GPU memory allocated at start: {torch.cuda.memory_allocated()/1e9:.2f} GB")
        print(f"GPU memory reserved at start: {torch.cuda.memory_reserved()/1e9:.2f} GB")

    def on_step_end(self, args, state, control, **kwargs):
        if self.training_bar is not None:
            self.training_bar.update(1)
            if len(state.log_history) > 0:
                loss = state.log_history[-1].get('loss', 0)
                self.training_bar.set_description(f"Training - Loss: {loss:.4f}")
            
            # Calculate ETA
            steps_done = state.global_step
            if steps_done > 0:
                time_elapsed = datetime.utcnow() - self.start_time
                time_per_step = time_elapsed.total_seconds() / steps_done
                steps_remaining = self.total_steps - steps_done
                eta_seconds = time_per_step * steps_remaining
                eta = datetime.utcnow() + timedelta(seconds=eta_seconds)
                self.training_bar.set_postfix(ETA=eta.strftime("%Y-%m-%d %H:%M:%S"))
                
                # Print debug info every 10 steps
                if steps_done % 10 == 0:
                    print(f"\nStep {steps_done}/{self.total_steps}")
                    print(f"GPU memory: {torch.cuda.memory_allocated()/1e9:.2f} GB allocated, {torch.cuda.memory_reserved()/1e9:.2f} GB reserved")

    def on_epoch_begin(self, args, state, control, **kwargs):
        if self.epoch_bar is not None:
            self.epoch_bar.close()
        self.epoch_bar = notebook_tqdm(total=args.num_train_epochs, desc="Epochs")
        self.epoch_bar.update(state.epoch)
        print(f"\nStarting epoch {state.epoch+1}/{args.num_train_epochs}")

    def on_train_end(self, args, state, control, **kwargs):
        end_time = datetime.utcnow()
        training_duration = end_time - self.start_time
        print(f"\nTraining completed at {end_time} UTC")
        print(f"Total training time: {training_duration}")
        if self.training_bar is not None:
            self.training_bar.close()
        if self.epoch_bar is not None:
            self.epoch_bar.close()

In [None]:
def prepare_model_for_training():
    """Prepare the model for training with proper quantization and gradient computation"""
    # Define quantization config
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_use_double_quant=True
    )
    print("Loading model...")
    model = AutoModelForCausalLM.from_pretrained(
        "mistralai/Mistral-7B-v0.1",
        quantization_config=bnb_config,
        device_map="auto",
        trust_remote_code=True,
        offload_folder="offload",
        torch_dtype=torch.bfloat16
    )

    print("Preparing model for k-bit training...")
    model = prepare_model_for_kbit_training(model)

    # Configure LoRA
    print("Configuring LoRA...")
    # Reduce rank from 16 to 8 to save memory
    lora_config = LoraConfig(
        r=4,
        lora_alpha=16,
        target_modules=[
            "q_proj",
            "k_proj",
            "v_proj",
            "o_proj",
            "gate_proj",
            "up_proj",
            "down_proj",
        ],
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM"
    )

    # Wrap model with LoRA
    model = get_peft_model(model, lora_config)
    model.print_trainable_parameters()
    # Debug memory usage
    print(f"GPU memory after model prep: {torch.cuda.memory_allocated()/1e9:.2f} GB")
    return model

In [None]:
#Data formatting and dataset creation
def format_prompt(nl, bash=None):
    prompt = f"### Instruction: Convert the following English description to a Bash command:\n\n{nl}\n\n### Response:"
    if bash:
        return prompt + f" {bash}"
    return prompt

def create_datasets(train_data_dict, dev_data_dict, tokenizer):
    """Create datasets with proper instruction formatting for NL to Bash conversion"""
    print("Formatting and tokenizing datasets with instruction prompts...")
    
    def prepare_training_data(examples):
        # Format inputs and outputs with proper instruction templates
        formatted_prompts = []
        formatted_completions = []
        
        for nl, bash in zip(examples["nl"], examples["bash"]):
            # Input: just the instruction and NL
            formatted_prompts.append(format_prompt(nl))
            # Output: full prompt with response
            formatted_completions.append(format_prompt(nl, bash))
        
        # Tokenize inputs
        inputs = tokenizer(
            formatted_prompts, 
            padding="max_length", 
            truncation=True, 
            max_length=384,  # Reduced from 512 to save memory
            return_tensors=None
        )
        
        # Tokenize outputs (labels)
        with tokenizer.as_target_tokenizer():
            outputs = tokenizer(
                formatted_completions,
                padding="max_length",
                truncation=True,
                max_length=384,  # Reduced from 512 to save memory
                return_tensors=None
            )
        
        inputs["labels"] = outputs["input_ids"]
        return inputs

    # Print some examples for debugging
    print("\nExample formatted prompt:")
    print(format_prompt(train_data_dict["nl"][0]))
    print("\nExample formatted completion:")
    print(format_prompt(train_data_dict["nl"][0], train_data_dict["bash"][0]))
    
    # Create datasets with progress bars
    train_dataset = Dataset.from_dict(train_data_dict).map(
        prepare_training_data,
        batched=True,
        batch_size=100,  # Process in smaller batches to avoid memory issues
        remove_columns=['nl', 'bash'],
        desc="Processing training data"
    )
    
    dev_dataset = Dataset.from_dict(dev_data_dict).map(
        prepare_training_data,
        batched=True,
        batch_size=100,  # Process in smaller batches
        remove_columns=['nl', 'bash'],
        desc="Processing validation data"
    )
    
    # Print dataset stats
    print(f"Train dataset size: {len(train_dataset)} examples")
    print(f"Validation dataset size: {len(dev_dataset)} examples")
    
    return train_dataset, dev_dataset

In [None]:
#Metrics Calculation
def compute_command_metrics(eval_preds):
    """Calculate metrics for evaluation"""
    logits, labels = eval_preds
    predictions = logits.argmax(-1)
    
    # Decode predictions and labels
    tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1")
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    # Replace -100 with tokenizer.pad_token_id before decoding
    labels = labels.copy()
    labels[labels == -100] = tokenizer.pad_token_id
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    
    # Extract just the bash commands
    pred_commands = []
    label_commands = []
    for p, l in zip(decoded_preds, decoded_labels):
        try:
            pred_cmd = p.split("### Response:")[-1].strip()
            label_cmd = l.split("### Response:")[-1].strip()
            pred_commands.append(pred_cmd)
            label_commands.append(label_cmd)
        except:
            print(f"Error extracting command from: {p}")
            pred_commands.append("")
            label_commands.append("")
    
    # Calculate exact match accuracy
    exact_matches = sum(1 for p, l in zip(pred_commands, label_commands) if p == l)
    accuracy = exact_matches / len(pred_commands) if pred_commands else 0
    
    return {"command_accuracy": accuracy}

In [None]:
#Bash command generation
def generate_bash_command(model, tokenizer, nl_text, max_length=100):
    """Generate a bash command from natural language"""
    prompt = format_prompt(nl_text)
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=max_length,
            temperature=0.1,
            top_p=0.75,
            do_sample=True
        )
    
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    bash_command = response.split("### Response:")[-1].strip()
    return bash_command