# Importations

In [1]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.schema.output_parser import StrOutputParser
from langchain.output_parsers.openai_tools import JsonOutputToolsParser
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableBranch, RunnablePassthrough, RunnableLambda, Runnable, RunnableParallel
from langchain.tools.retriever import create_retriever_tool
from langchain_core.tools import tool
import json
from langchain_groq import ChatGroq
import warnings
import json
from operator import itemgetter
import requests
import re
from zapv2 import ZAPv2
from datetime import datetime
from langdetect import detect, DetectorFactory
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel


warnings.filterwarnings('ignore')

# Mise en place API

In [2]:
# Initialisation de l'API FastAPI
app = FastAPI()

# Configurer CORS
origins = [
    "http://localhost:4200",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# API Dev KEYS

In [3]:
GROQ_API_KEY= "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"
API_KEY = 'an7nj1cf12ghsjenpi30qiulv2'

In [4]:

# datetime object containing current date and time
now = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
 
print(f"now = {now}")

now = 06/11/2024 14:58:56


# Functions

In [5]:
def format_chat_history(data):
  """
  Format the chat history for the contextualization step of the chain
  """
  formatize_chat_history = ""
  if "chat_history" in data.keys():
    for message in data["chat_history"]:
      message_type = str(type(message)).split("'")[1].split(".")[-1]
      message_content = message.content.replace("\n", "")
      formatize_chat_history += f"{message_type}: {message_content}\n"
    data["chat_history"] = formatize_chat_history
  return data

# Embedding & LLM

In [6]:
from sentence_transformers import SentenceTransformer

# Embedding model
embeddings = SentenceTransformer('distiluse-base-multilingual-cased-v1')

# llm
# Initialisation du modèle Groq avec Langchain
llm = ChatGroq(
    groq_api_key=GROQ_API_KEY, 
    model_name='llama-3.1-70b-versatile',
    temperature=0.3, 
    max_tokens=1500
)

# Agent: Connexion Owasp , demarage session 

In [7]:
PROXY = {"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"}
zap = ZAPv2(apikey=API_KEY, proxies=PROXY)

@tool
def start_session(zap, session_name, overwrite=True):
    """
    Starts a new OWASP ZAP session.

    Args:
        zap (ZAPv2): The ZAP instance to initiate the session.
        session_name (str): The name of the new session to be created.
        overwrite (bool): If True, overwrites any existing session with the same name. Defaults to True.

    Returns:
        None

    Raises:
        Exception: If ZAP fails to start the session.
    """
    zap.core.new_session(name=session_name, overwrite=overwrite)
    print(f"Session {session_name} started.")





# Prompt Engeneering 

In [8]:
# Get current time and date as a string
current_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

identity = "You are Vulnerabot, a ChatOps solution created by Sonatel, using Generative AI to provide real-time support for web vulnerability management and incident analysis for Sonatel’s systems."

general_role = """
As Sonatel's ChatOps specialist in web security, your role is twofold:
  ✅ Respond to general cybersecurity questions based on your expertise.
  ✅ Start a new OWASP ZAP session if specifically requested by the user.

If a user asks for a ZAP session initiation, use the `start_session` tool function.
"""

session_start_role = """
Your role includes determining whether a session start is needed:
1. **General Questions**: Respond with your knowledge of web security.
2. **OWASP ZAP Session Start Requests**: Use the `start_session` function to initiate a session.

- **ZAP Session Start**: If a user asks to start a session, call the function with the appropriate `session_name`.

Instructions:
  - Only start a session if the user request specifically requires it.
  - Confirm each step's completion, and inform the user if any action is needed.

Method to follow:
Question: the question here
FunctionCall: if necessary
Response: function output or otherwise, put "Not Required"

"""

prompt_structure = """
Prompt Variables:
- User Question: {question}
- Chat History: {chat_history}

Current Time: {current_time}
"""

In [9]:


get_index_info_prompt = ChatPromptTemplate.from_messages([
  ("system", identity),
  ("system", general_role),
  ("system", session_start_role),
  ("system", prompt_structure),
])

get_index_info_prompt.pretty_print()


You are Vulnerabot, a ChatOps solution created by Sonatel, using Generative AI to provide real-time support for web vulnerability management and incident analysis for Sonatel’s systems.



As Sonatel's ChatOps specialist in web security, your role is twofold:
  ✅ Respond to general cybersecurity questions based on your expertise.
  ✅ Start a new OWASP ZAP session if specifically requested by the user.

If a user asks for a ZAP session initiation, use the `start_session` tool function.




Your role includes determining whether a session start is needed:
1. **General Questions**: Respond with your knowledge of web security.
2. **OWASP ZAP Session Start Requests**: Use the `start_session` function to initiate a session.

- **ZAP Session Start**: If a user asks to start a session, call the function with the appropriate `session_name`.

Instructions:
  - Only start a session if the user request specifically requires it.
  - Confirm each step's completion, and inform the user if any action

In [10]:
get_index_info_model =ChatGroq(
    groq_api_key=GROQ_API_KEY, 
    model_name='llama-3.1-70b-versatile',
    temperature=0.3,          # Contrôle de la créativité du modèle, ici plus faible pour des réponses plus ciblées.
    max_tokens=3000             # Limite de longueur de réponse.
)

# Exemple de liaison avec un outil si nécessaire
llm.bind_tools([start_session]).bind(stop="\nResponse")

RunnableBinding(bound=ChatGroq(client=<groq.resources.chat.completions.Completions object at 0x00000126E49BE300>, async_client=<groq.resources.chat.completions.AsyncCompletions object at 0x00000126E49BED50>, model_name='llama-3.1-70b-versatile', temperature=0.3, model_kwargs={}, groq_api_key=SecretStr('**********'), max_tokens=1500), kwargs={'tools': [{'type': 'function', 'function': {'name': 'start_session', 'description': 'Starts a new OWASP ZAP session.\n\nArgs:\n    zap (ZAPv2): The ZAP instance to initiate the session.\n    session_name (str): The name of the new session to be created.\n    overwrite (bool): If True, overwrites any existing session with the same name. Defaults to True.\n\nReturns:\n    None\n\nRaises:\n    Exception: If ZAP fails to start the session.', 'parameters': {'properties': {'zap': {}, 'session_name': {}, 'overwrite': {'default': True}}, 'required': ['zap', 'session_name'], 'type': 'object'}}}], 'stop': '\nResponse'}, config={}, config_factories=[])

In [11]:
from langchain_core.messages import AIMessage
from typing import Dict, List

# Fonction call_tool adaptée
def call_tool(msg: AIMessage) -> Dict:
    """
    Fonction d'aide pour appeler les outils en fonction des requêtes de l'IA.
    """
    # Dictionnaire de correspondance entre les noms d'outils et les fonctions
    tool_map = {
        "start_session": lambda args: lancer_session(zap, **args),
    }
    
    # Copie des appels d'outils pour traitement
    tool_calls = msg.tool_calls.copy()
    
    # Si aucun outil n'est appelé, retourner une réponse par défaut
    if not tool_calls:
        return {"output": "Aucun outil spécifié."}

    # Exécuter chaque appel d'outil
    for tool_call in tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]
        if tool_name in tool_map:
            # Appeler l'outil approprié et stocker la sortie
            tool_call["output"] = tool_map[tool_name](tool_args)
        else:
            tool_call["output"] = f"Outil {tool_name} non disponible."
    
    return tool_calls[0]

