# Introduction

This Jupyter Notebook demonstrates an intermediate-level implementation of input moderation and security measures for interactive LLM systems. 

The code covers essential components of secure system design:
- **Input Validation**: Ensures user inputs meet defined criteria using a whitelist, length checks, and special character escaping.
- **Rate Limiting**: Implements a rate-limiting mechanism to prevent abuse and denial-of-service attacks.
- **Prompt Generation**: Dynamically generates secure prompts using validated user data.
- **Task Instruction Design**: Provides carefully structured instructions for better system responses.

### Key Objectives:
1. Enhance system security by preventing malicious inputs and attacks such as prompt injections.
2. Improve user experience through robust handling of diverse input scenarios.
3. Demonstrate the integration of modular programming and object-oriented design principles.

The examples include both valid and malicious inputs to illustrate how the system handles various scenarios securely and effectively. This notebook is a practical tool for students, developers, and researchers interested in building secure, scalable, and user-friendly systems. 


In [8]:
# Import necessary libraries
import html
from time import time, sleep

# Define constants and classes
WHITELIST = {"greeting", "help", "query", "feedback", "info"}
MAX_LENGTH = 100

class RateLimiter:
    """
    A simple rate limiter to prevent abuse by limiting the number of requests
    in a given time window.
    """
    def __init__(self, max_requests: int, time_window: float):
        self.max_requests = max_requests
        self.time_window = time_window
        self.request_times = []

    def allow_request(self) -> bool:
        current_time = time()
        # Filter out old requests outside the time window
        self.request_times = [
            t for t in self.request_times if current_time - t < self.time_window
        ]
        if len(self.request_times) < self.max_requests:
            self.request_times.append(current_time)
            return True
        return False

# Define Input Moderation Functions
def validate_whitelist(user_input: str) -> bool:
    """
    Validate if the input is in the whitelist.
    
    Enhanced Features:
    - Supports multi-word phrases by splitting and validating each word.
    - Logs invalid inputs for monitoring and debugging purposes.
    
    Parameters:
        user_input (str): The input provided by the user.
    
    Returns:
        bool: True if the input is valid, False otherwise.
    """
    words = user_input.lower().split()
    for word in words:
        if word not in WHITELIST:
            print(f"Invalid word detected: {word}")
            return False
    return True


def escape_input(user_input: str) -> str:
    """
    Escape special characters in user input to prevent injection attacks.
    
    Enhanced Features:
    - Detects and reports potentially harmful characters.
    - Handles complex HTML or script injection attempts gracefully.
    
    Parameters:
        user_input (str): The raw input provided by the user.
    
    Returns:
        str: The sanitized input.
    """
    harmful_chars = ['<', '>', '&', '"', "'"]
    for char in harmful_chars:
        if char in user_input:
            print(f"Potentially harmful character detected: {char}")
    return html.escape(user_input)


def validate_length(user_input: str) -> bool:
    """
    Ensure the input length does not exceed the maximum allowed.
    
    Enhanced Features:
    - Provides detailed feedback on length violations.
    - Offers recommendations for shortening overly long inputs.
    
    Parameters:
        user_input (str): The input provided by the user.
    
    Returns:
        bool: True if the input length is valid, False otherwise.
    """
    if len(user_input) > MAX_LENGTH:
        print(f"Input too long ({len(user_input)} characters). Maximum allowed is {MAX_LENGTH}.")
        return False
    return True


def create_prompt(user_name: str, question: str) -> str:
    """
    Securely create a prompt with user parameters.
    
    Enhanced Features:
    - Validates question length and structure.
    - Ensures safe concatenation of user inputs into the prompt.
    
    Parameters:
        user_name (str): The name of the user.
        question (str): The question posed by the user.
    
    Returns:
        str: A secure, formatted prompt.
    
    Raises:
        ValueError: If the user name or question is invalid.
    """
    if not user_name.isalnum() or len(user_name) > 50:
        raise ValueError(f"Invalid user name: {user_name}")
    if not validate_length(question):
        raise ValueError("Question exceeds the maximum allowed length.")
    return f"User {user_name} asks: {question}"


def generate_instruction(task: str) -> str:
    """
    Generate carefully designed instructions for tasks.
    
    Enhanced Features:
    - Validates the task description for completeness and clarity.
    - Appends additional instructions for ambiguous or incomplete tasks.
    
    Parameters:
        task (str): The task description.
    
    Returns:
        str: A well-formatted instructional statement.
    """
    if not task.strip():
        raise ValueError("Task description cannot be empty.")
    if len(task) > MAX_LENGTH:
        print("Warning: Task description is long; consider summarizing.")
    return f"Please complete the following task in a clear and concise manner: {task.strip()}."


