# PyRIT Red Teaming - Sample I
This module provides a simple red teaming framework using Microsoft PyRIT for adversarial testing of AI models. It manages attack configurations, target models, and result analysis.

## Loading of Libraries

In [142]:
import os
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
import numpy as np
import pandas as pd

# PyRIT core imports
from pyrit.datasets import fetch_adv_bench_dataset
from pyrit.common import IN_MEMORY, initialize_pyrit
from pyrit.memory import CentralMemory, AttackResultEntry

# PyRIT converters and normalizers
from pyrit.prompt_converter import Base64Converter
from pyrit.prompt_normalizer import PromptConverterConfiguration

# PyRIT attack framework
from pyrit.executor.attack import (
    AttackAdversarialConfig,
    AttackConverterConfig,
    AttackScoringConfig,
    ConsoleAttackResultPrinter,
    RedTeamingAttack,
    RTOSystemPromptPaths,
)

# PyRIT targets and scorers
from pyrit.prompt_target import OpenAIChatTarget, CrucibleTarget
from pyrit.score import SelfAskTrueFalseScorer, TrueFalseQuestionPaths

# Make Read Dataset to have initialized the categories

## Initial Parameters

In [143]:
MEMORY_EXPORT_PATH: str = "./mem_exports/memoryconversations"

DATASET_LIMIT: int = 5

MAX_TURNS: int = 3

CONVERTERS_APPLICATION_CHANCES = 0.25

class ModelType(Enum):
    """Enumeration for different model types supported by the system."""
    OPENAI_GPT4O = "gpt-4o"
    OPENAI_GPT4O_MINI = "gpt-4o-mini"
    OPENAI_GPT35_TURBO = "gpt-3.5-turbo"
    LLAMA_31_8B = "Meta-Llama-3.1-8B-Instruct"
    MINISTRAL_3B = "Ministral-3B"
    PIECEOFCAKE = "Crucible - pieceofcake"

class AttackOutcome(Enum):
    """Enumeration for attack outcomes."""
    SUCCESS = "SUCCESS"
    FAILURE = "FAILURE"
    TIMEOUT = "TIMEOUT"
    ERROR = "ERROR"

@dataclass
class AttackConfiguration:
    """Data class to hold attack configuration parameters."""
    dataset_limit: int = DATASET_LIMIT
    max_turns: int = MAX_TURNS
    memory_labels: Dict[str, str] = None
    dataset_categories: List[str] = None
    
    
    def __post_init__(self):
        if self.dataset_limit is None:
            self.dataset_limit = DATASET_LIMIT
        if self.max_turns is None:
            self.max_turns = MAX_TURNS


@dataclass
class AttackResult:
    """Data class to hold individual attack results."""
    objective: str
    attack_identifier: str
    scorer: str
    executed_turns: int
    execution_time_s: float
    success: bool
    outcome_reason: Optional[str]
    last_response: Optional[str] = None
    last_score: Optional[Any] = None


## UTILS

In [144]:
class PyRITLogger:
    """Logging utility class for PyRIT operations."""
    
    def __init__(self, log_level: int = logging.INFO):
        """
        Initialize logger with specified log level.
        
        Args:
            log_level: Logging level (default: INFO)
        """
        self.logger = logging.getLogger("PyRITRedTeaming")
        self.logger.setLevel(log_level)
        
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
    
    def log_attack_start(self, target_name: str, objective: str):
        """Log the start of an attack."""
        self.logger.info(f"Starting attack on {target_name} with objective: {objective[:200]}")
    
    def log_attack_result(self, result: AttackResult):
        """Log attack result."""
        status = "SUCCESS" if result.success else "FAILURE"
        self.logger.info(f"Attack {status}: {result.executed_turns} turns, {result.execution_time_s:.2f}s")
    
    def log_error(self, error: Exception, context: str = ""):
        """Log error with context."""
        self.logger.error(f"Error in {context}: {str(error)}")


