<a href="https://colab.research.google.com/github/Kdavis2025/Automating-Compliance-AI-and-Machine-Learning-Approaches-to-Achieviing-CMMC-2.0-Certification/blob/main/Scoping_Assistance_and_Asset_Categorization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ────────────────────────────────────────────────────────────────────────────────
# STEP 0: Install and import required libraries
# ────────────────────────────────────────────────────────────────────────────────
!pip install --quiet ipywidgets pandas numpy matplotlib

import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, clear_output
from google.colab import files

# ────────────────────────────────────────────────────────────────────────────────
# STEP 1: Define Helper Functions for Compliance Scanning & Savings Calculation
# ────────────────────────────────────────────────────────────────────────────────
def load_json_upload(upload_widget):
    """
    Reads a JSON file from the given FileUpload widget and returns the parsed Python object.
    Returns None if nothing is uploaded.
    """
    if upload_widget.value:
        uploaded_filename = list(upload_widget.value.keys())[0]
        content = upload_widget.value[uploaded_filename]['content']
        text = content.decode('utf-8')
        return json.loads(text)
    return None

def filter_standards(df, domains, levels, compliance):
    """
    Filters the standards DataFrame by:
      - domain prefixes (before the first '.')
      - CMMC level (extracted from '.L#-')
      - ComplianceLevel in the specified list
    Returns the filtered DataFrame.
    """
    def extract_domain(ctrl_id):
        return str(ctrl_id).split('.')[0]

    def extract_level(ctrl_id):
        m = pd.Series(str(ctrl_id)).str.extract(r"\.L(\d)\-")
        return int(m[0]) if pd.notna(m[0][0]) else np.nan

    mask = (
        df['ControlID'].apply(lambda x: extract_domain(x) in domains) &
        df['ControlID'].apply(extract_level).isin(levels) &
        df['ComplianceLevel'].isin(compliance)
    )
    return df[mask]

def scan_config(standards_df, config_df):
    """
    Merges standards_df with config_df on ControlID to produce:
      - report: a list of dicts with all columns + 'Detected' + 'Compliant'
      - deviations: subset where Compliant == False
    """
    merged = standards_df.merge(
        config_df[['ControlID', 'Status']], on='ControlID', how='left'
    )
    merged['Detected'] = merged['Status'].notnull()
    merged['Compliant'] = merged['Status'] == 'Fully Implemented'
    report = merged.to_dict(orient='records')
    deviations = merged[~merged['Compliant']]
    return report, deviations

def compute_savings(num_controls):
    """
    Given the number of controls to be scanned:
      - Manual scanning: 5 minutes per control
      - Automated scanning: 0.1 minutes per control
      - Analyst cost: $100/hour
    Returns a dict with time/cost for manual vs automated and savings.
    """
    # Time (in minutes)
    manual_time = num_controls * 5
    auto_time   = num_controls * 0.1
    time_saved  = manual_time - auto_time

    # Cost (assuming $100/hour)
    cost_rate   = 100  # $/hour
    manual_cost = (manual_time / 60) * cost_rate
    auto_cost   = (auto_time   / 60) * cost_rate
    cost_saved  = manual_cost - auto_cost

    return {
        'ManualTime_min': manual_time,
        'AutoTime_min': auto_time,
        'TimeSaved_min': time_saved,
        'ManualCost_$': manual_cost,
        'AutoCost_$': auto_cost,
        'CostSaved_$': cost_saved
    }

# ────────────────────────────────────────────────────────────────────────────────
# STEP 2: Create UI Components (ipywidgets) for Upload / Filter / Scan
# ────────────────────────────────────────────────────────────────────────────────
# 2.1: Upload Standards + Load button
std_upload      = widgets.FileUpload(accept='.json', multiple=False, description='Upload Standards (.json)')
btn_load_std    = widgets.Button(description='Load Standards', button_style='success')
out_std         = widgets.Output()

# 2.2: Filter selectors + Apply button
domain_select   = widgets.SelectMultiple(options=[], description='Domains')
level_select    = widgets.SelectMultiple(options=[1,2,3], value=[1,2,3], description='Levels')
compl_select    = widgets.SelectMultiple(
    options=["Not Implemented","Partially Implemented","Fully Implemented"],
    value=["Not Implemented","Partially Implemented","Fully Implemented"],
    description='Compliance'
)
btn_apply_filters = widgets.Button(description='Apply Filters', button_style='primary')
out_filtered    = widgets.Output()

# 2.3: Upload Config + Scan button
cfg_upload      = widgets.FileUpload(accept='.json', multiple=False, description='Upload Config (.json)')
btn_scan        = widgets.Button(description='Run Scan', button_style='info')
out_scan        = widgets.Output()

# 2.4: Placeholder for standards DataFrame
standards_df    = None

# ────────────────────────────────────────────────────────────────────────────────
# STEP 3: Define Callback Functions to Wire Up the UI
# ────────────────────────────────────────────────────────────────────────────────
def on_load_std(b):
    """
    When "Load Standards" is clicked:
      - Parse the uploaded JSON
      - Convert into a DataFrame
      - Populate domain_select with unique domain prefixes
      - Display first few rows
    """
    with out_std:
        clear_output()
        data = load_json_upload(std_upload)
        if data is None:
            print("⚠️ No standards file uploaded.")
            return
        global standards_df
        try:
            standards_df = pd.DataFrame(data)
        except Exception as e:
            print("🚫 Could not parse JSON as list of records:", e)
            standards_df = None
            return

        # Populate domain selector
        domains = sorted(standards_df['ControlID'].str.split('.').str[0].unique())
        domain_select.options = domains
        domain_select.value = domains[:]  # select all by default

        print("✅ Standards loaded. Preview:")
        display(standards_df.head())

