In [None]:
from pathlib import Path
import logging
from typing import Dict, List, Optional, Union
import yaml
import torch
from datasets import Dataset, load_dataset
from transformers import (
    AutoTokenizer, AutoModelForCausalLM, 
    TrainingArguments, Trainer,
    DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model, TaskType
import numpy as np
from torch.cuda.amp import autocast
import psutil
import json

class LLMPipeline:
    def __init__(self, config_path: str):
        """Initialize the LLM pipeline with configuration."""
        self.config = self._load_config(config_path)
        self.setup_logging()
        self.tokenizer = None
        self.model = None
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        
    def _load_config(self, config_path: str) -> Dict:
        """Load configuration from YAML file."""
        with open(config_path, 'r') as f:
            return yaml.safe_load(f)
    
    def setup_logging(self):
        """Configure logging with rotation and formatting."""
        logging.basicConfig(
            level=self.config.get('logging_level', 'INFO'),
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('pipeline.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def preprocess_data(self, texts: List[str]) -> Dataset:
        """Preprocess and tokenize input texts."""
        try:
            # Initialize tokenizer if not already done
            if self.tokenizer is None:
                self.tokenizer = AutoTokenizer.from_pretrained(
                    self.config['model_name'],
                    use_fast=True
                )

            # Basic text cleaning
            cleaned_texts = [
                text.strip().replace('\n', ' ').replace('\r', ' ')
                for text in texts
            ]

            # Create dataset
            dataset = Dataset.from_dict({'text': cleaned_texts})

            # Tokenization function
            def tokenize_function(examples):
                return self.tokenizer(
                    examples['text'],
                    padding='max_length',
                    truncation=True,
                    max_length=self.config['max_length']
                )

            # Apply tokenization
            tokenized_dataset = dataset.map(
                tokenize_function,
                batched=True,
                num_proc=4,
                remove_columns=['text']
            )

            return tokenized_dataset

        except Exception as e:
            self.logger.error(f"Error in preprocessing: {str(e)}")
            raise

    def setup_model(self):
        """Initialize model with PEFT configuration."""
        try:
            # Load base model
            base_model = AutoModelForCausalLM.from_pretrained(
                self.config['model_name'],
                torch_dtype=torch.float16,
                device_map='auto'
            )

            # Configure LoRA
            lora_config = LoraConfig(
                task_type=TaskType.CAUSAL_LM,
                r=8,  # LoRA attention dimension
                lora_alpha=32,
                lora_dropout=0.1,
                target_modules=["q_proj", "v_proj"]
            )

            # Apply LoRA
            self.model = get_peft_model(base_model, lora_config)
            self.model.to(self.device)

        except Exception as e:
            self.logger.error(f"Error in model setup: {str(e)}")
            raise

    def train(self, dataset: Dataset):
        """Train the model using PEFT."""
        try:
            training_args = TrainingArguments(
                output_dir="./results",
                per_device_train_batch_size=4,
                gradient_accumulation_steps=4,
                learning_rate=2e-4,
                num_train_epochs=3,
                fp16=True,
                logging_steps=100,
                save_strategy="steps",
                save_steps=200,
                evaluation_strategy="steps",
                eval_steps=200,
                save_total_limit=3,
            )

            trainer = Trainer(
                model=self.model,
                args=training_args,
                train_dataset=dataset,
                data_collator=DataCollatorForLanguageModeling(
                    tokenizer=self.tokenizer,
                    mlm=False
                )
            )

            trainer.train()
            
        except Exception as e:
            self.logger.error(f"Error during training: {str(e)}")
            raise

    def inference(self, text: str) -> str:
        """Perform inference with resource monitoring."""
        try:
            # Monitor resource usage
            cpu_percent = psutil.cpu_percent()
            memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB

            # Log resource usage
            self.logger.info(f"CPU Usage: {cpu_percent}%, Memory: {memory:.2f}MB")

            # Tokenize input
            inputs = self.tokenizer(
                text,
                return_tensors="pt",
                padding=True,
                truncation=True,
                max_length=self.config['max_length']
            ).to(self.device)

            # Generate with automatic mixed precision
            with autocast():
                outputs = self.model.generate(
                    inputs.input_ids,
                    max_length=self.config['max_length'],
                    num_return_sequences=1,
                    temperature=0.7,
                    do_sample=True
                )

            return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        except Exception as e:
            self.logger.error(f"Error during inference: {str(e)}")
            raise

    def save_model(self, path: str):
        """Save the model and configuration."""
        try:
            # Create directory if it doesn't exist
            save_path = Path(path)
            save_path.mkdir(parents=True, exist_ok=True)

            # Save model
            self.model.save_pretrained(save_path)
            self.tokenizer.save_pretrained(save_path)

            # Save configuration
            with open(save_path / 'pipeline_config.json', 'w') as f:
                json.dump(self.config, f)

            self.logger.info(f"Model saved successfully to {path}")

        except Exception as e:
            self.logger.error(f"Error saving model: {str(e)}")
            raise

    def load_model(self, path: str):
        """Load a saved model and configuration."""
        try:
            load_path = Path(path)
            
            # Load configuration
            with open(load_path / 'pipeline_config.json', 'r') as f:
                self.config = json.load(f)

            # Load tokenizer and model
            self.tokenizer = AutoTokenizer.from_pretrained(load_path)
            self.model = AutoModelForCausalLM.from_pretrained(
                load_path,
                torch_dtype=torch.float16,
                device_map='auto'
            )

            self.logger.info(f"Model loaded successfully from {path}")

        except Exception as e:
            self.logger.error(f"Error loading model: {str(e)}")
            raise