In [12]:
from operator import itemgetter

get_index_info = format_chat_history | get_index_info_prompt | get_index_info_model | call_tool

In [15]:
response = get_index_info.invoke({"question": "Please start a ZAP session with the name 'security_test_session'.", "chat_history": [],"current_time": current_time})
response

{'output': 'Aucun outil spécifié.'}

# Agent

In [45]:
import requests
import time
import json
from typing import List

PROXY = {"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"}


#=== Function to determine user language ===
@tool
def get_user_language(user_request):
    """
    This function attempts to detect the language of the user request using the langdetect library.
    If detection fails, it defaults to English.

    Use Case:
        This function is useful for determining the language of the user's request, allowing 
        for appropriate responses based on the detected language. This is particularly important 
        in multilingual environments where user requests may come in various languages.

    Args:
        user_request (str): The text of the user request from which the language needs to be detected.

    Returns:
        str: A language code (e.g., 'fr', 'en', etc.) indicating the detected language. If detection fails, returns 'en'.

    Example Requests:
        - "Pouvez-vous effectuer une analyse de vulnérabilité sur http://example.com?"  # French
        - "Can you perform a vulnerability scan on http://example.com?"                  # English
        - "¿Puedes realizar un escaneo de vulnerabilidades en http://example.com?"      # Spanish
        
    Always use this function before generating any answer
    """
    # Ensure consistent language detection
    DetectorFactory.seed = 0
    try:
        # Detect language from the user request
        language = detect(user_request)
        return language  # Returns a language code, e.g., 'fr', 'en', etc.
    except:
        # In case of error or if detection fails, default to English
        return "en"
    

# === URL Accessibility Check Functions ===

@tool
def analyze_url(url: str) -> str:
    """
    This function attempts to access the URL and confirms if an analysis is possible.
    If not, it recommends using an OWASP ZAP proxy.

    Use Case:
        This function is useful for quickly checking if a URL is accessible and if it can be analyzed 
        using OWASP ZAP. If the URL is inaccessible, it provides guidance on configuring ZAP as a proxy.

    Args:
        url (str): The URL to analyze.

    Returns:
        str: A message indicating whether the URL is accessible and if OWASP ZAP has started analysis.

    Example Requests:
        - "Can you check if this URL is reachable for testing?"
        - "Is there a way to analyze this URL with ZAP?"
    """
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            zap.urlopen(url)
            return f"The URL {url} is accessible and OWASP ZAP analysis has started."
        else:
            return f"The URL {url} is inaccessible. HTTP status code: {response.status_code}."
    except requests.exceptions.RequestException:
        return (
            f"Direct access to the URL {url} failed. "
            "Consider configuring OWASP ZAP as a proxy."
        )


