# F3. Risk Scoring

>>_The CSF’s use will vary based on an organization’s unique mission and risks. With an understanding of stakeholder expectations and risk appetite and tolerance (as outlined in GOVERN), an organization can prioritize cybersecurity activities to make informed decisions about cybersecurity expenditures and actions. An organization may choose to handle risk in one or more ways — including mitigating, transferring, avoiding, or accepting negative risks and realizing, sharing, enhancing, or accepting positive risks — depending on the potential impacts and likelihoods. Importantly, an organization can use the CSF both internally to manage its cybersecurity capabilities and externally to oversee or communicate with third parties._\
\
\- _[National Institute of Standards and Technology (NIST)](https://nvlpubs.nist.gov/nistpubs/CSWP/NIST.CSWP.29.pdf)_

In [1]:
import pandas as pd
philosophy = pd.read_csv('../data/risk_philosophy.csv')
philosophy

Unnamed: 0,Risk Philosophy,Scoring Formula(s),Aggregation(s),When to Use
0,Conservative,Multiplicative,Max,"To only act on high-confidence, multi-dimensio..."
1,Worst-case,Worst Case (Max),Max,To flag any asset with a single severe vulnera...
2,Balanced/Pragmatic,"Weighted Average, Simple Mean","Mean, Median","For realistic, overall asset risk monitoring"
3,Cumulative,"Simple Mean, Weighted Average",Sum,When interested in total risk exposure per asset
4,Outlier-resistant,"Simple Mean, Weighted Average",Median,To ignore rare extremes and focus on typical r...


## Intended Purpose of Code

The risk scoring module was designed with an interactive dashboard that generates personalized risk scoring and summarizes findings using tables and graphs to account for individual user needs. Some of the graphs are generated using data not involved in calculating risk scores and don't update with new user input. These graphs are 'static' and supplement findings in the risk score analysis.

### Key Features

#### Risk-Scoring
* Interactive risk scoring module with user input/dropdown/slider for:
    * Risk Formula
        * Supports multiple risk formulas (weighted, multiplicative, worst-case, mean)
    * Aggregation Method
        * Aggregates by (max, mean, median, sum, count high-risk CVEs)
    * Floating Slider
        * Allows users to toggle CVE count thresholds per asset

#### Analysis & Visualization
* Generates the following in response to user inputs in the interactive risk scoring module:
    * Summary Tables
        * Asset-level Risk Summary
        * CVE-level Vulnerabilities Summary
    * Heatmap
        * asset vs riskScore
    * Time Series:
        * monthly count of new CVEs per asset
            * Future Enhancement: multiple choice legend allowing users to filter any combination of assets
 

#### Static Visualizations
* Pie Chart
    * distribution of severity levels (Critical/High/Medium/Low)

### Known Issues

* Save buttons overwrite existing files instead of saving a unique file
    * Appending a version number to the end of the file with each click could resolve this
* No way to sort summary tables
    * _Needs more thought..._

_The interactive components of the below code were AI generated to tailor analysis to individual user needs._

In [3]:
import ipywidgets as widgets
from IPython.display import display, FileLink
import matplotlib.pyplot as plt
import numpy as np
from ipywidgets import interact, Dropdown
import seaborn as sns

# --- Helper Functions for Saving Tables and Charts ---
def save_table_to_ass_csv(df, filename="../data/asset_risk_summary.csv"):
    df.to_csv(filename, index=False)
    print(f"Table saved as '{filename}'")
    display(FileLink(filename))

def save_table_to_vul_csv(df, filename="../data/cve_vuln_summary.csv"):
    df.to_csv(filename, index=False)
    print(f"Table saved as '{filename}'")
    display(FileLink(filename))

def add_ass_table_save_buttons(df, table_label="ass_table"):
    save_ass_csv_button = widgets.Button(description=f"Save {table_label}")
    def on_save_ass_csv_clicked(b):
        save_table_to_ass_csv(df, filename="../data/asset_risk_summary.csv")
    save_ass_csv_button.on_click(on_save_ass_csv_clicked)   # FIXED
    display(save_ass_csv_button)
    
def add_vul_table_save_buttons(df, table_label="vul_table"):
    save_vul_csv_button = widgets.Button(description=f"Save {table_label}")
    def save_vul_csv_clicked(b):
        save_table_to_vul_csv(df, filename="../data/cve_vuln_summary.csv")
    save_vul_csv_button.on_click(save_vul_csv_clicked)       # FIXED
    display(save_vul_csv_button)

# Configuration
input_file = "../data/vuln_catalogue_v2.csv"  # Change to your path if needed

# Load Data
def load_vuln_data(file_path):
    df = pd.read_csv(file_path)
    return df

# --- RISK FORMULAS ---
def weighted_average_score(row, weights=None):
    if weights is None:
        weights = {'baseScore': 0.5, 'exploitabilityScore': 0.25, 'impactScore': 0.25}
    vals = [(row.get(col), w) for col, w in weights.items() if pd.notnull(row.get(col))]
    if not vals:
        return np.nan
    score = sum(v * w for v, w in vals)
    total_weight = sum(w for _, w in vals)
    return round(score / total_weight, 2)

def multiplicative_risk_score(row):
    vals = [row.get(col) for col in ['baseScore', 'exploitabilityScore', 'impactScore']]
    if any(pd.isnull(v) for v in vals):
        return np.nan
    vals_norm = [v / 10.0 for v in vals]
    score = np.prod(vals_norm) * 10
    return round(score, 2)

def worst_case_score(row):
    vals = [row.get(col) for col in ['baseScore', 'exploitabilityScore', 'impactScore']]
    vals = [v for v in vals if pd.notnull(v)]
    if not vals:
        return np.nan
    return max(vals)

def simple_mean_score(row):
    vals = [row.get(col) for col in ['baseScore', 'exploitabilityScore', 'impactScore']]
    vals = [v for v in vals if pd.notnull(v)]
    if not vals:
        return np.nan
    return round(np.mean(vals), 2)

formula_map = {
    'Weighted Average': weighted_average_score,
    'Multiplicative': multiplicative_risk_score,
    'Worst Case (Max)': worst_case_score,
    'Simple Mean': simple_mean_score,
}

agg_map = {
    'Max': 'max',
    'Mean': 'mean',
    'Median': 'median',
    'Sum': 'sum',
}

def count_high_risk(series, threshold=7.0):
    return (series >= threshold).sum()

# Interactive Function
def interactive_risk_scoring(input_file=input_file):
    df = load_vuln_data(input_file)
    for col in ['baseScore', 'exploitabilityScore', 'impactScore']:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
        else:
            df[col] = np.nan
    
    def update_scoring(formula, aggregation, highrisk_threshold):
        # Calculate riskScore
        df['riskScore'] = df.apply(formula_map[formula], axis=1)
        # Aggregate per asset (Title)
        group = df.groupby(['Title','cpeName'])
        agg_df = group['riskScore'].agg(agg_map[aggregation]).reset_index()
        agg_df = agg_df.rename(columns={'riskScore': f'{aggregation}RiskScore'})
        # Count high risk CVEs per asset
        highrisk_df = group['riskScore'].apply(lambda x: (x >= highrisk_threshold).sum()).reset_index()
        highrisk_df = highrisk_df.rename(columns={'riskScore': f'countHighRiskCVEs (>{highrisk_threshold})'})
        # Merge for summary
        summary = pd.merge(agg_df, highrisk_df, on='cpeName', how='left')
        summary['Title']=summary['Title_x']
        summary.drop(columns=['Title_x','Title_y'],inplace=True,axis=1)
        summary.insert(0, "Title", summary.pop("Title"))
        # Show sample summary and first few vulnerabilities for inspection
        print("\nAsset-level Risk Summary:")
        display(summary.head(20))
        add_ass_table_save_buttons(summary, table_label="Summary")
        print("\nCVE-level Vulnerabilities Summary:")
        display(df[['Title', 'cveID', 'riskScore']].head(20))
        add_vul_table_save_buttons(df[['Title','cpeName', 'cveID', 'riskScore']], table_label="Summary")

        # Pie Chart: Distribution of Severity Levels 
        severity_counts = df['baseSeverity'].value_counts()
        severity_counts.plot(kind='pie', autopct='%1.1f%%', startangle=140, figsize=(6,6))
        plt.title("Distribution of Severity Levels")
        plt.ylabel("")
        plt.show()
    
        # Heatmap: Top 20 Assets by selected risk score aggregation
        top_assets = summary.sort_values(by=f'{aggregation}RiskScore', ascending=False).head(20)
        heatmap_data = top_assets.set_index('Title')[[f'{aggregation}RiskScore']]
        plt.figure(figsize=(2, 10))
        sns.heatmap(heatmap_data, annot=True, cmap='YlOrRd', cbar=True)
        plt.title(f"Heatmap: Top 20 Assets by {aggregation} Risk Score\n(Formula: {formula})")
        plt.xlabel(f"{aggregation} Risk Score")
        plt.ylabel("Asset (Title)")
        plt.xticks(rotation=0)
        plt.show()
    
        # Time Series: Monthly Count of New CVEs per Asset
        if 'published' in df.columns:
            df['published'] = pd.to_datetime(df['published'], errors='coerce')
            df['month'] = df['published'].dt.to_period('M').astype(str)
            df['year'] = df['published'].dt.year
        else:
            raise KeyError("No 'published' column found in DataFrame.")
    
        asset_col = 'Title'
    
        def plot_monthly_cves(year_range):
            min_year, max_year = year_range
            df_filtered = df[(df['year'] >= min_year) & (df['year'] <= max_year)]
            monthly_cves = df_filtered.groupby(['month', asset_col])['cveID'].nunique().unstack(fill_value=0)
            if monthly_cves.empty:
                print(f"No data available for the selected year range: {min_year}-{max_year}")
                return
            monthly_cves.plot(figsize=(14,7))
            plt.title(f"Monthly Count of New CVEs per Asset ({min_year}-{max_year})")
            plt.ylabel("Number of New CVEs")
            plt.xlabel("Month")
            plt.legend(title='Asset', bbox_to_anchor=(1.05, 1), loc='upper left')
            plt.tight_layout()
            plt.show()
    
        years = sorted(df['year'].dropna().unique())
        if years:
            year_slider = widgets.IntRangeSlider(
                value=[min(years), max(years)],
                min=min(years),
                max=max(years),
                step=1,
                description='Year Range:',
                continuous_update=False
            )
            out = widgets.interactive_output(plot_monthly_cves, {'year_range': year_slider})
            display(year_slider, out)
        else:
            print("No valid years found in the data.")
    
        return

    interact(
        update_scoring,
        formula=Dropdown(options=list(formula_map.keys()), value='Weighted Average', description='Risk Formula:'),
        aggregation=Dropdown(options=list(agg_map.keys()), value='Max', description='Aggregation:'),
        highrisk_threshold=widgets.FloatSlider(value=7.0, min=0.0, max=10.0, step=0.1, description='High Risk CVE:')
    )

# --- MAIN EXECUTION ---
if __name__ == "__main__":
    interactive_risk_scoring(input_file)

interactive(children=(Dropdown(description='Risk Formula:', options=('Weighted Average', 'Multiplicative', 'Wo…