In [None]:
import os
import google.generativeai as genai
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())
genai.configure(api_key=os.environ["GEMINI_API_KEY"])

proModel = genai.GenerativeModel('gemini-2.5-pro')
flashModel = genai.GenerativeModel('gemini-2.5-flash')
flashLiteModel = genai.GenerativeModel('gemini-2.5-flash-lite')
import csv
import random
import pandas as pd
import re
import random
import ipaddress

ORG_NAMES = [
    "Nexus Dynamics", "Blue Horizon Initiative", "Vertex Solutions", "Evergreen Alliance", 
    "Ironclad Logistics", "Stellar Pathways", "Quantum Reach", "Silver Leaf Collective", 
    "Aetheric Systems", "Terraform Global", "Summit Peak Partners", "Opal Sky Media", 
    "Velocity Ventures", "Echo Chamber Arts", "Harbor Light Foundation", "Crestview Analytics", 
    "Titanium Core", "Aventine Research", "Prism Logic", "Nomad Gear Co.", 
    "Solaris Energy", "Brimstone Manufacturing", "Cloud Nine Software", "Atlas Mapping", 
    "Green Sprout Organic", "Fable & Lore", "Catalyst Consulting", "Zenith Point", 
    "Borealis Tech", "Moxie Marketing", "Iron River Finance", "Vanguard Heritage", 
    "Nebula Creative", "Arcane Security", "Pioneer Pulse", "Gilded Cage Design", 
    "North Star Education", "Pacific Rim Exports", "Ember Glow Hospitality", "Digital Drift", 
    "Stone Arch Masonry", "Swift Current Labs", "Nova Terra", "Mainframe Managed", 
    "Willow Creek Health", "Blackwood Industries", "Neon Pulse Entertainment", "Apex Legends Group", 
    "Sovereign Trust", "Kinetix Robotics", "Foresight Strategies", "Calyx Botany", 
    "Obsidian Operatives", "Maple Leaf Logistics", "Aeon Pharmaceuticals", "True North Travel", 
    "Golden Gate Gaming", "Bluebird Bio", "Symmetry Architecture", "Firebrand Media", 
    "Oasis Wellness", "Midnight Oil Studios", "Copperhead Cables", "Radiant Life", 
    "Structure & Form", "Infinity Loop", "Tidal Wave Sports", "Granite Shield", 
    "Astraeus Aerospace", "Urban Jungle Planning", "Hearth & Home", "Binary Star", 
    "Grizzly Peak", "Orchid Isle", "Wildfire Communications", "Sterling Silver", 
    "Vivid Vision", "Iron Oak Furniture", "Skyward Bound", "Common Ground", 
    "Iron Bridge Legal", "Phoenix Rising", "Deep Root Ecology", "Signal Flare", 
    "Modern Myth", "Clear Path", "Cinder & Ash", "Blue Marble", 
    "Falcon Heavy", "Paper Plane Publishing", "Hidden Gem", "Solid State", 
    "Verve & Vigor", "Top Tier", "Silent Spring", "Open Door", 
    "Great Lakes", "New Era", "White Label", "True Grit"
]
DOMAIN_ENDERS = [".net", ".org", ".com"]
YES_NO = ["Yes", "No"]
CSV_HEADERS = ["ID", "Category", "Description", "Input_1_Network_Scan_JSON", "Input_2_Org_Data_JSON", "Input_3_Current_Risks_JSON"]

def generate_valid_random_ipv4():
    """Generates a random, valid IPv4 address that is publicly routable."""
    while True:
        # Generate a random 32-bit integer representing an IPv4 address
        random_int = random.randint(0, 2**32 - 1)
        ip_obj = ipaddress.IPv4Address(random_int)
        
        # Check if the generated IP is in a reserved or private range
        if not ip_obj.is_reserved and not ip_obj.is_private:
            return str(ip_obj)

