# **Evaluation for GenAI-Powered STIX 2.1 Generator**

**Notebook Version:** 6.0  
**Author:** Giulio Triggiani  
**Python Version:** >= 3.8  
**Key Libraries:** `stix2`

---

## **Objective**
The objective is to **quantitatively** evaluate the performance of GenAI_STIX_2_1_Generator, an LLM-based tool for the automatic generation of Cyber Threat Intelligence reports in STIX 2.1 format.

To measure the effectiveness of the generator, this script compares a bundle automatically generated by the tool with a reference bundle (‚Äúground truth‚Äù) manually created by a CTI analyst.

The analysis leverages the advanced features of the official stix2 library, performing in particular a comprehensive semantic comparison of the bundle graph to identify not only literal matches but also semantic ones.

The data obtained from these comparisons are then used to calculate the overall averages of the bundles compared.

## **Workflow Overview**

1.   **Setup**: installs the Python libraries needed for validating and manipulating STIX objects;
2.   **Libraries and Environment**: imports the required modules and mount Google Drive to access the file;
3.   **Support Functions**: This cell contains the main functions that perform validation and display the data in tabular form;
5.   **Comparison**: performs the entire process: it browses folders, loads bundles, compares them, and prints the results.


## **Part 1**: Setup

This block installs the Python libraries needed for validating and manipulating STIX objects.

In [None]:
print("--- Installing dependencies ---")
!pip install stix2[semantic] --quiet
!pip install stix2-validator --quiet
print("Installation complete.")

## **Part 2**: Importing Libraries and Setting Up the Environment

This block imports the required modules and mounts Google Drive to access the files.

NOTE: The first time you will be asked for access to Google Drive.

In [None]:
import os
import json
from collections import defaultdict
import pandas as pd
from IPython.display import display

# Import from the stix2 library
from stix2 import parse, MemoryStore
from stix2.equivalence.graph import graph_similarity
from stix2 import Relationship
from stix2validator import validate_file, print_results

from google.colab import drive

print("--- Google Drive Mount ---")
drive.mount('/content/drive', force_remount=True)
print("Google Drive mounted correctly.")

# --- CONFIGURATION ---
BASE_DRIVE_DIR = '/content/drive/MyDrive/Reports_Evaluation'
OUTPUT_DIR = '/content/drive/MyDrive/Evaluation_Results'

## **Part 3**: Support Functions
This cell contains the main functions that perform validation and display the data in tabular form.

Bundles are validated according to the STIX 2.1 standard.

In [None]:
def validate_bundle(file_path):
    """ Validates an STIX 2.1 bundle. """
    print(f"\nValidating the bundle: {os.path.basename(file_path)}...")
    results = validate_file(file_path)
    if results.is_valid:
        print(f"‚úÖ Validation of {os.path.basename(file_path)} success.")
        return True
    else:
        print(f"‚ùå Validation ERROR for {os.path.basename(file_path)}:")
        print_results(results)
        return False

def display_scores_as_table(json_file_path):
    """
    Reads a JSON file with prop_scores, processes it,
    displays the results, and RETURNS the calculated statistics.
    """
    # Set pandas to display all rows of the DataFrame and display the data in tabular form.
    pd.set_option('display.max_rows', None)
    try:
        with open(json_file_path, 'r') as f: data = json.load(f)

        summary = data.get('summary', {})
        if not summary:
            print("The JSON file does not contain the ‚Äòsummary‚Äô section or it is empty..")
            return None, None, None

        # Extracts relevant information from the ‚Äòsummary‚Äô dictionary
        results_list = [{"Type Object": k.split('--')[0], "Object ID (Generated)": v.get('lhs'),
                         "Object ID (Ground Truth)": v.get('rhs'), "Match Score (%)": v.get('value', 0)}
                        for k, v in summary.items()]

        # Create a DataFrame with pandas
        df = pd.DataFrame(results_list)

        # Sort the DataFrame by score in descending order
        df_sorted = df.sort_values(by="Match Score (%)", ascending=False).reset_index(drop=True)

        # Format the score to display it with two decimal places.
        df_sorted["Match Score (%)"] = df_sorted["Match Score (%)"].map('{:,.2f}'.format)

        print("\n--- Detailed Analysis of Match Scores ---")
        # View the table in the notebook
        display(df_sorted)

        # Calculate the statistics to be returned
        df['Match Score (%)'] = pd.to_numeric(df['Match Score (%)'])
        summary_stats = df['Match Score (%)'].describe()
        avg_score_by_type = df.groupby('Type Object')['Match Score (%)'].mean()

        print("\n--- Summary Statistics (Single Bundle) ---")
        display(summary_stats.to_frame().style.format('{:,.2f}'))
        print("\n--- Average Score by Item Type (Single Bundle) ---")
        display(avg_score_by_type.to_frame().sort_values(by="Match Score (%)", ascending=False).style.format('{:,.2f}'))

        # Returns the calculated data
        return summary_stats, avg_score_by_type, df

    except Exception as e:
        print(f"‚ùå ERROR in display_scores_as_table: {e}")
        return None, None, None


## **Part 4**: Main Logic of Comparison
This is the main cell that performs the entire process: it browses folders, loads bundles, compares them, and prints the results.

