# AI-Powered Security Scanner Agent with Gemini - Vertex AI Workbench

This notebook demonstrates how to build an **AI-powered** security scanning agent using:
- **Gemini API** from Google AI Studio (free tier)
- **GCP Agent Development Kit** for orchestration
- **Semgrep** for security scanning
- **Vertex AI** for deployment

## What Makes This AI-Powered?
- Gemini analyzes scan results and provides intelligent insights
- AI explains vulnerabilities in natural language
- Prioritizes fixes based on context
- Suggests remediation code
- Answers follow-up questions about security issues

## Cell 1: Install Dependencies

In [4]:
# Install required packages
!pip install -q --upgrade pip
!pip install -q google-cloud-aiplatform[reasoningengine]
!pip install -q google-genai
!pip install -q semgrep
!pip install -q gitpython
!pip install -q flask

print("‚úÖ All dependencies installed successfully!")

‚úÖ All dependencies installed successfully!


## Cell 2: Configure API Keys and GCP Settings

‚ö†Ô∏è **IMPORTANT**: Get your Gemini API key from [Google AI Studio](https://aistudio.google.com/app/apikey)

In [12]:
import os

# ================== CONFIGURE THESE ===================
# Get your free Gemini API key from: https://aistudio.google.com/app/apikey
GEMINI_API_KEY = "<enter API Key>"  # Replace with your API key

# GCP Configuration
PROJECT_ID = "<enter project ID>"  # Replace with your GCP project ID
LOCATION = "<Enter Location>"  # Or your preferred region
STAGING_BUCKET = "gs://<bucket-name>"  # Replace with your GCS bucket
# =====================================================

# Set environment variables
os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY
os.environ["GCP_PROJECT_ID"] = PROJECT_ID
os.environ["GCP_LOCATION"] = LOCATION
os.environ["GCS_STAGING_BUCKET"] = STAGING_BUCKET

print(f"‚úÖ Configuration set:")
print(f"   Gemini API Key: {'*' * 20}{GEMINI_API_KEY[-4:]}")
print(f"   Project ID: {PROJECT_ID}")
print(f"   Location: {LOCATION}")
print(f"   Staging Bucket: {STAGING_BUCKET}")

‚úÖ Configuration set:
   Gemini API Key: ********************LGzQ
   Project ID: steam-aria-479903-s9
   Location: us-central1
   Staging Bucket: gs://dataset-bucket-imran


## Cell 3: Test Gemini API Connection

In [13]:
from google import genai
from google.genai import types

# Configure Gemini client
client = genai.Client(api_key=GEMINI_API_KEY)

# First, let's list available models to see what's supported
print("üîç Checking available models...\n")
try:
    models = client.models.list()
    print("Available models:")
    for model in models:
        print(f"  - {model.name}")
    print()
except Exception as e:
    print(f"Could not list models: {e}\n")

# Try with gemini-2.0-flash-exp (latest experimental model)
try:
    response = client.models.generate_content(
        model='models/gemini-2.0-flash-exp',
        contents="Say 'Hello! I'm ready to analyze security vulnerabilities!'"
    )
    print("ü§ñ Gemini Response:")
    print(response.text)
    print("\n‚úÖ Gemini API is working correctly!")
except Exception as e:
    print(f"‚ùå Error with gemini-2.0-flash-exp: {e}")
    
    # Fallback to gemini-1.5-pro
    try:
        print("\nTrying gemini-1.5-pro...")
        response = client.models.generate_content(
            model='models/gemini-1.5-pro',
            contents="Say 'Hello! I'm ready to analyze security vulnerabilities!'"
        )
        print("ü§ñ Gemini Response:")
        print(response.text)
        print("\n‚úÖ Gemini API is working correctly!")
    except Exception as e2:
        print(f"‚ùå Error with gemini-1.5-pro: {e2}")

üîç Checking available models...

Available models:
  - models/embedding-gecko-001
  - models/gemini-2.5-flash
  - models/gemini-2.5-pro
  - models/gemini-2.0-flash-exp
  - models/gemini-2.0-flash
  - models/gemini-2.0-flash-001
  - models/gemini-2.0-flash-exp-image-generation
  - models/gemini-2.0-flash-lite-001
  - models/gemini-2.0-flash-lite
  - models/gemini-2.0-flash-lite-preview-02-05
  - models/gemini-2.0-flash-lite-preview
  - models/gemini-exp-1206
  - models/gemini-2.5-flash-preview-tts
  - models/gemini-2.5-pro-preview-tts
  - models/gemma-3-1b-it
  - models/gemma-3-4b-it
  - models/gemma-3-12b-it
  - models/gemma-3-27b-it
  - models/gemma-3n-e4b-it
  - models/gemma-3n-e2b-it
  - models/gemini-flash-latest
  - models/gemini-flash-lite-latest
  - models/gemini-pro-latest
  - models/gemini-2.5-flash-lite
  - models/gemini-2.5-flash-image-preview
  - models/gemini-2.5-flash-image
  - models/gemini-2.5-flash-preview-09-2025
  - models/gemini-2.5-flash-lite-preview-09-2025
  - 

## Cell 4: Create Sample Vulnerable Application

In [14]:
%%writefile sample_app.py
"""
Sample vulnerable Python application for security scanning demo
This intentionally contains security vulnerabilities for demonstration purposes
"""

import os
import pickle
import subprocess
import sqlite3
from flask import Flask, request, render_template_string

app = Flask(__name__)

# Vulnerability 1: Hardcoded credentials
DATABASE_PASSWORD = "admin123"
API_KEY = "sk-1234567890abcdef"

# Vulnerability 2: SQL Injection
@app.route('/user')
def get_user():
    user_id = request.args.get('id')
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    # Vulnerable to SQL injection
    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)
    result = cursor.fetchone()
    return str(result)

