In [1]:
import os
import json
import csv
import pandas as pd
import json
import re
from collections import defaultdict

In [2]:
def load_config():
    # Get the absolute path of the project root (one directory up)
    project_root = os.path.abspath(os.path.join(os.getcwd(), '../..'))

    # Normalize the project_root to ensure it's correctly formatted
    project_root = os.path.normpath(project_root)
    
    config_path = os.path.join(project_root, 'config.json')

    if not os.path.exists(config_path):
        raise FileNotFoundError(f"Config file not found at expected location: {config_path}")

    with open(config_path, 'r') as f:
        config = json.load(f)

    return config, project_root

In [3]:
config, project_root = load_config()

In [4]:
csv_path_enterprise = os.path.normpath(os.path.join(project_root, config["data_directory"], config["file_paths_groups_v15"]["enterprise"]))
csv_path_mobile = os.path.normpath(os.path.join(project_root, config["data_directory"], config["file_paths_groups_v15"]["mobile"]))
csv_path_ics = os.path.normpath(os.path.join(project_root, config["data_directory"], config["file_paths_groups_v15"]["ics"]))

In [5]:
def load_groups(file_path, sheet_name='groups'):
    """
    Load groups from a MITRE ATT&CK Excel file.

    Parameters
    ----------
    file_path : str
        Path to the Excel file.
    sheet_name : str
        Sheet name containing group data.

    Returns
    -------
    dict
        Dictionary mapping group ID to name and associated aliases.
    """
    df = pd.read_excel(file_path, sheet_name=sheet_name)
    df = df[['ID', 'name', 'associated groups']].dropna(subset=['ID'])

    group_data = {}

    for _, row in df.iterrows():
        group_id = row['ID'].strip()
        name = row['name'].strip()
        aliases_raw = row.get('associated groups')

        aliases = []
        if pd.notna(aliases_raw):
            aliases = [alias.strip() for alias in aliases_raw.split(",") if alias.strip()]

        group_data[group_id] = {
            'name': name,
            'aliases': aliases
        }

    return group_data

In [6]:
enterprise_groups = load_groups(csv_path_enterprise)
ics_groups = load_groups(csv_path_mobile)
mobile_groups = load_groups(csv_path_ics)

In [7]:
def merge_group_data(primary, secondary):
    """
    Merge group data dictionaries by group ID.

    Parameters
    ----------
    primary : dict
        Base group data.
    secondary : dict
        New group data to merge into the base.
    """
    for group_id, info in secondary.items():
        if group_id not in primary:
            primary[group_id] = info
        else:
            # Merge aliases while preserving uniqueness
            existing_aliases = set(primary[group_id]['aliases'])
            new_aliases = set(info['aliases'])

            merged_aliases = sorted(existing_aliases.union(new_aliases))
            primary[group_id]['aliases'] = merged_aliases


In [8]:
 ##Merge ICS and Mobile into Enterprise
merge_group_data(enterprise_groups, ics_groups)
merge_group_data(enterprise_groups, mobile_groups)


In [9]:
len(enterprise_groups)

152

In [10]:
def load_json_metadata(metadata_path):
    """
    Loads the group metadata JSON file.

    Parameters
    ----------
    metadata_path : str
        Path to the JSON file containing group metadata.

    Returns
    -------
    dict
        Group metadata loaded from the file.
    """
    with open(metadata_path, 'r', encoding='utf-8') as f:
        metadata = json.load(f)
    return metadata

In [11]:
malpedia_actors_file = os.path.normpath(os.path.join(project_root, "group_profile_analysis", "malpedia_api_responses", "actors_data.json"))

In [12]:
malpedia_actors_metadata = load_json_metadata(malpedia_actors_file)

In [13]:
len(malpedia_actors_metadata)

800

In [15]:
# Load the MITRE group mappings (this should be the mappings of group_name ->aliases created from the Excel files)
attack_group_alias_file = os.path.normpath(os.path.join(project_root, "group_profile_analysis", "attack_malpedia_intersection", "attack_group_mapping.json"))

In [20]:
attack_group_alias_metadata =load_json_metadata(attack_group_alias_file)

In [21]:
# Function to normalize group names
def normalize_group_name(name):
    
    if not isinstance(name, str):
        return ''
    
    # Convert to lowercase for case-insensitive comparison
    name = name.lower().strip()

    # Remove 'team' from names like 'Sandworm Team'
    if name.endswith(' team'):
        name = name.replace(' team', '')

    # Replace 'threat group-' with 'tg-' (e.g., 'Threat Group-1314' -> 'TG-1314')
    name = re.sub(r'threat group[- ]', 'tg-', name)

    # Remove 'temp.' or similar prefixes (e.g., 'Temp.Pittytiger' -> 'Pittytiger')
    name = re.sub(r'^temp[\. ]+', '', name)

    # Normalize spaces and dots (e.g., 'pitty tiger' == 'pitty.tiger')
    name = re.sub(r'[\. ]+', ' ', name)

    # Remove common suffixes like 'framework' or 'group' (e.g., 'Inception Framework' -> 'Inception')
    name = re.sub(r' (framework|group)$', '', name)

    # Standardize 'Confucius' and 'Confucious' to 'confucius'
    name = re.sub(r'confucious', 'confucius', name)

    # Normalize specific known prefixes (apt, unc, g)
    name = re.sub(
        r'\b(apt|unc|g)[\s\.-]*([a-z]*)[\s\.-]*(\d{1,4})\b',
        lambda m: m.group(1) + m.group(2) + m.group(3),
        name
    )
    #re.sub(r'([a-z])[\s\.-]?(\d{2,4})', r'\1\2', name)


    return name

