# LoRA Fine-tuning for Hard Disk Drive Failure Prediction

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/YOUR_USERNAME/YOUR_REPO/blob/main/lora_hdd_failure_prediction.ipynb)

This notebook demonstrates fine-tuning GPT-2 for binary classification of hard disk drive health status. A demo for article “Haozhou Zhou, Wei Li, Sagar Kamarthi, and Sri Srinivasan Radhakrishnan. Condition Monitoring Framework Using Reliability-Centered Features and Large Language Models”.

## 🔧 Requirements
- GPU with at least 24GB memory (tested on NVIDIA L4)
- Python 3.8+
- Google Drive (if using Colab)

## 📊 Performance Metrics
- **FDR** (Failure Detection Rate): TP / (TP + FN)
- **FAR** (False Alarm Rate): FP / (FP + TN)


## 1. Installation and Setup

In [None]:
# Install required packages
!pip install -q datasets
!pip install -q git+https://github.com/huggingface/peft.git

print("✓ Dependencies installed successfully")

In [None]:
# Import required libraries
import os
import gc
import json
import warnings
import torch
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from pathlib import Path
from tqdm.auto import tqdm

# Hugging Face imports
from datasets import Dataset, DatasetDict
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    Trainer,
    TrainingArguments,
    DataCollatorForLanguageModeling,
    EarlyStoppingCallback
)
from peft import LoraConfig, get_peft_model

# Scikit-learn imports
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# Setup device
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if device.type == "cuda":
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory Available: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## 2. Configuration

In [None]:
class Config:
    """Central configuration for the experiment"""

    # Model settings
    MODEL_NAME = 'gpt2'  # Can be changed to 'gpt2-medium', 'gpt2-large'

    # LoRA settings
    LORA_RANK = 4
    LORA_ALPHA = 32
    LORA_DROPOUT = 0.05
    TARGET_MODULES = ["attn.c_attn", "attn.c_proj"]

    # Training settings
    BATCH_SIZE = 4
    GRADIENT_ACCUMULATION_STEPS = 4
    LEARNING_RATE = 2e-4
    MAX_STEPS = 1000
    WARMUP_STEPS = 100
    EVAL_STEPS = 100
    SAVE_STEPS = 100
    EARLY_STOPPING_PATIENCE = 5
    INFERENCE_BATCH_SIZE = 16

    # Data settings
    TRAIN_SIZE = 0.8  # 80% train, 20% split between val and test
    N_FOLDS = 10
    RANDOM_SEED = 42
    
    # !!! Paths - Update these for your environment 
    # For Colab:
    if 'COLAB_GPU' in os.environ:
        DATA_DIR = Path('/content/drive/MyDrive/LLM-HDDFailurePrediction')
        OUTPUT_DIR = Path('./outputs')
    else:
        # For local environment:
        DATA_DIR = Path('./data')
        OUTPUT_DIR = Path('./outputs')

    DATA_FILE = DATA_DIR / 'data' / '7_LT_60_SL.jsonl'
    GENE_FINETUNE_SCRIPT = DATA_DIR / 'gene_finetune_data.py'

    # Output settings
    RESULTS_FILE = 'gpt2_lora_results.csv'

config = Config()

# Create output directory
config.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f"Configuration loaded")
print(f"Data directory: {config.DATA_DIR}")
print(f"Output directory: {config.OUTPUT_DIR}")

## 3. Data Loading and Preprocessing

In [5]:
def load_data_from_jsonl(file_path: Path) -> pd.DataFrame:
    """Load data from JSONL file"""
    if not file_path.exists():
        raise FileNotFoundError(f"Data file not found: {file_path}")

    data = pd.read_json(file_path, lines=True)
    print(f"Loaded {len(data)} samples from {file_path.name}")
    print(f"Label distribution:\n{data['answer'].value_counts()}")
    return data

