## Vulnerability Class 
Holds the data about CVE as well as related EPSS score and percentile. Mostly used to parse the EPSS data from json for downloaded csv file

In [7]:
from pydantic import BaseModel, Field
from typing import Optional

class Vulnerability(BaseModel):
    cve: str = Field(description="Vulnerability identifier, e.g., 'CVE-2023-4863'")
    epss: float = Field(description="Probability of exploitation (0.0-1.0)")
    percentile: float = Field(description="Percentile rank of this CVE rank")
    date: Optional[str] = Field(None, description="Date of the EPSS data retrieval")

## Pydantic objects for parsing OSV CVE Data

Helper class to cleanly unmarshall CVE data from osv.dev as defined by the schema https://ossf.github.io/osv-schema/

In [3]:
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any

class Severity(BaseModel):
    type: str = Field(description="Severity type, e.g., 'CVSSv3'")
    score: str = Field(description="Severity score, e.g., 'CVSS:3.1/AV:N/AC:H/PR:N/UI:N/S:U/C:L/I:N/A:N'")

    def __str__(self) -> str:
        return f"{self.score}"
    
class Package(BaseModel):
    name: str = Field(description="Name of the package")
    ecosystem: str = Field(description="Ecosystem of the package, e.g., 'Debian:12'")
    purl: Optional[str] = Field(None, description="Package URL")

    def __str__(self) -> str:
        return f"{self.name}"

class Event(BaseModel):
    """Represents an event in a version range (introduced, fixed, etc.)"""
    introduced: Optional[str] = Field(None, description="Version where vulnerability was introduced")
    fixed: Optional[str] = Field(None, description="Version where vulnerability was fixed")
    last_affected: Optional[str] = Field(None, description="Last version affected by vulnerability")
    limit: Optional[str] = Field(None, description="Limit version for the range")
    
    def is_fixed_event(self) -> bool:
        """Check if this event represents a fix"""
        return self.fixed is not None
    
    def get_fixed_version(self) -> Optional[str]:
        """Get the fixed version if this is a fix event"""
        return self.fixed
    
    def __str__(self) -> str:
        if self.introduced:
            return f"Introduced in {self.introduced}"
        if self.fixed:
            return f"Fixed in {self.fixed}"
        return "Not fixed"

class Range(BaseModel):
    """Represents a version range with events"""
    type: str = Field(description="Range type, e.g., 'ECOSYSTEM', 'SEMVER'")
    events: List[Event] = Field(description="List of events in this range")
    repo: Optional[str] = Field(None, description="Repository URL for this range")
    
    def has_fixed_event(self) -> bool:
        """Check if this range contains any fixed events"""
        return any(event.is_fixed_event() for event in self.events)
    
    def get_fixed_versions(self) -> List[str]:
        """Get all fixed versions in this range"""
        return [event.get_fixed_version() for event in self.events if event.is_fixed_event()]
    
    def get_latest_fixed_version(self) -> Optional[str]:
        """Get the latest fixed version in this range"""
        fixed_versions = self.get_fixed_versions()
        return fixed_versions[-1] if fixed_versions else None

class Affected(BaseModel):
    package: Package = Field(description="Affected package details")
    versions: Optional[List[str]] = Field(None, description="List of affected versions")
    ranges: Optional[List[Range]] = Field(None, description="Affected version ranges")
    ecosystem_specific: Optional[Dict[str, Any]] = Field(None, description="Ecosystem-specific data")
    database_specific: Optional[Dict[str, Any]] = Field(None, description="Database-specific data")
    
    def has_any_fix(self) -> bool:
        """Check if any range in this affected package has a fix"""
        if not self.ranges:
            return False
        return any(range_item.has_fixed_event() for range_item in self.ranges)
    
    def get_all_fixed_versions(self) -> List[str]:
        """Get all fixed versions across all ranges"""
        if not self.ranges:
            return []
        
        fixed_versions = []
        for range_item in self.ranges:
            fixed_versions.extend(range_item.get_fixed_versions())
        return fixed_versions

class Reference(BaseModel):
    type: str = Field(description="Reference type, e.g., 'ADVISORY'")
    url: str = Field(description="Reference URL")

    def __str__(self) -> str:
        return f"{self.type}: {self.url}"