In [22]:
# Compare MITRE group names and synonyms with Malpedia actors
def find_intersection_with_malpedia_actors(malpedia_actors_data, mitre_group_mappings, output_file="group_intersection_output.json"):
    intersection = []  # Store MITRE groups that intersect with Malpedia actors
    unique_group_ids = set()  # To store unique group IDs for intersection
    mitre_groups_not_in_malpedia = []  # Store MITRE groups not found in Malpedia

        # Dictionary to store the results for the JSON output
    intersection_data = {}

    # Loop through each MITRE group in mitre_group_mappings
    for group_id, group_info in mitre_group_mappings.items():
        group_name = normalize_group_name(group_info['name'])  # Normalize group name
        associated_groups = group_info.get('aliases', '')

        # Normalize associated_groups and ensure it's a list for iteration
        if isinstance(associated_groups, float) or associated_groups is None:
            associated_groups = ''
        if isinstance(associated_groups, str):
            associated_groups = [ag.strip() for ag in associated_groups.split(',')]

        #associated_groups = [normalize_group_name(ag) for ag in associated_groups] if isinstance(associated_groups, list) else [normalize_group_name(associated_groups)]
        associated_groups = [normalize_group_name(ag) for ag in associated_groups]

        found_match = False

        # Loop through each actor in Malpedia actors data
        for actor_id, actor_info in malpedia_actors_data.items():
            actor_name = normalize_group_name(actor_info['value'])  # Normalize actor name
            synonyms = [normalize_group_name(synonym) for synonym in actor_info.get('meta', {}).get('synonyms', [])]  # Normalize synonyms

            # Check if group_name or associated_groups match with Malpedia actor name or synonyms
            if group_name == actor_name or group_name in synonyms:
                intersection.append((group_name, actor_name, group_id))  # Found intersection with group_name
                unique_group_ids.add(group_id)
                found_match = True

                # Add data to intersection_data for JSON output
                intersection_data[group_id] = {
                    'MITRE Group Name': group_name,
                    'MITRE Associated Names': associated_groups,
                    'Malpedia Actor Name': actor_name,
                    'Malpedia Aliases': synonyms
                }
                
                break  # No need to check further if match is found
            elif any(ag == actor_name or ag in synonyms for ag in associated_groups):
                intersection.append((group_name, actor_name, group_id))  # Found intersection with associated group
                unique_group_ids.add(group_id)
                found_match = True

                # Add data to intersection_data for JSON output
                intersection_data[group_id] = {
                    'MITRE Group Name': group_name,
                    'MITRE Associated Names': associated_groups,
                    'Malpedia Actor Name': actor_name,
                    'Malpedia Aliases':synonyms
                }
                
                break

        # If no match was found for this MITRE group, add it to the "not in Malpedia" list
        if not found_match:
            mitre_groups_not_in_malpedia.append((group_name, group_id))

          # Write the intersection data to a JSON file
    with open(output_file, 'w') as json_file:
        json.dump(intersection_data, json_file, indent=4)

    return intersection, unique_group_ids, mitre_groups_not_in_malpedia


In [23]:
# Find the intersection
intersection, unique_group_ids, mitre_groups_not_in_malpedia = find_intersection_with_malpedia_actors(malpedia_actors_metadata, attack_group_alias_metadata, output_file="group_intersection_output.json")


In [19]:
# Function to read the JSON and count the number of keys
def count_json_keys(json_file_path):
    # Open and load the JSON file
    with open(json_file_path, 'r') as file:
        data = json.load(file)

    # Count the number of keys in the JSON data (which is a dictionary)
    num_keys = len(data)
    return num_keys
# Example usage
json_file_path = 'group_intersection_output.json'  # Replace with your actual file path
num_keys = count_json_keys(json_file_path)

print(f"Number of keys in the JSON file: {num_keys}")

Number of keys in the JSON file: 145


In [61]:
# Load your intersection JSON
with open('group_intersection_output.json', 'r') as f:
    data = json.load(f)

# Step 1: Create a reverse map → Malpedia actor → list of MITRE group IDs
actor_to_groups = defaultdict(list)

for group_id, group_info in data.items():
    actor_name = group_info['Malpedia Actor Name']
    actor_to_groups[actor_name].append(group_id)

# Step 2: Identify collisions (actors with multiple MITRE groups)
collisions = {actor: group_ids for actor, group_ids in actor_to_groups.items() if len(group_ids) > 1}

# Optional: Print or format the results
print("Collisions - Malpedia actors mapped to multiple MITRE groups:")
for actor, group_ids in collisions.items():
    print(f"- {actor}: {group_ids}")

Collisions - Malpedia actors mapped to multiple MITRE groups:
- apt17: ['G0025', 'G0001']
- apt19: ['G0073', 'G0009']
- apt30: ['G0013', 'G0030']
- lazarus: ['G0082', 'G0138', 'G0032']
- apt41: ['G0096', 'G0044']
- earth lusca: ['G0143', 'G1006']
- fin7: ['G0008', 'G0046']
- dragonok: ['G0017', 'G0002']
- mustang panda: ['G1014', 'G0129']


In [62]:
mitre_groups_not_in_malpedia

[('blue mockingbird', 'G0108'),
 ('chimera', 'G0114'),
 ('lazyscripter', 'G0140'),
 ('the white company', 'G0089'),
 ('tg-1314', 'G0028'),
 ('windigo', 'G0124'),
 ('cyberav3ngers', 'G1027')]