def create_sample_data(n_samples: int = 670) -> pd.DataFrame:
    """Create sample data for demonstration when real data is not available"""
    np.random.seed(config.RANDOM_SEED)

    samples = []
    for i in range(n_samples):
        # Generate synthetic sensor readings
        context = f"Sensor readings: temp={np.random.uniform(20, 80):.1f}, "
        context += f"vibration={np.random.uniform(0, 10):.2f}, "
        context += f"runtime={np.random.randint(100, 10000)}, "
        context += f"errors={np.random.randint(0, 50)}"

        samples.append({
            'context': context,
            'question': 'Is the hard drive healthy or faulty?',
            'answer': np.random.choice(['healthy', 'faulty'], p=[0.5, 0.5])
        })

    return pd.DataFrame(samples)

In [None]:
# Try to load real data or use external script
try:
    # Option 1: Try to import from external script (for Colab)
    if config.GENE_FINETUNE_SCRIPT.exists():
        os.chdir(config.DATA_DIR)
        from gene_finetune_data import generate_data_folds

        # Generate folds using the imported function
        folds = generate_data_folds(
            str(config.DATA_FILE),
            label_0_ratio=1.0,
            n_splits=config.N_FOLDS,
            train_size=config.TRAIN_SIZE,
            seed=config.RANDOM_SEED
        )
        print(f"Successfully loaded {len(folds)} folds from gene_finetune_data")


except Exception as e:
    print(f"Error loading data: {e}")
    print("Creating sample data for demonstration...")
    data = create_sample_data(670)
    folds = None

In [None]:
# Create folds if not already created
if folds is None:
    def create_stratified_folds(data: pd.DataFrame, n_splits: int = 10) -> Dict:
        """Create stratified k-fold splits"""
        skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=config.RANDOM_SEED)
        folds = {}

        for fold_idx, (train_val_idx, test_idx) in enumerate(skf.split(data, data['answer'])):
            # Split data
            train_val_data = data.iloc[train_val_idx]
            test_data = data.iloc[test_idx]

            # Further split train_val into train and val
            val_size = int(len(train_val_data) * (1 - config.TRAIN_SIZE))
            val_data = train_val_data.sample(n=val_size, random_state=config.RANDOM_SEED)
            train_data = train_val_data.drop(val_data.index)

            folds[fold_idx] = {
                'train': train_data.to_dict('records'),
                'val': val_data.to_dict('records'),
                'test': test_data.to_dict('records')
            }

        return folds

    folds = create_stratified_folds(data, config.N_FOLDS)

# Print fold statistics
for fold_idx in range(min(3, len(folds))):  # Show first 3 folds
    print(f"\nFold {fold_idx + 1}:")
    for split_name, records in folds[fold_idx].items():
        healthy_count = sum(1 for r in records if r["answer"] == "healthy")
        faulty_count = len(records) - healthy_count
        print(f"  {split_name}: {len(records)} samples (healthy: {healthy_count}, faulty: {faulty_count})")

print(f"\n... ({len(folds)} folds total)")

## 4. Model Setup and Helper Functions

In [None]:
# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained(config.MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = 'left'

print(f"Tokenizer loaded: {config.MODEL_NAME}")
print(f"Vocabulary size: {len(tokenizer)}")

In [None]:
def create_prompt(context: str, question: str, answer: str) -> str:
    """Create a formatted prompt for training/inference"""
    return f"""CONTEXT
{context}

QUESTION
{question}

ANSWER
{answer}{tokenizer.eos_token}"""

def calculate_trainable_parameters(model) -> Tuple[int, int, float]:
    """Calculate trainable and total parameters"""
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    all_params = sum(p.numel() for p in model.parameters())
    percentage = 100 * trainable_params / all_params

    print(f"Trainable parameters: {trainable_params:,}")
    print(f"Total parameters: {all_params:,}")
    print(f"Percentage trainable: {percentage:.4f}%")

    return trainable_params, all_params, percentage

def convert_fold_to_dataset(fold_data: Dict) -> DatasetDict:
    """Convert fold data to HuggingFace DatasetDict"""
    def list_to_dict(records: List[Dict]) -> Dict[str, List]:
        if not records:
            return {}
        return {key: [r[key] for r in records] for key in records[0].keys()}

    dataset_dict = DatasetDict({
        split: Dataset.from_dict(list_to_dict(records))
        for split, records in fold_data.items()
    })

    return dataset_dict