class DebianCVE(BaseModel):
    id: str = Field(description="CVE identifier, e.g., 'CVE-2023-4863'")
    details: str = Field(description="Detailed description of the CVE")
    modified: Optional[str] = Field(None, description="Modification timestamp")
    published: Optional[str] = Field(None, description="Publication timestamp")
    upstream: Optional[List[str]] = Field(None, description="Upstream references")
    references: Optional[List[Reference]] = Field(None, description="List of references")
    severity: Optional[List[Severity]] = Field(None, description="List of severity assessments")
    affected: List[Affected] = Field(description="List of affected packages and versions")
    
    def has_fixes_available(self) -> bool:
        """Check if any affected package has fixes available"""
        return any(affected_pkg.has_any_fix() for affected_pkg in self.affected)
    
    def get_packages_with_fixes(self) -> List[Affected]:
        """Get all affected packages that have fixes available"""
        return [affected_pkg for affected_pkg in self.affected if affected_pkg.has_any_fix()]
    
    def get_packages_without_fixes(self) -> List[Affected]:
        """Get all affected packages that don't have fixes available"""
        return [affected_pkg for affected_pkg in self.affected if not affected_pkg.has_any_fix()]
    
    def filter_for_system_version(self, version: str) -> 'DebianCVE':
        """Filter for a specific Debian version (e.g., '12', '13')"""
        target_ecosystem = f"Debian:{version}"
        filtered_affected = [
            affected for affected in self.affected 
            if affected.package.ecosystem == target_ecosystem
        ]
        
        # Create a new instance with filtered data
        return DebianCVE(
            id=self.id,
            details=self.details,
            modified=self.modified,
            published=self.published,
            upstream=self.upstream,
            references=self.references,
            severity=self.severity,
            affected=filtered_affected
        )
    
    def filter_for_current_system(self) -> 'DebianCVE':
        """Return a new DebianCVE instance filtered for the current Debian version"""
        current_version = get_debian_version()
        if not current_version:
            # If we can't detect the version, return the original
            return self
        
        return self.filter_for_system_version(current_version)
    
    def get_available_ecosystems(self) -> List[str]:
        """Get list of all ecosystems in the affected packages"""
        return list(set(affected.package.ecosystem for affected in self.affected))
    
    def _format_references(self) -> str:
        """Format references section"""
        if not self.references:
            return ""
        
        lines = ["References:\n"]
        for ref in self.references:
            lines.append(f"  - {ref}\n")
        return "".join(lines)
    
    def _format_severity(self) -> str:
        """Format severity section"""
        if not self.severity:
            return ""
        
        lines = ["Severity:\n"]
        for sev in self.severity:
            lines.append(f"  - {sev}\n")
        return "".join(lines)
    
    def _format_versions(self, versions: List[str]) -> str:
        """Format versions section"""
        if not versions:
            return ""
        
        lines = ["  Versions:\n"]
        for version in versions:
            lines.append(f"    - {version}\n")
        return "".join(lines)
    
    def _format_fix_status(self, affected: Affected) -> str:
        """Format fix status for an affected package"""
        if not affected.ranges:
            return ""
        
        fixed_events = [
            event for range_item in affected.ranges 
            for event in range_item.events 
            if event.is_fixed_event()
        ]
        
        if fixed_events:
            lines = []
            for event in fixed_events:
                if not event.introduced:
                    lines.append(f"      * Status: {event}\n")
            return "".join(lines)
        else:
            return "  No fixes available.\n"
    
    def __str__(self) -> str:
        lines = [f"CVE: {self.id}\nDescription: {self.details}\n"]
        
        lines.append(self._format_references())
        lines.append(self._format_severity())
        
        for affected in self.affected:
            lines.append(f"Affected Package: {affected.package}\n")
            lines.append(self._format_versions(affected.versions))
            lines.append(self._format_fix_status(affected))
        
        return "".join(lines)

    def to_llm_summary(self, epss: str, percentile: str) -> str:
        """Generates a concise summary optimized for LLM reasoning."""
        summary = [
            f"CVE_ID: {self.id}",
            f"CRITICALITY: {self.severity[0].score if self.severity else 'Unknown'}",
            f"DETAILS: {self.details[:300]}..." # Keep descriptions short
        ]
        
        if not self.affected:
            summary.append("SYSTEM_IMPACT: No packages affected on this Debian version.")
        else:
            for affected_pkg in self.affected:
                pkg_name = affected_pkg.package.name
                fixed_versions = affected_pkg.get_all_fixed_versions()
                
                status = "FIX_AVAILABLE" if fixed_versions else "AWAITING_FIX"
                fix_detail = f"Fixed in: {', '.join(fixed_versions)}" if fixed_versions else "No fix version assigned yet."
                
                summary.append(f"PACKAGE: {pkg_name} | STATUS: {status} | {fix_detail}")

        summary.append(f"EPSS_SCORE: {epss} | PERCENTILE: {percentile}")
        
        return "\n".join(summary)

Function to prefetch the epss CSV data so we need not hit the website to get EPSS score

In [6]:
import requests
import gzip
import csv

def download_epss_csv(dest_path="epss_scores-current.csv.gz") -> bool:
    """Downloads the latest EPSS CSV file.

    Args:
        dest_path (str): Path to save the downloaded file.
    Returns:
        bool: True if download succeeded, False otherwise.
    """
    url = "https://epss.empiricalsecurity.com/epss_scores-current.csv.gz"
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(dest_path, 'wb') as f:
            f.write(response.content)
        return True
    except Exception as e:
        print(f"Update failed: {e}. Falling back to existing file.")
        return False
    
def load_epss_map(file_path="epss_scores-current.csv.gz"):
    """Loads EPSS data into a dictionary indexed by CVE ID."""
    epss_map = {}
    with gzip.open(file_path, mode='rt') as f:
        # Skip FIRST.org header metadata lines
        reader = csv.DictReader(filter(lambda row: not row.startswith('#'), f))
        for row in reader:
            # Map CVE ID to its (epss, percentile) tuple
            epss_map[row['cve']] = (row['epss'], row['percentile'])
    return epss_map

download_epss_csv()

True

A function to return OS version in format used by osv.dev data. Sid is always marked as forky/sid (or Debian next version/sid) so for now its 14. Function needs update when Debian 14 releases for unstable.

In [5]:
import re
def get_debian_version() -> Optional[str]:
    """Detect the current Debian version from the system"""
    try:
        # Try to read from /etc/debian_version
        with open('/etc/debian_version', 'r') as f:
            version = f.read().strip()
            # Extract major version number (e.g., "12.7" -> "12")
            match = re.match(r'(\d+)', version)
            if match:
                return match.group(1)
            match = re.match(r'.*\/sid', version)
            if match:
                return "14"
    except (FileNotFoundError, IOError):
        pass
    
    return None