In [1]:
import pandas as pd
from langchain.llms.huggingface_pipeline import HuggingFacePipeline
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from dotenv import dotenv_values
import sys
sys.path.insert(0,'/workspaces/RAG_secure_code_generation/src')
from utils.utils import load_yaml, init_argument_parser, sanitize_output, fill_default_parameters
from langchain.prompts import (
    ChatPromptTemplate, PromptTemplate
)
from utils.openai_utils import is_openai_model, build_chat_model
from langchain.chat_models import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
import random
import numpy as np
from functools import partial
from typing import List
from langchain.embeddings import OpenAIEmbeddings


from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.document_loaders.generic import GenericLoader
from langchain.document_loaders import WebBaseLoader
from utils.custom_grobid_parser import CustomGrobidParser
from langchain.docstore.document import Document
from langchain_core.embeddings import Embeddings
import bs4
from langchain_core.runnables import RunnablePassthrough

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [3]:
seed = 156
np.random.seed(seed)
random.seed(seed)

In [4]:
template_file = "../data/templates/complete_function_readable.yaml"
rag_template_file = "../data/rag_templates/basic_rag_suffix.txt"
task_file = "../data/tasks/detect_xss_simple_prompt.txt"
parameters_file = "../data/prompt_parameters/empty.yaml"
papers_folder = "../data/papers/"
model_name = "gpt-3.5-turbo-0613"


In [5]:
env = dotenv_values()

In [6]:
template = load_yaml(template_file)
    # load parameters
prompt_parameters = load_yaml(parameters_file)

#read txt containing the task
with open(task_file) as f:
    prompt_parameters["input"] = f.read()
prompt_parameters = fill_default_parameters(prompt_parameters, template["default_parameters"])
with open(rag_template_file) as f:
    template['input'] += "\n" + f.read()

use_openai_api = is_openai_model(model_name)
openai_key = env['OPENAI_API_KEY']
model = ChatOpenAI(temperature=0, openai_api_key=openai_key, model=model_name)

In [7]:
def build_scientific_papers_loader(papers_folder:str)->List[Document]: 
    loader = GenericLoader.from_filesystem(
        papers_folder,
        glob="*",
        suffixes=[".pdf"],
        parser=CustomGrobidParser(segment_sentences=False),
    )
    docs = loader.load()
    return docs

In [8]:
url_filter_cheat_sheet = "https://cheatsheetseries.owasp.org/cheatsheets/XSS_Filter_Evasion_Cheat_Sheet.html"

In [9]:
def build_web_page_loader(url:str)->List[Document]:
    loader = WebBaseLoader(
        web_paths=(url,),
        bs_kwargs=dict(
            parse_only=bs4.SoupStrainer(
            )
        ))
    docs = loader.load()
    return docs

In [10]:
def build_documents_retriever(docs:List[Document],
                              embeddings:Embeddings,
                              chunk_size:int=3500,
                              chunk_overlap:int=500):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    splits = text_splitter.split_documents(docs)

    vectorstore = Chroma.from_documents(documents=splits, embedding=embeddings)
    retriever = vectorstore.as_retriever()
    return retriever

In [11]:
#docs = build_scientific_papers_loader(papers_folder)
docs = build_web_page_loader(url_filter_cheat_sheet)
retriever = build_documents_retriever(docs, embeddings=OpenAIEmbeddings())

  if self._task_type is "RETRIEVAL_DOCUMENT":


In [12]:
prompt = ChatPromptTemplate.from_messages([("system", template['input']), ("human", "{input}")])


In [13]:
retriever

VectorStoreRetriever(tags=['Chroma', 'OpenAIEmbeddings'], vectorstore=<langchain.vectorstores.chroma.Chroma object at 0x7ff21e0b3910>)

In [15]:
comment = "Check if in the given http_get_request there is an XSS exploit, considering also the possible modifications that an attacker can perform."
ex = "Replace spaces with ”/”, ”%0A” or ”%0D”"
retriever.get_relevant_documents( comment, k=22)

[Document(page_content='Assuming http://www.google.com/ is programmatically replaced with nothing). I actually used a similar attack vector against several separate real world XSS filters by using the conversion filter itself (here is an example) to help create the attack vector (IE: java&\\#x09;script: was converted into java    script:, which renders in IE, Netscape 8.1+ in secure site mode and Opera):\n\n<A\xa0HREF="http://www.google.com/ogle.com/">XSS</A>\nAssisting XSS with HTTP Parameter Pollution¶\nAssume a content sharing flow on a web site is implemented as shown below. There is a "Content" page which includes some content provided by users and this page also includes a link to "Share" page which enables a user choose their favorite social sharing platform to share it on. Developers HTML encoded the "title" parameter in the "Content" page to prevent against XSS but for some reasons they didn\'t URL encoded this parameter to prevent from HTTP Parameter Pollution. Finally they d

In [38]:
chain =  prompt | model | StrOutputParser() | sanitize_output


In [39]:
prompt_parameters["context"] = retriever | format_docs

In [40]:
prompt_parameters

{'input': 'def detect_xss(http_get_request: str)->bool: \n""" Check if in the given http_get_request there is an XSS exploit, considering also the possible evasions that an attacker can perform.""" ',
 'context': VectorStoreRetriever(tags=['Chroma', 'OpenAIEmbeddings'], vectorstore=<langchain.vectorstores.chroma.Chroma object at 0x7f4ffa2bbdf0>)
 | RunnableLambda(...)}

In [41]:
print(prompt.format(**prompt_parameters))


System: The user will provide the initial part of the function (function name, parameters with types, return type and a comment describing the purpose of the function, with some optional example. 
You are a coding assistant that writes some python code to complete the user's function. Be sure that the code is syntactically correct and that it returns the correct type. 
The lenght of the code should short, readable, and without redundant checks on the paylaod. 
Return only python code in Markdown format, e.g.:
```python
....
```
Use the following pieces of retrieved context to write a more complete function:
Context: first=VectorStoreRetriever(tags=['Chroma', 'OpenAIEmbeddings'], vectorstore=<langchain.vectorstores.chroma.Chroma object at 0x7f4ffa2bbdf0>) last=RunnableLambda(...)

Human: def detect_xss(http_get_request: str)->bool: 
""" Check if in the given http_get_request there is an XSS exploit, considering also the possible evasions that an attacker can perform.""" 


In [42]:
response = chain.invoke(prompt_parameters)


In [43]:
response

'\nimport re\n\ndef detect_xss(http_get_request: str) -> bool:\n    # Regular expression pattern to match potential XSS payloads\n    xss_pattern = re.compile(r"<script.*?>.*?</script>|<.*?on\\w+=.*?>")\n\n    # Check if the HTTP GET request contains any potential XSS payload\n    if re.search(xss_pattern, http_get_request):\n        return True\n    else:\n        return False\n'