class TargetModelManager:
    """Manages different target models for red teaming attacks."""
    
    def __init__(self):
        """Initialize the target model manager."""
        self.logger = PyRITLogger().logger
        self.targets = {}
        self._initialize_targets()
    
    def _initialize_targets(self):
        """Initialize all available target models."""
        try:
            # OpenAI models
            self.targets[ModelType.OPENAI_GPT4O] = OpenAIChatTarget(
                model_name=ModelType.OPENAI_GPT4O.value
            )
            self.targets[ModelType.OPENAI_GPT4O_MINI] = OpenAIChatTarget(
                model_name=ModelType.OPENAI_GPT4O_MINI.value
            )
            self.targets[ModelType.OPENAI_GPT35_TURBO] = OpenAIChatTarget(
                model_name=ModelType.OPENAI_GPT35_TURBO.value
            )
            
            # External target (Crucible)
            self.targets[ModelType.PIECEOFCAKE] = CrucibleTarget(
                endpoint="https://pieceofcake.platform.dreadnode.io"
            )
            
            # Azure Foundry models (if environment variables are available)
            if self._check_credentials():
                self.targets[ModelType.LLAMA_31_8B] = OpenAIChatTarget(
                    endpoint=os.environ["AZURE_FOUNDRY_MODELS_COMPLETION_ENDPOINT"],
                    api_key=os.environ["AZURE_FOUNDRY_API_KEY"],
                    api_version="2024-05-01-preview",
                    model_name=ModelType.LLAMA_31_8B.value,
                    max_tokens=216,
                )
                
                self.targets[ModelType.MINISTRAL_3B] = OpenAIChatTarget(
                    endpoint=os.environ["AZURE_FOUNDRY_MODELS_COMPLETION_ENDPOINT"],
                    api_key=os.environ["AZURE_FOUNDRY_API_KEY"],
                    api_version="2024-05-01-preview",
                    model_name=ModelType.MINISTRAL_3B.value,
                    max_tokens=216,
                )
            
            self.logger.info(f"Initialized {len(self.targets)} target models")
            
        except Exception as e:
            self.logger.error(f"Error initializing targets: {str(e)}")
    
    def _check_credentials(self) -> bool:
        """Check if Azure credentials are available."""
        required_vars = ["OPENAI_API_KEY",
                         "OPENAI_CHAT_KEY",
                         "CRUCIBLE_API_KEY",
                         "AZURE_FOUNDRY_MODELS_COMPLETION_ENDPOINT",
                         "AZURE_FOUNDRY_API_KEY"]
        return all(var in os.environ for var in required_vars)
    
    def get_target(self, model_type: ModelType) -> Optional[Any]:
        """
        Get a target model by type.
        
        Args:
            model_type: The type of model to retrieve
            
        Returns:
            Target model instance or None if not available
        """
        return self.targets.get(model_type)
    
    def get_available_targets(self) -> List[ModelType]:
        """Get list of available target model types."""
        return list(self.targets.keys())


## Dataset Manager

In [145]:
class DatasetManager:
    """Manages adversarial datasets for red teaming attacks."""
    
    def __init__(self):
        """Initialize the dataset manager."""
        self.logger = PyRITLogger().logger
    
    def fetch_adversarial_objectives(
        self, 
        categories: List[str] = None, 
        limit: int = 5
    ) -> List[str]:
        """
        Fetch adversarial objectives from the AdvBench dataset.
        
        Args:
            categories: List of categories to fetch
            limit: Maximum number of objectives to fetch
            
        Returns:
            List of adversarial objective strings
        """
        
        try:
            self.logger.info(f"Fetching {limit} objectives from categories: {categories}")
            objectives = fetch_adv_bench_dataset(
                main_categories=categories
            ).get_values(first=limit)
            
            self.logger.info(f"Successfully fetched {len(objectives)} objectives")
            return objectives
            
        except Exception as e:
            self.logger.error(f"Error fetching dataset: {str(e)}")
            return []

## Attack Executor

