# Project Title: Lintelligence
*Gemini-Powered Source Code Vulnerability Analysis Tool* (not yet a tool)

## Project Summary:

This notebook has been prepared to practice and demonstrate course material from Google's 5-Day Intensive Gen AI Course that took place between March 31 - April 4 2025.

This notebook compares evaluation of two approaches implemented.
1) Few-shot prompting.
2) Retrieval-augmented generation (RAG).


## Content
To demonstrate Linteligence's way of source code for security vulnerabilities; this notebook shows how to:
* Connect to Gemini via Google’s API ( google-genai ).
* Implement few shot prompting and analyze given code.
* Load a dataset: CVEFixes ( https://www.kaggle.com/datasets/girish17019/cvefixes-vulnerable-and-fixed-code  ) that contains vulnerable and fixed code examples.
* Embed this dataset into a ChromaDB vector store.
* Create a RAG-powered code analyzer function that compares input code with known vulnerable examples.
* Evaluate and compare performance of few-shot prompting and the RAG-powered code analyzer.

## Results:
After running the tests a few times, for both approaches accuracy value is 50%.

## Comments & Future Work
I was (naively) expecting the RAG approach to perform better than this but it looks like there is a problem with my way of implementing it. As a future work, I plan to troubleshoot what could have increased the accuracy. Feel free to try with other models.

It was fun and educative participating in this course and playing around with Gemine's API.
Many thanks to each organizer contributed to make this happen.

# Project Implementation
## Setting up Requirements
Install required packages:

In [1]:
!pip uninstall -qqy jupyterlab # Remove unused conflicting packages
!pip install -qU "google-genai==1.7.0" "chromadb==0.6.3"

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m144.7/144.7 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m611.1/611.1 kB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m53.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m100.9/100.9 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m284.2/284.2 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.2/95.2 kB[0m [31m4.6 MB/s[0

In [2]:
# Import required modules for Google GenAI
from google import genai
from google.genai import types
from google.api_core import retry

from IPython.display import HTML, Markdown, display

In [3]:
# Import ChromaDB for vector-based document storage and retrieval
from tqdm import tqdm #for eval
import json

import chromadb
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.config import Settings

In [4]:
# Standard library and data manipulation imports
import os
import pandas as pd
import numpy as np

In [5]:
# Define a helper to retry when per-minute quota is reached.
is_retriable = lambda e: (isinstance(e, genai.errors.APIError) and e.code in {429, 503})

genai.models.Models.generate_content = retry.Retry(
    predicate=is_retriable)(genai.models.Models.generate_content)

Set up your API key

To run the following cell, your API key must be stored it in a Kaggle secret named GOOGLE_API_KEY.

If you don't already have an API key, you can grab one from AI Studio. You can find detailed instructions in the docs.

To make the key available through Kaggle secrets, choose Secrets from the Add-ons menu and follow the instructions to add your key or enable it for this notebook.

In [6]:
from kaggle_secrets import UserSecretsClient

GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")

In [7]:
# Define the system prompt

SYSTEM_PROMPT = """
You are Lintelligence, an AI security assistant for developers.

Your task is to analyze the provided code snippet for real security vulnerabilities.

Only and only return an issue if:
- The vulnerability is clearly present in the code
- It is not mitigated by common secure coding practices (e.g., parameterized queries, input validation)

If the code is already secure, respond clearly with:
{
  "vulnerability": null,
  "message": "No security issues found."
}

When a vulnerability exists, respond in this exact JSON format:
{
  "vulnerability": "<Short name, like 'SQL Injection'>","severity": "<Low | Medium | High | Critical>","explanation": "<Describe the issue and where it occurs>","why_it_matters": "<Explain real-world risk>","suggested_fix": "<Rewritten secure version of the code>","fix_explanation": "<Why the fix works>"
}

Do not speculate. Do not suggest changes if the code is already secure.

Only respond with valid JSON!
"""

In [8]:
# Helper functions that we will use to clean output, it doesn't return JSON as expected.
import re

def safe_json_parse(raw_response_text):
    """
    Clean and parse Gemini output wrapped in markdown and malformed escape sequences.
    Returns a parsed JSON object or fallback.
    """

    # print(f"Raw_response_text: {raw_response_text}")

    # Step 1: Remove code block wrappers like ```json or ```
    raw_text = re.sub(r"^```[a-zA-Z]*\s*", "", raw_response_text.strip(), flags=re.IGNORECASE)
    raw_text = re.sub(r"\s*```", "", raw_text)
    
    if raw_text.startswith("```") or raw_text.endswith("```"):
        lines = raw_text.splitlines()
        lines = [line for line in lines if not line.strip().startswith("```")]
        cleaned_text = "\n".join(lines)
    else:
        cleaned_text = raw_text


    # 2. Truncate newlines inside strings
    cleaned_text.replace("\n", "")
    cleaned_text.replace("```", "")
    
    # print(f"Cleaned version: {cleaned_text}")

    try:
      return json.loads(cleaned_text)
    except Exception as e:
      return {
          "error": f"Failed to parse JSON: {str(e)}",
          "raw_response": raw_text
      }


## Few-Shot Prompting Approach

In [9]:
# Add few-shot examples to improve AI model output consistency

FEW_SHOT_EXAMPLES = """

Below are examples of vulnerable code and how you should analyze them.
---
Example 1:
Input Code Snippet:
search = request.args.get("q")
query = "SELECT * FROM users WHERE name = '" + search + "'"
cursor.execute(query)
Output:
{
  "vulnerability": "SQL Injection",
  "severity": "Critical",
  "explanation": "User input is directly concatenated into a SQL query, allowing attackers to inject arbitrary SQL commands.",
  "why_it_matters": "An attacker could bypass authentication or extract sensitive user data from the database.",
  "suggested_fix": "search = request.args.get(\"q\")\ncursor.execute(\"SELECT * FROM users WHERE name = ?\", (search,))",
  "fix_explanation": "Using parameterized queries ensures that user input is treated strictly as data, preventing SQL injection."
}

Example 2:
Input Code Snippet:
const user = req.query.user;
res.send("<h1>Hello, " + user + "!</h1>");


Output:
{
  "vulnerability": "Cross-Site Scripting (XSS)",
  "severity": "High",
  "explanation": "Unescaped user input is rendered directly into the HTML response, enabling script injection.",
  "why_it_matters": "An attacker can run arbitrary JavaScript in the browser of other users, potentially stealing cookies or credentials.",
  "suggested_fix": "const user = req.query.user;\nres.send(\"<h1>Hello, \" + escapeHtml(user) + \"!</h1>\");",
  "fix_explanation": "Escaping HTML output neutralizes potentially dangerous characters and prevents execution of malicious scripts."
}



Example 3:
Input Code Snippet:
user_id = request.args.get("id")
user = User.query.get(user_id)

Output:
{
  "vulnerability": "Insecure Direct Object Reference (IDOR)",
  "severity": "High",
  "explanation": "There is no access control check to confirm the user has permission to access the specified ID.",
  "why_it_matters": "Attackers can modify the ID parameter to access data of other users, such as personal or financial information.",
  "suggested_fix": "user_id = request.args.get(\"id\")\nif user_id != current_user.id:\n    abort(403)\nuser = User.query.get(user_id)",
  "fix_explanation": "Verifying that the requested ID matches the authenticated user’s ID ensures proper access control."
}



Example 4:
Input Code Snippet:
import os
filename = request.args.get("file")
os.system("cat " + filename)

Output:
{
  "vulnerability": "Command Injection",
  "severity": "Critical",
  "explanation": "User input is passed directly to a system shell command without validation or sanitization.",
  "why_it_matters": "An attacker could execute arbitrary system commands on the host server.",
  "suggested_fix": "import subprocess\nfilename = request.args.get(\"file\")\nsubprocess.run([\"cat\", filename], check=True)",
  "fix_explanation": "Using a subprocess with argument list avoids shell interpretation of input, preventing injection."
}



Example 5:
Input Code Snippet:
import pickle
data = request.data
obj = pickle.loads(data)

Output:
{
  "vulnerability": "Insecure Deserialization",
  "severity": "Critical",
  "explanation": "Pickle can execute arbitrary code during deserialization, and it is being used on untrusted input.",
  "why_it_matters": "An attacker could craft a payload that executes arbitrary code on the server.",
  "suggested_fix": "import json\nobj = json.loads(request.data)",
  "fix_explanation": "Switching to JSON ensures that the deserialization is safe and limited to basic data types."
}



Example 6:
Input Code Snippet:
# Code
def login():
    user = User.get(request.form["username"])
    if user and user.check_password(request.form["password"]):
        login_user(user)


Output:
{
  "vulnerability": "Broken Authentication (No Rate Limiting)",
  "severity": "High",
  "explanation": "There is no limit on the number of login attempts, allowing brute-force attacks.",
  "why_it_matters": "An attacker could automate login attempts and compromise accounts using credential stuffing.",
  "suggested_fix": "Implement a rate limiter or lockout mechanism after several failed login attempts.",
  "fix_explanation": "Rate limiting prevents automated brute-force attacks by slowing or blocking repeated login attempts."
}



Example 7:
Input Code Snippet:
api_key = "sk_test_1234567890abcdef"

Output:
{
  "vulnerability": "Hardcoded Secret",
  "severity": "Medium",
  "explanation": "A sensitive API key is hardcoded directly into the source code.",
  "why_it_matters": "If the code is leaked or shared, the key can be used by attackers to abuse the associated service.",
  "suggested_fix": "Store secrets in environment variables and load them using a secure method.",
  "fix_explanation": "Environment variables keep secrets out of source control and limit exposure in case of leaks."
}



Example 8:
Input Code Snippet:
url = request.args.get("url")
requests.get(url)

Output:
{
  "vulnerability": "Server-Side Request Forgery (SSRF)",
  "severity": "Critical",
  "explanation": "User input is used to make internal HTTP requests without validation.",
  "why_it_matters": "Attackers can exploit SSRF to access internal services or metadata APIs (e.g., AWS instance metadata).",
  "suggested_fix": "Validate that the input URL is in an allowed list of domains before making the request.",
  "fix_explanation": "Restricting requests to trusted domains prevents attackers from reaching internal resources."
}


Example 9:
Input Code Snippet:
@app.route("/debug")
def debug():
    raise Exception("Test")

Output:
{
  "vulnerability": "Information Disclosure (Stack Trace)",
  "severity": "Medium",
  "explanation": "The endpoint intentionally throws an exception that may expose stack traces in production.",
  "why_it_matters": "Exposing stack traces can reveal implementation details useful to attackers.",
  "suggested_fix": "Only show detailed error messages in development mode; use generic messages in production.",
  "fix_explanation": "Hiding stack traces in production prevents attackers from gaining insight into the backend structure."
}


Example 10:
Input Code Snippet:
@app.route("/update-email", methods=["POST"])
def update_email():
    current_user.email = request.form["email"]
    db.session.commit()

Output:
{
  "vulnerability": "Cross-Site Request Forgery (CSRF)",
  "severity": "High",
  "explanation": "The endpoint processes state-changing actions via POST without verifying a CSRF token.",
  "why_it_matters": "An attacker could trick a logged-in user into submitting a malicious request unknowingly.",
  "suggested_fix": "@app.route(\"/update-email\", methods=[\"POST\"])\n@csrf_protect\ndef update_email(): ...",
  "fix_explanation": "Using CSRF tokens ensures that the request was intentionally initiated by the user."
}
"""

In [10]:
# Initialize the Google GenAI client with API key

client = genai.Client(api_key=GOOGLE_API_KEY)

In [11]:
# Function to analyze code using Gemine (temp=0 to decrease false-positives)

def analyze_code(code_snippet: str) -> str:
  """Analyze given code"""
  #config = types.GenerateContentConfig(temperature=0.0, system_instruction=SYSTEM_PROMPT)
  config = types.GenerateContentConfig(temperature=0.0, system_instruction=SYSTEM_PROMPT+FEW_SHOT_EXAMPLES)
  prompt = "Here is a code snippet:"
  raw_response = client.models.generate_content(
      model='gemini-2.0-flash',
      config=config,
      contents=prompt + code_snippet,
  )

  # raw_text = raw_response.text.strip()
  # if raw_text.startswith("```"):
  #     lines = raw_text.splitlines()
  #     lines = [line for line in lines if not line.strip().startswith("```")]
  #     cleaned_text = "\n".join(lines)
  # else:
  #     cleaned_text = raw_text

  raw_text = raw_response.text.strip()
  cleaned_text = safe_json_parse(raw_text)

  return cleaned_text

The following is a utility function to convert json into representable Markdown format

In [12]:
def json_to_markdown(vuln_report):
#    print(vuln_report)
    if not isinstance(vuln_report, dict):
        return "Invalid input: expected a JSON object."

    if vuln_report.get("vulnerability") is None:
        return "No security issues found in the provided code."

    return f"""
### 🛡️ Vulnerability: {vuln_report['vulnerability']}

### 🚨 Severity: {vuln_report['severity']}

---

#### ❗ Explanation  
{vuln_report['explanation']}

---

#### 🧠 Impact:
{vuln_report['why_it_matters']}

---

#### ✅ Suggested Fix  
{vuln_report['suggested_fix']}

---

#### 🔍 Fix Explanation  
{vuln_report['fix_explanation']}
"""


In [13]:
vulnerable_code = """
search = request.args.get("q")
query = "SELECT * FROM users WHERE name = '" + search + "'"
cursor.execute(query)
"""

code_snippet = vulnerable_code

response = analyze_code(code_snippet)

# print(response)
Markdown(json_to_markdown(response))


### 🛡️ Vulnerability: SQL Injection

### 🚨 Severity: Critical

---

#### ❗ Explanation  
User input is directly concatenated into a SQL query, allowing attackers to inject arbitrary SQL commands.

---

#### 🧠 Impact:
An attacker could bypass authentication or extract sensitive user data from the database.

---

#### ✅ Suggested Fix  
search = request.args.get("q")
cursor.execute("SELECT * FROM users WHERE name = ?", (search,))

---

#### 🔍 Fix Explanation  
Using parameterized queries ensures that user input is treated strictly as data, preventing SQL injection.


In [14]:
secure_code = """
search = request.args.get("q")
cursor.execute("SELECT * FROM users WHERE name = ?", (search,))
"""
code_snippet = secure_code

response = analyze_code(code_snippet)

# print(response)
Markdown(json_to_markdown(response))

No security issues found in the provided code.

## RAG Approach

In [15]:
# Check if the dataset is available
os.listdir('/kaggle/input/cvefixes-vulnerable-and-fixed-code/')

['CVEFixes.csv', 'LICENSE.txt']

In [16]:
# Load CVEFixes dataset containing vulnerable/fixed code pairs

df = pd.read_csv("/kaggle/input/cvefixes-vulnerable-and-fixed-code/CVEFixes.csv")
df.head()

Unnamed: 0,code,language,safety
0,package org.bouncycastle.jcajce.provider.asymm...,java,vulnerable
1,<?php\n\n\n\n/**\n\n * ownCloud - user_ldap\n\...,php,vulnerable
2,#!/usr/bin/env python\n\nfrom __future__ impor...,py,safe
3,/* -*- c-basic-offset: 8 -*-\n\n rdesktop: A...,c,safe
4,<!DOCTYPE html>\n\n<html>\n\n <head>\n\n ...,html,safe


In [17]:
# List Gemini models supporting embeddings. 
# Future work: experiment with the others!

for m in client.models.list():
    if "embedContent" in m.supported_actions:
        print(m.name)

models/embedding-001
models/text-embedding-004
models/gemini-embedding-exp-03-07
models/gemini-embedding-exp


In [18]:
# Split dataset into train/test datasets.
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

In [19]:
# Clean + normalize
# Not sure if this is needed

train_df["language"] = train_df["language"].str.lower().str.strip()
train_df["safety"] = train_df["safety"].str.lower().str.strip()
train_df["code"] = train_df["code"].astype(str)


test_df["language"] = test_df["language"].str.lower().str.strip()
test_df["safety"] = test_df["safety"].str.lower().str.strip()
test_df["code"] = test_df["code"].astype(str)


# Limit with python for quick tests
#df = df[df["language"] == "py"].copy()

# Confirm rows exist
if train_df.empty:
    raise ValueError("Still no Python rows found! check train_df!")
if test_df.empty:
    raise ValueError("Still no Python rows found! check test_df!")


# Format docs for RAG
# Not sure if this is needed
def format_doc(row):
    label = "SAFE EXAMPLE" if row["safety"] == "safe" else "VULNERABLE EXAMPLE"
    return f"{label}\n\nCODE:\n{row['code']}"

train_df["doc"] = train_df.apply(format_doc, axis=1)

# Show preview
train_df[["safety", "doc"]].head()

Unnamed: 0,safety,doc
4223,safe,SAFE EXAMPLE\n\nCODE:\n/*\n\n * Copyright (c) ...
18839,vulnerable,VULNERABLE EXAMPLE\n\nCODE:\n2015-12-27 Even ...
24940,safe,SAFE EXAMPLE\n\nCODE:\n<?php\n\n# MantisBT - A...
8454,vulnerable,VULNERABLE EXAMPLE\n\nCODE:\n/*\n\n * AEAD: Au...
28747,vulnerable,VULNERABLE EXAMPLE\n\nCODE:\nAttic Changelog\n...


In [20]:
# Confirm test_df & show preview

test_df[["safety", "code"]].head()

Unnamed: 0,safety,code
20087,vulnerable,<?php\n\n\n\n// Init owncloud\n\nrequire_once(...
14931,vulnerable,"/* Prototype JavaScript framework, version 1...."
8029,vulnerable,/* FriBidi\n\n * fribidi-bidi.c - bidirectiona...
23270,safe,<?php\n\n\n\n/**\n\n* ownCloud\n\n*\n\n* @auth...
19847,safe,/*\n\n * Copyright (c) 2001-2003 Michael David...


In [21]:
# Define a helper to retry when per-minute quota is reached.
is_retriable = lambda e: (isinstance(e, genai.errors.APIError) and e.code in {429, 503})


class GeminiEmbeddingFunction(EmbeddingFunction):
    # Specify whether to generate embeddings for documents, or queries
    document_mode = True

    @retry.Retry(predicate=is_retriable)
    def __call__(self, input: Documents) -> Embeddings:
        if self.document_mode:
            embedding_task = "retrieval_document"
        else:
            embedding_task = "retrieval_query"

        response = client.models.embed_content(
            model="models/embedding-001",
            contents=input,
            config=types.EmbedContentConfig(
                task_type=embedding_task,
            ),
        )
        return [e.values for e in response.embeddings]

In [22]:
# Truncates documents that are too long to avoid embedding errors
# This is to fix max bytes in request error.

MAX_LEN = 10000  # chars
#train_df = train_df[train_df["doc"].str.len() < MAX_LEN].copy() # this completely removes it
train_df["doc"] = train_df["doc"].str.slice(0, MAX_LEN) #just truncates it
documents = train_df["doc"]
metadatas = train_df["safety"].apply(lambda x: {"safety": x})

In [23]:
DB_NAME = "test1"

embed_fn = GeminiEmbeddingFunction()
embed_fn.document_mode = True

chroma_client = chromadb.Client()
db = chroma_client.get_or_create_collection(name=DB_NAME, embedding_function=embed_fn)

step = 100
for start in range(0, len(documents), step):
    end = min(start + step, len(documents))
    if start % 1000 == 0 or end == len(documents):
        print(f"Batch: {start} --> {end}")

    docs_test = documents[start:end].tolist()
    ids = [str(i) for i in range(start, end)]
    metadata_batch = metadatas[start:end].tolist()
#    metadata_batch = metadatas[start:end]
#    print(metadata_batch)
#    db.add(documents=docs_test, ids=ids, metadatas=metadata_batch) 
    try:
        db.add(documents=docs_test, ids=ids, metadatas=metadata_batch)
    except Exception as e:
        print(f"Error in batch {start}-{end}: {e}")

Batch: 0 --> 100
Batch: 1000 --> 1100
Batch: 2000 --> 2100
Batch: 3000 --> 3100
Batch: 4000 --> 4100
Batch: 5000 --> 5100
Batch: 6000 --> 6100
Batch: 7000 --> 7100
Batch: 8000 --> 8100
Batch: 9000 --> 9100
Batch: 10000 --> 10100
Batch: 11000 --> 11100
Batch: 12000 --> 12100
Batch: 13000 --> 13100
Batch: 14000 --> 14100
Batch: 15000 --> 15100
Batch: 16000 --> 16100
Batch: 17000 --> 17100
Batch: 18000 --> 18100
Batch: 19000 --> 19100
Batch: 20000 --> 20100
Batch: 21000 --> 21100
Batch: 22000 --> 22100
Batch: 23000 --> 23100
Batch: 24000 --> 24100
Batch: 24900 --> 24955


In [24]:
# Sanity-check: count how many documents were added to ChromaDB
db.count()

24955

In [25]:
# Switch to query mode when generating embeddings.
embed_fn.document_mode = False

# Search the Chroma DB using an example query.
query = "select"

result = db.query(query_texts=[query], n_results=1)
print(result["documents"])

[['VULNERABLE EXAMPLE\n\nCODE:\n404: Not Found']]


In [26]:
# Function to analyze given code snippet using RAG approach

def analyze_code_with_rag(code_snippet: str, k: int = 3) -> dict:
    # Step 1: Query similar examples
    embed_fn.document_mode = False  # Query mode
    results = db.query(query_texts=[code_snippet], n_results=k)

    retrieved_examples = results["documents"][0]
    context_examples = "\n\n---\n\n".join(retrieved_examples)

    # Step 2: Build a prompt
    prompt = f"""
    You are Lintelligence, an AI assistant that helps developers write secure code. Below is a code snippet to analyze:{code_snippet}
    Here are {k} similar examples from real-world open-source code (some may be vulnerable, others may be safe): {context_examples}
    Your job is to identify actual security vulnerabilities in the new code. ONLY flag an issue if the code:
    Uses insecure practices (e.g., direct input injection, weak encryption, unsafe deserialization)
    Fails to follow common secure coding principles
    Is not using known safe patterns (e.g., parameterized queries, escaping)
    If the code is secure, DO NOT flag it. Confirm that it is safe instead.
    """
    config = types.GenerateContentConfig(temperature=0.0)
    raw_response = client.models.generate_content(
      model='gemini-2.0-flash',
      config=config,
      contents=SYSTEM_PROMPT+prompt
    )
    raw_text = raw_response.text.strip()
    cleaned_text = safe_json_parse(raw_text)

    return cleaned_text

Simple test cases; just to see if it works:

In [27]:
vulnerable_code = """
search = request.args.get("q")
query = "SELECT * FROM users WHERE name = '" + search + "'"
cursor.execute(query)
"""

response = analyze_code_with_rag(vulnerable_code, k=3)
Markdown(json_to_markdown(response))


### 🛡️ Vulnerability: SQL Injection

### 🚨 Severity: High

---

#### ❗ Explanation  
The code directly concatenates the user-provided `search` parameter into the SQL query string without any sanitization or escaping. This allows an attacker to inject arbitrary SQL code by crafting a malicious input string.

---

#### 🧠 Impact:
A successful SQL injection attack can allow an attacker to bypass authentication, read sensitive data, modify data, execute administrative operations, recover the content of a given file present on the DBMS file system and in some cases issue commands to the operating system.

---

#### ✅ Suggested Fix  
python
search = request.args.get("q")
query = "SELECT * FROM users WHERE name = %s"
cursor.execute(query, (search,))


---

#### 🔍 Fix Explanation  
Using parameterized queries (also known as prepared statements) ensures that the input is treated as data, not as part of the SQL command. The database driver handles the proper escaping and quoting of the input, preventing SQL injection vulnerabilities.


In [28]:
safe_code_example = """
import sqlite3
from flask import request

conn = sqlite3.connect("users.db")
cursor = conn.cursor()

username = request.args.get("username")
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))

user_data = cursor.fetchone()
print(user_data)

conn.close()
"""

response = analyze_code_with_rag(safe_code_example,k=3)
Markdown(json_to_markdown(response))

No security issues found in the provided code.

## Evaluation
In this section, we evaluate the performance of each approach

In [29]:
# The number of records we have in test dataset:

len(test_df)

6239

In [30]:
# Counts of each unique label
test_df["safety"].value_counts()

safety
vulnerable    3139
safe          3100
Name: count, dtype: int64

In [31]:
from sklearn.metrics import confusion_matrix, classification_report

### Evaluation of Few-Shot-Prompting Approach:

In [32]:
correct = 0
false = 0
total = 0
invalid = 0

y_true = []
y_pred = []

for _, row in tqdm(test_df.sample(100).iterrows()):  # small test set, it takes too long with free API
    label = row["safety"]
    code = row["code"]
    pred = ""
    response = analyze_code(code)
    
    #if response is not valid, don't count it.
    if "vulnerability" not in response and "message" not in response:
        # print("Missing 'vulnerability' key and 'message'. Skipping...")
        # print(f"DEBUG: {response}")
        invalid += 1
        continue

    # Decide prediction
    if response.get("message") and "no security issues found" in response["message"].lower():
        pred = "safe"
    elif response.get("vulnerability"):  # Any vulnerability listed = vulnerable
        pred = "vulnerable"
    else:
        print(f"Error parsing results.")
        invalid += 1
        continue

    y_true.append(label)
    y_pred.append(pred)

    if label == pred:
        correct += 1
    elif label != pred:
        false += 1
    
    # print(f"Label: {label} --> Prediction: {pred}")
    total += 1


print(f"Accuracy: {correct}/{total} = {correct/total:.2f}")
print(f"Invalid: {invalid}")

100it [06:10,  3.70s/it]

Accuracy: 46/92 = 0.50
Invalid: 8





In [33]:
if total > 0:
    labels = ["safe", "vulnerable"]
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=labels))


Classification Report:
              precision    recall  f1-score   support

        safe       0.57      0.57      0.57        53
  vulnerable       0.41      0.41      0.41        39

    accuracy                           0.50        92
   macro avg       0.49      0.49      0.49        92
weighted avg       0.50      0.50      0.50        92



### Evaluation of RAG Approach:

In [34]:
correct = 0
false = 0
total = 0
invalid = 0

y_true = []
y_pred = []

for _, row in tqdm(test_df.sample(100).iterrows()):  # small test set, it takes too long with free API
    label = row["safety"]
    code = row["code"]
    pred = ""
    response = analyze_code_with_rag(code)
    
    #if response is not valid, don't count it.
    if "vulnerability" not in response and "message" not in response:
        # print("Missing 'vulnerability' key and 'message'. Skipping...")
        # print(f"DEBUG: {response}")
        invalid += 1
        continue

    # Decide prediction
    if response.get("message") and "no security issues found" in response["message"].lower():
        pred = "safe"
    elif response.get("vulnerability"):  # Any vulnerability listed = vulnerable
        pred = "vulnerable"
    else:
        print(f"Error parsing results.")
        invalid += 1
        continue

    y_true.append(label)
    y_pred.append(pred)

    if label == pred:
        correct += 1
    elif label != pred:
        false += 1
    
    # print(f"Label: {label} --> Prediction: {pred}")
    total += 1


print(f"Accuracy: {correct}/{total} = {correct/total:.2f}")
print(f"Invalid: {invalid}")

100it [06:16,  3.77s/it]

Accuracy: 55/98 = 0.56
Invalid: 2





In [35]:
if total > 0:
    labels = ["safe", "vulnerable"]
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=labels))


Classification Report:
              precision    recall  f1-score   support

        safe       0.58      0.68      0.63        53
  vulnerable       0.53      0.42      0.47        45

    accuracy                           0.56        98
   macro avg       0.55      0.55      0.55        98
weighted avg       0.56      0.56      0.55        98