# Vulnerability 3: Command Injection
@app.route('/ping')
def ping():
    host = request.args.get('host')
    # Vulnerable to command injection
    result = os.system(f"ping -c 1 {host}")
    return f"Ping result: {result}"

# Vulnerability 4: Insecure Deserialization
@app.route('/load_data', methods=['POST'])
def load_data():
    data = request.data
    # Vulnerable to insecure deserialization
    obj = pickle.loads(data)
    return str(obj)

# Vulnerability 5: Server-Side Template Injection (SSTI)
@app.route('/hello')
def hello():
    name = request.args.get('name', 'Guest')
    # Vulnerable to SSTI
    template = f"<h1>Hello {name}!</h1>"
    return render_template_string(template)

# Vulnerability 6: Path Traversal
@app.route('/read_file')
def read_file():
    filename = request.args.get('file')
    # Vulnerable to path traversal
    with open(f'/var/data/{filename}', 'r') as f:
        return f.read()

# Vulnerability 7: Weak Cryptography
def encrypt_data(data):
    import hashlib
    # Using MD5 (weak hash)
    return hashlib.md5(data.encode()).hexdigest()

# Vulnerability 8: Debug mode in production
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')

print("‚úÖ Sample vulnerable application created!")

Writing sample_app.py


## Cell 5: Define AI-Powered Security Scanner Agent

This agent uses Gemini to:
- Analyze scan results intelligently
- Explain vulnerabilities in plain language
- Prioritize fixes
- Suggest remediation code

In [17]:
import os
import subprocess
import tempfile
import shutil
from typing import Dict, List, Any
import json
from google import genai
from google.genai import types