# === Configuration Functions ===

@tool
def configure_proxy(zap, proxy_addr, proxy_port):
    """
    Configure the use of an outgoing proxy server in OWASP ZAP.

    Use Case:
        Use this function to set up an outgoing proxy for OWASP ZAP, which is essential when you need 
        to route your traffic through a specific proxy server for security testing or compliance reasons.

    Args:
        zap (ZAPv2): The ZAP instance to configure.
        proxy_addr (str): The address of the proxy server.
        proxy_port (int): The port number of the proxy server.

    Example Requests:
        - "I need to route my traffic through a proxy. Can you help me set that up?"
        - "How do I configure a proxy in ZAP?"
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    zap.core.set_option_use_proxy_chain(True)
    zap.core.set_option_proxy_chain_name(proxy_addr)
    zap.core.set_option_proxy_chain_port(proxy_port)
    print("Outgoing proxy configured.")


@tool
def configure_context(zap, context_name, include_urls: List[str], exclude_urls: List[str]):
    """
    Create and configure a ZAP context, including specified URLs and excluding others.

    Use Case:
        This function allows you to define a context within OWASP ZAP, which is useful for organizing 
        your scans and specifying which URLs to include or exclude based on your testing requirements.

    Args:
        zap (ZAPv2): The ZAP instance to configure.
        context_name (str): The name of the context to be created.
        include_urls (List[str]): A list of regex patterns for URLs to include in the context.
        exclude_urls (List[str]): A list of regex patterns for URLs to exclude from the context.

    Returns:
        str: The ID of the newly created context.

    Example Requests:
        - "Can you create a context for my testing that includes specific URLs?"
        - "I need to exclude certain URLs from my scans. How can I set that up?"
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    context_id = zap.context.new_context(contextname=context_name)
    for url in include_urls:
        zap.context.include_in_context(contextname=context_name, regex=url)
    for url in exclude_urls:
        zap.context.exclude_from_context(contextname=context_name, regex=url)
    print(f"Context '{context_name}' configured.")
    return context_id


@tool
def start_session(zap, session_name, overwrite=True):
    """
    Starts a new OWASP ZAP session.

    Use Case:
        This function is crucial for starting a new testing session in OWASP ZAP, allowing you to 
        manage different scans and analyses independently. Overwriting existing sessions helps in maintaining 
        updated testing states.

    Args:
        zap: ZAPv2 instance.
        session_name (str): Name of the session to start.
        overwrite (bool): Whether to overwrite an existing session with the same name.

    Raises:
        Exception: If ZAP fails to start the session.

    Example Requests:
        - "I want to start a new session for my penetration test."
        - "How can I manage multiple sessions in ZAP?"
    
    ALWAYS use this tool before any kind of scan
    """
    
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    
    # Ensure zap is the correct type before proceeding
    if not isinstance(zap, ZAPv2):
        raise ValueError("Expected zap to be an instance of ZAPv2")

    zap.core.new_session(name=session_name, overwrite=overwrite)
    print(f"Session {session_name} started.")


# === Passive Scan Functions ===

@tool
def passive_scan(zap, url):
    """
    Launch a passive scan on a target URL.

    Use Case:
        This function initiates a passive scan on the specified URL, which is helpful for 
        identifying potential security issues without actively probing the server.

    Args:
        zap (ZAPv2): The ZAP instance to perform the scan.
        url (str): The target URL to be scanned.

    Example Requests:
        - "Can you run a passive scan on this URL?"
        - "I need to check this URL for vulnerabilities without stressing the server."
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    zap.urlopen(url)
    time.sleep(20)  # Wait for the passive scan to finish


@tool
def passive_scan_context(zap, url, context_name):
    """
    Launch a passive scan within a specific context.

    Use Case:
        Use this function to perform a passive scan in a designated context, allowing for 
        more tailored vulnerability assessments based on the context configuration.

    Args:
        zap (ZAPv2): The ZAP instance to perform the scan.
        url (str): The target URL to be scanned.
        context_name (str): The name of the context to be used for the scan.

    Example Requests:
        - "Can you perform a passive scan in my specific context?"
        - "I want to scan this URL with the context settings I've defined."
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    zap.urlopen(url, contextname=context_name)
    time.sleep(20)  # Wait for the passive scan to finish


