<a href="https://colab.research.google.com/github/KijoSal-dev/code-demo-Snyk/blob/main/demo_Snyk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
# 1.1: Install prerequisites
#    pip install scikit-learn requests

import json, subprocess, requests
import numpy as np
from sklearn.ensemble import IsolationForest
import datetime

# 1. Run Snyk CLI and capture JSON output
try:
    proc = subprocess.run(
        ["snyk", "test", "--json"],
        stdout=subprocess.PIPE,
        check=True # This will raise CalledProcessError for non-zero exit codes
    )
    vulns = json.loads(proc.stdout)
    # If we reach here, snyk test returned 0 (no vulnerabilities)
    has_vulnerabilities = False
except subprocess.CalledProcessError as e:
    if e.returncode == 3:
        # Exit code 3 means vulnerabilities were found.
        # We can still process the output to get vulnerability details.
        vulns = json.loads(e.stdout)
        has_vulnerabilities = True
    else:
        # For any other non-zero exit code, re-raise the error
        raise e
except FileNotFoundError as e:
    print(f"Error: {e}. Please ensure Snyk CLI is installed and in your PATH.")
    vulns = None # Set vulns to None to indicate no vulnerabilities could be processed


# 2. Extract features per vulnerability
#    e.g., severity (0=low,1=medium,2=high,3=critical),
#          package age (in days),
#          num of vulnerable versions
def extract_features(v):
    sev_map = {"low":0,"medium":1,"high":2,"critical":3}
    severity = sev_map.get(v["severity"], 0) # Use .get with a default in case severity is missing
    pkg_age = 0 # Default to 0
    try:
        # Attempt to get package age, handle potential errors
        pkg_info = requests.get(f"https://pypi.org/pypi/{v['packageName']}/json").json()
        # Find the release date of the earliest version
        release_dates = []
        for version_info in pkg_info.get('releases', {}).values():
            for file_info in version_info:
                if 'upload_time_iso_8601' in file_info:
                    release_dates.append(file_info['upload_time_iso_8601'])

        if release_dates:
             first_release_date_str = min(release_dates)
             # Handle potential timezone info in the date string
             first_release_date = datetime.datetime.fromisoformat(first_release_date_str.replace('Z', '+00:00')).date()
             pkg_age = (datetime.date.today() - first_release_date).days
        else:
            pkg_age = 0 # Default if no release dates found
    except Exception as ex:
        print(f"Could not get package age for {v['packageName']}: {ex}")
        pkg_age = 0 # Default to 0 on error


    versions = len(v.get("versions", [])) # Use .get with a default for versions
    return [severity, pkg_age, versions]

# Ensure there are vulnerabilities to process before proceeding
if vulns and "vulnerabilities" in vulns and len(vulns["vulnerabilities"]) > 0:
    X = np.array([extract_features(v) for v in vulns["vulnerabilities"]])

    # 3. Fit or load a pretrained Isolation Forest
    #    In production, you’d persist this model and only call .predict()
    # Only fit and predict if there are data points
    if len(X) > 0:
        iso = IsolationForest(contamination=0.05, random_state=0)
        iso.fit(X)            # Train on historical data in a real setup
        labels = iso.predict(X)  # -1 = anomaly, 1 = normal
    else:
        labels = [] # No vulnerabilities, no anomalies

    # 4. Decision logic
    # Check for critical vulnerabilities based on the parsed vulns data
    has_critical = any(v.get("severity") == "critical" for v in vulns.get("vulnerabilities", []))
    # Check for anomalies based on the generated labels
    has_anomaly  = any(l == -1 for l in labels)

    if has_critical or has_anomaly:
        print("🚨 Security build failed")
        exit(1) # Exit with a non-zero code to indicate failure
    else:
        print("✅ Security checks passed")
        exit(0) # Exit with 0 to indicate success
elif vulns is None:
     # Snyk command failed to run (e.g., FileNotFoundError)
     print("⚠️ Security checks inconclusive: Could not run Snyk.")
     exit(1) # Indicate failure as security checks could not be completed