class AISecurityScannerAgent:
    """
    AI-Powered Security Scanner Agent that uses Gemini for intelligent analysis
    """
    
    def __init__(self, gemini_api_key: str):
        """Initialize the AI Security Scanner Agent"""
        self.gemini_api_key = gemini_api_key
        self.client = genai.Client(api_key=gemini_api_key)
        self.model_name = 'models/gemini-2.0-flash-exp'  # Updated to 2.0 flash
        self.semgrep_installed = self._check_semgrep()
        
    def _check_semgrep(self) -> bool:
        """Check if Semgrep is installed"""
        try:
            subprocess.run(
                ["semgrep", "--version"],
                check=True,
                capture_output=True,
                text=True
            )
            return True
        except (subprocess.CalledProcessError, FileNotFoundError):
            print("Semgrep not found. Installing...")
            self._install_semgrep()
            return True
    
    def _install_semgrep(self):
        """Install Semgrep"""
        try:
            subprocess.run(
                ["pip", "install", "semgrep"],
                check=True
            )
            print("Semgrep installed successfully")
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"Failed to install Semgrep: {e}")
    
    def clone_repository(self, repo_url: str, temp_dir: str) -> str:
        """
        Clone a GitHub repository to a temporary directory
        
        Args:
            repo_url: GitHub repository URL
            temp_dir: Temporary directory path
            
        Returns:
            Path to the cloned repository
        """
        try:
            print(f"Cloning repository: {repo_url}")
            subprocess.run(
                ["git", "clone", repo_url, temp_dir],
                check=True,
                capture_output=True,
                text=True
            )
            print(f"Repository cloned to: {temp_dir}")
            return temp_dir
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"Failed to clone repository: {e.stderr}")
    
    def run_semgrep_scan(self, repo_path: str, rules: str = "auto") -> Dict[str, Any]:
        """
        Run Semgrep scan on the repository
        
        Args:
            repo_path: Path to the repository
            rules: Semgrep rules to use (default: "auto")
            
        Returns:
            Dictionary containing scan results
        """
        try:
            print(f"Running Semgrep scan on: {repo_path}")
            
            result = subprocess.run(
                [
                    "semgrep",
                    "--config", rules,
                    "--json",
                    "--quiet",
                    repo_path
                ],
                capture_output=True,
                text=True,
                timeout=300
            )
            
            scan_results = json.loads(result.stdout)
            
            print(f"Scan completed. Found {len(scan_results.get('results', []))} issues")
            return scan_results
            
        except subprocess.TimeoutExpired:
            raise RuntimeError("Semgrep scan timed out")
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"Semgrep scan failed: {e.stderr}")
        except json.JSONDecodeError as e:
            raise RuntimeError(f"Failed to parse Semgrep output: {e}")
    
    def ai_analyze_vulnerability(self, issue: Dict[str, Any]) -> str:
        """
        Use Gemini AI to analyze a specific vulnerability
        
        Args:
            issue: Single vulnerability from Semgrep results
            
        Returns:
            AI-generated analysis and recommendations
        """
        prompt = f"""
You are a security expert analyzing code vulnerabilities. Analyze this security issue:

**Vulnerability Type**: {issue.get('check_id', 'Unknown')}
**Severity**: {issue.get('extra', {}).get('severity', 'Unknown')}
**Message**: {issue.get('extra', {}).get('message', 'No message')}
**File**: {issue.get('path', 'Unknown')}
**Line**: {issue.get('start', {}).get('line', 'Unknown')}
**Code**:
```
{issue.get('extra', {}).get('lines', 'No code available')}
```

Provide:
1. A brief explanation of the vulnerability (2-3 sentences)
2. Why this is dangerous (1-2 sentences)
3. How to fix it with example code (if applicable)

Keep it concise and practical.
"""
        
        try:
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=prompt
            )
            return response.text
        except Exception as e:
            return f"AI analysis unavailable: {str(e)}"
    
    def ai_generate_executive_summary(self, scan_results: Dict[str, Any]) -> str:
        """
        Use Gemini AI to generate an executive summary of all findings
        
        Args:
            scan_results: Complete Semgrep scan results
            
        Returns:
            AI-generated executive summary
        """
        results = scan_results.get('results', [])
        
        if not results:
            return "‚úÖ No security vulnerabilities found. The code appears to be secure."
        
        # Prepare summary data
        severity_counts = {}
        vulnerability_types = {}
        
        for issue in results:
            severity = issue.get('extra', {}).get('severity', 'UNKNOWN')
            check_id = issue.get('check_id', 'Unknown')
            
            severity_counts[severity] = severity_counts.get(severity, 0) + 1
            vulnerability_types[check_id] = vulnerability_types.get(check_id, 0) + 1
        
        prompt = f"""
You are a security analyst preparing an executive summary. Based on the security scan results:

**Total Vulnerabilities**: {len(results)}
**Severity Distribution**: {json.dumps(severity_counts, indent=2)}
**Top Vulnerability Types**: {json.dumps(list(vulnerability_types.items())[:5], indent=2)}

Create a brief executive summary (3-4 paragraphs) that:
1. Summarizes the overall security posture
2. Highlights the most critical issues
3. Recommends priority actions
4. Provides a risk assessment

Use clear, non-technical language suitable for management.
"""
        
        try:
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=prompt
            )
            return response.text
        except Exception as e:
            return f"AI summary unavailable: {str(e)}"
    
    def format_results_with_ai(self, scan_results: Dict[str, Any], include_ai_analysis: bool = True) -> str:
        """
        Format scan results with optional AI analysis
        
        Args:
            scan_results: Raw Semgrep scan results
            include_ai_analysis: Whether to include AI analysis for each issue
            
        Returns:
            Formatted report string with AI insights
        """
        results = scan_results.get('results', [])
        errors = scan_results.get('errors', [])
        
        if not results and not errors:
            return "‚úÖ No security issues found! The code looks clean."
        
        report = []
        report.append("=" * 80)
        report.append("ü§ñ AI-POWERED SECURITY SCAN REPORT")
        report.append("=" * 80)
        
        # AI Executive Summary
        print("\nü§ñ Generating AI Executive Summary...")
        executive_summary = self.ai_generate_executive_summary(scan_results)
        report.append("\nüìä EXECUTIVE SUMMARY (AI-Generated)")
        report.append("=" * 80)
        report.append(executive_summary)
        report.append("\n" + "=" * 80)
        
        report.append(f"\nTotal Issues Found: {len(results)}")
        report.append("")
        
        # Group by severity
        severity_groups = {}
        for issue in results:
            severity = issue.get('extra', {}).get('severity', 'INFO')
            if severity not in severity_groups:
                severity_groups[severity] = []
            severity_groups[severity].append(issue)
        
        # Display by severity
        for severity in ['ERROR', 'WARNING', 'INFO']:
            if severity in severity_groups:
                issues = severity_groups[severity]
                report.append(f"\n{'='*80}")
                report.append(f"üî¥ {severity} LEVEL ISSUES ({len(issues)} found)")
                report.append(f"{'='*80}")
                
                for idx, issue in enumerate(issues, 1):
                    report.append(f"\n{idx}. {issue.get('check_id', 'Unknown')}")
                    report.append(f"   Severity: {severity}")
                    report.append(f"   Message: {issue.get('extra', {}).get('message', 'No message')}")
                    report.append(f"   File: {issue.get('path', 'Unknown')}")
                    report.append(f"   Line: {issue.get('start', {}).get('line', 'Unknown')}")
                    
                    # Show code snippet
                    code = issue.get('extra', {}).get('lines', '')
                    if code:
                        report.append(f"   Code:")
                        for line in code.strip().split('\n'):
                            report.append(f"      {line}")
                    
                    # AI Analysis for critical issues
                    if include_ai_analysis and severity in ['ERROR', 'WARNING']:
                        print(f"   ü§ñ Analyzing issue {idx}/{len(issues)}...")
                        ai_analysis = self.ai_analyze_vulnerability(issue)
                        report.append(f"\n   ü§ñ AI ANALYSIS:")
                        for line in ai_analysis.split('\n'):
                            report.append(f"   {line}")
                        report.append("")
        
        if errors:
            report.append(f"\n{'='*80}")
            report.append(f"SCAN ERRORS ({len(errors)} found)")
            report.append(f"{'='*80}")
            for error in errors:
                report.append(f"  - {error}")
        
        report.append("\n" + "=" * 80)
        
        return "\n".join(report)
    
    def scan_github_repo(self, repo_url: str, rules: str = "auto", include_ai_analysis: bool = True) -> str:
        """
        Main method to scan a GitHub repository with AI analysis
        
        Args:
            repo_url: GitHub repository URL
            rules: Semgrep rules to use (default: "auto")
            include_ai_analysis: Whether to include AI analysis
            
        Returns:
            Formatted security scan report with AI insights
        """
        temp_dir = None
        
        try:
            # Create temporary directory
            temp_dir = tempfile.mkdtemp(prefix="security_scan_")
            
            # Clone repository
            repo_path = self.clone_repository(repo_url, temp_dir)
            
            # Run scan
            scan_results = self.run_semgrep_scan(repo_path, rules)
            
            # Format and return results with AI analysis
            report = self.format_results_with_ai(scan_results, include_ai_analysis)
            
            return report
            
        except Exception as e:
            return f"‚ùå Error during security scan: {str(e)}"
            
        finally:
            # Cleanup temporary directory
            if temp_dir and os.path.exists(temp_dir):
                shutil.rmtree(temp_dir)
                print(f"Cleaned up temporary directory: {temp_dir}")
    
    def chat_about_vulnerability(self, question: str, context: str = "") -> str:
        """
        Chat with AI about security vulnerabilities
        
        Args:
            question: User's question about security
            context: Optional context from scan results
            
        Returns:
            AI response
        """
        prompt = f"""
You are a security expert. Answer this question about security vulnerabilities:

Question: {question}

{f'Context from scan: {context}' if context else ''}

Provide a clear, practical answer.
"""
        
        try:
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=prompt
            )
            return response.text
        except Exception as e:
            return f"Error: {str(e)}"

