# Preprocessing

### 2. Remove comments

In [11]:
import tokenize
from io import StringIO

# Remove comments
def remove_comments_and_docstrings(source):
    """
    Removes comments and docstrings from Python source code.

    Parameters:
    source (str): The input Python code as a string.

    Returns:
    str: The Python code with comments and docstrings removed.
    """
    try:
        io_obj = StringIO(source)
        out = ""
        prev_toktype = tokenize.INDENT
        last_lineno = -1
        last_col = 0

        # Tokenize the input source code
        tokgen = tokenize.generate_tokens(io_obj.readline)
        for tok in tokgen:
            token_type = tok[0]
            token_string = tok[1]
            start_line, start_col = tok[2]
            end_line, end_col = tok[3]
            ltext = tok[4]

            if start_line > last_lineno:
                last_col = 0
            if start_col > last_col:
                out += " " * (start_col - last_col)

            # Skip comments
            if token_type == tokenize.COMMENT:
                continue

            # Handle string tokens
            if token_type == tokenize.STRING:
                # Skip docstrings
                if prev_toktype == tokenize.INDENT:
                    continue
                elif prev_toktype == tokenize.NEWLINE:
                    if start_col == 0:
                        continue

            out += token_string
            prev_toktype = token_type
            last_col = end_col
            last_lineno = end_line

        # Remove any extra empty lines
        out = '\n'.join(line for line in out.splitlines() if line.strip())
        return out

    except Exception as e:
        # print('error:', e)
        return None

### 1. Bandit Labeling

In [None]:
import os
import tempfile
import subprocess
import io, tokenize, re

# Labeling using banidt
def execute_code(code):
    if code is None or not isinstance(code, str) or not code.strip():
        return ""
    
    temp_file_path = None
    try:
        # Create a temporary file to write the code
        with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as temp_file:
            temp_file.write(code)
            temp_file_path = temp_file.name

        # Run Bandit on the temporary file, suppressing output
        with open(os.devnull, 'w') as devnull:
            result = subprocess.check_output(["bandit", temp_file_path], stderr=devnull)

        return result.decode("utf-8").strip()
    except subprocess.CalledProcessError as e:
        # Return the error message, specifying the appropriate encoding
        return f"Error: {e.output.decode('latin-1')}"
    finally:
        # Delete the temporary file
        if temp_file_path:
            os.unlink(temp_file_path)


# Source: code col # Dist: ouput col
def apply_bandit_to_dataframe(df, source, dist):
    temp_df = df.copy()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(tqdm(executor.map(execute_code, temp_df[source]), total=len(temp_df), desc="Running Bandit"))

    # Add the results to a new column in the DataFrame
    temp_df[dist] = results

    return temp_df

# Extracting CWEs form bandit report (labels)
def extract_cwe(bandit_output):
    # Define regular expression to extract CWE
    cwe_regex = re.compile(r'CWE: (\S+)')
    # Extract all occurrences of CWE
    cwe_matches = cwe_regex.findall(bandit_output)
    # If CWEs are found, return them as a list
    if cwe_matches:
        return {'CWE': cwe_matches}
    else:
        return {'CWE': ['Not Detected']}

# Here  we choose the most frequnet CWE appeared if there is more then one 
def most_common_cwe(cwe_list):
    cwe_counter = Counter(cwe_list)
    most_common = cwe_counter.most_common(1)
    return most_common[0][0]

# After getting Vulnerable labels, here we process Safe samples
def get_safe_label(bandit, label):
    if label == "Not Detected":
        # The code could be parsed (there are some cases where the code is not parsable; hence, we can't say that it was safe from vulnerabilities, so we label them as unknown).
        pattern2 = re.compile(r'Files skipped \(0\)')
        if not bool(pattern2.search(bandit)):
            label = "UNK"
        else:
            label = "SAFE"
            
    return label


# # # Example:
# # analysed_df = apply_bandit_to_dataframe(df, "code", "bandit")
# # # Apply extract_cwe function to bandit output and expand result as new column CWE
# # df_cwe_info = analysed_df['bandit'].apply(extract_cwe).apply(pd.Series)

# # # Concatenate original DataFrame with the new DataFrame containing extracted CWE information
# # analysed_df = pd.concat([analysed_df, df_cwe_info], axis=1)

# # # Determine if each row was detected in anything or not
# # analysed_df['is_vul'] = analysed_df['CWE'].apply(lambda x: 1 if x != ['Not Detected'] else 0)

# # analysed_df['label'] = analysed_df['CWE'].apply(lambda x: most_common_cwe(x))

# # analysed_df['label'] = analysed_df.apply(lambda row: get_safe_label(row["bandit"], row["label"]), axis=1)

# Dataset Without Dependancies (imports + function calls + vars)