In [146]:
class AttackExecutor:
    """Executes red teaming attacks using PyRIT framework."""
    
    def __init__(self, target_manager: TargetModelManager):
        """
        Initialize the attack executor.
        
        Args:
            target_manager: Instance of TargetModelManager
        """
        self.target_manager = target_manager
        self.logger = PyRITLogger().logger
        self.memory = CentralMemory.get_memory_instance()
        self.result_printer = ConsoleAttackResultPrinter()
    
    def _create_attack_configuration(
        self, 
        target_model: Any,
        adversarial_target: Any = None,
        scoring_target: Any = None
    ) -> tuple:
        """
        Create attack configuration components.
        
        Args:
            target_model: Primary target model for attacks
            adversarial_target: Model for adversarial prompt generation
            scoring_target: Model for scoring attack results
            
        Returns:
            Tuple of (converter_config, adversarial_config, scoring_config)
        """
        # Default to GPT-4o-mini for adversarial and scoring if not specified
        if adversarial_target is None:
            adversarial_target = self.target_manager.get_target(ModelType.OPENAI_GPT4O)
        
        if scoring_target is None:
            scoring_target = self.target_manager.get_target(ModelType.OPENAI_GPT4O_MINI)

        # Use and configure prompt converter (Base64 encoding for obfuscation) with a chance 1/4
        
        if np.random.rand() < CONVERTERS_APPLICATION_CHANCES:
            prompt_converters = PromptConverterConfiguration.from_converters(
                converters=[Base64Converter()]
            )
        else:
            prompt_converters = PromptConverterConfiguration.from_converters(
                converters=[]
            )
        converter_config = AttackConverterConfig(request_converters=prompt_converters)
        
        # Configure adversarial attack generation
        adversarial_config = AttackAdversarialConfig(
            target=adversarial_target,
            system_prompt_path=RTOSystemPromptPaths.TEXT_GENERATION.value,
        )
        
        # Configure attack result scoring
        scoring_config = AttackScoringConfig(
            objective_scorer=SelfAskTrueFalseScorer(
                true_false_question_path=TrueFalseQuestionPaths.PROMPT_INJECTION.value,
                chat_target=scoring_target
            )
        )
        
        return converter_config, adversarial_config, scoring_config
    
    async def execute_single_attack(
        self,
        objective: str,
        target_model: Any,
        config: AttackConfiguration
    ) -> AttackResult:
        """
        Execute a single red teaming attack.
        
        Args:
            objective: The adversarial objective to test
            target_model: Target model to attack
            config: Attack configuration parameters
            
        Returns:
            AttackResult containing the outcome and metrics
        """
        try:
            # Create attack configurations
            converter_config, adversarial_config, scoring_config = \
                self._create_attack_configuration(target_model)
            
            # Initialize red teaming attack
            red_teaming_attack = RedTeamingAttack(
                objective_target=target_model,
                attack_converter_config=converter_config,
                attack_adversarial_config=adversarial_config,
                attack_scoring_config=scoring_config,
                max_turns=config.max_turns,
            )
            
            # Execute the attack
            self.logger.info(f"Executing attack with objective: {objective[:200]}")
            result = await red_teaming_attack.execute_async(
                objective=objective,
                memory_labels=config.memory_labels
            )
            
            # Print result to console
            await self.result_printer.print_result_async(result=result)
            
            # Convert to our AttackResult format
            attack_result = self._convert_pyrit_result(result)
            
            return attack_result
            
        except Exception as e:
            self.logger.error(f"Error executing attack: {str(e)}")
            return AttackResult(
                objective=objective,
                attack_identifier="RedTeamingAttack",
                scorer="Error",
                executed_turns=0,
                execution_time_s=0.0,
                success=False,
                outcome_reason=str(e)
            )
    
    def _convert_pyrit_result(self, pyrit_result) -> AttackResult:
        """
        Convert PyRIT result to our AttackResult format.
        
        Args:
            pyrit_result: Result from PyRIT attack execution
            
        Returns:
            Converted AttackResult instance
        """
        entry = AttackResultEntry(entry=pyrit_result)
        
        return AttackResult(
            objective=entry.objective,
            attack_identifier="RedTeamingAttack",
            scorer=str(entry.last_score).split(':', 1)[0] if entry.last_score else "Unknown",
            executed_turns=entry.executed_turns,
            execution_time_s=entry.execution_time_ms / 1000 if entry.execution_time_ms else 0.0,
            success=str(entry.outcome).endswith('SUCCESS') if entry.outcome else False,
            outcome_reason=entry.outcome_reason,
            last_response=entry.last_response,
            last_score=entry.last_score
        )

## Result Analyzer