print("‚úÖ AISecurityScannerAgent class defined successfully!")

‚úÖ AISecurityScannerAgent class defined successfully!


## Cell 6: Create and Test AI Agent Locally

In [18]:
# Create AI agent instance
ai_agent = AISecurityScannerAgent(gemini_api_key=GEMINI_API_KEY)

print("ü§ñ AI Security Scanner Agent initialized!")
print("üîç Running security scan on sample_app.py with AI analysis...\n")

# Scan the current directory (includes sample_app.py)
scan_results = ai_agent.run_semgrep_scan(".", rules="auto")
report = ai_agent.format_results_with_ai(scan_results, include_ai_analysis=True)

print(report)

ü§ñ AI Security Scanner Agent initialized!
üîç Running security scan on sample_app.py with AI analysis...

Running Semgrep scan on: .
Scan completed. Found 38 issues

ü§ñ Generating AI Executive Summary...
   ü§ñ Analyzing issue 1/26...
   ü§ñ Analyzing issue 2/26...
   ü§ñ Analyzing issue 3/26...
   ü§ñ Analyzing issue 4/26...
   ü§ñ Analyzing issue 5/26...
   ü§ñ Analyzing issue 6/26...
   ü§ñ Analyzing issue 7/26...
   ü§ñ Analyzing issue 8/26...
   ü§ñ Analyzing issue 9/26...
   ü§ñ Analyzing issue 10/26...
   ü§ñ Analyzing issue 11/26...
   ü§ñ Analyzing issue 12/26...
   ü§ñ Analyzing issue 13/26...
   ü§ñ Analyzing issue 14/26...
   ü§ñ Analyzing issue 15/26...
   ü§ñ Analyzing issue 16/26...
   ü§ñ Analyzing issue 17/26...
   ü§ñ Analyzing issue 18/26...
   ü§ñ Analyzing issue 19/26...
   ü§ñ Analyzing issue 20/26...
   ü§ñ Analyzing issue 21/26...
   ü§ñ Analyzing issue 22/26...
   ü§ñ Analyzing issue 23/26...
   ü§ñ Analyzing issue 24/26...
   ü§ñ 

