In [1]:
import openai
import json
import nltk
import re
import subprocess
import sys
import json
import os, time
import random
import pickle
from tqdm import tqdm
import random
import numpy as np
import tiktoken
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
import torch
import os
import concurrent.futures
import torch.nn.functional as F
from tqdm import tqdm
    

In [2]:
openai.api_key = "Your Token"
temperature_set = 1
model_name = "gpt-3.5-turbo-instruct"
directory = 'Directory'

In [3]:
def Bandit_Feedback(path):
    """
    Analyzes a given Python code using the Bandit tool to identify security vulnerabilities.

    This function writes the provided code to a temporary file, runs the Bandit security
    linter tool on the file, and extracts the security issues identified by Bandit. The
    function returns a list of these issues, with each issue being represented as a list
    containing the test name, issue text, and the problematic code snippet.

    Parameters:
    - code (str): The Python code to be analyzed for security vulnerabilities.

    Returns:
    - list[list[str]]: A list of security issues, where each issue is represented as a list
                       containing the test name, issue text, and the problematic code snippet.
    """

    file_name = path

    result = subprocess.run(["bandit", "-r", "-f", "json", file_name], capture_output=True, text=True)
    output_json = json.loads(result.stdout)
    Feedback = []
    for issue in output_json['results']:
        Feedback.append([issue['test_name'],issue['issue_text'],issue['code']])

    return Feedback

In [4]:
def extract_code(code):
    """
    Extracts Python code enclosed between markdown code block delimiters.
    
    This function extracts Python code that is wrapped between markdown code block 
    delimiters (i.e., ```python ... ```). If there's no valid Python code block, or 
    if the extracted code has syntax errors, the function returns None. If valid 
    code is found, it returns the code as a string.
    
    Parameters:
    - code (str): The string containing the potential markdown-wrapped Python code.

    Returns:
    - str or None: Extracted Python code if it's valid and has no syntax errors, otherwise None.
    """
    
    function = None
    match = re.search('```(.*?)```', code, re.DOTALL)
    if match:
        function = match.group(1)
        function = function.replace("python","")

    if function != None:
        if len(Check_Syntax(function)) != 0:
            return None
        
    return function

In [5]:
def Is_Fixed(code):
    random_integer = random.randint(1, 100000000)
    file_name = '/Directory/temp/' + str(random_integer) + 'temp.py'

    with open(file_name, 'w+') as file:
        file.write(code)
        
    result = subprocess.run(["bandit", "-r", "-f", "json", file_name], capture_output=True, text=True)
    output_json = json.loads(result.stdout)
    Feedback = []
    for issue in output_json['results']:
        Feedback.append([issue['issue_text'],issue['code']])

    os.remove(file_name)
    return len(Feedback)

In [6]:
def Check_Syntax(code):
    """
    Checks the syntax of the given Python code for vulnerabilities using Bandit.

    This function writes the provided Python code to a temporary file and runs the Bandit
    tool on it to detect any security vulnerabilities. The function returns any errors
    detected in the code in JSON format.

    Parameters:
    - code (str): The Python code to be checked for vulnerabilities.

    Returns:
    - list[dict]: A list of errors detected by Bandit in the given Python code.
                  Each dictionary in the list contains details about a specific error.
    """
    random_integer = random.randint(1, 100000000)
    file_name = '/Directory/temp/' + str(random_integer) + 'temp.py'

    with open(file_name, 'w+') as file:
        file.write(code)

    result = subprocess.run(["bandit", "-r", "-f", "json", file_name], capture_output=True, text=True)
    output_json = json.loads(result.stdout)
    os.remove(file_name)

    return output_json['errors']

In [7]:
def run_bandit(path):
    """
    Analyze the given Python code using the Bandit tool for security vulnerabilities.

    This function writes the given code to a temporary file, runs the Bandit tool
    on the file to identify security vulnerabilities, and then parses the results 
    into a structured format for further use.

    Parameters:
    - code (str): The Python code string to be analyzed.

    Returns:
    int: The number of identified vulnerabilities in the provided code.
    """
    file_name = path

    result = subprocess.run(["bandit", "-r", "-f", "json", file_name], capture_output=True, text=True)
    output_json = json.loads(result.stdout)
    Feedback = []
    for issue in output_json['results']:
        Feedback.append([issue['issue_text'],issue['code']])

    return len(Feedback)

