In [1]:
import re
import json
import numpy as np
import torch
from transformers import RobertaTokenizer, RobertaModel
from sklearn.ensemble import RandomForestClassifier
import pandas as pd

# Function to clean code
def clean_code(code):
    code = re.sub(r'\/\*[\s\S]*?\*\/', '', code)  # Remove multiline comments
    code = re.sub(r'\/\/.*', '', code)  # Remove single line comments
    code = re.sub(r'#.*', '', code)  # Remove python comments
    code = re.sub(r'\s*\n\s*', '\n', code)  # Remove extra whitespace around newlines
    return code.strip()


# Function to clean Python code
def clean_code(code):
    code = re.sub(r'#.*', '', code)  # Remove Python comments
    code = re.sub(r'\s*\n\s*', '\n', code)  # Remove extra whitespace around newlines
    return code.strip()

# Function to label Python code (0 for safe, 1 for unsafe)
def label_code(snippet):
    unsafe_patterns = {
        'eval(': 'Dynamic Code Execution',               # Avoid dynamic evaluation
    'exec(': 'Dynamic Code Execution',               # Avoid dynamic execution
    'subprocess.call(': 'Command Injection',          # Avoid executing external commands
    'subprocess.Popen(': 'Command Injection',         # Avoid opening subprocesses with dynamic input
    'input(': 'Unvalidated Input',                    # Avoid unsafe handling of user inputs
    'open(': 'File Handling',                         # Be cautious with file handling
    'os.system(': 'Command Injection',                # Avoid system calls
    'pickle.load(': 'Deserialization Vulnerability',  # Deserialization of potentially unsafe data
    'pickle.dumps(': 'Serialization Vulnerability',  # Serialization of potentially unsafe data
    'import(': 'Dynamic Import',                     # Potentially unsafe module import
    'os.getenv(': 'Environment Variable Exposure',    # Exposure through environment variables
    'glob.glob(': 'File Exposure',                    # File path exposure
    'shutil.copy(': 'File Handling',                  # Copying files, potentially unsafe
    'shutil.move(': 'File Handling',                  # Moving files, potentially unsafe
    'sqlite3.connect(': 'SQL Injection',              # Connection to SQLite with potential SQL injection
    'pymysql.connect(': 'SQL Injection',              # Connection to MySQL with potential SQL injection
    'psycopg2.connect(': 'SQL Injection',             # Connection to PostgreSQL with potential SQL injection
    'requests.get(': 'Potential Data Exposure',       # HTTP GET requests, potentially exposing data
    'requests.post(': 'Potential Data Exposure',      # HTTP POST requests, potentially exposing data
    'socket.socket(': 'Potential Network Vulnerability',  # Potential network vulnerabilities
    'subprocess.run(': 'Command Injection',           # Executes command with potentially unsafe input
    'shutil.rmtree(': 'File Deletion',                # Deleting files, potentially unsafe
    'os.remove(': 'File Deletion',                    # Deleting files, potentially unsafe
    'os.rmdir(': 'Directory Deletion',                # Deleting directories, potentially unsafe
    'requests.Session()' : 'Potential Data Exposure',  # Session object potentially exposes data
    'os.chmod(': 'File Permissions Modification',     # Modifying file permissions, potentially unsafe
    'os.chown(': 'File Ownership Modification',       # Modifying file ownership, potentially unsafe
    'tempfile.NamedTemporaryFile(': 'Temporary File Handling',  # Handling temporary files, potential risks
    'importlib.import_module(': 'Dynamic Module Import',  # Dynamically importing modules
    'execfile(': 'Dynamic Code Execution',             # Executing files with potentially unsafe content
    'base64.b64decode(': 'Base64 Decoding',           # Decoding base64 encoded data, potential security risk
    'json.loads(': 'JSON Deserialization',            # JSON deserialization, potential security risk
    'yaml.safe_load(': 'YAML Deserialization',        # YAML deserialization, potential security risk
    'pickle.loads(': 'Deserialization Vulnerability', # Unpickling potentially unsafe data
    'marshal.loads(': 'Deserialization Vulnerability' # Marshaling potentially unsafe data
    }

    unsafe_lines = []
    for line in snippet.split('\n'):
        for pattern, vuln_type in unsafe_patterns.items():
            if pattern in line:
                unsafe_lines.append((line.strip(), vuln_type))
    return unsafe_lines

# Load pre-trained model and tokenizer
tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')
model = RobertaModel.from_pretrained('microsoft/codebert-base')

# Load existing preprocessed dataset
with open('python.json', 'r') as f:
    data = json.load(f)



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/498 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

In [2]:
# Tokenize and get embeddings for the code snippets
inputs = tokenizer([d['code'] for d in data], padding=True, truncation=True, return_tensors='pt')
with torch.no_grad():
    outputs = model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1).numpy()

In [3]:
# Create labels
labels = np.array([d['label'] for d in data])

# Train a simple classifier
clf = RandomForestClassifier()
clf.fit(embeddings, labels)

# Testing the classifier with a new Python code snippet
test_code = """
  import os
  import subprocess

  # Safe code
  def safe_function():
      print("This is safe")

  # Unsafe code
  user_input = 'ls'
  subprocess.call(user_input, shell=True)  # Unsafe
  eval("print('Hello, world!')")  # Unsafe
"""

# Clean the code for embedding
cleaned_test_code = clean_code(test_code)

# Identify unsafe lines
unsafe_lines = label_code(cleaned_test_code)

print("Unsafe Lines of Code with Vulnerability Types:\n")
for line, vuln_type in unsafe_lines:
    print(f"{line} - {vuln_type}")


# Tokenize and get embeddings for the cleaned test code
test_inputs = tokenizer([cleaned_test_code], padding=True, truncation=True, return_tensors='pt')
with torch.no_grad():
    test_outputs = model(**test_inputs)
test_embeddings = test_outputs.last_hidden_state.mean(dim=1).numpy()

# Predict with the trained classifier
prediction = clf.predict(test_embeddings)
# print("\nPrediction:", prediction)

Unsafe Lines of Code with Vulnerability Types:

subprocess.call(user_input, shell=True) - Command Injection
eval("print('Hello, world!')") - Dynamic Code Execution


In [5]:
!pip install gradio


Collecting gradio
  Downloading gradio-4.38.1-py3-none-any.whl (12.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m21.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl (15 kB)
Collecting altair<6.0,>=5.0 (from gradio)
  Downloading altair-5.3.0-py3-none-any.whl (857 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m857.8/857.8 kB[0m [31m25.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting fastapi (from gradio)
  Downloading fastapi-0.111.1-py3-none-any.whl (92 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/92.2 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ffmpy (from gradio)
  Downloading ffmpy-0.3.2.tar.gz (5.5 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gradio-client==1.1.0 (from gradio)
  Downloading gradio_client-1.1.0-py3-none-any.whl (318 kB)
[2K     [90m━━━━━━━━━━━━━━

In [9]:
import gradio as gr

# Function to detect unsafe Python code
def detect_unsafe_code(code):
    cleaned_code = clean_code(code)
    unsafe_lines = label_code(cleaned_code)

    # Format the output
    formatted_output = "\n".join([f"{line[0]} - {line[1]}" for line in unsafe_lines])
    return formatted_output

# Create and launch the Gradio interface
iface = gr.Interface(
    fn=detect_unsafe_code,
    inputs="text",
    outputs="text",
    title="Python Code Safety Analyzer",
    description="Enter your Python code to detect unsafe lines."
)

iface.launch()


Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://185e0477ae863c107d.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