## Cell 7: Test AI Chat Feature

Ask the AI agent questions about security vulnerabilities.

In [19]:
# Ask AI about SQL injection
question = "What is SQL injection and how can I prevent it in Python?"
answer = ai_agent.chat_about_vulnerability(question)

print(f"‚ùì Question: {question}\n")
print(f"ü§ñ AI Response:\n{answer}")

‚ùì Question: What is SQL injection and how can I prevent it in Python?

ü§ñ AI Response:
Okay, let's break down SQL injection and how to defend against it in Python.

**What is SQL Injection?**

SQL injection (SQLi) is a type of security vulnerability that occurs when a malicious user is able to insert (or "inject") arbitrary SQL code into a database query.  Essentially, they're tricking your application into executing commands that the developer never intended. This can lead to:

*   **Data breaches:** Attackers can steal sensitive data like user credentials, financial information, or proprietary business data.
*   **Data manipulation:** They can modify, delete, or insert data, potentially corrupting your database and impacting application functionality.
*   **Authentication bypass:** Attackers might bypass login mechanisms and gain administrative access.
*   **Denial-of-service (DoS):** In some cases, they can disrupt the availability of your application by overloading the database

## Cell 8: Test with GitHub Repository (Optional)

In [None]:
# Test with a GitHub repository
# Replace with your repository URL
test_repo_url = "https://github.com/yourusername/your-repo.git"