In [8]:
format_code = '\n#### Please provide the answer as a code snippet enclosed in triple back-ticks.'
format_strategies = '\n#### Please provide each strategy in triple back-ticks.'

# Direct prompting

In [9]:
def Direct_prompting(item):
    item_path = os.path.join(directory, item)

    file_path_refine = item_path + '/Direct_prompting.py'
    if os.path.exists(file_path_refine):
        return
        
    with open(item_path + '/generate_code.py', 'r') as f:
        code = f.read()

    if run_bandit(item_path + '/generate_code.py') == 0:  # check if the function has no bug
        return

    text = '''#### Does the provided function have security issue? if yes, please fix the issue.\n## Python function:\n''' + '```' + code + '```' + format_code
    response = openai.Completion.create(
      model=model_name,
      prompt=[text],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    self_refine_code = extract_code(response.choices[0].text)
    if self_refine_code is not None:
        with open(file_path_refine, 'w+') as file:
            file.write(self_refine_code)
    else:
        with open(file_path_refine, 'w+') as file:
            file.write(code)

items = os.listdir(directory)

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(Direct_prompting, items)

# Self-Debug

In [10]:
import os
import concurrent.futures

def Self_Debug(item):
    item_path = os.path.join(directory, item)
    
    file_path_refine = os.path.join(item_path, 'Self_Debug.py')
    if os.path.exists(file_path_refine):
        return

    with open(os.path.join(item_path, 'generate_code.py'), 'r') as f:
        code = f.read()

    if run_bandit(item_path + '/generate_code.py') == 0: 
        return

    feedback_prompt = '##### Explaining the provided function for security debugging.\n '
    feedback_message = f"{feedback_prompt}\n\n### Python function\n\n```{code}```\n"
    
    Code_explanation = openai.Completion.create(
      model=model_name,
      prompt=[feedback_message],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    Feedback = Code_explanation.choices[0].text

    feedback_prompt = '##### Based on the code explanation, please fix the security issues:\n'
    feedback_message = f"{feedback_prompt}{Feedback}\n\n### Python function\n\n```{code}```\n{format_code}"

    response_with_feedback_message= openai.Completion.create(
      model=model_name,
      prompt=[feedback_message],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    self_refine_code = extract_code(response_with_feedback_message.choices[0].text)
    

    if self_refine_code is not None:
        with open(file_path_refine, 'w+') as file:
            file.write(self_refine_code)
    else:
        with open(file_path_refine, 'w+') as file:
            file.write(code)

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(Self_Debug, os.listdir(directory))


# Directly giving bandit feedback

In [11]:
def bandit_feedback(item):
    item_path = os.path.join(directory, item)    

    file_path_refine = item_path + '/Directly_giving_bandit_feedback.py'
    if os.path.exists(file_path_refine):
        return
        
    with open(item_path + '/generate_code.py', 'r') as f:
        code = f.read()

    if run_bandit(item_path + '/generate_code.py') == 0:
        return

    Feedback_from_Bandit = Bandit_Feedback(item_path + '/generate_code.py')
    feedback_details = "\n### The feedback from static code analysis:\n" +  " ".join([f"{idx}- {item[0]}\n In the following lines:\n{item[1]}" for idx, item in enumerate(Feedback_from_Bandit, start=1)])

    feedback_prompt = '##### Based on the feedback provided, please fix the following security issues:\n'
    feedback_message = f"{feedback_prompt}{feedback_details}\n\n### Python function\n\n```{code}```\n{format_code}"

    response = openai.Completion.create(
      model=model_name,
      prompt=[feedback_message],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    self_refine_code = extract_code(response.choices[0].text)
    
    if self_refine_code is not None:
        with open(file_path_refine, 'w+') as file:
            file.write(self_refine_code)
    else:
        with open(file_path_refine, 'w+') as file:
            file.write(code)

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(bandit_feedback, os.listdir(directory))


# Verbalization


In [None]:
def Verbalization(item):
    item_path = os.path.join(directory, item)
    
    file_path_refine = item_path + '/Verbalization.py'
    if os.path.exists(file_path_refine):
        return
    
    with open(item_path + '/generate_code.py', 'r') as f:
        code = f.read()

    if run_bandit(item_path + '/generate_code.py') == 0:  # check if the function has no bug
        return


    Feedback_from_Bandit = Bandit_Feedback(item_path + '/generate_code.py')
    feedback_details = " ".join([f"{idx}{' >> Issue: ['} {item[0]}{'] '}{item[1]}\n In the following lines:\n{item[2]}" for idx, item in enumerate(Feedback_from_Bandit, start=1)])

    feedback_prompt = '##### verbalizing the feedback as an input to language model:\n Test results:\n '
    feedback_message = f"{feedback_prompt}{feedback_details}\n\n### Python function\n\n```{code}```\n"

    response_with_feedback_message = openai.Completion.create(
      model=model_name,
      prompt=[feedback_message],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    Feedback = response_with_feedback_message.choices[0].text

    verbalizing_feedback_prompt = '##### Based on the feedback provided, please fix the security issues:\n'
    verbalizing_feedback_message = f"{verbalizing_feedback_prompt}{Feedback}\n{'Test results:'}\n{feedback_details}\n\n### Python function\n\n```{code}```\n{format_code}"

    response_with_feedback_message= openai.Completion.create(
      model=model_name,
      prompt=[verbalizing_feedback_message],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    self_refine_code = extract_code(response_with_feedback_message.choices[0].text)
    

    if self_refine_code is not None:
        with open(file_path_refine, 'w+') as file:
            file.write(self_refine_code)
    else:
        with open(file_path_refine, 'w+') as file:

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(Verbalization, os.listdir(directory))

# Feedback-Driven Solution Synthesis (FDSS)

In [16]:
import concurrent.futures

def FDSS(item):
    item_path = os.path.join(directory, item)
    
    file_path_refine = item_path + '/strategies.py'
    if os.path.exists(file_path_refine):
        return
    
    with open(item_path + '/generate_code.py', 'r') as f:
        code = f.read()
        
    if run_bandit(item_path + '/generate_code.py') == 0: # check if the function has no bug
        return
    
    Feedback_from_Bandit = Bandit_Feedback(item_path + '/generate_code.py')
    feedback_details = " ".join([f"{idx}{' >> Issue: ['} {item[0]}{'] '}{item[1]}\n In the following lines:\n{item[2]}" for idx, item in enumerate(Feedback_from_Bandit, start=1)])

    feedback_prompt = '##### verbalizing the feedback as an input to language model:\n '
    feedback_message = f"{feedback_prompt}{feedback_details}\n\n### Python function\n\n```{code}```\n"

    response_with_feedback_message = openai.Completion.create(
      model=model_name,
      prompt=[feedback_message],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    Feedback = response_with_feedback_message.choices[0].text


    K_search_prompt = '##### provide three different strategies to fix the security issues:\n Test results:\n '
    format_strategies = '\n#### Please provide each strategy in 1), 2) and 3)'
    K_search_message = f"{K_search_prompt}{Feedback}\n{'Test results:'}\n{feedback_details}\n\n### Python function\n\n{code}\n{format_strategies}"


    
    response_K_search_message_message= openai.Completion.create(
      model=model_name,
      prompt=[K_search_message],
      temperature=temperature_set,
      max_tokens=2000,
      n=1,
      logprobs=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
    )

    K_search_feedback = response_K_search_message_message.choices[0].text
    strategies = re.split(r'\d\)', K_search_feedback)[1:] 
    
    for strategy in strategies:

        Iterative = 1
        fixed = False
        while(Iterative != MAX_Iterative):
            feedback_prompt = '##### Based on the feedback provided, please fix the security issues:\n'
            feedback_message = f"{feedback_prompt}{Feedback}\n{'Test results:'}\n{feedback_details}\n{strategy}\n### Python function\n\n```{code}```\n"
            
            response_K_search_message_message= openai.Completion.create(
                  model=model_name,
                  prompt=[feedback_message],
                  temperature=temperature_set,
                  max_tokens=2000,
                  n=1,
                  logprobs=1,
                  frequency_penalty=0.0,
                  presence_penalty=0.0,
            )

            self_refine_code = extract_code(response_K_search_message_message.choices[0].text)

            if self_refine_code is not None:
                Feedback_from_Bandit = Is_Fixed(self_refine_code) 
                with open(file_path_refine, 'w+') as file:
                    file.write(self_refine_code)
                if len(Feedback_from_Bandit) == 0:
                    fixed = True
                    break
            else:
                with open(file_path_refine, 'w+') as file:
                    file.write(code)
            
            Iterative = Iterative + 1
        if fixed == True:
            break

MAX_Iterative = 3

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(FDSS, os.listdir(directory))