In [0]:
%pip install semgrep pip-audit sqlfluff detect-secrets
%restart_python

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import json
import requests
import subprocess
import tempfile
import os
import pandas as pd
import logging
from typing import Dict, List, Any, Optional
from pathlib import Path


In [0]:
def install_gitleaks():
    """Install GitLeaks binary if not already installed"""
    gitleaks_path = "/tmp/gitleaks"

    if not os.path.exists(gitleaks_path):
        print("Installing GitLeaks...")
        # Download GitLeaks binary for Linux
        url = "https://github.com/gitleaks/gitleaks/releases/download/v8.18.0/gitleaks_8.18.0_linux_x64.tar.gz"

        with tempfile.TemporaryDirectory() as temp_dir:
            tar_path = os.path.join(temp_dir, "gitleaks.tar.gz")

            # Download
            response = requests.get(url)
            with open(tar_path, 'wb') as f:
                f.write(response.content)

            # Extract
            subprocess.run(["tar", "-xzf", tar_path, "-C", temp_dir], check=True)

            # Move to final location
            subprocess.run(["cp", os.path.join(temp_dir, "gitleaks"), gitleaks_path], check=True)
            subprocess.run(["chmod", "+x", gitleaks_path], check=True)

        print("GitLeaks installed successfully!")
    else:
        print("GitLeaks already installed.")

install_gitleaks()

Installing GitLeaks...
GitLeaks installed successfully!


In [0]:
class SASTScanner:
    """Base class for SAST scanners"""

    def __init__(self, name: str):
        self.name = name

    def scan(self, file_path: str) -> Dict[str, Any]:
        """Run the scanner on a file and return structured results"""
        raise NotImplementedError

    def _run_command(self, cmd: List[str], cwd: Optional[str] = None) -> tuple:
        """Run a command and return stdout, stderr, and return code"""
        try:
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                cwd=cwd,
                timeout=300  # 5 minute timeout
            )
            return result.stdout, result.stderr, result.returncode
        except subprocess.TimeoutExpired:
            return "", "Command timed out", 1
        except Exception as e:
            return "", str(e), 1

In [0]:
class SemgrepScanner(SASTScanner):
    """Semgrep scanner implementation"""

    def __init__(self):
        super().__init__("Semgrep")

    def scan(self, file_path: str) -> Dict[str, Any]:
        cmd = ["semgrep", "--config=auto", "--json", "--quiet", file_path]
        stdout, stderr, returncode = self._run_command(cmd)

        results = {
            "scanner": self.name,
            "file_path": file_path,
            "status": "success" if returncode == 0 else "error",
            "raw_output": stdout,
            "error_output": stderr,
            "findings": [],
            "pass_count": 0,
            "warn_count": 0,
            "fail_count": 0,
            "total_issues": 0
        }

        if stdout:
            try:
                data = json.loads(stdout)
                findings = data.get("results", [])
                results["findings"] = findings
                results["total_issues"] = len(findings)

                # Categorize findings by severity
                for finding in findings:
                    severity = finding.get("extra", {}).get("severity", "INFO")
                    if severity in ["ERROR", "HIGH"]:
                        results["fail_count"] += 1
                    elif severity in ["WARNING", "MEDIUM"]:
                        results["warn_count"] += 1
                    else:
                        results["pass_count"] += 1

            except json.JSONDecodeError:
                results["status"] = "error"
                results["error_output"] = "Failed to parse JSON output"

        return results

In [0]:
class PipAuditScanner(SASTScanner):
    """pip-audit scanner implementation"""

    def __init__(self):
        super().__init__("pip-audit")

    def scan(self, file_path: str) -> Dict[str, Any]:
        # pip-audit works on requirements files or installed packages
        # For this POC, we'll scan if it's a requirements file

        results = {
            "scanner": self.name,
            "file_path": file_path,
            "status": "skipped",
            "raw_output": "",
            "error_output": "",
            "findings": [],
            "pass_count": 0,
            "warn_count": 0,
            "fail_count": 0,
            "total_issues": 0
        }

        # Only scan if it looks like a requirements file
        if not any(keyword in os.path.basename(file_path).lower() 
                  for keyword in ["requirements", "pyproject.toml", "setup.py"]):
            results["error_output"] = "File type not supported by pip-audit"
            return results

        cmd = ["pip-audit", "--format=json", "--requirement", file_path]
        stdout, stderr, returncode = self._run_command(cmd)

        results["raw_output"] = stdout
        results["error_output"] = stderr
        results["status"] = "success" if returncode == 0 else "error"

        if stdout:
            try:
                data = json.loads(stdout)
                findings = data.get("vulnerabilities", [])
                results["findings"] = findings
                results["total_issues"] = len(findings)
                results["fail_count"] = len(findings)  # All vulnerabilities are failures

            except json.JSONDecodeError:
                results["status"] = "error"
                results["error_output"] = "Failed to parse JSON output"

        return results

In [0]:
from __future__ import annotations
import io, os, json, re, tokenize, ast
from dataclasses import dataclass
from typing import List, Optional

SQL_KEYWORDS = (
    "SELECT","WITH","INSERT","UPDATE","DELETE","CREATE","ALTER","DROP","MERGE",
    "TRUNCATE","GRANT","REVOKE","CALL","REFRESH","ANALYZE","OPTIMIZE"
)