@tool
def monitor_passive_scan(zap, url):
    """
    Monitor the passive scan and retrieve alerts from the target URL.

    Use Case:
        This function is essential for reviewing the results of a passive scan, allowing 
        you to identify any alerts that may indicate vulnerabilities.

    Args:
        zap (ZAPv2): The ZAP instance to monitor.
        url (str): The target URL to retrieve alerts for.

    Example Requests:
        - "What vulnerabilities were found during the passive scan?"
        - "Can you show me the alerts from the scan I just ran?"
        
    ALWAYS use this tool when a Passive scan is in completion
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    print("Retrieving alerts from the passive scan...")
    alerts = zap.core.alerts(baseurl=url)

    # Check the structure of the alerts
    if isinstance(alerts, list):
        for alert in alerts:
            # Ensure that each alert is a dictionary
            if isinstance(alert, dict):
                print(f"Alert: {alert.get('name', 'N/A')}, URL: {alert.get('url', 'N/A')}, "
                      f"Description: {alert.get('description', 'N/A')}, Risk: {alert.get('risk', 'N/A')}")
            else:
                print(f"Unrecognized alert: {alert}")
    else:
        print("No alerts found or the data structure is incorrect.")

# === Spider Scan Functions ===

@tool
def spider_scan(zap, url, recurse=True):
    """
    Start a simple spider scan on a target URL.

    Use Case:
        This function is useful for discovering links and resources on a web application, 
        helping to identify potential attack surfaces.

    Args:
        zap (ZAPv2): The ZAP instance to perform the spider scan.
        url (str): The target URL to be scanned.
        recurse (bool): If True, allows recursion in scanning linked pages. Defaults to True.

    Example Requests:
        - "Can you start a spider scan on this URL?"
        - "I need to discover all the links on this page."
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    scan_id = zap.spider.scan(url=url, recurse=recurse)
    monitor_spider_scan(zap, scan_id)


@tool
def spider_scan_context(zap, url, context_name):
    """
    Start a spider scan in a specific context.

    Use Case:
        This function allows you to initiate a spider scan within a defined context, 
        ensuring that only the relevant links are scanned based on your configurations.

    Args:
        zap (ZAPv2): The ZAP instance to perform the spider scan.
        url (str): The target URL to be scanned.
        context_name (str): The name of the context to be used for the scan.

    Example Requests:
        - "Can I run a spider scan in my context to find all linked resources?"
        - "I want to scan this URL based on the settings in my context."
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    scan_id = zap.spider.scan(url=url, contextname=context_name)
    monitor_spider_scan(zap, scan_id)


@tool
def monitor_spider_scan(zap, scan_id):
    """
    Monitor the status of the spider scan.

    Use Case:
        This function is necessary for tracking the progress of the spider scan, allowing 
        you to understand how much of the scan is completed and when it's finished.

    Args:
        zap (ZAPv2): The ZAP instance to monitor.
        scan_id (str): The ID of the scan to monitor.

    Example Requests:
        - "What is the status of the spider scan I started?"
        - "How long will the spider scan take to complete?"
        
    ALWAYS use this tool when a Spider scan is in completion
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    while int(zap.spider.status(scan_id)) < 100:
        print(f"Spider scan in progress: {zap.spider.status(scan_id)}%")
        time.sleep(20)
    print("Spider scan completed.")


# === AJAX Spider Scan Functions ===

@tool
def ajax_spider_scan(zap, url):
    """
    Start a simple AJAX spider scan on a target URL.

    Use Case:
        This function is tailored for scanning AJAX-heavy applications, helping to discover 
        dynamic content and resources that may not be found by traditional scanning methods.

    Args:
        zap (ZAPv2): The ZAP instance to perform the AJAX spider scan.
        url (str): The target URL to be scanned.

    Example Requests:
        - "Can you perform an AJAX spider scan on this URL?"
        - "I need to scan a web app that uses AJAX heavily."
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    zap.ajaxSpider.scan(url=url)
    monitor_ajax_spider_scan(zap)


@tool
def ajax_spider_scan_context(zap, url, context_name):
    """
    Start an AJAX spider scan in a specific context.

    Use Case:
        This function is useful for running AJAX spider scans in the context of predefined settings, 
        which is crucial for testing complex web applications.

    Args:
        zap (ZAPv2): The ZAP instance to perform the AJAX spider scan.
        url (str): The target URL to be scanned.
        context_name (str): The name of the context to be used for the scan.

    Example Requests:
        - "Can I run an AJAX spider scan within my defined context?"
        - "I want to scan this AJAX application while adhering to my context rules."
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    zap.ajaxSpider.scan(url=url, contextname=context_name)
    monitor_ajax_spider_scan(zap)