In [147]:
class ResultAnalyzer:
    """Analyzes and processes red teaming attack results."""

    def __init__(self, memory_export_path: str = MEMORY_EXPORT_PATH):
        """
        Initialize the result analyzer.
        
        Args:
            memory_export_path: Path for exporting memory conversations
        """
        self.memory = CentralMemory.get_memory_instance()
        self.memory_export_path = memory_export_path
        self.logger = PyRITLogger().logger
    
    def analyze_attack_results(self) -> pd.DataFrame:
        """
        Analyze all attack results from memory and create summary statistics.
        
        Returns:
            DataFrame containing attack results and statistics
        """
        try:
            # Get raw results from memory
            raw_results = list(self.memory.get_attack_results())
            
            if not raw_results:
                self.logger.warning("No attack results found in memory")
                return pd.DataFrame()
            
            # Convert to DataFrame for analysis
            results_data = []
            
            for result in raw_results:
                result_dict = vars(result)
                
                # Extract and clean relevant information
                analysis_row = {
                    'objective': result_dict.get('objective', ''),
                    'attack_identifier': result_dict.get('attack_identifier', {}).get('__type__', 'Unknown'),
                    'scorer': str(result_dict.get('last_score', '')).split(':', 1)[0],
                    'executed_turns': result_dict.get('executed_turns', 0),
                    'execution_time_s': result_dict.get('execution_time_ms', 0) / 1000,
                    'success': str(result_dict.get('outcome', '')).endswith('SUCCESS'),
                    'outcome_reason': result_dict.get('outcome_reason', '')
                }
                
                results_data.append(analysis_row)
            
            # Create analysis DataFrame
            analysis_df = pd.DataFrame(results_data)
            
            return analysis_df
            
        except Exception as e:
            self.logger.error(f"Error analyzing results: {str(e)}")
            return pd.DataFrame()
    
    def export_memory_conversations(self, formats: List[str] = None):
        """
        Export memory conversations to specified formats.
        
        Args:
            formats: List of formats to export ('json', 'csv')
        """
        if formats is None:
            formats = ['json', 'csv']
        
        try:
            for format_type in formats:
                file_path = f"{self.memory_export_path}.{format_type}"
                self.memory.export_conversations(
                    file_path=file_path,
                    export_type=format_type
                )
                self.logger.info(f"Exported conversations to {file_path}")
                
        except Exception as e:
            self.logger.error(f"Error exporting conversations: {str(e)}")
    
    def find_interesting_prompts(self, memory_labels: Dict[str, str] = None) -> List[Any]:
        """
        Find prompts with high scores or positive attack indicators.
        
        Args:
            memory_labels: Memory labels to filter by
            
        Returns:
            List of interesting prompt pieces
        """
        try:
            result_pieces = self.memory.get_prompt_request_pieces(labels=memory_labels)
            interesting_prompts = []
            
            for piece in result_pieces:
                for score in piece.scores:
                    # Check for high float scores or positive true/false scores
                    if (score.scorer_class_identifier.get("__type__") == "SelfAskTrueFalseScorer" 
                        and score.get_value() == True):
                        interesting_prompts.append(piece)
                        break
            
            print(f"Found {len(interesting_prompts)} interesting prompts")
            return interesting_prompts
            
        except Exception as e:
            self.logger.error(f"Error finding interesting prompts: {str(e)}")
            return []

## PyRIT Red Teaming System