@dataclass
class ExtractedSQL:
    text: str
    start_line: int
    end_line: int
    origin: str            # 'sql_file', 'py_string', 'py_call', 'magic_sql', 'ipynb_sql', 'md_fence', 'yaml_block'
    context: str = ""      # e.g., 'spark.sql', 'cursor.execute', 'f-string', 'cell 7', etc.

def _looks_like_sql(s: str) -> bool:
    t = re.sub(r"\s+", " ", s.strip())
    if len(t) < 12:
        return False
    up = t.upper()
    return any(k in up for k in SQL_KEYWORDS) and (" " in t or "\n" in s)

# --- Python helpers (accurate line numbers via tokenize) ----------------------

_PREFIX_RE = re.compile(r"(?i)^(?P<prefix>(?:[rubf]|br|rb|fr|rf){0,2})?")
_QUOTE_RE  = re.compile(r"(?s)^([\"']{3}|[\"'])|(.*)([\"']{3}|[\"'])$")

def _unquote_python_string_token(token_str: str) -> str:
    """Return best-effort string content from a Python STRING token (handles raw/f/byte prefixes & triple quotes)."""
    # Try literal_eval first (works for normal & raw strings; not for f-strings)
    try:
        if token_str.lstrip().lower().startswith(("f'", 'f"', "fr'", 'fr"', "rf'", 'rf"')):
            raise ValueError  # skip f-strings here
        return ast.literal_eval(token_str)
    except Exception:
        pass

    s = token_str.strip()
    # Strip prefixes like r, u, b, f, rf, fr
    m = _PREFIX_RE.match(s)
    if m:
        s = s[m.end():]

    # Identify opening quote
    if len(s) < 2:
        return s
    quote = s[0]
    if s[:3] in ("'''",'"""'):
        q = s[:3]
        core = s[3:-3] if s.endswith(q) else s[3:]
    else:
        q = quote
        core = s[1:-1] if s.endswith(q) else s[1:]

    # For f-strings, replace {...} with placeholders (don’t attempt to evaluate)
    # Keeps surrounding SQL text intact.
    if token_str.lstrip().lower().startswith(("f", "rf", "fr")):
        core = re.sub(r"\{[^{}]*\}", "{expr}", core)

    return core

def _extract_sql_from_python_code(code: str) -> List[ExtractedSQL]:
    out: List[ExtractedSQL] = []
    # 1) STRING tokens: captures triple-quoted SQL in variables, docstrings, etc.
    try:
        for tok in tokenize.generate_tokens(io.StringIO(code).readline):
            if tok.type == tokenize.STRING:
                raw = tok.string
                txt = _unquote_python_string_token(raw)
                if _looks_like_sql(txt):
                    out.append(ExtractedSQL(
                        text=txt,
                        start_line=tok.start[0],
                        end_line=tok.end[0],
                        origin="py_string",
                        context="string literal" + (" (f-string)" if raw.lower().lstrip().startswith("f") else "")
                    ))
    except tokenize.TokenError:
        pass

    # 2) Heuristic: common call sites with inline SQL as first arg
    #    We use regex with line spans for simplicity; tokenizer already gave us strings above,
    #    but this adds context labels like spark.sql / cursor.execute / read_sql.
    call_patterns = [
        (r'\bspark\.sql\s*\(\s*([ruRbBfF]{0,2}?["\']{1,3}.*?["\']{1,3})', "spark.sql"),
        (r'\bcursor\.execute\s*\(\s*([ruRbBfF]{0,2}?["\']{1,3}.*?["\']{1,3})', "cursor.execute"),
        (r'\bread_sql(?:_query)?\s*\(\s*([ruRbBfF]{0,2}?["\']{1,3}.*?["\']{1,3})', "read_sql"),
        (r'\btext\s*\(\s*([ruRbBfF]{0,2}?["\']{1,3}.*?["\']{1,3})', "sqlalchemy.text"),
    ]
    for pat, ctx in call_patterns:
        for m in re.finditer(pat, code, flags=re.IGNORECASE | re.DOTALL):
            s = m.group(1)
            txt = _unquote_python_string_token(s)
            if _looks_like_sql(txt):
                # approximate line span using preceding newlines
                start_line = code.count("\n", 0, m.start(1)) + 1
                end_line   = code.count("\n", 0, m.end(1)) + 1
                out.append(ExtractedSQL(txt, start_line, end_line, "py_call", ctx))

    # 3) Databricks-exported MAGIC %sql inside .py (lines start with "# MAGIC %sql")
    lines = code.splitlines()
    i = 0
    while i < len(lines):
        line = lines[i].lstrip()
        if line.startswith("# MAGIC %sql"):
            start = i + 1
            sql_lines = []
            i += 1
            while i < len(lines):
                cur = lines[i]
                cur_strip = cur.lstrip()
                if cur_strip.startswith("# COMMAND"):
                    break
                if cur_strip.startswith("# MAGIC %") and not cur_strip.startswith("# MAGIC %sql"):
                    break
                if cur_strip.startswith("# MAGIC"):
                    sql_lines.append(cur_strip.replace("# MAGIC ", "", 1).replace("# MAGIC", "", 1))
                else:
                    # non-MAGIC lines inside the same cell are still content
                    sql_lines.append(cur)
                i += 1
            sql_text = "\n".join(sql_lines).strip()
            if _looks_like_sql(sql_text):
                out.append(ExtractedSQL(sql_text, start+1, start+len(sql_lines), "magic_sql", "Databricks # MAGIC %sql"))
            continue
        i += 1

    return out