@tool
def monitor_ajax_spider_scan(zap):
    """
    Monitor the status of the AJAX spider scan.

    Use Case:
        This function allows you to track the progress of an AJAX spider scan, ensuring 
        that you can assess its completion and any potential issues.

    Args:
        zap (ZAPv2): The ZAP instance to monitor.

    Example Requests:
        - "How is the AJAX spider scan progressing?"
        - "When will the AJAX spider scan finish?"
    
    ALWAYS use this tool when a Ajax Spider scan is in completion
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    while zap.ajaxSpider.status != 'stopped':
        print(f"AJAX Spider scan in progress: {zap.ajaxSpider.status}")
        time.sleep(20)
    print("AJAX Spider scan completed.")


# === Active Scan Functions ===

@tool
def active_scan(zap, url, recurse=True):
    """
    Start a simple active scan on a target URL.

    Use Case:
        This function is important for conducting active scans that probe the application for vulnerabilities, 
        simulating potential attacks to discover security flaws.

    Args:
        zap (ZAPv2): The ZAP instance to perform the active scan.
        url (str): The target URL to be scanned.
        recurse (bool): If True, allows recursion in scanning linked pages. Defaults to True.

    Example Requests:
        - "Can you start an active scan on this URL?"
        - "I need to test this application for vulnerabilities."
    """
    try:
        # Configuration de l'instance ZAP avec débogage pour les erreurs de connexion
        zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
        print("Instance ZAP configurée avec succès.")

        # Vérification de la connexion au serveur ZAP
        version = zap.core.version
        print(f"Connexion ZAP établie avec la version : {version}")

        # Lancement du scan actif
        print(f"Lancement du scan actif sur l'URL: {url} avec recursion: {recurse}")
        scan_id = zap.ascan.scan(url=url, recurse=recurse)

        # Validation du scan_id retourné
        if scan_id.isnumeric():
            print(f"Scan actif démarré avec succès, ID de scan: {scan_id}")
            monitor_active_scan(zap, scan_id)  # Fonction pour monitorer le scan
        else:
            raise ValueError(f"Échec du lancement du scan : ID de scan non valide ({scan_id})")
            
    except Exception as e:
        print(f"Erreur lors de l'exécution du scan actif : {str(e)}")


@tool
def active_scan_context(zap, url, context_name):
    """
    Start an active scan in a specific context.

    Use Case:
        This function allows for conducting active scans within a defined context, ensuring that 
        only relevant URLs are targeted based on the context configuration.

    Args:
        zap (ZAPv2): The ZAP instance to perform the active scan.
        url (str): The target URL to be scanned.
        context_name (str): The name of the context to be used for the scan.

    Example Requests:
        - "Can I run an active scan in my context to find vulnerabilities?"
        - "I want to conduct an active scan based on my context settings."
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    scan_id = zap.ascan.scan(url=url, contextname=context_name)
    monitor_active_scan(zap, scan_id)