# Show example prompt
sample = folds[0]['train'][0]
example_prompt = create_prompt(sample['context'], sample['question'], sample['answer'])
print("\nExample prompt:")
print("=" * 50)
print(example_prompt)
print("=" * 50)

## 5. Training and Evaluation Classes

In [28]:
class FineTuner:
    """Class for fine-tuning and evaluating the model"""

    def __init__(self, model, tokenizer, dataset: DatasetDict):
        self.model = model
        self.tokenizer = tokenizer
        self.dataset = dataset.map(self._tokenize_samples, batched=False)

    def _tokenize_samples(self, samples):
        """Tokenize samples for training"""
        prompt = create_prompt(
            samples['context'],
            samples['question'],
            samples['answer']
        )
        return self.tokenizer(
            prompt
        )

    def train(self, output_dir: str = 'outputs', **kwargs):
        """Train the model"""
        # Determine precision type
        use_bf16 = torch.cuda.is_available() and torch.cuda.is_bf16_supported()
        use_fp16 = torch.cuda.is_available() and not use_bf16

        training_args = TrainingArguments(
            output_dir=output_dir,
            per_device_train_batch_size=kwargs.get('batch_size', config.BATCH_SIZE),
            gradient_accumulation_steps=config.GRADIENT_ACCUMULATION_STEPS,
            warmup_steps=config.WARMUP_STEPS,
            max_steps=config.MAX_STEPS,
            learning_rate=config.LEARNING_RATE,
            logging_steps=10,
            eval_steps=config.EVAL_STEPS,
            save_steps=config.SAVE_STEPS,
            eval_strategy="steps",
            load_best_model_at_end=True,
            save_total_limit=1,
            fp16=use_fp16,
            bf16=use_bf16,
            logging_dir=f'{output_dir}/logs',
            report_to='none',
            disable_tqdm=False,
            metric_for_best_model="eval_loss",
        )

        data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer,
            mlm=False
        )

        # Enable memory optimizations
        self.model.gradient_checkpointing_enable()
        self.model.config.use_cache = False

        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=self.dataset['train'],
            eval_dataset=self.dataset['val'],
            data_collator=data_collator,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=config.EARLY_STOPPING_PATIENCE)]
        )

        trainer.train()
        return trainer

    def inference(self, batch_size: int = None) -> pd.DataFrame:
        """Run inference on test set"""
        if batch_size is None:
            batch_size = config.INFERENCE_BATCH_SIZE

        test_data = self.dataset['test']
        results = []

        self.model.eval()
        for i in tqdm(range(0, len(test_data), batch_size), desc="Evaluating"):
            batch = test_data[i:i + batch_size]

            # Prepare prompts
            prompts = [
                f"CONTEXT\n{batch['context'][j]}\n\nQUESTION\n{batch['question'][j]}\n\nANSWER\n"
                for j in range(len(batch['context']))
            ]

            # Tokenize
            inputs = self.tokenizer(
                prompts,
                return_tensors='pt',
                padding=True,
                truncation=True
            ).to(self.model.device)

            # Generate
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=2,
                    eos_token_id=self.tokenizer.eos_token_id
                )

            # Decode and extract answers
            for j, output in enumerate(outputs):
                decoded = self.tokenizer.decode(output, skip_special_tokens=True)
                print(decoded)

                # Extract answer
                if "ANSWER" in decoded:
                    answer_text = decoded.split("ANSWER")[-1].strip().lower()
                    if "healthy" in answer_text or answer_text.startswith("h"):
                        pred = "healthy"
                    elif "faulty" in answer_text or answer_text.startswith("f"):
                        pred = "faulty"
                    else:
                        pred = "unknown"
                else:
                    pred = "unknown"

                results.append({
                    'Predicted Label': pred,
                    'Actual Label': batch['answer'][j]
                })

        return pd.DataFrame(results)

## 6. Main Training Loop