else:
    # No vulnerabilities found by Snyk (exit code 0)
    print("✅ Security checks passed (no vulnerabilities found by Snyk)")
    exit(0) # Exit with 0 to indicate success

✅ Security checks passed (no vulnerabilities found by Snyk)


This script essentially automates a security check using the Snyk CLI and then applies an anomaly detection algorithm to the findings. Here's a breakdown:

    Import Libraries: It imports necessary libraries: json for handling JSON data, subprocess for running external commands (like Snyk), requests for making HTTP requests (to PyPI), numpy for numerical operations, IsolationForest from sklearn.ensemble for anomaly detection, and datetime for working with dates.
    Run Snyk CLI:
        It attempts to run the Snyk CLI command snyk test --json. The --json flag tells Snyk to output the results in JSON format.
        subprocess.run executes this command. stdout=subprocess.PIPE captures the standard output, and check=True normally raises an error if the command exits with a non-zero status.
        A try...except block is used to handle potential errors.
        It specifically catches subprocess.CalledProcessError. If the exit code is 3, it means Snyk found vulnerabilities, and the code proceeds to load the JSON output. For any other non-zero exit code, it re-raises the error.
        It also catches FileNotFoundError in case the snyk command itself is not found (meaning Snyk CLI isn't installed or in the PATH).
        The JSON output from Snyk (whether due to an error code 3 or success code 0) is loaded into the vulns variable. A boolean has_vulnerabilities is set based on whether the exit code was 3.
    Extract Features (extract_features function):
        This function takes a single vulnerability object (v) from the Snyk output as input.
        It extracts relevant features for anomaly detection:
            Severity: Maps Snyk's severity levels ("low", "medium", "high", "critical") to numerical values (0-3).
            Package Age: Attempts to fetch information about the vulnerable package from PyPI using requests. It finds the earliest release date and calculates the age of the package in days. It includes error handling in case the PyPI request fails.
            Number of Vulnerable Versions: Counts how many vulnerable versions are listed for the package in the Snyk output.
        It returns a list containing these three features.
    Prepare Data for Anomaly Detection:
        It checks if vulns is not None (meaning Snyk ran) and if there are actual vulnerabilities listed in the Snyk output.
        If vulnerabilities exist, it creates a NumPy array X where each row is the feature vector extracted by calling extract_features for each vulnerability.
    Anomaly Detection (Isolation Forest):
        If X is not empty (i.e., there are vulnerabilities to analyze), it initializes an IsolationForest model.
            contamination=0.05 suggests that the model expects about 5% of the data points to be anomalies. You might need to adjust this based on your data.
            random_state=0 ensures reproducibility.
        iso.fit(X) trains the model on the extracted features. In a real-world scenario, you would typically train this model on historical vulnerability data to learn what "normal" vulnerabilities look like for your project.
        labels = iso.predict(X) predicts whether each vulnerability is an anomaly (-1) or normal (1).
    Decision Logic:
        It checks if any of the vulnerabilities have a "critical" severity based on the original Snyk output.
        It checks if any of the vulnerabilities were flagged as anomalies by the Isolation Forest model (any(l == -1 for l in labels)).
        If either a critical vulnerability is found OR an anomaly is detected, it prints "🚨 Security build failed" and exits with a status code of 1 (indicating failure).
        Otherwise (if no critical vulnerabilities and no anomalies), it prints "✅ Security checks passed" and exits with a status code of 0 (indicating success).
        It also includes specific messages and exit codes for cases where Snyk could not be run (vulns is None) or if Snyk ran but found no vulnerabilities.

In summary, the script uses Snyk to find vulnerabilities, extracts numerical features from these findings, and then uses an Isolation Forest model to identify vulnerabilities that are unusual or significantly different from the others, in addition to checking for critical vulnerabilities directly. This combined approach aims to flag builds that might have particularly risky or unexpected security issues.