@tool
def monitor_active_scan(zap, scan_id):
    """
    Monitor the progress of the active scan.

    Use Case:
        This function is necessary for observing the status of an active scan, allowing 
        you to understand the completion status and any alerts that arise during the scan.

    Args:
        zap (ZAPv2): The ZAP instance to monitor.
        scan_id (str): The ID of the scan to monitor.

    Example Requests:
        - "What is the current status of the active scan I initiated?"
        - "Can you tell me when the active scan will be completed?"
    
    ALWAYS use this tool when an Active scan is in completion
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    while int(zap.ascan.status(scan_id)) < 100:
        print(f"Active scan in progress: {zap.ascan.status(scan_id)}%")
        time.sleep(20)
    print("Active scan completed.")


# === Alert Management Functions ===

@tool
def retrieve_alerts(zap, url, risk_level="High"):
    """
    Retrieve alerts filtered by risk level from the target URL.

    Use Case:
        This function is vital for fetching and displaying alerts based on specified risk levels, 
        enabling you to focus on the most critical vulnerabilities.

    Args:
        zap (ZAPv2): The ZAP instance to retrieve alerts from.
        url (str): The target URL to filter alerts for.
        risk_level (str): The risk level to filter alerts by (e.g., 'High', 'Medium', 'Low'). Defaults to 'High'.

    Example Requests:
        - "Can you show me all high-risk alerts for this URL?"
        - "I want to review alerts based on specific risk criteria."
    
    ALWAYS use this tool after any kind of scan
    """
    zap = ZAPv2(apikey='an7nj1cf12ghsjenpi30qiulv2', proxies={"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"})
    alerts = zap.core.alerts(baseurl=url)
    for alert in alerts:
        if alert['risk'] == risk_level:
            display_alert(alert)


@tool
def display_alert(alert):
    """
    Display the details of a specific alert.

    Use Case:
        This function is used to output the details of a specific alert to the console, 
        providing essential information for vulnerability assessment.

    Args:
        alert (dict): The alert details containing its name, URL, description, and risk level.

    Example Requests:
        - "What details can you provide about this alert?"
        - "Can you display the information for the specific alert I found?"
    """
    print(f"Alert: {alert['name']}")
    print(f"URL: {alert['url']}")
    print(f"Description: {alert['description']}")
    print(f"Risk Level: {alert['risk']}")
    print("===")


# List of tools
tools = [
    get_user_language,
    analyze_url,                # Checks URL accessibility for analysis with OWASP ZAP
    configure_proxy,            # Configures an outgoing proxy server for ZAP
    configure_context,          # Creates and configures a ZAP context
    start_session,              # Starts a new ZAP session
    passive_scan,               # Launches a passive scan on a target URL
    passive_scan_context,       # Launches a passive scan in a specific context
    monitor_passive_scan,       # Retrieves alerts from a passive scan
    spider_scan,                # Starts a simple spider scan
    spider_scan_context,        # Starts a spider scan in a specific context
    monitor_spider_scan,        # Monitors the status of the spider scan
    ajax_spider_scan,           # Starts a simple AJAX spider scan
    ajax_spider_scan_context,   # Starts an AJAX spider scan in a specific context
    monitor_ajax_spider_scan,   # Monitors the status of the AJAX spider scan
    active_scan,                # Starts a simple active scan
    active_scan_context,        # Starts an active scan in a specific context
    monitor_active_scan,        # Monitors the progress of the active scan
    retrieve_alerts,            # Retrieves alerts filtered by risk level
    display_alert               # Displays the details of an alert
]

# Chat model configuration
'''choose_model = ChatOpenAI(
    openai_api_base="https://api.groq.com/openai/v1",
    model="llama3-70b-8192",
    temperature=0.3,
    max_tokens=1900,
    api_key="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"
).bind_tools(tools)'''

from langchain_ollama.llms import OllamaLLM

choose_model = OllamaLLM(
    base_url='https://ffwrmo63w8ubqb-11434.proxy.runpod.net/',
    model="test1:latest",  # Modèle spécifique
    temperature=0.3,
    max_tokens=5000
)

def call_tool(tool_invocation: dict) -> Runnable:
    """Function for dynamically constructing the end of the chain based on the model-selected tool."""
    tool_map = {tool.name: tool for tool in tools}
    tool = tool_map[tool_invocation["type"]]
    return RunnablePassthrough.assign(output=itemgetter("args") | tool)

call_tool_list = RunnableLambda(call_tool).map()

def human_approval(tool_invocations: list) -> Runnable:
    context = []
  
    for tool in tool_invocations:
        if "search" in tool["type"]:
            tool_invocations.remove(tool)
            context.extend(
                tools_dict[tool["type"]].invoke(tool["args"]["query"])
            )

    function_call = ""
    if tool_invocations:
        tool_strs = "\n\n".join(
            json.dumps(tool_call, indent=2) for tool_call in tool_invocations
        )
        msg = (
            f"Do you approve the following tool invocations:\n\n{tool_strs}\n\n"
            "Any response other than 'Y'/'Yes' (case insensitive) will be treated as a refusal."
        )
        resp = input(msg)
        if resp.lower() not in ("yes", "y"):
            function_call = (
                f"You did not approve the following function calls:\n\n{tool_strs}",
                f"\n\npossibly for the following reasons: {resp}"
            )
        else:
            call_tools_return = call_tool_list.invoke(tool_invocations)
            call_tools_return_strs = "\n\n".join(
                json.dumps(call_tool_return, indent=2) for call_tool_return in call_tools_return
            )
            function_call = (
                f"You approved the function calls and here is the return from the functions:\n\n{call_tools_return_strs}"
            )

    return {"context": context, "function_call": function_call}

def route(info):
    if info.tool_calls == []:
        return StrOutputParser()
    else:
        return JsonOutputToolsParser() | RunnableLambda(human_approval)

In [46]:
isinstance(zap, ZAPv2)

True

In [47]:


def parse_after_get_info_response(data: dict) -> dict:
  return {
    'user_request': data['user_request'],
    'chat_history': data['chat_history'],
    'current_time': data.get('current_time'),  # Ensure current_time is included
    'user_language': data.get('user_language'),
  }

def route_after_approval(data: dict) -> dict:
  if isinstance(data['approval'], str):
    return data['approval']
  return {
    'user_request': data['user_request'],
    'chat_history': data['chat_history'],
    'current_time': data.get('current_time'),  # Ensure current_time is included
    'user_language': data.get('user_language'),
    'context': data['approval']['context'], 
    'function_call': data['approval']['function_call']
  }

In [48]:

# Get current time and date as a string
current_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

Identity="""
You are Vulnerabot, a specialized ChatOps solution developed by Sonatel, utilizing Generative AI to offer real-time web vulnerability analysis and incident management. 
"""

General_Role="""
Your primary objectives as Vulnerabot include:
  ✅ Centralizing knowledge and offering expert responses on cybersecurity.
  ✅ Supporting Sonatel's incident management by analyzing URLs and detecting vulnerabilities.
  ✅ Configuring and managing OWASP ZAP sessions, and handling proxy setups on demand.
"""
Role_in_Decision_Making="""
Your responsibility is to assess the user’s requests to decide the best course of action. Specifically:

1. **General Cybersecurity Questions**: Respond using your knowledge base, without needing tool invocation.
2. **Specific Analysis Requests**: Determine if OWASP ZAP function calling is required and select the relevant tool(s).

If an action requires a function call, select the appropriate function based on the user's requirements:
- **Analyze URL Accessibility**: analyze_url(url)
- **Start a New ZAP Session**: start_session(zap, session_name)
- **Configure Proxy**: configure_proxy(zap, proxy_addr, proxy_port)
- **Perform Scans**: Select from passive_scan, spider_scan, ajax_spider_scan, and active_scan.