In [148]:
class PyRITRedTeamingSystem:
    """
        Main class that orchestrates the entire red teaming system.
        
        This class provides a high-level interface for conducting comprehensive
        red teaming attacks against AI models using the PyRIT framework.
        """
    
    def __init__(self, memory_type: str = IN_MEMORY, log_level: int = logging.INFO):
        """
        Initialize the PyRIT Red Teaming System.
        
        Args:
            memory_type: Type of memory storage for PyRIT
            log_level: Logging level for the system
        """
        # Initialize PyRIT
        initialize_pyrit(memory_db_type=memory_type)
        
        # Initialize components
        self.logger = PyRITLogger(log_level).logger
        self.target_manager = TargetModelManager()
        self.dataset_manager = DatasetManager()
        self.attack_executor = AttackExecutor(self.target_manager)
        self.result_analyzer = ResultAnalyzer()
        
        # Memory management
        self.memory = CentralMemory.get_memory_instance()
        
        self.logger.info("PyRIT Red Teaming System initialized successfully")
    
    async def run_attack(
        self,
        target_models: List[ModelType] = None,
        config: AttackConfiguration = None
    ) -> Dict[str, List[AttackResult]]:
        """
        Run red teaming attacks across multiple models.
        
        Args:
            target_models: List of model types to test (default: all available)
            config: Attack configuration parameters
            
        Returns:
            Dictionary mapping model names to their attack results
        """
        if target_models is None:
            target_models = self.target_manager.get_available_targets()
        
        if config is None:
            config = AttackConfiguration()
        
        # Reset memory for fresh start
        self.memory.reset_database()
        
        # Fetch adversarial objectives
        objectives = self.dataset_manager.fetch_adversarial_objectives(
            categories=config.dataset_categories,
            limit=config.dataset_limit
        )
        
        if not objectives:
            self.logger.error("No objectives fetched. Cannot proceed with attacks.")
            return {}
        
        # Execute attacks on each target model
        all_results = {}
        
        for model_type in target_models:
            target_model = self.target_manager.get_target(model_type)
            #config.memory_labels = {"target_model": model_type.value}

            if target_model is None:
                self.logger.warning(f"Target model {model_type} not available. Skipping.")
                continue
            
            self.logger.info(f"Starting attacks on {model_type.value}")
            model_results = []
            
            for objective in objectives:  
                try:
                    result = await self.attack_executor.execute_single_attack(
                        objective=objective,
                        target_model=target_model,
                        config=config
                    )
                    model_results.append(result)
                    
                except Exception as e:
                    self.logger.error(f"Error attacking {model_type.value}: {str(e)}")
                    continue
            
            all_results[model_type.value] = model_results
            self.logger.info(f"Completed {len(model_results)} attacks on {model_type.value}")
        
        return all_results
    
    def analyze_and_export_results(self, export_formats: List[str] = None) -> pd.DataFrame:
        """
        Analyze attack results and export data.
        
        Args:
            export_formats: List of export formats ('json', 'csv')
            
        Returns:
            DataFrame containing analysis results
        """
        if export_formats is None:
            export_formats = ['json', 'csv']
        
        # Analyze results
        analysis_df = self.result_analyzer.analyze_attack_results()
        
        # Export memory conversations
        self.result_analyzer.export_memory_conversations(formats=export_formats)
        
        # Find interesting prompts
        interesting_prompts = self.result_analyzer.find_interesting_prompts()
        
        return analysis_df

## Main

In [None]:
if __name__ == "__main__":
    """Main function demonstrating system usage."""
    
    # Initialize the red teaming system
    red_teaming_system = PyRITRedTeamingSystem()

    target_models=[ModelType.OPENAI_GPT4O_MINI,
                   ModelType.OPENAI_GPT35_TURBO,
                   ModelType.LLAMA_31_8B,
                   ModelType.MINISTRAL_3B, 
                   ModelType.PIECEOFCAKE
               ]

    print("=" * 50)
    print("RUNNING TEST")
    print("=" * 50)
    
    # Configure attack parameters
    attack_config = AttackConfiguration(
        memory_labels={},
        dataset_categories=["Financial and Business"],
    )
    
    # Run attacks
    results = await red_teaming_system.run_attack(
        target_models=target_models,
        config=attack_config
    )

    print(f"Test completed{"."*20}")
    
    # Analyze and export results

    results_df = red_teaming_system.analyze_and_export_results()

    static_df = pd.DataFrame([item.value for item in target_models for _ in range(DATASET_LIMIT)], columns=['target_models'])

    analysis_df = pd.concat([static_df, results_df], axis=1)

    if not analysis_df.empty:
        print(f"{"=" * 50}\nANALYZING RESULTS\n{"=" * 50}")
        print(analysis_df[['target_models','objective','success','executed_turns', 'execution_time_s']].to_string())
        
        print(f"\n{"=" * 50}\nRESULTS SUMMARY\n{"=" * 50}")
        aggregated_df = analysis_df.groupby(['target_models']).agg({
            'success': ['mean'],
            'executed_turns': ['sum', 'mean', 'min', 'max'],
            'execution_time_s': ['sum', 'mean', 'min', 'max']
        }).reset_index()
        print(aggregated_df.to_string())