# Main Execution
if __name__ == "__main__":
    # Initialize rate limiter
    limiter = RateLimiter(max_requests=5, time_window=10)  # 5 requests per 10 seconds
    
    # Synthetic Security Dataset
    user_inputs = [
        {"input": "query", "name": "AdminUser1", "question": "How to detect phishing emails?"},
        {"input": "help", "name": "CyberSec101", "question": "Explain SQL injection prevention."},
        {"input": "feedback", "name": "SecurityAnalyst", "question": "The firewall rules seem too restrictive."},
        {"input": "query", "name": "PenTesterX", "question": "Describe buffer overflow mitigation strategies."},
        {"input": "info", "name": "NetAdmin45", "question": "How do I configure IDS?"},
        {"input": "unknown", "name": "AnonUser", "question": "How to exploit vulnerabilities?"},
        {"input": "query", "name": "RedTeam<alert>", "question": "Methods to bypass endpoint protection."},
        {"input": "help", "name": "SecureDev", "question": "Best practices for secure coding."},
    ]
    
    for user_input in user_inputs:
        print("\nProcessing input:", user_input)
        
        # Rate-limiting
        if not limiter.allow_request():
            print("Rate limit exceeded. Please try again later.")
            continue
        
        # Validate whitelist
        if not validate_whitelist(user_input["input"]):
            print("Invalid input: Not in the whitelist.")
            continue
        
        # Escape input to prevent injection
        escaped_input = escape_input(user_input["input"])
        print(f"Escaped Input: {escaped_input}")
        
        # Validate length
        if not validate_length(escaped_input):
            print("Input is too long.")
            continue
        
        try:
            # Create prompt with user details
            prompt = create_prompt(user_input["name"], user_input["question"])
            print(f"Generated Prompt: {prompt}")
        except ValueError as e:
            print(f"Error: {e}")
            continue
        
        # Generate carefully designed instructions
        task = "Answer This Question."
        instruction = generate_instruction(task)
        print(f"Instruction: {instruction}")
        
        # Simulate processing delay
        sleep(1)

        # Additional demonstration: handling valid inputs differently
        if user_input["input"] == "feedback":
            print("Thank you for your feedback!")
        elif user_input["input"] == "info":
            print("Retrieving requested information...")
        elif user_input["input"] == "greeting":
            print("Hello! How can I assist you?")


Processing input: {'input': 'query', 'name': 'AdminUser1', 'question': 'How to detect phishing emails?'}
Escaped Input: query
Generated Prompt: User AdminUser1 asks: How to detect phishing emails?
Instruction: Please complete the following task in a clear and concise manner: Answer This Question..

Processing input: {'input': 'help', 'name': 'CyberSec101', 'question': 'Explain SQL injection prevention.'}
Escaped Input: help
Generated Prompt: User CyberSec101 asks: Explain SQL injection prevention.
Instruction: Please complete the following task in a clear and concise manner: Answer This Question..

Processing input: {'input': 'feedback', 'name': 'SecurityAnalyst', 'question': 'The firewall rules seem too restrictive.'}
Escaped Input: feedback
Generated Prompt: User SecurityAnalyst asks: The firewall rules seem too restrictive.
Instruction: Please complete the following task in a clear and concise manner: Answer This Question..
Thank you for your feedback!