NOTE: All STIX Indicator objects with a `pattern-type` other than `stix` are excluded from the comparison.


In [None]:
def calculate_single_graph_score(generated_bundle_path, expert_bundle_path, full_output_path):
    """
    Runs graph_similarity, captures the prop_scores, saves them to a JSON file and shows averages in tabular form.
    """
    if not validate_bundle(generated_bundle_path) or not validate_bundle(expert_bundle_path):
        return None, None, None, None

    # Load bundles using json.load()
    with open(generated_bundle_path, 'r') as f: gen_bundle_dict = json.load(f)
    with open(expert_bundle_path, 'r') as f: exp_bundle_dict = json.load(f)

    # Isolate and remove special markers to avoid the parser bug
    gen_objects_to_compare = [obj for obj in gen_bundle_dict.get('objects', []) if not (obj.get('type') == 'indicator' and obj.get('pattern_type') != 'stix')]
    exp_objects_to_compare = [obj for obj in exp_bundle_dict.get('objects', []) if not (obj.get('type') == 'indicator' and obj.get('pattern_type') != 'stix')]

    ds_generated = MemoryStore(stix_data=gen_objects_to_compare)
    ds_ground_truth = MemoryStore(stix_data=exp_objects_to_compare)

    prop_scores = {}

    print("\n--- Performing graph-based comparison... ---")
    try:
        score = graph_similarity(ds_generated, ds_ground_truth, prop_scores=prop_scores)

        print(f"‚úÖ Graph Similarity Score: {score:.2f}%")

        # Save to file
        with open(full_output_path, 'w') as f:
            json.dump(prop_scores, f, indent=4)
        print(f"‚úÖ Detailed scores saved to file: {full_output_path}")

        # Capture results from the display function
        summary_stats, avg_score_by_type, single_df = display_scores_as_table(full_output_path)

        # Returns the overall score and detailed statistics
        return score, summary_stats, avg_score_by_type, single_df

    except Exception as e:
        print(f"‚ùå ERROR while executing graph_similarity: {e}")
        return None, None, None, None # Returns None in case of error


# --- SCRIPT EXECUTION & FINAL AGGREGATION ---

# List for collecting the results of each analysis
all_results = []
all_dataframes = []

if not os.path.exists(BASE_DRIVE_DIR):
    print(f"‚ùå ERRORE: La directory specificata non esiste: {BASE_DRIVE_DIR}")
else:
    # Create the output folder if it does not exist
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # Itera on all bunItera on all bundle foldersdle folders
    for dirname in sorted(os.listdir(BASE_DRIVE_DIR)):
        dirpath = os.path.join(BASE_DRIVE_DIR, dirname)
        if os.path.isdir(dirpath):
            print(f"\n{'='*25} Analyzing: {dirname} {'='*25}")
            ground_truth_path, predicted_path = None, None
            try:
                for filename in os.listdir(dirpath):
                    if filename.endswith("_gt.json"): ground_truth_path = os.path.join(dirpath, filename)
                    elif filename.endswith("_pred.json"): predicted_path = os.path.join(dirpath, filename)
            except FileNotFoundError:
                print(f"Unable to access the folder '{dirname}'."); continue

            if ground_truth_path and predicted_path:
                output_filename = f"prop_scores_{dirname}.json"
                full_output_path = os.path.join(OUTPUT_DIR, output_filename)

                # Run the analysis and capture the results
                score, summary_stats, avg_score_by_type, single_df = calculate_single_graph_score(predicted_path, ground_truth_path, full_output_path)

                # Add the results to the list (only if the analysis was successful)
                if score is not None and single_df is not None:
                    all_results.append({
                        "bundle": dirname,
                        "graph_similarity_score": score,
                    })
                    all_dataframes.append(single_df)
            else:
                print(f"Bundle pair not found.")

# --- FINAL AGGREGATE ANALYSIS ---
if all_results and all_dataframes:
    print(f"\n\n{'='*30} FINAL AGGREGATE ANALYSIS {'='*30}")

    # Calculate the overall average similarity score
    total_graph_score = sum(res['graph_similarity_score'] for res in all_results)
    average_graph_score = total_graph_score / len(all_results)
    print(f"\n--- Average Graph Similarity Score (on {len(all_results)} bundle) ---")
    print(f"üìä Overall Average Score: {average_graph_score:.2f}%")

    # Create a unique DataFrame with ALL objects from ALL bundles
    master_df = pd.concat(all_dataframes, ignore_index=True)

    # Calculate and display overall summary statistics
    print(f"\n--- Summary Statistics (on {len(master_df)} total objects) ---")
    total_summary_stats = master_df['Match Score (%)'].describe()
    display(total_summary_stats.to_frame().style.format('{:,.2f}'))

    # Calculate and display average performance by object type across ALL objects
    print("\n--- Average Performance by Object Type (across all objects) ---")
    final_avg_by_type = master_df.groupby('Type Object')['Match Score (%)'].mean().sort_values(ascending=False)

    final_df_avg_type = final_avg_by_type.reset_index()
    final_df_avg_type.columns = ['Type Object', 'Average Score (%)']

    # Display the final table
    display(final_df_avg_type.style.format({'Average Score (%)': '{:,.2f}'}).hide(axis="index"))
else:
    print("\nNo valid results to aggregate.")