In [29]:
def run_fold_experiment(fold_idx: int, fold_data: Dict, existing_results: pd.DataFrame = None) -> Dict:
    """Run experiment for a single fold"""

    # Check if this fold is already completed
    if existing_results is not None and not existing_results.empty:
        if fold_idx + 1 in existing_results['fold_number'].values:
            print(f"Skipping fold {fold_idx + 1} - already completed")
            return None

    print(f"\n{'='*60}")
    print(f"Processing Fold {fold_idx + 1}/{config.N_FOLDS}")
    print(f"{'='*60}")

    # Convert data
    dataset = convert_fold_to_dataset(fold_data)

    # Setup LoRA config
    lora_config = LoraConfig(
        r=config.LORA_RANK,
        lora_alpha=config.LORA_ALPHA,
        target_modules=config.TARGET_MODULES,
        lora_dropout=config.LORA_DROPOUT,
        bias="none",
        task_type="CAUSAL_LM",
        fan_in_fan_out=True
    )

    # Load and setup model
    model = AutoModelForCausalLM.from_pretrained(
        config.MODEL_NAME,
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
        device_map='auto' if torch.cuda.is_available() else None
    )

    # Freeze base model
    for param in model.parameters():
        param.requires_grad = False
    model.enable_input_require_grads()

    # Apply LoRA
    model = get_peft_model(model, lora_config)
    trainable_params, total_params, percentage = calculate_trainable_parameters(model)

    # Train
    fine_tuner = FineTuner(model, tokenizer, dataset)
    output_dir = config.OUTPUT_DIR / f'fold_{fold_idx}'
    trainer = fine_tuner.train(str(output_dir))

    # Evaluate
    print("\nRunning inference...")
    results_df = fine_tuner.inference()

    # Calculate metrics
    tp = ((results_df['Predicted Label'] == 'faulty') &
          (results_df['Actual Label'] == 'faulty')).sum()
    fn = ((results_df['Predicted Label'] == 'healthy') &
          (results_df['Actual Label'] == 'faulty')).sum()
    fp = ((results_df['Predicted Label'] == 'faulty') &
          (results_df['Actual Label'] == 'healthy')).sum()
    tn = ((results_df['Predicted Label'] == 'healthy') &
          (results_df['Actual Label'] == 'healthy')).sum()

    # Calculate rates (with safety checks)
    fdr = tp / (tp + fn) if (tp + fn) > 0 else 0
    far = fp / (fp + tn) if (fp + tn) > 0 else 0
    accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0

    # Print results
    print("\n" + "="*40)
    print("RESULTS")
    print("="*40)
    print(f"True Positive: {tp}")
    print(f"False Negative: {fn}")
    print(f"False Positive: {fp}")
    print(f"True Negative: {tn}")
    print(f"Failure Detection Rate: {fdr:.4f}")
    print(f"False Alarm Rate: {far:.4f}")
    print(f"Accuracy: {accuracy:.4f}")

    # Save predictions
    results_df.to_csv(output_dir / 'predictions.csv', index=False)

    # Clean up
    del model
    del fine_tuner
    torch.cuda.empty_cache()
    gc.collect()

    return {
        "LLM": config.MODEL_NAME,
        "rank": config.LORA_RANK,
        "target_module": str(config.TARGET_MODULES),
        "trainable_params": trainable_params,
        "percent_trainable_params": round(percentage, 6),
        "fold_number": fold_idx + 1,
        "true_positive": tp,
        "false_negative": fn,
        "false_positive": fp,
        "true_negative": tn,
        "false_alarm_rate": round(far, 4),
        "failure_detection_rate": round(fdr, 4),
        "accuracy": round(accuracy, 4)
    }

## 7. Run Experiment

In [None]:
# Load existing results if available
results_file = config.OUTPUT_DIR / config.RESULTS_FILE

if results_file.exists():
    df_results = pd.read_csv(results_file)
    print(f"Loaded existing results with {len(df_results)} completed folds")
else:
    df_results = pd.DataFrame()
    print("Starting fresh - no existing results found")

# Configuration for this run
RUN_ALL_FOLDS = True  # Set to False to run only the first fold
MAX_FOLDS = config.N_FOLDS if RUN_ALL_FOLDS else 1

print(f"\nWill process {MAX_FOLDS} fold(s)")

In [None]:
# Run experiments
new_results = []