Processing input: {'input': 

In [None]:
import html
from time import time, sleep
import os
import logging
import re
from tenacity import retry, wait_exponential, stop_after_attempt
from transformers import pipeline

# Configuration
WHITELIST = {"greeting", "help", "query", "feedback", "info"}
MAX_LENGTH = 100

logging.basicConfig(level=logging.INFO)



class RateLimiter:
    def __init__(self, max_requests: int, time_window: float):
        self.max_requests = max_requests
        self.time_window = time_window
        self.request_times = []

    def allow_request(self) -> bool:
        current_time = time()
        self.request_times = [t for t in self.request_times if current_time - t < self.time_window]
        
        if len(self.request_times) < self.max_requests:
            self.request_times.append(current_time)
            return True
        return False

# Input Validation and Sanitization
def validate_whitelist(user_input: str) -> bool:
    words = user_input.lower().split()
    invalid_words = [word for word in words if word not in WHITELIST]

    if invalid_words:
        logging.warning(f"Invalid words detected: {', '.join(invalid_words)}")
        return False
    return True


def escape_input(user_input: str) -> str:
    harmful_chars = {'<', '>', '&', '"', "'"}
    detected_chars = [char for char in user_input if char in harmful_chars]

    if detected_chars:
        logging.warning(f"Potentially harmful characters detected: {', '.join(detected_chars)}")
    
    return html.escape(user_input)


def validate_length(user_input: str) -> bool:
    if len(user_input) > MAX_LENGTH:
        logging.warning(f"Input too long ({len(user_input)} characters). Maximum allowed is {MAX_LENGTH}.")
        return False
    return True


def validate_username(user_name: str) -> bool:
    return re.match(r'^[A-Za-z0-9_]{3,50}$', user_name) is not None


def escape_prompt_data(user_name: str, question: str) -> str:
    return html.escape(user_name), html.escape(question)

# Secure Prompt Creation
def create_prompt(user_name: str, question: str) -> str:
    if not validate_username(user_name):
        raise ValueError(f"Invalid user name: {user_name}")

    if not validate_length(question):
        raise ValueError("Question exceeds the maximum allowed length.")

    return f"Answer this question: {question}"


# Load the advanced post-trained LLM and set the device to GPU (device 0)
llm_pipeline = pipeline("text-generation", model="Qwen/Qwen2.5-72B-Instruct", device=0)

@retry(wait=wait_exponential(multiplier=1, min=2, max=10), stop=stop_after_attempt(5))
def get_llm_response(prompt: str) -> str:
    response = llm_pipeline(prompt, max_length=150, num_return_sequences=1)
    return response[0]["generated_text"].strip()



# Main Execution
if __name__ == "__main__":
    limiter = RateLimiter(max_requests=5, time_window=10)

    # Synthetic Security Dataset
    user_inputs = [
        {"input": "query", "name": "AdminUser1", "question": "How to detect phishing emails?"},
        {"input": "help", "name": "CyberSec101", "question": "Explain SQL injection prevention."},
        {"input": "feedback", "name": "SecurityAnalyst", "question": "The firewall rules seem too restrictive."},
        {"input": "query", "name": "PenTesterX", "question": "Describe buffer overflow mitigation strategies."},
        {"input": "info", "name": "NetAdmin45", "question": "How do I configure IDS?"},
        {"input": "unknown", "name": "AnonUser", "question": "How to exploit vulnerabilities?"},
        {"input": "query", "name": "RedTeam<alert>", "question": "Methods to bypass endpoint protection."},
        {"input": "help", "name": "SecureDev", "question": "Best practices for secure coding."},
    ]

    for user_input in user_inputs:
        logging.info(f"Processing input: {user_input}")

        if not limiter.allow_request():
            logging.error("Rate limit exceeded. Please try again later.")
            continue

        if not validate_whitelist(user_input["input"]):
            logging.warning("Invalid input: Not in the whitelist.")
            continue

        escaped_input = escape_input(user_input["input"])
        
        if not validate_length(escaped_input):
            logging.warning("Input is too long.")
            continue

        try:
            prompt = create_prompt(user_input["name"], user_input["question"])
            response = get_llm_response(prompt)
            logging.info(f"LLM Response: {response}")
            print("\n")
        except ValueError as e:
            logging.error(f"Error: {e}")
            continue

        sleep(1)

        match user_input["input"]:
            case "feedback":
                logging.info("Feedback received. Logging and analyzing for further action.")
            case "info":
                logging.info("Information request logged. Dispatching to relevant department.")
            case "greeting":
                logging.info("Greeting detected. Initiating friendly response protocol.")
            case "help":
                logging.info("Help request acknowledged. Connecting to the helpdesk.")
            case "query":
                logging.info("User query identified. Generating detailed response.")
            case _:
                logging.warning("Unknown input type detected. Logging and ignoring.")

# Conclusion

This code serves as a **starting point** for building robust moderation systems in applications utilizing large language models (LLMs). It highlights the **basic requirements** necessary for ensuring secure, ethical, and efficient interactions at the input stage. Each function is designed to address a specific aspect of moderation, including input validation, rate-limiting, and secure prompt creation.

However, it’s important to recognize that this notebook only scratches the surface of what is needed for comprehensive moderation. As the complexity and scale of AI systems grow, so too will the challenges. Here are a few key points to consider moving forward:

- **Iterative Improvement**: Moderation frameworks must evolve alongside new use cases and emerging threats. Regular audits, updates, and refinements are crucial to maintaining effectiveness.

- **Advanced Techniques**: While this notebook demonstrates basic methods, additional layers of security, such as anomaly detection, machine learning for moderation, and contextual validation, should be integrated into more sophisticated systems.

- **Collaboration**: The design of secure and ethical systems is a collective effort. Engaging with interdisciplinary experts, including ethicists, security specialists, and end-users, can provide invaluable insights for improvement.

- **From Basics to Advanced**: The concepts illustrated here should inspire more comprehensive implementations in real-world environments, incorporating scalable and context-aware tools tailored to specific application needs.

Treat this code as a **reminder of the foundational requirements** for effective moderation. It is a stepping stone towards creating resilient systems that not only protect the AI but also uphold user trust, security, and compliance with ethical standards.

### Final Note
Moderation is not just a technical requirement but a **responsibility**—one that ensures AI systems align with human values and remain tools for good. Use this notebook as a base, and continue building, improving, and innovating to meet the challenges ahead.