**Special Instructions**:
  - Configure a proxy if direct access to the URL is not possible.
  - If the user specifies a scan type (e.g., spider scan), only execute that scan. Otherwise, initiate a full scan sequence.
  - Confirm each action's completion and inform the user if additional manual input is needed.
"""

Prompt_Variables="""
  - Chat History: {chat_history}
  - User Request: {user_request}
  - Timestamp: {current_time}
  - User Language: {user_language}
  """

Guidelines_for_Requests="""
  - **General Knowledge Requests**: Use your internal cybersecurity expertise to provide answers without using OWASP ZAP.
  - **Specific Scans**: When a user specifies a scan type (e.g., spider scan), execute only the specified scan.
  - **Full Scan**: When no scan type is specified, perform a comprehensive scan (including passive, spider, AJAX, and active scans).
  - **Proxy Configuration**: If direct URL access fails, ask the user for proxy details and configure them using configure_proxy.
"""

Tool_and_Proxy_Setup="""
- **Proxy Configuration**: Proxy is set to {"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"}
- **ZAP API Initialization**: zap = ZAPv2(apikey=API_KEY, proxies=PROXY)
"""

Available_Tools="""
- **Language Detection**: get_user_language(user_request) - Detects the user's language based on the request content.
- **URL Analysis**: analyze_url(url) - Checks the accessibility and validity of a URL before analysis with OWASP ZAP.
- **Configure Proxy**: configure_proxy(zap, proxy_addr, proxy_port) - Configures an outgoing proxy for ZAP if required to access a specific URL.
- **Context Configuration**: configure_context(zap, context_name, urls) - Creates and configures a specific context in ZAP for scoped scanning.

- **Session Management**:
  - **Start ZAP Session**: start_session(zap, session_name, overwrite=True) - Starts a new ZAP session for tracking scan results independently.

- **Scans**:
  - **Passive Scan**: passive_scan(url) - Runs a passive scan to detect vulnerabilities without interacting with the application.
  - **Passive Scan in Context**: passive_scan_context(context_name) - Runs a passive scan within a specific ZAP context.
  - **Spider Scan**: spider_scan(url) - Crawls through the URL structure to map all reachable endpoints.
  - **Spider Scan in Context**: spider_scan_context(context_name) - Runs a spider scan in a predefined context.
  - **AJAX Spider Scan**: ajax_spider_scan(url) - Executes an AJAX-based spider scan for dynamic content.
  - **AJAX Spider Scan in Context**: ajax_spider_scan_context(context_name) - Runs an AJAX scan within a specific context.
  - **Active Scan**: active_scan(url, recurse=True) - Initiates an active scan to probe for application vulnerabilities.
  - **Active Scan in Context**: active_scan_context(context_name, recurse=True) - Executes an active scan within a specific context.

- **Monitoring and Alert Retrieval**:
  - **Monitor Passive Scan**: monitor_passive_scan(scan_id) - Monitors the completion and results of a passive scan.
  - **Monitor Spider Scan**: monitor_spider_scan(scan_id) - Tracks the progress of a spider scan.
  - **Monitor AJAX Spider Scan**: monitor_ajax_spider_scan(scan_id) - Monitors the status of an AJAX spider scan.
  - **Monitor Active Scan**: monitor_active_scan(scan_id) - Follows the progress of an active scan.
  - **Retrieve Alerts**: retrieve_alerts(risk_level) - Fetches alerts filtered by specified risk levels.
  - **Display Alert**: display_alert(alert_id) - Displays detailed information for a specific alert.

Example Requests:
- "Please configure a proxy for this URL."
- "Can you start a passive scan in the specific context?"
- "Run an active scan on this URL to detect vulnerabilities."
- "Retrieve all high-risk alerts from the recent scan."
"""


Example_Interactions="""
Example 1:
User Request: "Please conduct a full vulnerability scan on http://example.com."
Action:
1. Call analyze_url("http://example.com") to verify accessibility.
2. If the URL is accessible, start a session with start_session(zap, "example_session").
3. Run the full scan sequence: passive_scan, spider_scan, ajax_spider_scan, and active_scan.
Response: "Full vulnerability scan on http://example.com completed. Alerts generated: [details]."

Example 2:
User Request: "Configure a proxy for scanning http://example.com."
Action:
1. Verify if direct URL access fails.
2. Prompt the user for proxy configuration details if required.
3. Call configure_proxy(zap, proxy_addr, proxy_port) using the provided details.
Response: "Proxy configuration for http://example.com complete."

Example 3:
User Request: "How is the scan progressing?"
Action:
1. Retrieve and display the current status of ongoing scans.
2. Provide the user with any alerts found thus far and estimated completion time if available.
Response: "The scan is currently 75% complete. Alerts so far: [alert details]. Next update in 20 seconds."