def create_new_data_values(anon):
    # Creating new csv to reach 1008 rows (12 * 84)
    data = []
    data.append(CSV_HEADERS)

    count = 1
    index = 1

    while count <= 84:
        if anon:
            with open('golden_set_12_anon.csv', mode='r', newline='', encoding='utf-8') as file:
                csv_reader = csv.DictReader(file)
                for row in csv_reader:
                    id_num = index

                    # Input 2 - Org Data
                    # Random for each one
                    mfa_1 = random.choice(YES_NO)
                    mfa_2 = random.choice(YES_NO)
                    mfa_3 = random.choice(YES_NO)
                    policy = random.choice(YES_NO)
                    training_1 = random.choice(YES_NO)
                    training_2 = random.choice(YES_NO)
                    input_2 = "\"Do you require MFA to access email?\":\""+mfa_1+"\",\"Do you require MFA to log into computers?\":\""+mfa_2+"\",\"Do you require MFA to access sensitive data systems?\":\""+mfa_3+"\",\"Does your organization have an employee acceptable use policy?\":\""+policy+"\",\"Does your organization do security awareness training for new employees?\":\""+training_1+"\",\"Does your organization do security awareness training for all employees at least once per year?\":\""+training_2+"\"}"

                    category = row["Category"]
                    description = row["Description"]
                    input_1 = row["Input_1_Network_Scan_JSON"]
                    input_3 = row["Input_3_Current_Risks_JSON"]

                    data.append([id_num, category, description, input_1, input_2, input_3])
                    print(data)

                    index = index + 1
        else:
            with open('golden_set_12_personal.csv', mode='r', newline='', encoding='utf-8') as file:
                csv_reader = csv.DictReader(file)
                for row in csv_reader:
                    id_num = index

                    # Input 2 - Org Data
                    # Random for each one
                    org_name = random.choice(ORG_NAMES)
                    clean_org_name = re.sub(r'[^a-zA-Z0-9 ]', '', org_name)
                    email_domain =  clean_org_name.replace(" ", "") + random.choice(DOMAIN_ENDERS)
                    website_domain = "www." + email_domain
                    external_ip = generate_valid_random_ipv4()
                    mfa_1 = random.choice(YES_NO)
                    mfa_2 = random.choice(YES_NO)
                    mfa_3 = random.choice(YES_NO)
                    policy = random.choice(YES_NO)
                    training_1 = random.choice(YES_NO)
                    training_2 = random.choice(YES_NO)
                    input_2 = "{\"text\":{\"Organization Name\":\""+org_name+"\",\"Email Domain\":\""+email_domain+"\",\"Website Domain\":\""+website_domain+"\",\"External IP\":\""+external_ip+"\",\"Do you require MFA to access email?\":\""+mfa_1+"\",\"Do you require MFA to log into computers?\":\""+mfa_2+"\",\"Do you require MFA to access sensitive data systems?\":\""+mfa_3+"\",\"Does your organization have an employee acceptable use policy?\":\""+policy+"\",\"Does your organization do security awareness training for new employees?\":\""+training_1+"\",\"Does your organization do security awareness training for all employees at least once per year?\":\""+training_2+"\"}"

                    category = row["Category"]
                    description = row["Description"]
                    input_1 = row["Input_1_Network_Scan_JSON"]
                    input_3 = row["Input_3_Current_Risks_JSON"]

                    data.append([id_num, category, description, input_1, input_2, input_3])
                    print(data)

                    index = index + 1

        # Add the row to the csv
        count = count + 1

    with open('golden_set_1008_anon.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerows(data)
    print("CSV file created succcessfully")

SCORING_HEADERS = ["ID", "Personal or Anon", "Score", "Reason"]


  from .autonotebook import tqdm as notebook_tqdm


In [6]:
import os
import csv
import json
import time
from dotenv import find_dotenv, load_dotenv
import google.generativeai as genai

# ==========================================
# SETUP & CONFIGURATION
# ==========================================
load_dotenv(find_dotenv())
genai.configure(api_key=os.environ['GEMINI_API_KEY']) 

MODEL_NAME = 'gemini-2.5-flash'
BASE_TEST_DIR = "Testing"
CSV_FILEPATH = "golden_set_1008_anon.csv" 

# ==========================================
# HTML GENERATOR (JSON to HTML directly)
# ==========================================
def generate_html_from_json(output_json: dict) -> str:
    """Parses the generated JSON output and builds a readable HTML report."""
    
    html = f"""
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>Cybersecurity Assessment Report</title>
        <style>
            body {{ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; line-height: 1.6; color: #333; max-width: 900px; margin: 2rem auto; padding: 0 2rem; }}
            h1, h2, h3 {{ color: #2c3e50; border-bottom: 2px solid #eee; padding-bottom: 0.5rem; }}
            .summary, .conclusion {{ background-color: #f8f9fa; padding: 1.5rem; border-left: 4px solid #007bff; border-radius: 4px; margin-bottom: 2rem; }}
            .risk-card {{ background: #fff; border: 1px solid #ddd; padding: 1.5rem; border-radius: 6px; margin-bottom: 1.5rem; box-shadow: 0 2px 4px rgba(0,0,0,0.05); }}
            .severity-Critical {{ color: #dc3545; font-weight: bold; }}
            .severity-High {{ color: #fd7e14; font-weight: bold; }}
            .severity-Medium {{ color: #ffc107; font-weight: bold; }}
            .severity-Low {{ color: #28a745; font-weight: bold; }}
            ul {{ padding-left: 1.5rem; }}
        </style>
    </head>
    <body>
        <h1>Cybersecurity Assessment Report</h1>
    """
    
    reports = output_json.get("report", [])
    
    for idx, report_item in enumerate(reports):
        readiness = report_item.get("Risk Assessment & Readiness", {})
        recommendations = report_item.get("Recommendations", {})
        conclusion = report_item.get("Conclusion", "No conclusion provided.")
        
        # Summary Section
        summary = readiness.get("Summary", "No summary provided.")
        html += f"""
        <div class="summary">
            <h2>Overview</h2>
            <p>{summary}</p>
        </div>
        """
        
        # Vulnerabilities Section
        vulnerabilities = readiness.get("Vulnerabilites Found", [])
        html += "<h2>Identified Risks</h2>"
        
        if not vulnerabilities:
            html += "<p>No specific vulnerabilities were identified.</p>"
            
        for vuln in vulnerabilities:
            risk_name = vuln.get("Risk", "Unknown Risk")
            overview = vuln.get("Overview", "")
            severity = vuln.get("Severity", "Info")
            affected = vuln.get("Affected Elements", [])
            
            affected_list = "".join([f"<li>{item}</li>" for item in affected])
            
            html += f"""
            <div class="risk-card">
                <h3>{risk_name} - <span class="severity-{severity}">{severity}</span></h3>
                <p><strong>Overview:</strong> {overview}</p>
                <p><strong>Affected Elements:</strong></p>
                <ul>{affected_list}</ul>
            </div>
            """
            
        # Recommendations Section
        html += "<h2>Recommendations</h2><ul>"
        if isinstance(recommendations, dict):
            for key, rec in recommendations.items():
                html += f"<li><strong>{key}:</strong> {rec}</li>"
        elif isinstance(recommendations, list):
            for rec in recommendations:
                html += f"<li>{rec}</li>"
        else:
             html += f"<li>{recommendations}</li>"
        html += "</ul>"
            
        # Conclusion Section
        html += f"""
        <div class="conclusion">
            <h2>Conclusion</h2>
            <p>{conclusion}</p>
        </div>
        <hr>
        """
        
    html += "</body></html>"
    return html

# ==========================================
# MAIN EXECUTION LOOP
# ==========================================

report_schema = {
    "type": "object",
    "properties": {
        "report": {
            "type": "array",
            "description": "A cybersecurity assessment report on an organization's current security risks.",
            "items": {
                "type": "object",
                "properties": {
                    "Risk Assessment & Readiness": {
                        "type": "object",
                        "description": "A paragraph summary of the organization's network and a list of found vulnerabilities.",
                        "properties": {
                            "Summary": {
                                "type": "string",
                                "description": "A summary of the organization's network."
                            },
                            "Vulnerabilites Found": {
                                "type": "array",
                                "description": "A list of found vulnerabilities.",
                                "items": {
                                    "type": "object",
                                    "properties": {
                                        "Risk": {
                                            "type": "string",
                                            "description": "A short name of what the risk is."
                                        },
                                        "Overview": {
                                            "type": "string",
                                            "description": "A text description of the risk, explaining what it is, its impact, and how it was identified."
                                        },
                                        "Severity": {
                                            "type": "string",
                                            "description": "The calculated severity score (Critical, High, Medium, Low, or Info) for the risk."
                                        },
                                        "Affected Elements": {
                                            "type": "array",
                                            "description": "A list of system components, files, URLs, or specific functions/code areas affected by this risk.",
                                            "items": {
                                                "type": "string"
                                            }
                                        }
                                    },
                                    "required": ["Risk", "Overview", "Severity", "Affected Elements"]
                                }
                            }
                        },
                        "required": ["Summary", "Vulnerabilites Found"]
                    },
                    "Recommendations": {
                        "type": "string",
                        "description": "A paragraph of specific recommendations for mitigating the risks found in the Vulnerabilities section."
                    },
                    "Conclusion": {
                        "type": "string",
                        "description": "A summary of the organization's current vulnerabilities and readiness."
                    }
                },
                "required": [
                    "Risk Assessment & Readiness",
                    "Recommendations",
                    "Conclusion"
                ]
            }
        }
    },
    "required": ["report"]
}

# The prompt testing the JSON output
system_prompt = "You are an expert cybersecurity analyst. Return only valid JSON according to the schema provided based on the input data."

def run_test_suite():
    os.makedirs(BASE_TEST_DIR, exist_ok=True)
    model = genai.GenerativeModel(model_name=MODEL_NAME, system_instruction=system_prompt)
    
    try:
        with open(CSV_FILEPATH, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            
            for index, row in enumerate(reader):
                row_id = row.get('ID', index + 1)
                
                # Create Folder Structure: Testing > Report-X
                report_folder = os.path.join(BASE_TEST_DIR, f"Report-{row_id}")
                os.makedirs(report_folder, exist_ok=True)
                
                print(f"Processing Report {row_id}...")
                
                # Extract Data & Save Input JSON
                input_data = {
                    "Network_Scan": row.get('Input_1_Network_Scan_JSON', ''),
                    "Org_Data": row.get('Input_2_Org_Data_JSON', ''),
                    "Current_Risks": row.get('Input_3_Current_Risks_JSON', '')
                }
                
                with open(os.path.join(report_folder, "input.json"), "w", encoding='utf-8') as input_file:
                    json.dump(input_data, input_file, indent=4)
                
                # Format string for the AI prompt
                context_text = f"Network Scan Data:\n{input_data['Network_Scan']}\n\nOrg Data:\n{input_data['Org_Data']}\n\nCurrent Risks:\n{input_data['Current_Risks']}"
                
                # Call Gemini (Enforcing JSON Schema)
                try:
                    response = model.generate_content(
                        contents=[context_text],
                        generation_config={
                            "temperature": 0.2,
                            "response_mime_type": "application/json",
                            "response_schema": report_schema
                        }
                    )
                    
                    if not response.text:
                        print(f"  [!] Empty response for Report {row_id}")
                        continue
                        
                    output_json = json.loads(response.text)
                    
                    # Save Output JSON
                    with open(os.path.join(report_folder, "output.json"), "w", encoding='utf-8') as output_file:
                        json.dump(output_json, output_file, indent=4)
                        
                    # Generate and Save HTML
                    html_content = generate_html_from_json(output_json)
                    with open(os.path.join(report_folder, "report.html"), "w", encoding='utf-8') as html_file:
                        html_file.write(html_content)
                        
                    print(f"  [+] Successfully generated JSON and HTML for Report {row_id}")
                    
                    # Small delay to avoid rate limiting
                    time.sleep(2) 
                    
                except Exception as e:
                    print(f"  [x] Failed to generate content for Report {row_id}: {e}")
                    
    except FileNotFoundError:
        print(f"Could not find the CSV file: {CSV_FILEPATH}")

# Execute the suite
run_test_suite()
print("Testing pipeline complete.")

Processing Report 1...
  [+] Successfully generated JSON and HTML for Report 1
Processing Report 2...
  [+] Successfully generated JSON and HTML for Report 2
Processing Report 3...
  [+] Successfully generated JSON and HTML for Report 3
Processing Report 4...
  [+] Successfully generated JSON and HTML for Report 4
Processing Report 5...
  [+] Successfully generated JSON and HTML for Report 5
Processing Report 6...
  [+] Successfully generated JSON and HTML for Report 6
Processing Report 7...
  [+] Successfully generated JSON and HTML for Report 7
Processing Report 8...
  [+] Successfully generated JSON and HTML for Report 8
Processing Report 9...
  [+] Successfully generated JSON and HTML for Report 9
Processing Report 10...
  [+] Successfully generated JSON and HTML for Report 10
Processing Report 11...
  [+] Successfully generated JSON and HTML for Report 11
Processing Report 12...
  [+] Successfully generated JSON and HTML for Report 12
Processing Report 13...
  [+] Successfully gen

In [7]:
import os
import json
import csv

BASE_DIR = 'Testing'
CSV_FILENAME = 'report_scores.csv'

# Define keyword weights
# Primary structural and high-importance keywords
HIGH_WEIGHT = {
    "overview": 10,
    "risk assessment & readiness": 15,
    "vulnerabilities found": 15,
    "recommendations": 15,
    "conclusion": 10,
    "risk": 5,
    "severity": 5
}

# Secondary terminology (Good to have)
LOW_WEIGHT = {
    "cve": 2,
    "cvss": 2,
    "exploit": 3,
    "attack vector": 3,
    "zero-day": 4,
    "ioc": 3,
    "indicators of compromise": 3,
    "lateral movement": 4,
    "exfiltration": 4,
    "malware": 2,
    "ransomware": 2,
    "mitigation": 3,
    "remediation": 3,
    "encryption": 2,
    "mfa": 2
}

# Combine dictionaries
ALL_KEYWORDS = {**HIGH_WEIGHT, **LOW_WEIGHT}

def score_reports():
    results = []

    # Traverse the directory structure
    if not os.path.exists(BASE_DIR):
        print(f"Error: Directory '{BASE_DIR}' not found. Please ensure it is in the same folder as this script.")
        return

    for root, dirs, files in os.walk(BASE_DIR):
        for file in files:
            if file == 'output.json':
                report_name = os.path.basename(root) # Gets 'Report-1', 'Report-2', etc.
                file_path = os.path.join(root, file)
                
                try:
                    # Read and parse the JSON
                    with open(file_path, 'r', encoding='utf-8') as f:
                        data = json.load(f)
                        # Convert entire JSON to a lowercase string for easy scanning
                        report_text = json.dumps(data).lower()
                        
                        # Calculate Score
                        score = 0
                        for keyword, weight in ALL_KEYWORDS.items():
                            # Count how many times the keyword appears
                            occurrences = report_text.count(keyword)
                            score += (occurrences * weight)
                        
                        results.append({'Report': report_name, 'Score': score})
                except json.JSONDecodeError:
                    print(f"Skipping {report_name}: Invalid JSON format.")
                except Exception as e:
                    print(f"Error processing {report_name}: {e}")

    # Generate CSV and Stats
    if not results:
        print("No valid reports found to score.")
        return

    # Sort results alphabetically by Report name
    results.sort(key=lambda x: x['Report'])
    
    # Calculate statistics
    scores = [r['Score'] for r in results]
    max_score = max(scores)
    min_score = min(scores)
    avg_score = sum(scores) / len(scores)
    
    # Find the reports associated with the max and min scores
    highest_reports = [r['Report'] for r in results if r['Score'] == max_score]
    lowest_reports = [r['Report'] for r in results if r['Score'] == min_score]

    # Write to CSV
    with open(CSV_FILENAME, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=['Report', 'Score'])
        
        writer.writeheader()
        for row in results:
            writer.writerow(row)
            
        # Add a blank row for spacing
        writer.writerow({'Report': '', 'Score': ''})
        
        # Add summary rows at the bottom
        writer.writerow({'Report': f'Highest Score ({", ".join(highest_reports)})', 'Score': max_score})
        writer.writerow({'Report': f'Lowest Score ({", ".join(lowest_reports)})', 'Score': min_score})
        writer.writerow({'Report': 'Average Score', 'Score': round(avg_score, 2)})
        
    print(f"Success! Scored {len(results)} reports. Results saved to {CSV_FILENAME}")

score_reports()

Success! Scored 1007 reports. Results saved to report_scores.csv