# Uncomment to run (make sure to replace the URL first)
# print(f"üîç Scanning GitHub repository with AI analysis: {test_repo_url}\n")
# report = ai_agent.scan_github_repo(test_repo_url, include_ai_analysis=True)
# print(report)

## Cell 9: Create Agent Function for Vertex AI Deployment

In [26]:
class DeployableAIAgent:
    """Wrapper class for deployment to Vertex AI"""
    
    def __init__(self, gemini_api_key: str):
        """Initialize with Gemini API key"""
        self.gemini_api_key = gemini_api_key
        # Don't initialize the agent here - do it in query() instead
    
    def query(self, repo_url: str = None, rules: str = "auto", 
              include_ai_analysis: bool = True, question: str = None) -> str:
        """
        Query endpoint for the AI agent
        
        Args:
            repo_url: GitHub repository URL to scan (optional if just asking a question)
            rules: Semgrep rules to use (default: "auto")
            include_ai_analysis: Whether to include AI analysis (default: True)
            question: Optional question to ask the AI about security
            
        Returns:
            Security scan report with AI insights or answer to question
        """
        # Initialize agent here (not in __init__) to avoid pickle issues
        agent = AISecurityScannerAgent(gemini_api_key=self.gemini_api_key)
        
        # If just asking a question
        if question and not repo_url:
            return agent.chat_about_vulnerability(question)
        
        # If scanning a repository
        if repo_url:
            return agent.scan_github_repo(repo_url, rules, include_ai_analysis)
        
        return "Please provide either a repo_url to scan or a question to ask."

print("‚úÖ DeployableAIAgent class created!")

‚úÖ DeployableAIAgent class created!


## Cell 10: Initialize Vertex AI

In [21]:
from google.cloud import aiplatform

# Initialize Vertex AI
aiplatform.init(
    project=PROJECT_ID,
    location=LOCATION,
    staging_bucket=STAGING_BUCKET
)

print(f"‚úÖ Vertex AI initialized")
print(f"   Project: {PROJECT_ID}")
print(f"   Location: {LOCATION}")
print(f"   Staging Bucket: {STAGING_BUCKET}")

‚úÖ Vertex AI initialized
   Project: steam-aria-479903-s9
   Location: us-central1
   Staging Bucket: gs://dataset-bucket-imran


## Cell 11: Deploy AI Agent to Vertex AI

‚ö†Ô∏è This may take 5-10 minutes to complete.

In [27]:
from vertexai.preview import reasoning_engines

print("üöÄ Deploying AI Security Scanner Agent to Vertex AI...")
print("   This may take 5-10 minutes...\n")

# Create the reasoning engine with the deployable agent class
agent_app = reasoning_engines.ReasoningEngine.create(
    DeployableAIAgent(GEMINI_API_KEY),  # Pass the class instance, not a function
    requirements=[
        "google-cloud-aiplatform[reasoningengine]",
        "google-genai",  # Changed from google-generativeai
        "semgrep",
        "gitpython"
    ],
    display_name="ai-security-scanner-agent",
    description="AI-powered agent that scans GitHub repositories for security vulnerabilities using Semgrep and Gemini",
    extra_packages=[]
)

print(f"\n‚úÖ AI Agent deployed successfully!")
print(f"\nResource name: {agent_app.resource_name}")
print(f"\nüìù Save this resource name for querying the agent later!")

üöÄ Deploying AI Security Scanner Agent to Vertex AI...
   This may take 5-10 minutes...

Using bucket dataset-bucket-imran
Writing to gs://dataset-bucket-imran/reasoning_engine/reasoning_engine.pkl
Writing to gs://dataset-bucket-imran/reasoning_engine/requirements.txt
Creating in-memory tarfile of extra_packages
Writing to gs://dataset-bucket-imran/reasoning_engine/dependencies.tar.gz
Creating ReasoningEngine
Create ReasoningEngine backing LRO: projects/104435755698/locations/us-central1/reasoningEngines/562186892351635456/operations/1870835991191748608
ReasoningEngine created. Resource name: projects/104435755698/locations/us-central1/reasoningEngines/562186892351635456
To use this ReasoningEngine in another session:
reasoning_engine = vertexai.preview.reasoning_engines.ReasoningEngine('projects/104435755698/locations/us-central1/reasoningEngines/562186892351635456')