In [4]:
import pandas as pd

dataset = pd.read_csv("Dataset/VulPySec.csv")
dataset

Unnamed: 0,code,source,is_vul,label
0,"def _ExpectSpaceBeforeOperator(self, token):\n...",qwenv1,0,SAFE
1,"def is_key2(salt: str, index: int) -> bool:\n ...",hf,0,SAFE
2,def test_unexpected_results_all_unexpected(sel...,hf,0,SAFE
3,@pytest.fixture\ndef zigpy_device_dt(zigpy_dev...,hf,0,SAFE
4,"def __init__(self):\n self.n = Vector3(0, 0...",qwenv1,0,SAFE
...,...,...,...,...
150911,def vulnerable_code():\n dst_dir = tempfile...,qwenv3,1,CWE-377
150912,"def __init__(self, data_path='/tmp'):\n if ...",qwenv3,1,CWE-377
150913,"def save_file(filename, content):\n import ...",qwenv3,1,CWE-377
150914,"def _write_manifest(self, user_id):\n self....",qwenv3,1,CWE-377


In [16]:
from tqdm import tqdm

# Wrapping tqdm for pandas
tqdm.pandas()

# Applying the function with progress bar
dataset["unc_code"] = dataset.code.progress_apply(remove_comments_and_docstrings)

# Adding number of lines
dataset["num_lines"] = dataset.code.apply(lambda x: len(x.splitlines()))

# Display the dataset
dataset = dataset[["code", "unc_code", "source", "num_lines", "label", "is_vul"]]
dataset

100%|███████████████████████████████████████████████████████████████████████| 150916/150916 [00:10<00:00, 14486.17it/s]


Unnamed: 0,code,unc_code,source,num_lines,label,is_vul
0,"def _ExpectSpaceBeforeOperator(self, token):\n...","def _ExpectSpaceBeforeOperator(self, token):\n...",qwenv1,8,SAFE,0
1,"def is_key2(salt: str, index: int) -> bool:\n ...","def is_key2(salt: str, index: int) -> bool:\n ...",hf,7,SAFE,0
2,def test_unexpected_results_all_unexpected(sel...,def test_unexpected_results_all_unexpected(sel...,hf,4,SAFE,0
3,@pytest.fixture\ndef zigpy_device_dt(zigpy_dev...,@pytest.fixture\ndef zigpy_device_dt(zigpy_dev...,hf,4,SAFE,0
4,"def __init__(self):\n self.n = Vector3(0, 0...","def __init__(self):\n self.n = Vector3(0, 0...",qwenv1,4,SAFE,0
...,...,...,...,...,...,...
150911,def vulnerable_code():\n dst_dir = tempfile...,def vulnerable_code():\n dst_dir = tempfile...,qwenv3,10,CWE-377,1
150912,"def __init__(self, data_path='/tmp'):\n if ...","def __init__(self, data_path='/tmp'):\n if ...",qwenv3,4,CWE-377,1
150913,"def save_file(filename, content):\n import ...","def save_file(filename, content):\n import ...",qwenv3,10,CWE-377,1
150914,"def _write_manifest(self, user_id):\n self....","def _write_manifest(self, user_id):\n self....",qwenv3,10,CWE-377,1


In [17]:
# Selecting labels with more than 5000 samples
label_counts = dataset['label'].value_counts()
filtered_labels = label_counts[label_counts > 1000].index

# Define the number of samples to select for each label
sample_size = {
    'SAFE': 30000,
}

# Default sample size for other labels
default_sample_size = 10000

# Initialize an empty list to store DataFrames
sampled_dfs = []

for label in filtered_labels:
    # Determine the sample size for the current label
    n_samples = sample_size.get(label, default_sample_size)
    # Sample the DataFrame
    sampled_df = dataset[dataset['label'] == label].sample(n=min(n_samples, label_counts[label]), random_state=42)
    sampled_dfs.append(sampled_df)

# Concatenate all the sampled DataFrames
final_df = pd.concat(sampled_dfs)
final_df.reset_index(drop=True, inplace=True)

# Verify the result
print(final_df['label'].value_counts())

label
SAFE       30000
CWE-703    10000
CWE-400    10000
CWE-22     10000
CWE-327    10000
CWE-78     10000
CWE-605    10000
CWE-330    10000
CWE-502    10000
CWE-89     10000
CWE-259     8861
CWE-20      8816
CWE-319     7025
CWE-377     6214
Name: count, dtype: int64


In [21]:
final_df

Unnamed: 0,code,unc_code,source,num_lines,label,is_vul
0,"def __init__(self, source_dir, dest_dir):\n ...","def __init__(self, source_dir, dest_dir):\n ...",gptv2,4,SAFE,0
1,"def callback(self, value):\n if value is No...","def callback(self, value):\n if value is No...",hf,5,SAFE,0
2,@cached_property\ndef supports_3d_storage(self...,@cached_property\ndef supports_3d_storage(self...,hf,3,SAFE,0
3,"def validate_phone_number(phone_number, throw=...","def validate_phone_number(phone_number, throw=...",hf,8,SAFE,0
4,"def __init__(self, module):\n super(Vlans, ...","def __init__(self, module):\n super(Vlans, ...",hf,2,SAFE,0
...,...,...,...,...,...,...
150911,"def main():\n data = ""This is some sensitiv...","def main():\n data = ""This is some sensitiv...",qwenv3,7,CWE-377,1
150912,"def __init__(self, user_id):\n self.user_id...","def __init__(self, user_id):\n self.user_id...",qwenv3,3,CWE-377,1
150913,def download_and_save_image(url):\n file_na...,def download_and_save_image(url):\n file_na...,qwenv3,5,CWE-377,1
150914,def generate_secret_filename():\n secret_fi...,def generate_secret_filename():\n secret_fi...,qwenv3,3,CWE-377,1


In [23]:
final_df.to_csv("Dataset/VulPySec_Dataset.csv", index=False)

# Dataset With dependencies

Note: we've noticed that labeling with bandit after adding the dependencies is better. (less false positives and negatives)

In [28]:
import pandas as pd

dataset = pd.read_csv("Dataset/VulPySec_withDependencies.csv")
dataset

Unnamed: 0,code,label,is_vul
0,import numpy as np\n\ndef _get_random_inputs_a...,SAFE,0
1,import sqlite3\n\ndef format_output(database):...,SAFE,0
2,from typing import List\nfrom telegram import ...,SAFE,0
3,import sqlalchemy\nfrom flask import Flask\nfr...,SAFE,0
4,"{\n 'name': 'John',\n 'age': 30,\n 'c...",SAFE,0
...,...,...,...
152583,"self.proxy = {\n ""http"": ""http://proxy-ip:p...",SAFE,0
152584,import requests\nfrom Crypto.Cipher import AES...,CWE-327,1
152585,proxies = {\n 'http': 'http://proxy_user:pr...,SAFE,0
152586,import os\nimport requests\nimport requests_to...,CWE-20,1


In [29]:
from tqdm import tqdm

# Wrapping tqdm for pandas
tqdm.pandas()

# Applying the function with progress bar
dataset["unc_code"] = dataset.code.progress_apply(remove_comments_and_docstrings)

# Adding number of lines
dataset["num_lines"] = dataset.code.apply(lambda x: len(x.splitlines()))


# Display the dataset
dataset = dataset[["code", "unc_code", "num_lines", "label", "is_vul"]]
dataset

100%|████████████████████████████████████████████████████████████████████████| 152588/152588 [00:19<00:00, 7651.16it/s]


Unnamed: 0,code,unc_code,num_lines,label,is_vul
0,import numpy as np\n\ndef _get_random_inputs_a...,import numpy as np\ndef _get_random_inputs_and...,22,SAFE,0
1,import sqlite3\n\ndef format_output(database):...,import sqlite3\ndef format_output(database):\n...,29,SAFE,0
2,from typing import List\nfrom telegram import ...,from typing import List\nfrom telegram import ...,15,SAFE,0
3,import sqlalchemy\nfrom flask import Flask\nfr...,import sqlalchemy\nfrom flask import Flask\nfr...,20,SAFE,0
4,"{\n 'name': 'John',\n 'age': 30,\n 'c...","{\n 'name': 'John',\n 'age': 30,\n 'c...",9,SAFE,0
...,...,...,...,...,...
152583,"self.proxy = {\n ""http"": ""http://proxy-ip:p...","self.proxy = {\n ""http"": ""http://proxy-ip:p...",4,SAFE,0
152584,import requests\nfrom Crypto.Cipher import AES...,import requests\nfrom Crypto.Cipher import AES...,20,CWE-327,1
152585,proxies = {\n 'http': 'http://proxy_user:pr...,proxies = {\n 'http': 'http://proxy_user:pr...,4,SAFE,0
152586,import os\nimport requests\nimport requests_to...,import os\nimport requests\nimport requests_to...,19,CWE-20,1


In [30]:
print(dataset['label'].value_counts())

label
SAFE       34935
CWE-20     10671
CWE-502    10155
CWE-400    10057
CWE-259    10016
CWE-78      9906
CWE-327     9635
CWE-330     9606
CWE-605     9283
CWE-22      8954
CWE-703     8892
CWE-319     8728
CWE-377     6470
CWE-89      5280
Name: count, dtype: int64


In [31]:
dataset.to_csv("Dataset/VulPySec_Dataset_withDependencies.csv", index=False)