for fold_idx in range(MAX_FOLDS):
    try:
        result = run_fold_experiment(fold_idx, folds[fold_idx], df_results)

        if result is not None:
            new_results.append(result)

            # Save incrementally
            if new_results:
                new_df = pd.DataFrame(new_results)
                combined_df = pd.concat([df_results, new_df], ignore_index=True)
                combined_df.to_csv(results_file, index=False)
                print(f"Results saved to {results_file}")

    except Exception as e:
        print(f"Error in fold {fold_idx + 1}: {e}")
        continue

print("\n" + "="*60)
print("EXPERIMENT COMPLETED")
print("="*60)

## 8. Results Analysis (Optional)

In [None]:
# Load and analyze final results
if results_file.exists():
    final_results = pd.read_csv(results_file)

    if not final_results.empty:
        print("\n" + "="*60)
        print("OVERALL RESULTS SUMMARY")
        print("="*60)

        # Calculate statistics
        metrics = ['failure_detection_rate', 'false_alarm_rate', 'accuracy']

        for metric in metrics:
            if metric in final_results.columns:
                mean_val = final_results[metric].mean()
                std_val = final_results[metric].std()
                min_val = final_results[metric].min()
                max_val = final_results[metric].max()

                print(f"\n{metric.upper().replace('_', ' ')}:")
                print(f"  Mean ± Std: {mean_val:.4f} ± {std_val:.4f}")
                print(f"  Range: [{min_val:.4f}, {max_val:.4f}]")

        # Display full results table
        print("\n" + "="*60)
        print("DETAILED RESULTS BY FOLD")
        print("="*60)
        display_cols = ['fold_number', 'failure_detection_rate', 'false_alarm_rate', 'accuracy']
        display_cols = [c for c in display_cols if c in final_results.columns]
        print(final_results[display_cols].to_string(index=False))

        print(f"\n✓ Results saved to: {results_file}")
    else:
        print("No results found in the file")
else:
    print("No results file found")

## 9. Visualization (Optional)

In [None]:
# Optional: Create visualizations
try:
    import matplotlib.pyplot as plt
    import seaborn as sns

    if results_file.exists() and not final_results.empty:
        # Set style
        plt.style.use('seaborn-v0_8')
        sns.set_palette("husl")

        # Create figure with subplots
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))

        # Plot 1: FDR and FAR by fold
        folds_list = final_results['fold_number'].values
        axes[0].plot(folds_list, final_results['failure_detection_rate'],
                    'o-', label='FDR', linewidth=2, markersize=8)
        axes[0].plot(folds_list, final_results['false_alarm_rate'],
                    's-', label='FAR', linewidth=2, markersize=8)
        axes[0].set_xlabel('Fold Number', fontsize=12)
        axes[0].set_ylabel('Rate', fontsize=12)
        axes[0].set_title('Performance Metrics by Fold', fontsize=14, fontweight='bold')
        axes[0].legend(loc='best')
        axes[0].grid(True, alpha=0.3)
        axes[0].set_ylim([0, 1.05])

        # Plot 2: Box plot of metrics
        metrics_data = final_results[['failure_detection_rate', 'false_alarm_rate', 'accuracy']]
        metrics_data.columns = ['FDR', 'FAR', 'Accuracy']

        box_plot = metrics_data.boxplot(ax=axes[1], patch_artist=True)
        axes[1].set_ylabel('Value', fontsize=12)
        axes[1].set_title('Distribution of Performance Metrics', fontsize=14, fontweight='bold')
        axes[1].set_ylim([0, 1.05])

        # Color the boxes
        colors = ['lightblue', 'lightcoral', 'lightgreen']
        for patch, color in zip(box_plot['boxes'], colors):
            patch.set_facecolor(color)

        plt.tight_layout()

        # Save figure
        fig_path = config.OUTPUT_DIR / 'performance_metrics.png'
        plt.savefig(fig_path, dpi=100, bbox_inches='tight')
        plt.show()

        print(f"\n✓ Figure saved to: {fig_path}")

except ImportError:
    print("Matplotlib/Seaborn not available for visualization")
except Exception as e:
    print(f"Could not create visualization: {e}")