Example 4:
User Request: "Pouvez-vous effectuer une analyse de vulnérabilité sur http://example.com?" 
User Language: "French"
Action:
1. Call analyze_url("http://example.com") to verify accessibility.
2. If the URL is accessible, start a session with start_session(zap, "example_session").
3. Run the full scan sequence: passive_scan, spider_scan, ajax_spider_scan, and active_scan.
Response: "L'analyse de vulnérabilité complète sur http://example.com est terminée. Alertes générées : [détails]."

Example 5:
User Request: "Can you provide a security overview for my website?"
User Language: "English"
Action:
1. Respond in English based on the user's request.
Response: "Sure! Here is a security overview of your website..."

"""

Management_of_responses="""
# When responding to the user:
- Assess if function calls were made and what actions were taken.
- Collect all relevant alerts and provide a summary after scans.
- Ensure the user is kept informed about scan progress every 20 seconds if necessary.


**Clarification of Scan Type**:
- If the user request does not clearly specify a type of scan (e.g., passive, active, spider), ask for clarification.
- Suggested prompt: "Could you please specify the type of scan you would like to perform (e.g., passive, active, spider) on the target?"


**Language Handling**:
Use function get_user_language in order to detect langage
- Assess the user’s request for a specified language. 
- If a language is indicated, respond in that language.
- If no language is specified, default to the language used in the user's request.
"""

In [49]:
choose_prompt = ChatPromptTemplate.from_messages([
  ("system", identity),
  ("system", General_Role),
  ("system", Role_in_Decision_Making),
  ("system", Prompt_Variables),
  ("system", Guidelines_for_Requests),
  ("system", Available_Tools),
  ("system", Example_Interactions),
  ("system", Management_of_responses),


])

choose_prompt.pretty_print()


You are Vulnerabot, a ChatOps solution created by Sonatel, using Generative AI to provide real-time support for web vulnerability management and incident analysis for Sonatel’s systems.



Your primary objectives as Vulnerabot include:
  ✅ Centralizing knowledge and offering expert responses on cybersecurity.
  ✅ Supporting Sonatel's incident management by analyzing URLs and detecting vulnerabilities.
  ✅ Configuring and managing OWASP ZAP sessions, and handling proxy setups on demand.




Your responsibility is to assess the user’s requests to decide the best course of action. Specifically:

1. **General Cybersecurity Questions**: Respond using your knowledge base, without needing tool invocation.
2. **Specific Analysis Requests**: Determine if OWASP ZAP function calling is required and select the relevant tool(s).

If an action requires a function call, select the appropriate function based on the user's requirements:
- **Analyze URL Accessibility**: analyze_url(url)
- **Start a New

In [50]:
choose_chain = (
  RunnableParallel(
    user_request = itemgetter('user_request'), 
    chat_history = itemgetter('chat_history'),
    current_time = itemgetter('current_time'),
    user_language= itemgetter('user_language'),
      
  ) 
  | parse_after_get_info_response
  | RunnableParallel(
    user_request = itemgetter("user_request"), 
    chat_history = itemgetter("chat_history"),
    current_time = itemgetter('current_time'),
     user_language=itemgetter('user_language'),
    approval =choose_prompt | choose_model | route
  )
  | route_after_approval
)

In [None]:
# Get current time and date as a string
current_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

# Determine user language before invoking the chain
user_request = "can you do a passive scan of https://ept.edu.sn/"
user_language = get_user_language(user_request)

print(user_language)

response = choose_chain.invoke({
    "user_request": user_request,
    "chat_history": [],
    "current_time": current_time,
    "user_language": user_language, 
})


response


en


In [None]:
for chunk in choose_chain.stream ({
    "user_request": user_request,
    "chat_history": [],
    "current_time": current_time,
    "user_language": user_language, 
}):
     print(chunk, end="", flush=True)


In [44]:
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.callbacks import CallbackManager

class MyCallbackHandler(BaseCallbackHandler):
    def on_start(self, run_id, **kwargs):
        print(f"Début de la session avec run_id: {run_id}")

    def on_end(self, run_id, **kwargs):
        print(f"Fin de la session avec run_id: {run_id}")

    def on_error(self, run_id, error, **kwargs):
        print(f"Erreur dans la session {run_id}: {error}")



# Création du CallbackManager avec le CallbackHandler personnalisé
callback_manager = CallbackManager([MyCallbackHandler()])


user_request2 = "fais un scan actif de l'url https://ept.edu.sn/ et donnes moi toutes les vulnerabilites apres avoir demaarrer une session"
user_language2 = get_user_language(user_request2)

print(user_language2)

response2 = choose_chain.invoke(
    {
        "user_request": user_request2,
        "chat_history": [],
        "current_time": current_time,
        "user_language": user_language2,
    },
    callback_manager=callback_manager,
    verbose=False
)


print(response2)

fr


BadRequestError: Error code: 400 - {'error': {'message': 'Please reduce the length of the messages or completion.', 'type': 'invalid_request_error', 'param': 'messages', 'code': 'context_length_exceeded'}}

# Gestion reponses et specification 