‚úÖ AI Agent deployed successfully!

Resource name: projects/104435755698/locations/us-central1/reasoningEngines/562

## Cell 12: Query the Deployed AI Agent with Repository Scan

In [28]:
# Query the deployed AI agent
test_repo = "https://github.com/yourusername/your-repo.git"  # Replace with your repo

print(f"üîç Scanning repository with AI analysis: {test_repo}\n")

# Query the agent with AI analysis enabled
response = agent_app.query(
    repo_url=test_repo,
    rules="auto",
    include_ai_analysis=True
)

print(response)

üîç Scanning repository with AI analysis: https://github.com/yourusername/your-repo.git

‚ùå Error during security scan: Failed to clone repository: Cloning into '/tmp/security_scan_ke951aoo'...
fatal: could not read Username for 'https://github.com': No such device or address



## Cell 13: Query the AI Agent with Security Questions

In [29]:
# Ask the AI agent security questions
questions = [
    "What are the OWASP Top 10 vulnerabilities?",
    "How can I prevent command injection in Python?",
    "What's the difference between authentication and authorization?"
]

for question in questions:
    print(f"\n{'='*80}")
    print(f"‚ùì Question: {question}")
    print(f"{'='*80}")
    
    response = agent_app.query(question=question, repo_url=None)
    print(f"\nü§ñ Answer:\n{response}")


‚ùì Question: What are the OWASP Top 10 vulnerabilities?

ü§ñ Answer:
Okay, here's a breakdown of the OWASP Top 10 vulnerabilities, presented in a clear and practical manner. Think of this as your quick reference guide for the most critical web application security risks:

**What is the OWASP Top 10?**

The OWASP (Open Web Application Security Project) Top 10 is a regularly updated *awareness document* representing a broad consensus about the most critical security risks to web applications.  It's *not* a standard or a law, but it's widely considered the de facto standard for prioritizing security efforts. It's based on real-world vulnerability data and developer community input.  Knowing these vulnerabilities is a crucial starting point for building and maintaining secure web applications.

**The 2021 OWASP Top 10 (The Most Recent Edition):**

Here's a summary, with explanations and practical mitigations:

1.  **A01:2021 ‚Äì Broken Access Control:**

    *   **What it is:**  Failure

## Cell 14: Compare Scans With and Without AI Analysis

In [30]:
import time

test_repo = "https://github.com/1mr0-tech/k3s-suite.git"  # Replace

print("üìä Comparing scan performance with and without AI analysis\n")

# Without AI analysis
print("1Ô∏è‚É£ Running scan WITHOUT AI analysis...")
start = time.time()
response_no_ai = agent_app.query(
    repo_url=test_repo,
    include_ai_analysis=False
)
time_no_ai = time.time() - start
print(f"   ‚è±Ô∏è  Time: {time_no_ai:.2f} seconds\n")

# With AI analysis
print("2Ô∏è‚É£ Running scan WITH AI analysis...")
start = time.time()
response_with_ai = agent_app.query(
    repo_url=test_repo,
    include_ai_analysis=True
)
time_with_ai = time.time() - start
print(f"   ‚è±Ô∏è  Time: {time_with_ai:.2f} seconds\n")

print(f"\nüìà Results:")
print(f"   Without AI: {time_no_ai:.2f}s")
print(f"   With AI: {time_with_ai:.2f}s")
print(f"   AI adds: {time_with_ai - time_no_ai:.2f}s of intelligent analysis")

üìä Comparing scan performance with and without AI analysis

1Ô∏è‚É£ Running scan WITHOUT AI analysis...
   ‚è±Ô∏è  Time: 115.13 seconds

2Ô∏è‚É£ Running scan WITH AI analysis...
   ‚è±Ô∏è  Time: 166.36 seconds


üìà Results:
   Without AI: 115.13s
   With AI: 166.36s
   AI adds: 51.22s of intelligent analysis


## Cell 15: List All Deployed Agents

In [31]:
# List all reasoning engines in your project
agents = reasoning_engines.ReasoningEngine.list()