# --- Markdown fenced code blocks ------------------------------------------------

_MD_FENCE = re.compile(r"(?ms)^```(?:sql|postgres|tsql|bigquery)\s*(.*?)\s*```")

def _extract_sql_from_markdown(md: str) -> List[ExtractedSQL]:
    out: List[ExtractedSQL] = []
    for m in _MD_FENCE.finditer(md):
        block = m.group(1)
        if _looks_like_sql(block):
            start_line = md.count("\n", 0, m.start()) + 1
            end_line   = start_line + block.count("\n")
            out.append(ExtractedSQL(block, start_line, end_line, "md_fence", "```sql fenced block"))
    return out

# --- Simple YAML sql: | or query: | blocks ------------------------------------

_YAML_SQL = re.compile(
    r'(?mi)^(?P<key>\s*(?:sql|query)\s*:\s*\|)\s*\n(?P<body>(?:[ \t].*\n?)+)'
)

def _extract_sql_from_yaml(yml: str) -> List[ExtractedSQL]:
    out: List[ExtractedSQL] = []
    for m in _YAML_SQL.finditer(yml):
        body = m.group("body")
        # strip common indent
        lines = body.splitlines()
        if not lines:
            continue
        indent = min((len(l) - len(l.lstrip())) for l in lines if l.strip())
        block = "\n".join(l[indent:] if len(l) >= indent else l for l in lines)
        if _looks_like_sql(block):
            start_line = yml.count("\n", 0, m.start("body")) + 1
            end_line   = start_line + block.count("\n")
            out.append(ExtractedSQL(block, start_line, end_line, "yaml_block", "yaml sql: |"))
    return out

# --- Jupyter notebook (.ipynb) -------------------------------------------------

def _extract_sql_from_ipynb_bytes(data: bytes) -> List[ExtractedSQL]:
    out: List[ExtractedSQL] = []
    nb = json.loads(data.decode("utf-8"))
    line_offset = 0  # accumulate pseudo-lines to produce stable file-level line numbers

    for idx, cell in enumerate(nb.get("cells", [])):
        src_lines = cell.get("source", [])
        # Normalize list-of-lines -> single string
        cell_src = "".join(src_lines)
        cell_line_count = cell_src.count("\n") + 1 if cell_src else 0

        if cell.get("cell_type") != "code":
            line_offset += max(1, cell_line_count)
            continue

        # %sql / %%sql first non-empty line
        stripped_lines = [l.rstrip("\n") for l in src_lines]
        first_nonempty = next((l for l in stripped_lines if l.strip()), "")
        if first_nonempty.lstrip().startswith(("%sql", "%%sql")):
            # everything after that line is SQL
            start_idx = stripped_lines.index(first_nonempty)
            sql_text = "\n".join(stripped_lines[start_idx+1:]).strip()
            if _looks_like_sql(sql_text):
                start = line_offset + start_idx + 2  # +1 to move past %sql line, +1 to convert 0->1
                end   = start + sql_text.count("\n")
                out.append(ExtractedSQL(sql_text, start, end, "ipynb_sql", f"cell {idx} (%sql)"))
        else:
            # Treat as python code and reuse tokenizer to find strings/calls
            py_blocks = _extract_sql_from_python_code(cell_src)
            for b in py_blocks:
                out.append(ExtractedSQL(
                    text=b.text,
                    start_line=line_offset + b.start_line,
                    end_line=line_offset + b.end_line,
                    origin=("magic_sql" if b.origin=="magic_sql" else "py_string" if b.origin=="py_string" else "py_call"),
                    context=f"cell {idx}: {b.context}"
                ))

        line_offset += max(1, cell_line_count)

    return out

# --- Main entrypoint -----------------------------------------------------------

def extract_sql_blocks(file_path: str, content_bytes: Optional[bytes] = None) -> List[ExtractedSQL]:
    """
    Standalone extractor: returns SQL blocks with line numbers and origin.
    - Reads file if content_bytes not provided.
    """
    if content_bytes is None:
        with open(file_path, "rb") as f:
            content_bytes = f.read()

    lower = file_path.lower()

    # .sql -> return entire file
    if lower.endswith(".sql"):
        text = content_bytes.decode("utf-8", errors="replace")
        return [ExtractedSQL(text=text, start_line=1, end_line=text.count("\n")+1, origin="sql_file", context=os.path.basename(file_path))]

    # .ipynb
    if lower.endswith(".ipynb"):
        try:
            return _extract_sql_from_ipynb_bytes(content_bytes)
        except Exception:
            # fall back to text scan if something odd happens
            pass

    # Text-based formats
    text = content_bytes.decode("utf-8", errors="replace")

    if lower.endswith((".py", ".py.txt")) or "# Databricks notebook source" in text:
        return _extract_sql_from_python_code(text)

    if lower.endswith((".md", ".markdown")):
        return _extract_sql_from_markdown(text)

    if lower.endswith((".yml", ".yaml")):
        return _extract_sql_from_yaml(text)

    # Generic fallback: try markdown fences first, then a quick-n-dirty triple-quote scan
    blocks = _extract_sql_from_markdown(text) + _extract_sql_from_yaml(text)
    if blocks:
        return blocks

    # Nothing matched -> empty
    return []