def on_apply_filters(b):
    """
    When "Apply Filters" is clicked:
      - Ensure standards_df is loaded
      - Filter by selected domains/levels/compliance
      - Display filtered DataFrame
      - Automatically calculate cost/time savings for that filtered set
      - Offer download of filtered JSON
    """
    with out_filtered:
        clear_output()
        if standards_df is None:
            print("⚠️ Please upload & load a standards JSON first.")
            return

        filtered = filter_standards(
            standards_df,
            domains   = list(domain_select.value),
            levels    = list(level_select.value),
            compliance= list(compl_select.value)
        )
        num_controls = len(filtered)
        if num_controls == 0:
            print("ℹ️ No controls match your filters.")
            return

        print(f"✅ {num_controls} controls match the filters.")
        display(filtered)

        # Save filtered JSON for download
        filtered.to_json('filtered_standards.json', orient='records', indent=2)
        files.download('filtered_standards.json')

        # Compute cost/time savings based on number of filtered controls
        savings = compute_savings(num_controls)
        savings_df = pd.DataFrame([{
            'NumControls':       num_controls,
            'ManualTime_min':    savings['ManualTime_min'],
            'AutoTime_min':      savings['AutoTime_min'],
            'TimeSaved_min':     savings['TimeSaved_min'],
            'ManualCost_$':      savings['ManualCost_$'],
            'AutoCost_$':        savings['AutoCost_$'],
            'CostSaved_$':       savings['CostSaved_$']
        }])
        print("\n--- Estimated Cost & Time Savings for Automated Scanning ---")
        display(savings_df)

        # Plot a side‐by‐side bar chart:
        fig, ax = plt.subplots(1, 2, figsize=(10,4))

        # 1) Time comparison
        ax[0].bar(['Manual','Automated'], [savings['ManualTime_min'], savings['AutoTime_min']],
                  color=['#d9534f','#5cb85c'])
        ax[0].set_ylabel("Time (minutes)")
        ax[0].set_title(f"Time: {num_controls} Controls")

        # 2) Cost comparison
        ax[1].bar(['Manual','Automated'], [savings['ManualCost_$'], savings['AutoCost_$']],
                  color=['#d9534f','#5cb85c'])
        ax[1].set_ylabel("Cost ($)")
        ax[1].set_title(f"Cost: {num_controls} Controls")

        plt.tight_layout()
        plt.show()

def on_scan(b):
    """
    When "Run Scan" is clicked:
      - Ensure standards_df is loaded
      - Load the uploaded config JSON
      - Merge and identify deviations
      - Display the full report and deviations
      - Offer download of deviations.json
    """
    with out_scan:
        clear_output()
        if standards_df is None:
            print("⚠️ Please upload & load a standards JSON first.")
            return

        cfg = load_json_upload(cfg_upload)
        if cfg is None:
            print("⚠️ No configuration file uploaded.")
            return

        # Convert cfg into a DataFrame
        try:
            if isinstance(cfg, dict):
                config_df = pd.DataFrame(cfg)
            else:
                config_df = pd.DataFrame(cfg)
        except Exception as e:
            print("🚫 Configuration JSON must be a list of objects with 'ControlID' & 'Status'.", e)
            return

        # Validate columns
        if not {'ControlID','Status'}.issubset(config_df.columns):
            print("🚫 Config JSON missing 'ControlID' or 'Status' fields.")
            return

        # Run scan
        report, deviations = scan_config(standards_df, config_df)
        print("📋 Full Compliance Report (JSON):")
        print(json.dumps(report, indent=2))

        print("\n🚩 Flagged Deviations:")
        if deviations.empty:
            print("✔️ All controls are fully implemented.")
        else:
            display(deviations)

        deviations.to_json('deviations.json', orient='records', indent=2)
        files.download('deviations.json')

# Wire callbacks to buttons
btn_load_std.on_click(on_load_std)
btn_apply_filters.on_click(on_apply_filters)
btn_scan.on_click(on_scan)

# ────────────────────────────────────────────────────────────────────────────────
# STEP 4: Display the Entire Interactive UI
# ────────────────────────────────────────────────────────────────────────────────
display(widgets.HTML("<h2>🔒 CMMC/NIST Compliance Uploader, Filter, & Scanner</h2>"))
display(widgets.VBox([std_upload, btn_load_std, out_std]))
display(widgets.HTML("<hr style='margin:10px 0;'/>"))
display(widgets.HTML("<b>Step 2: Filter Standards & Compute Savings</b>"))
display(widgets.VBox([
    domain_select, level_select, compl_select,
    btn_apply_filters, out_filtered
]))
display(widgets.HTML("<hr style='margin:10px 0;'/>"))
display(widgets.HTML("<b>Step 3: Upload Configuration & Run Compliance Scan</b>"))
display(widgets.VBox([cfg_upload, btn_scan, out_scan]))