print("üìã Deployed AI Agents:\n")
for agent in agents:
    print(f"Display Name: {agent.display_name}")
    print(f"Resource Name: {agent.resource_name}")
    print(f"Create Time: {agent.create_time}")
    print("-" * 80)

üìã Deployed AI Agents:

Display Name: ai-security-scanner-agent
Resource Name: projects/104435755698/locations/us-central1/reasoningEngines/562186892351635456
Create Time: 2025-12-19 18:52:56.542019+00:00
--------------------------------------------------------------------------------


## Cell 16: Interactive AI Security Assistant

In [None]:
# Interactive mode - ask multiple questions
print("ü§ñ AI Security Assistant Ready!")
print("Ask questions about security vulnerabilities...\n")

# Example questions you can ask
example_questions = [
    "What is XSS and how do I prevent it?",
    "Explain CSRF attacks",
    "What are the best practices for password storage?",
    "How does HTTPS work?",
    "What is the principle of least privilege?"
]

# Ask a custom question (modify as needed)
your_question = "What is the difference between SQL injection and command injection?"

print(f"‚ùì Your Question: {your_question}\n")
answer = agent_app.query(question=your_question, repo_url=None)
print(f"ü§ñ AI Answer:\n{answer}")

## Cell 17: Clean Up - Delete Agent (Optional)

‚ö†Ô∏è Only run this if you want to delete the deployed agent.

In [33]:
# Delete the agent
print("üóëÔ∏è Deleting AI agent...")
agent_app.delete()
print("‚úÖ AI agent deleted successfully!")

üóëÔ∏è Deleting AI agent...
Deleting ReasoningEngine : projects/104435755698/locations/us-central1/reasoningEngines/562186892351635456


NotFound: 404 The ReasoningEngine does not exist.

##SUMMARY

In [None]:
print("""
‚ïî‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïó
‚ïë            ü§ñ AI-POWERED SECURITY AGENT WORKSHOP COMPLETE! üéâ       ‚ïë
‚ïö‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïù

‚úÖ What We Accomplished:
   ‚Ä¢ Created an AI-powered Security Scanner Agent
   ‚Ä¢ Integrated Gemini API for intelligent analysis
   ‚Ä¢ Tested with vulnerable sample code
   ‚Ä¢ Deployed to Vertex AI
   ‚Ä¢ Scanned GitHub repositories with AI insights
   ‚Ä¢ Generated AI-powered executive summaries
   ‚Ä¢ Built an interactive security Q&A system

ü§ñ AI Features Demonstrated:
   ‚Ä¢ Vulnerability analysis and explanation
   ‚Ä¢ Executive summaries for management
   ‚Ä¢ Code fix suggestions
   ‚Ä¢ Risk assessment and prioritization
   ‚Ä¢ Natural language Q&A about security

üìö Next Steps:
   1. Integrate with your CI/CD pipeline
   2. Customize AI prompts for your organization
   3. Add more AI features (fix generation, code review)
   4. Set up automated scanning with AI reports
   5. Create dashboards with AI-generated insights

üîß Key API Endpoints:
   ‚Ä¢ Scan with AI: agent.query(repo_url="...", include_ai_analysis=True)
   ‚Ä¢ Ask question: agent.query(question="...", repo_url=None)
   ‚Ä¢ Quick scan: agent.query(repo_url="...", include_ai_analysis=False)

üí° AI Use Cases:
   ‚Ä¢ Developer education on vulnerabilities
   ‚Ä¢ Management reporting with executive summaries
   ‚Ä¢ Code review assistance
   ‚Ä¢ Security training and awareness
   ‚Ä¢ Automated remediation suggestions

üìñ Resources:
   ‚Ä¢ Gemini API: https://ai.google.dev/
   ‚Ä¢ Semgrep: https://semgrep.dev/docs
   ‚Ä¢ Vertex AI: https://cloud.google.com/vertex-ai/docs

üéØ Why This is AI-Powered:
   ‚Ä¢ Gemini analyzes vulnerabilities contextually
   ‚Ä¢ Generates human-readable explanations
   ‚Ä¢ Prioritizes issues based on risk
   ‚Ä¢ Suggests specific code fixes
   ‚Ä¢ Answers follow-up questions naturally

Thank you for participating! üôè
This agent combines the power of static analysis with AI intelligence!
""")