In [0]:
class SQLFluffScanner(SASTScanner):
    """SQLFluff scanner implementation"""
    
    def __init__(self):
        super().__init__("SQLFluff")
    
    def _extract_sql_from_file(self, file_path: str) -> List[tuple]:
        """Extract SQL statements from Python/notebook files with line numbers"""
        sql_statements = []
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Handle Jupyter notebooks
            if file_path.endswith('.ipynb'):
                try:
                    import json as json_lib
                    notebook = json_lib.loads(content)
                    content = ""
                    line_offset = 0
                    
                    for cell in notebook.get('cells', []):
                        if cell.get('cell_type') == 'code':
                            cell_source = ''.join(cell.get('source', []))
                            content += cell_source + '\n'
                except:
                    pass  # Fall back to treating as regular text
            
            lines = content.split('\n')
            i = 0
            
            while i < len(lines):
                line = lines[i]
                stripped = line.strip()
                
                # Look for triple quote starts
                if '"""' in line or "'''" in line:
                    quote_type = '"""' if '"""' in line else "'''"
                    
                    # Find the start of the string
                    quote_start_idx = line.find(quote_type)
                    
                    # Check if it's a single line triple quote string
                    remaining_line = line[quote_start_idx + 3:]
                    if quote_type in remaining_line:
                        # Single line triple quote string
                        end_idx = remaining_line.find(quote_type)
                        sql_candidate = remaining_line[:end_idx].strip()
                        if any(keyword in sql_candidate.upper() for keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP']):
                            sql_statements.append((sql_candidate, i + 1))
                        i += 1
                        continue
                    
                    # Multi-line triple quote string
                    sql_lines = []
                    sql_start_line = i + 1
                    
                    # Add any content after the opening quotes on the same line
                    if remaining_line.strip():
                        sql_lines.append(remaining_line)
                    
                    i += 1
                    # Look for the closing triple quote
                    while i < len(lines):
                        current_line = lines[i]
                        if quote_type in current_line:
                            # Found closing quote
                            quote_end_idx = current_line.find(quote_type)
                            if quote_end_idx > 0:
                                sql_lines.append(current_line[:quote_end_idx])
                            break
                        else:
                            sql_lines.append(current_line)
                        i += 1
                    
                    # Check if the collected content looks like SQL
                    sql_text = '\n'.join(sql_lines).strip()
                    if sql_text and any(keyword in sql_text.upper() for keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP']):
                        sql_statements.append((sql_text, sql_start_line))
                
                # Single line SQL in regular strings
                elif ('"' in line or "'" in line) and any(keyword in line.upper() for keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE']):
                    import re
                    # Find content within quotes
                    matches = re.findall(r'["\']([^"\']*(?:SELECT|INSERT|UPDATE|DELETE)[^"\']*)["\']', line, re.IGNORECASE)
                    for match in matches:
                        if len(match.strip()) > 10:  # Only consider substantial SQL
                            sql_statements.append((match.strip(), i + 1))
                
                # MAGIC %sql cells in Databricks notebooks
                elif stripped.startswith('# MAGIC %sql') or stripped.startswith('%sql'):
                    sql_lines = []
                    sql_start_line = i + 1
                    i += 1
                    
                    # Collect all subsequent lines until we hit a non-SQL line or end of cell
                    while i < len(lines):
                        current_line = lines[i]
                        if (current_line.strip().startswith('# MAGIC') and 
                            not current_line.strip().startswith('# MAGIC %sql') and
                            current_line.strip() != '# MAGIC'):
                            break
                        elif current_line.strip().startswith('%') and not current_line.strip().startswith('%sql'):
                            break
                        else:
                            # Clean up MAGIC prefixes for actual SQL content
                            clean_line = current_line.replace('# MAGIC ', '').replace('# MAGIC', '')
                            sql_lines.append(clean_line)
                        i += 1
                    
                    sql_text = '\n'.join(sql_lines).strip()
                    if sql_text:
                        sql_statements.append((sql_text, sql_start_line))
                    continue  # i was already incremented
                
                i += 1
        
        except Exception as e:
            logger.warning(f"Error extracting SQL from {file_path}: {e}")
        
        return sql_statements
    
    def scan(self, file_path: str) -> Dict[str, Any]:
        results = {
            "scanner": self.name,
            "file_path": file_path,
            "status": "success",
            "raw_output": "",
            "error_output": "",
            "findings": [],
            "pass_count": 0,
            "warn_count": 0,
            "fail_count": 0,
            "total_issues": 0
        }
        
        # Check if file is supported
        file_ext = file_path.lower()
        if not (file_ext.endswith(('.sql', '.py', '.ipynb')) or 'sql' in file_ext):
            results["status"] = "skipped"
            results["error_output"] = "File type not supported by SQLFluff"
            return results
        
        # For pure SQL files, scan directly
        if file_ext.endswith('.sql'):
            cmd = ["sqlfluff", "lint", "--format=json", file_path]
            stdout, stderr, returncode = self._run_command(cmd)
            
            results["raw_output"] = stdout
            results["error_output"] = stderr
            results["status"] = "success" if returncode == 0 else "error"
        
        # For Python/notebook files, extract SQL first
        else:
            blocks = extract_sql_blocks(file_path)
            if not blocks:
                results["status"] = "skipped"
                results["error_output"] = "No SQL statements found in file"
                return results

            all_findings, all_output = [], []
            for b in blocks:
                with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as tmp:
                    tmp.write(b.text)
                    tmp_path = tmp.name
                try:
                    cmd = ["sqlfluff", "lint", "--format=json" "--dialect=databricks", tmp_path]
                    stdout, stderr, rc = self._run_command(cmd)
                    all_output.append(f"{b.origin} ({b.context}) at lines {b.start_line}-{b.end_line}:\n{stdout}")

                    if stdout:
                        data = json.loads(stdout)
                        for fr in data:
                            for v in fr.get("violations", []):
                                v["original_file_line"] = b.start_line + v.get("line_no", 1) - 1
                                v["sql_snippet"] = (b.text[:100] + "...") if len(b.text) > 100 else b.text
                                all_findings.append(v)
                finally:
                    try: os.unlink(tmp_path)
                    except: pass

            results["raw_output"] = "\n".join(all_output)
            results["findings"]   = all_findings
        
        # Process findings for both cases
        if results["raw_output"] and not results["findings"]:
            try:
                data = json.loads(results["raw_output"])
                findings = []
                for file_result in data:
                    findings.extend(file_result.get("violations", []))
                results["findings"] = findings
            except json.JSONDecodeError:
                results["status"] = "error"
                results["error_output"] = f"Failed to parse JSON output {results['raw_output']}"
        
        # Categorize findings
        results["total_issues"] = len(results["findings"])
        for finding in results["findings"]:
            code = finding.get("code", "")
            if code.startswith("L"):  # Layout issues
                results["warn_count"] += 1
            elif code.startswith("E"):  # Errors
                results["fail_count"] += 1
            else:
                results["pass_count"] += 1
        
        return results

In [0]:
class GitLeaksScanner(SASTScanner):
    """GitLeaks scanner implementation"""

    def __init__(self):
        super().__init__("GitLeaks")

    def scan(self, file_path: str) -> Dict[str, Any]:
        results = {
            "scanner": self.name,
            "file_path": file_path,
            "status": "success",
            "raw_output": "",
            "error_output": "",
            "findings": [],
            "pass_count": 0,
            "warn_count": 0,
            "fail_count": 0,
            "total_issues": 0
        }

        # Create a temporary directory and copy the file there
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_file = os.path.join(temp_dir, os.path.basename(file_path))

            # Copy the file to temp directory
            subprocess.run(["cp", file_path, temp_file], check=True)

            # Initialize git repo in temp directory
            subprocess.run(["git", "init"], cwd=temp_dir, capture_output=True)
            subprocess.run(["git", "add", "."], cwd=temp_dir, capture_output=True)
            subprocess.run(["git", "config", "user.email", "test@example.com"], cwd=temp_dir, capture_output=True)
            subprocess.run(["git", "config", "user.name", "Test User"], cwd=temp_dir, capture_output=True)
            subprocess.run(["git", "commit", "-m", "test"], cwd=temp_dir, capture_output=True)

            # Run GitLeaks
            cmd = ["/tmp/gitleaks", "detect", "--source", temp_dir, "--report-format", "json", "--report-path", "/tmp/gitleaks-report.json", "--exit-code", "0"]
            stdout, stderr, returncode = self._run_command(cmd, cwd=temp_dir)

            results["raw_output"] = stdout
            results["error_output"] = stderr

            # Read the report file
            report_path = "/tmp/gitleaks-report.json"
            if os.path.exists(report_path):
                try:
                    with open(report_path, 'r') as f:
                        findings = json.load(f)
                        results["findings"] = findings
                        results["total_issues"] = len(findings)
                        results["fail_count"] = len(findings)  # All secrets are failures

                    # Clean up report file
                    os.remove(report_path)

                except (json.JSONDecodeError, FileNotFoundError):
                    results["status"] = "error"
                    results["error_output"] = "Failed to read GitLeaks report"

        return results

In [0]:
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SASTRunner:
    """Main class to run all SAST scanners and collect results"""
    def __init__(self):
        self.scanners = [
            SemgrepScanner(),
            PipAuditScanner(),
            SQLFluffScanner(),
            GitLeaksScanner()
        ]

    def scan_file(self, file_path: str) -> pd.DataFrame:
        """Run all scanners on a file and return results as DataFrame"""

        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")

        all_results = []

        logger.info(f"Starting SAST scan for file: {file_path}")

        for scanner in self.scanners:
            logger.info(f"Running {scanner.name}...")
            try:
                result = scanner.scan(file_path)
                all_results.append(result)
                logger.info(f"{scanner.name} completed: {result['status']}")
            except Exception as e:
                logger.error(f"Error running {scanner.name}: {str(e)}")
                # Add error result
                error_result = {
                    "scanner": scanner.name,
                    "file_path": file_path,
                    "status": "error",
                    "raw_output": "",
                    "error_output": str(e),
                    "findings": [],
                    "pass_count": 0,
                    "warn_count": 0,
                    "fail_count": 0,
                    "total_issues": 0
                }
                all_results.append(error_result)

        # Convert to DataFrame
        df = pd.DataFrame(all_results)

        return df

    def scan_multiple_files(self, file_paths: List[str]) -> pd.DataFrame:
        """Run all scanners on multiple files"""
        all_results = []

        for file_path in file_paths:
            try:
                file_results = self.scan_file(file_path)
                all_results.append(file_results)
            except Exception as e:
                logger.error(f"Error scanning {file_path}: {str(e)}")

        # Combine all results
        if all_results:
            combined_df = pd.concat(all_results, ignore_index=True)
        else:
            combined_df = pd.DataFrame()

        return combined_df

In [0]:
import glob

test_files = glob.glob("test_files/*")
test_files

['test_files/unsafe.py',
 'test_files/requirements.txt',
 'test_files/unsafe.sql',
 'test_files/unsafe_config.env',
 'test_files/unsafe_with_sql.py']

In [0]:
# Initialize the SAST runner
sast_runner = SASTRunner()

# Scan all sample files
print("Running SAST scanners on sample files...")
results_df = sast_runner.scan_multiple_files(test_files)

Running SAST scanners on sample files...


INFO:__main__:Starting SAST scan for file: test_files/unsafe.py
INFO:__main__:Running Semgrep...
INFO:__main__:Semgrep completed: success
INFO:__main__:Running pip-audit...
INFO:__main__:pip-audit completed: skipped
INFO:__main__:Running SQLFluff...
INFO:__main__:SQLFluff completed: skipped
INFO:__main__:Running GitLeaks...
INFO:__main__:GitLeaks completed: success
INFO:__main__:Starting SAST scan for file: test_files/requirements.txt
INFO:__main__:Running Semgrep...
INFO:__main__:Semgrep completed: success
INFO:__main__:Running pip-audit...
INFO:__main__:pip-audit completed: error
INFO:__main__:Running SQLFluff...
INFO:__main__:SQLFluff completed: skipped
INFO:__main__:Running GitLeaks...
INFO:__main__:GitLeaks completed: success
INFO:__main__:Starting SAST scan for file: test_files/unsafe.sql
INFO:__main__:Running Semgrep...
INFO:__main__:Semgrep completed: success
INFO:__main__:Running pip-audit...
INFO:__main__:pip-audit completed: skipped
INFO:__main__:Running SQLFluff...
INFO:__m

In [0]:
display(results_df)

INFO:py4j.clientserver:Received command c on object id p0


scanner,file_path,status,raw_output,error_output,findings,pass_count,warn_count,fail_count,total_issues
Semgrep,test_files/unsafe.py,success,"{""version"":""1.137.1"",""results"":[{""check_id"":""python.lang.security.audit.eval-detected.eval-detected"",""path"":""test_files/unsafe.py"",""start"":{""line"":9,""col"":12,""offset"":191},""end"":{""line"":9,""col"":28,""offset"":207},""extra"":{""message"":""Detected the use of eval(). eval() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources."",""metadata"":{""source-rule-url"":""https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b307-eval"",""cwe"":[""CWE-95: Improper Neutralization of Directives in Dynamically Evaluated Code ('Eval Injection')""],""owasp"":[""A03:2021 - Injection""],""asvs"":{""control_id"":""5.2.4 Dyanmic Code Execution Features"",""control_url"":""https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v52-sanitization-and-sandboxing-requirements"",""section"":""V5: Validation, Sanitization and Encoding Verification Requirements"",""version"":""4""},""category"":""security"",""technology"":[""python""],""references"":[""https://owasp.org/Top10/A03_2021-Injection""],""subcategory"":[""audit""],""likelihood"":""LOW"",""impact"":""HIGH"",""confidence"":""LOW"",""license"":""Semgrep Rules License v1.0. For more details, visit semgrep.dev/legal/rules-license"",""vulnerability_class"":[""Code Injection""],""source"":""https://semgrep.dev/r/python.lang.security.audit.eval-detected.eval-detected"",""shortlink"":""https://sg.run/ZvrD""},""severity"":""WARNING"",""fingerprint"":""requires login"",""lines"":""requires login"",""validation_state"":""NO_VALIDATOR"",""engine_kind"":""OSS""}}],""errors"":[],""paths"":{""scanned"":[""test_files/unsafe.py""]},""time"":{""rules"":[],""rules_parse_time"":1.4108161926269531,""profiling_times"":{""config_time"":1.7925989627838135,""core_time"":5.0837342739105225,""ignores_time"":0.0015459060668945312,""total_time"":6.879085540771484},""parsing_time"":{""total_time"":0.0,""per_file_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""scanning_time"":{""total_time"":0.04295849800109863,""per_file_time"":{""mean"":0.014319499333699543,""std_dev"":0.00031207171201054327},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""matching_time"":{""total_time"":0.0,""per_file_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_files"":[]},""tainting_time"":{""total_time"":0.0,""per_def_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_defs"":[]},""fixpoint_timeouts"":[],""prefiltering"":{""project_level_time"":0.0,""file_level_time"":0.0,""rules_with_project_prefilters_ratio"":0.0,""rules_with_file_prefilters_ratio"":0.9896907216494846,""rules_selected_ratio"":0.024054982817869417,""rules_matched_ratio"":0.024054982817869417},""targets"":[],""total_bytes"":0,""max_memory_bytes"":1107031424},""engine_requested"":""OSS"",""skipped_rules"":[]}",,"List(List(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, python.lang.security.audit.eval-detected.eval-detected, List(28, 9, 207), List(OSS, requires login, requires login, Detected the use of eval(). eval() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources., List(List(5.2.4 Dyanmic Code Execution Features, https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v52-sanitization-and-sandboxing-requirements, V5: Validation, Sanitization and Encoding Verification Requirements, 4), security, LOW, List(CWE-95: Improper Neutralization of Directives in Dynamically Evaluated Code ('Eval Injection')), HIGH, Semgrep Rules License v1.0. For more details, visit semgrep.dev/legal/rules-license, LOW, List(A03:2021 - Injection), List(https://owasp.org/Top10/A03_2021-Injection), https://sg.run/ZvrD, https://semgrep.dev/r/python.lang.security.audit.eval-detected.eval-detected, https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b307-eval, List(audit), List(python), List(Code Injection)), WARNING, NO_VALIDATOR), test_files/unsafe.py, List(12, 9, 191)))",0,1,0,1
pip-audit,test_files/unsafe.py,skipped,,File type not supported by pip-audit,List(),0,0,0,0
SQLFluff,test_files/unsafe.py,skipped,,No SQL statements found in file,List(),0,0,0,0
GitLeaks,test_files/unsafe.py,success,,○  │╲  │ ○  ○ ░  ░ gitleaks [90m6:37PM[0m [32mINF[0m 1 commits scanned. [90m6:37PM[0m [32mINF[0m scan completed in 154ms [90m6:37PM[0m [31mWRN[0m leaks found: 1,"List(List(Test User, 90d4719daf482b60e6ee0df5e0944258efb221ac, 2025-09-24T18:37:51Z, Generic API Key, test@example.com, 32, 6, 4.2479277, unsafe.py, 90d4719daf482b60e6ee0df5e0944258efb221ac:unsafe.py:generic-api-key:6, api_key = ""sk-1234567890abcdef"", test, generic-api-key, sk-1234567890abcdef, 2, 6, , List(), null, null, null, null, null))",0,0,1,1
Semgrep,test_files/requirements.txt,success,"{""version"":""1.137.1"",""results"":[],""errors"":[],""paths"":{""scanned"":[""test_files/requirements.txt""]},""time"":{""rules"":[],""rules_parse_time"":1.5036838054656982,""profiling_times"":{""config_time"":1.635270357131958,""core_time"":1.941798210144043,""ignores_time"":0.00136566162109375,""total_time"":3.5795507431030273},""parsing_time"":{""total_time"":0.0,""per_file_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""scanning_time"":{""total_time"":0.004197120666503906,""per_file_time"":{""mean"":0.002098560333251953,""std_dev"":8.967253961600363e-08},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""matching_time"":{""total_time"":0.0,""per_file_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_files"":[]},""tainting_time"":{""total_time"":0.0,""per_def_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_defs"":[]},""fixpoint_timeouts"":[],""prefiltering"":{""project_level_time"":0.0,""file_level_time"":0.0,""rules_with_project_prefilters_ratio"":0.0,""rules_with_file_prefilters_ratio"":1.0,""rules_selected_ratio"":0.0,""rules_matched_ratio"":0.0},""targets"":[],""total_bytes"":0,""max_memory_bytes"":1107031424},""engine_requested"":""OSS"",""skipped_rules"":[]}",,List(),0,0,0,0
pip-audit,test_files/requirements.txt,error,"[?25l[32m-[0m Collecting inputs [2KThe virtual environment was not created successfully because ensurepip is not available. On Debian/Ubuntu systems, you need to install the python3-venv package using the following command.  apt install python3.12-venv You may need to use sudo with that command. After installing the python3-venv package, recreate your virtual environment. Failing command: /tmp/tmprlt3dze8/bin/python3.12 [32m-[0m Collecting inputs [2K [32m-[0m Collecting inputs [2K[32m-[0m Collecting inputs [?25h [1A[2K",Failed to parse JSON output,List(),0,0,0,0
SQLFluff,test_files/requirements.txt,skipped,,File type not supported by SQLFluff,List(),0,0,0,0
GitLeaks,test_files/requirements.txt,success,,○  │╲  │ ○  ○ ░  ░ gitleaks [90m6:37PM[0m [32mINF[0m 1 commits scanned. [90m6:37PM[0m [32mINF[0m scan completed in 164ms [90m6:37PM[0m [32mINF[0m no leaks found,List(),0,0,0,0
Semgrep,test_files/unsafe.sql,success,"{""version"":""1.137.1"",""results"":[],""errors"":[],""paths"":{""scanned"":[""test_files/unsafe.sql""]},""time"":{""rules"":[],""rules_parse_time"":1.524090051651001,""profiling_times"":{""config_time"":1.7546801567077637,""core_time"":1.9465739727020264,""ignores_time"":0.0013229846954345703,""total_time"":3.70363450050354},""parsing_time"":{""total_time"":0.0,""per_file_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""scanning_time"":{""total_time"":0.003410816192626953,""per_file_time"":{""mean"":0.0017054080963134766,""std_dev"":8.613830004833289e-08},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""matching_time"":{""total_time"":0.0,""per_file_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_files"":[]},""tainting_time"":{""total_time"":0.0,""per_def_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_defs"":[]},""fixpoint_timeouts"":[],""prefiltering"":{""project_level_time"":0.0,""file_level_time"":0.0,""rules_with_project_prefilters_ratio"":0.0,""rules_with_file_prefilters_ratio"":1.0,""rules_selected_ratio"":0.0,""rules_matched_ratio"":0.0},""targets"":[],""total_bytes"":0,""max_memory_bytes"":1107031424},""engine_requested"":""OSS"",""skipped_rules"":[]}",,List(),0,0,0,0
pip-audit,test_files/unsafe.sql,skipped,,File type not supported by pip-audit,List(),0,0,0,0


In [0]:
result = sast_runner.scan_file("test_files/unsafe.py")

INFO:__main__:Starting SAST scan for file: test_files/unsafe.py
INFO:__main__:Running Semgrep...
INFO:__main__:Semgrep completed: success
INFO:__main__:Running pip-audit...
INFO:__main__:pip-audit completed: skipped
INFO:__main__:Running SQLFluff...
INFO:__main__:SQLFluff completed: skipped
INFO:__main__:Running GitLeaks...
INFO:__main__:GitLeaks completed: success


In [0]:
display(result)

scanner,file_path,status,raw_output,error_output,findings,pass_count,warn_count,fail_count,total_issues
Semgrep,test_files/unsafe.py,success,"{""version"":""1.137.1"",""results"":[{""check_id"":""python.lang.security.audit.eval-detected.eval-detected"",""path"":""test_files/unsafe.py"",""start"":{""line"":9,""col"":12,""offset"":191},""end"":{""line"":9,""col"":28,""offset"":207},""extra"":{""message"":""Detected the use of eval(). eval() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources."",""metadata"":{""source-rule-url"":""https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b307-eval"",""cwe"":[""CWE-95: Improper Neutralization of Directives in Dynamically Evaluated Code ('Eval Injection')""],""owasp"":[""A03:2021 - Injection""],""asvs"":{""control_id"":""5.2.4 Dyanmic Code Execution Features"",""control_url"":""https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v52-sanitization-and-sandboxing-requirements"",""section"":""V5: Validation, Sanitization and Encoding Verification Requirements"",""version"":""4""},""category"":""security"",""technology"":[""python""],""references"":[""https://owasp.org/Top10/A03_2021-Injection""],""subcategory"":[""audit""],""likelihood"":""LOW"",""impact"":""HIGH"",""confidence"":""LOW"",""license"":""Semgrep Rules License v1.0. For more details, visit semgrep.dev/legal/rules-license"",""vulnerability_class"":[""Code Injection""],""source"":""https://semgrep.dev/r/python.lang.security.audit.eval-detected.eval-detected"",""shortlink"":""https://sg.run/ZvrD""},""severity"":""WARNING"",""fingerprint"":""requires login"",""lines"":""requires login"",""validation_state"":""NO_VALIDATOR"",""engine_kind"":""OSS""}}],""errors"":[],""paths"":{""scanned"":[""test_files/unsafe.py""]},""time"":{""rules"":[],""rules_parse_time"":1.4533069133758545,""profiling_times"":{""config_time"":1.7344183921813965,""core_time"":4.763259410858154,""ignores_time"":0.0015249252319335938,""total_time"":6.500217437744141},""parsing_time"":{""total_time"":0.0,""per_file_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""scanning_time"":{""total_time"":0.03985309600830078,""per_file_time"":{""mean"":0.013284365336100262,""std_dev"":0.0002715887869094836},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_files"":[]},""matching_time"":{""total_time"":0.0,""per_file_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_files"":[]},""tainting_time"":{""total_time"":0.0,""per_def_and_rule_time"":{""mean"":0.0,""std_dev"":0.0},""very_slow_stats"":{""time_ratio"":0.0,""count_ratio"":0.0},""very_slow_rules_on_defs"":[]},""fixpoint_timeouts"":[],""prefiltering"":{""project_level_time"":0.0,""file_level_time"":0.0,""rules_with_project_prefilters_ratio"":0.0,""rules_with_file_prefilters_ratio"":0.9896907216494846,""rules_selected_ratio"":0.024054982817869417,""rules_matched_ratio"":0.024054982817869417},""targets"":[],""total_bytes"":0,""max_memory_bytes"":1107293568},""engine_requested"":""OSS"",""skipped_rules"":[]}",,"List(List(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, python.lang.security.audit.eval-detected.eval-detected, List(28, 9, 207), List(OSS, requires login, requires login, Detected the use of eval(). eval() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources., List(List(5.2.4 Dyanmic Code Execution Features, https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v52-sanitization-and-sandboxing-requirements, V5: Validation, Sanitization and Encoding Verification Requirements, 4), security, LOW, List(CWE-95: Improper Neutralization of Directives in Dynamically Evaluated Code ('Eval Injection')), HIGH, Semgrep Rules License v1.0. For more details, visit semgrep.dev/legal/rules-license, LOW, List(A03:2021 - Injection), List(https://owasp.org/Top10/A03_2021-Injection), https://sg.run/ZvrD, https://semgrep.dev/r/python.lang.security.audit.eval-detected.eval-detected, https://bandit.readthedocs.io/en/latest/blacklists/blacklist_calls.html#b307-eval, List(audit), List(python), List(Code Injection)), WARNING, NO_VALIDATOR), test_files/unsafe.py, List(12, 9, 191)))",0,1,0,1
pip-audit,test_files/unsafe.py,skipped,,File type not supported by pip-audit,List(),0,0,0,0
SQLFluff,test_files/unsafe.py,skipped,,No SQL statements found in file,List(),0,0,0,0
GitLeaks,test_files/unsafe.py,success,,○  │╲  │ ○  ○ ░  ░ gitleaks [90m6:47PM[0m [32mINF[0m 1 commits scanned. [90m6:47PM[0m [32mINF[0m scan completed in 152ms [90m6:47PM[0m [31mWRN[0m leaks found: 1,"List(List(Test User, c58dadbf95a08605f987200244f16019d49f7a44, 2025-09-24T18:47:36Z, Generic API Key, test@example.com, 32, 6, 4.2479277, unsafe.py, c58dadbf95a08605f987200244f16019d49f7a44:unsafe.py:generic-api-key:6, api_key = ""sk-1234567890abcdef"", test, generic-api-key, sk-1234567890abcdef, 2, 6, , List(), null, null, null, null, null))",0,0,1,1
