In [17]:
# from google.colab import drive

# Mount Google Drive
# drive.mount('/content/gdrive')

# Unmount Google Drive
# drive.flush_and_unmount()

**Required modules**
---


1.   attackcti
2.   mitreattack-python library
3.   scikit-learn
4.   imblanced-learn

In [18]:
# install library to interact with mitre attack framework
!pip install attackcti 
!pip install mitreattack-python
!pip install scikit-learn
!pip install imbalanced-learn



In [20]:
!pip show mitreattack-python
# !ls /usr/local/lib/python3.10/dist-packages/mitreattack/
# help() #will ask to type in module , from here we can type in mitreattack


Name: mitreattack-python
Version: 3.0.8
Summary: MITRE ATT&CK python library
Home-page: https://github.com/mitre-attack/mitreattack-python/
Author: MITRE ATT&CK, MITRE Corporation
Author-email: attack@mitre.org


License: Apache 2.0
Location: C:\Users\tzhiq\AppData\Local\Programs\Python\Python312\Lib\site-packages
Requires: colour, deepdiff, drawsvg, loguru, Markdown, numpy, openpyxl, pandas, Pillow, pooch, python-dateutil, requests, rich, stix2, stix2-elevator, tabulate, taxii2-client, tqdm, typer, xlsxwriter
Required-by: 


**Retrieve Dataset using mitreattack-python library**
---
Dataset Description

---
1.   apt_groups: List of groups found in Mitre Attack framework
2.   attack_relationships: List of all the relationship listed in Mitre Attack framework
3.   attack_techniques: List of all the techniques listed in Mitre Attack framework



In [21]:
from mitreattack.attackToExcel import attackToExcel, stixToDf

# Step 1: Download and parse ATT&CK STIX data
attackdata = attackToExcel.get_stix_data("enterprise-attack")

# Step 2: Get Pandas DataFrame for techniques, groups & relationship
techniques_data = stixToDf.techniquesToDf(attackdata, "enterprise-attack")
groups_data = stixToDf.groupsToDf(attackdata)
relationships_data = stixToDf.relationshipsToDf(attackdata)

df_techniques = techniques_data["techniques"]
df_groups = groups_data["groups"]
df_relationships = relationships_data["relationships"]

# Step 3: Save the DataFrame to a CSV file respectively
techniques_csv_file = '/content/attack_techniques.csv' # Replace with your file path
df_techniques.to_csv(techniques_csv_file, index=False)
print(f"Technique CSV file created at: {techniques_csv_file}")

groups_csv_file = '/content/apt_groups.csv' # Replace with your file path
df_groups.to_csv(groups_csv_file, index=False)
print(f"Group CSV file created at: {groups_csv_file}")

relationships_csv_file = '/content/attack_relationships.csv' # Replace with your file path
df_relationships.to_csv(relationships_csv_file, index=False)
print(f"Relationship CSV file created at: {relationships_csv_file}")


[32m2024-11-26 18:42:48.914[0m | [1mINFO    [0m | [36mmitreattack.attackToExcel.attackToExcel[0m:[36mget_stix_data[0m:[36m69[0m - [1mDownloading ATT&CK data from github.com/mitre/cti[0m
parsing techniques: 100%|██████████| 656/656 [00:00<00:00, 2877.48it/s]
parsing relationships for type=technique: 100%|██████████| 19163/19163 [00:00<00:00, 24838.70it/s]
parsing groups: 100%|██████████| 159/159 [00:00<00:00, 74839.45it/s]
parsing relationships for type=group: 100%|██████████| 19163/19163 [00:00<00:00, 104484.93it/s]
parsing all relationships: 100%|██████████| 19163/19163 [00:00<00:00, 25248.83it/s]


OSError: Cannot save file into a non-existent directory: '\content'

In [None]:
# Step 4: Optional - Verify the CSV content
import pandas as pd
df = pd.read_csv(techniques_csv_file)
print(df.head())  # Display the first few rows to verify the data

          ID                                            STIX ID  \
0      T1548  attack-pattern--67720091-eee3-4d2d-ae16-826456...   
1  T1548.002  attack-pattern--120d5519-3098-4e1c-9191-2aa612...   
2  T1548.004  attack-pattern--b84903f0-c7d5-435d-a69e-de47cc...   
3  T1548.001  attack-pattern--6831414d-bb70-42b7-8030-d4e06b...   
4  T1548.003  attack-pattern--1365fe3b-0f50-455d-b4da-266ce3...   

                                                name  \
0                  Abuse Elevation Control Mechanism   
1  Abuse Elevation Control Mechanism: Bypass User...   
2  Abuse Elevation Control Mechanism: Elevated Ex...   
3  Abuse Elevation Control Mechanism: Setuid and ...   
4  Abuse Elevation Control Mechanism: Sudo and Su...   

                                         description  \
0  Adversaries may circumvent mechanisms designed...   
1  Adversaries may bypass UAC mechanisms to eleva...   
2  Adversaries may leverage the <code>Authorizati...   
3  An adversary may abuse configurat

**Data Preprocessing**
---


*   Identify Key Features
*   Remove any unnecessary columns
*   Handle missing values
*   Normalize text fields

1.   aptgroup_relationship.csv: Filtered list of all techniques that are used by each APT group


In [None]:
import pandas as pd
import numpy as np

# Load the CSVs
techniques_df = pd.read_csv(techniques_csv_file)
relationships_df = pd.read_csv(relationships_csv_file)
groups_df = pd.read_csv(groups_csv_file)

# Replace empty strings or strings with only spaces with NaN
techniques_df.replace(r'^\s*$', np.nan, regex=True, inplace=True)
techniques_df["supports remote"] = techniques_df["supports remote"].fillna("FALSE")
relationships_df.replace(r'^\s*$', np.nan, regex=True, inplace=True)
groups_df.replace(r'^\s*$', np.nan, regex=True, inplace=True)

# Columns to drop in each csv file
techniques_dropped_columns = ["STIX ID", "created", "url", "last modified", "domain", "version", "detection", "data sources", "contributors", "defenses bypassed", "permissions required", "system requirements", "impact type", "effective permissions", "relationship citations"]
relationships_dropped_columns = ["source ref", "source type", "mapping type", "target ref", "target type", "STIX ID", "created", "last modified"]
groups_dropped_columns = ["STIX ID", "created", "url", "last modified", "domain", "version", "contributors", "associated groups", "associated groups citations", "relationship citations"]

# Drop the columns in each csv file
techniques_df = techniques_df.drop(columns=techniques_dropped_columns)
relationships_df = relationships_df.drop(columns=relationships_dropped_columns)
groups_df = groups_df.drop(columns=groups_dropped_columns)

# Filter rows where the source ID starts with "G" and target ID starts with "T" in relationship.csv
# Find techniques that are used by each APT group
aptgroup_relationships_df = relationships_df[
    relationships_df['source ID'].str.startswith('G') & relationships_df['target ID'].str.startswith('T')
]

# Save the modified CSV
techniques_df.to_csv('/content/attack_techniques_cleaned.csv', index=False) # Replace with your file path
relationships_df.to_csv('/content/attack_relationships_cleaned.csv', index=False) # Replace with your file path
groups_df.to_csv('/content/apt_groups_cleaned.csv', index=False) # Replace with your file path
aptgroup_relationships_df.to_csv('/content/aptgroup_relationships.csv', index=False) # Replace with your file path


In [None]:
# Combine apt_groups_relationship with attack_techniques
# Define the column names for the common key in each file
techniques_key = 'ID'  # Column name in techniques.csv
apt_groups_key = 'target ID'  # Column name in apt_groups.csv

# Define the columns to merge and their new names
columns_to_merge = {
    'description': 'technique description',
    'tactics': 'technique tactics',
    'platforms': 'technique platforms',
    'is sub-technique': 'is sub-technique of target',
    'sub-technique of': 'target sub-technique of',
    'supports remote': 'technique supports remote'
}

# Select only the necessary columns from techniques.csv
columns_to_keep = [techniques_key] + list(columns_to_merge.keys())
techniques_subset = techniques_df[columns_to_keep]

# Merge the DataFrames
updated_aptgroup_relationships_df = pd.merge(
    aptgroup_relationships_df,
    techniques_subset,
    how='left',
    left_on=apt_groups_key,
    right_on=techniques_key
)

# Rename the columns as defined in columns_to_merge
updated_aptgroup_relationships_df.rename(columns=columns_to_merge, inplace=True)

# Optionally, drop the redundant key column from techniques.csv
updated_aptgroup_relationships_df.drop(columns=[techniques_key], inplace=True)

# Rename the columns for better reading of csv
updated_aptgroup_relationships_df.rename(columns={'source ID': 'group ID'}, inplace=True)
updated_aptgroup_relationships_df.rename(columns={'source name': 'group name'}, inplace=True)
updated_aptgroup_relationships_df.rename(columns={'target ID': 'technique ID'}, inplace=True)
updated_aptgroup_relationships_df.rename(columns={'target name': 'technique name'}, inplace=True)
updated_aptgroup_relationships_df.rename(columns={'mapping description': 'group mapping description'}, inplace=True)
updated_aptgroup_relationships_df.rename(columns={'target name': 'technique name'}, inplace=True)

# Save the updated DataFrame to a new CSV file
updated_aptgroup_relationships_df.to_csv('/content/updated_aptgroup_relationships.csv', index=False) # Replace with your file path

# Print the first few rows to verify
print(updated_aptgroup_relationships_df.head())

  group ID group name technique ID                   technique name  \
0    G0099   APT-C-36        T1105            Ingress Tool Transfer   
1    G0099   APT-C-36    T1204.002                   Malicious File   
2    G0099   APT-C-36    T1036.004       Masquerade Task or Service   
3    G0099   APT-C-36        T1571                Non-Standard Port   
4    G0099   APT-C-36        T1027  Obfuscated Files or Information   

                           group mapping description  \
0  [APT-C-36](https://attack.mitre.org/groups/G00...   
1  [APT-C-36](https://attack.mitre.org/groups/G00...   
2  [APT-C-36](https://attack.mitre.org/groups/G00...   
3  [APT-C-36](https://attack.mitre.org/groups/G00...   
4  [APT-C-36](https://attack.mitre.org/groups/G00...   

                               technique description    technique tactics  \
0  Adversaries may transfer tools or other files ...  Command and Control   
1  An adversary may rely upon a user opening a ma...            Execution   
2  Ad

# **Random Forest**



In [None]:
# Random Forest Model
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, accuracy_score
from imblearn.over_sampling import SMOTE
from collections import Counter
import numpy as np

In [None]:
# Load the updated dataset
file_path = 'updated_aptgroup_relationships.csv' # Replace with your file path
updated_df = pd.read_csv(file_path)

# Sub-Technique Classification


In [None]:
# Define features and target for classification
features = updated_df[["group ID", "technique ID", "technique tactics", "technique platforms"]].copy()
target = updated_df["is sub-technique of target"].astype(int)  # Binary classification

# Encode non-numeric features
non_numeric_cols = features.select_dtypes(include=['object']).columns.tolist()
for col in non_numeric_cols:
    features[col] = features[col].astype('category').cat.codes

# Train/Test split
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.3, random_state=42)

# Train Random Forest Classifier
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
rf_classifier.fit(X_train, y_train)

# Evaluate the Classifier
y_pred = rf_classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)

# Print Results
print("\n=== Sub-Technique Classification ===")
print("Accuracy:", accuracy)
print("\nClassification Report:\n", report)

# Feature Importance
feature_importances = pd.DataFrame({
    "Feature": features.columns,
    "Importance": rf_classifier.feature_importances_
}).sort_values(by="Importance", ascending=False)

print("\nFeature Importances:")
print(feature_importances)


=== Sub-Technique Classification ===
Accuracy: 0.9770114942528736

Classification Report:
               precision    recall  f1-score   support

           0       0.97      0.97      0.97       431
           1       0.98      0.98      0.98       700

    accuracy                           0.98      1131
   macro avg       0.98      0.97      0.98      1131
weighted avg       0.98      0.98      0.98      1131


Feature Importances:
               Feature  Importance
1         technique ID    0.505340
3  technique platforms    0.227755
2    technique tactics    0.203007
0             group ID    0.063899


# Sub-Technique to Main Technique Classification

In [None]:
# Filter for sub-techniques and create a copy
df_sub_techniques = updated_df[updated_df["technique ID"].str.contains(r"\.\d+$", regex=True)].copy()

# Extract main techniques
df_sub_techniques["main technique ID"] = df_sub_techniques["technique ID"].str.split(".").str[0]

# Normalize text columns
df_sub_techniques["technique name"] = df_sub_techniques["technique name"].str.strip().str.lower()
df_sub_techniques["main technique ID"] = df_sub_techniques["main technique ID"].str.strip()

# Encode IDs
label_encoder_main = LabelEncoder()
label_encoder_sub = LabelEncoder()
df_sub_techniques["main technique ID"] = label_encoder_main.fit_transform(df_sub_techniques["main technique ID"])
df_sub_techniques["technique ID"] = label_encoder_sub.fit_transform(df_sub_techniques["technique ID"])

# Split data
X = df_sub_techniques[["main technique ID"]]
y = df_sub_techniques["technique ID"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Check class distribution
# print("Class distribution in y_train before filtering:")
# print(Counter(y_train))

# Remove classes with fewer than 2 samples
class_counts = y_train.value_counts()
valid_classes = class_counts[class_counts > 1].index
X_train_filtered = X_train[y_train.isin(valid_classes)]
y_train_filtered = y_train[y_train.isin(valid_classes)]

# Handle class imbalance using SMOTE
smote = SMOTE(random_state=42, k_neighbors=1)  # Reduced neighbors to handle small classes
X_train_resampled, y_train_resampled = smote.fit_resample(X_train_filtered, y_train_filtered)

# Train the model
rf_classifier = RandomForestClassifier(random_state=42, class_weight="balanced")
rf_classifier.fit(X_train_resampled, y_train_resampled)

# Evaluate the model
y_pred = rf_classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy * 100:.2f}%\n")
print("Classification Report:\n", classification_report(y_test, y_pred, zero_division=0))

# Prediction function
def predict_sub_techniques(main_technique_id):
    """Predict the sub-techniques for a given main technique."""
    main_technique_id = main_technique_id.strip()
    if main_technique_id not in label_encoder_main.classes_:
        return f"Main technique ID '{main_technique_id}' not found."

    encoded_main_technique = np.array([[label_encoder_main.transform([main_technique_id])[0]]])
    predicted_sub_technique_class = rf_classifier.predict(encoded_main_technique)[0]
    predicted_sub_technique = label_encoder_sub.inverse_transform([predicted_sub_technique_class])[0]
    return f"The predicted sub-technique for main technique '{main_technique_id}' is: {predicted_sub_technique}"

# Test the prediction function
# example_main_technique_id = "T1584"
# result = predict_sub_techniques(example_main_technique_id)
# print(result)


Model Accuracy: 35.78%

Classification Report:
               precision    recall  f1-score   support

           2       0.00      0.00      0.00         1
           3       0.00      0.00      0.00         7
           4       0.06      1.00      0.12         1
           5       0.00      0.00      0.00         1
           6       0.00      0.00      0.00         4
           7       0.00      0.00      0.00         2
           9       1.00      1.00      1.00         4
          11       0.00      0.00      0.00        12
          12       0.00      0.00      0.00         8
          13       0.12      1.00      0.22         3
          15       0.00      0.00      0.00         1
          17       0.00      0.00      0.00         3
          18       0.00      0.00      0.00         6
          19       0.00      0.00      0.00         4
          20       0.00      0.00      0.00         1
          21       0.05      1.00      0.10         2
          22       0.00      0.00

PLATFORMS EXPLOITED BY TECHNIQUES

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
from sklearn.preprocessing import MultiLabelBinarizer, LabelEncoder

# Load the dataset
file_path = 'updated_aptgroup_relationships.xlsx' # Replace with your file path
df = pd.read_excel(file_path, sheet_name='updated_aptgroup_relationships')

# Preprocess the 'technique platforms' column
df['technique platforms'] = df['technique platforms'].str.split(', ')

# Use MultiLabelBinarizer to encode multiple platforms
mlb = MultiLabelBinarizer()
platforms_encoded = mlb.fit_transform(df['technique platforms'])

# Encode categorical features (e.g., techniques, group names)
le_techniques = LabelEncoder()
le_group = LabelEncoder()

df['encoded_tech'] = le_techniques.fit_transform(df['technique name'])
# df['encoded_group'] = le_group.fit_transform(df['group name'])

# Combine features for the model
X = df[['encoded_tech']]
y = platforms_encoded

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train the Random Forest classifier
rf_model = RandomForestClassifier(random_state=42, n_estimators=100, max_depth=None)
rf_model.fit(X_train, y_train)

# Make predictions
y_pred = rf_model.predict(X_test)

# Evaluate the model
report = classification_report(y_test, y_pred, target_names=mlb.classes_)
print("Classification Report:\n", report)

# Compute and display accuracy
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy * 100:.2f}%")

# Optional: Save the model and encoders for future use
import pickle
with open('rf_model.pkl', 'wb') as model_file: # Replace with your file path
    pickle.dump(rf_model, model_file)
with open('mlb_encoder.pkl', 'wb') as mlb_file: # Replace with your file path
    pickle.dump(mlb, mlb_file)
with open('techniques_encoder.pkl', 'wb') as techniques_file: # Replace with your file path
    pickle.dump(le_techniques, techniques_file)
with open('group_encoder.pkl', 'wb') as group_file: # Replace with your file path
    pickle.dump(le_group, group_file)

# ====== TESTING SECTION ======

# Example test input
test_technique = "Ingress Tool Transfer"  # Replace with desired tech
# test_group = "APT-C-36"       # Replace with desired group name

# Encode the test input
encoded_technique = le_techniques.transform([test_technique])[0]
# encoded_group = le_group.transform([test_group])[0]

# Create input for the model
test_input = [[encoded_technique]]

# Predict platforms
predicted_platforms = rf_model.predict(test_input)

# Decode the predicted platforms
decoded_platforms = mlb.inverse_transform(predicted_platforms)
print(f"Predicted platforms for technique '{test_technique}' ': {decoded_platforms}")

Classification Report:
                    precision    recall  f1-score   support

       Containers       1.00      0.92      0.96        65
             IaaS       0.99      0.98      0.98        83
Identity Provider       0.93      0.96      0.94        53
            Linux       0.98      1.00      0.99       518
          Network       0.97      0.97      0.97       179
     Office Suite       0.92      0.98      0.95        61
              PRE       0.92      0.94      0.93        65
             SaaS       0.93      0.98      0.95        52
          Windows       0.99      0.99      0.99       680
            macOS       0.98      0.99      0.99       522

        micro avg       0.98      0.99      0.98      2278
        macro avg       0.96      0.97      0.97      2278
     weighted avg       0.98      0.99      0.98      2278
      samples avg       0.98      0.98      0.98      2278

Model Accuracy: 96.42%
Predicted platforms for technique 'Ingress Tool Transfer' ': [('L



WHICH GROUP USE WHICH TECH

In [None]:
# Import necessary libraries
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

# Load the Excel file
file_path = 'updated_aptgroup_relationships.xlsx' # Replace with your file path
df = pd.read_excel(file_path)

# Filter relevant columns and drop NaN values for training
df = df.dropna(subset=["technique name", "group name"])

# Encode the "group name" and "technique name" columns
group_name_encoder = LabelEncoder()
technique_name_encoder = LabelEncoder()

df["group name"] = group_name_encoder.fit_transform(df["group name"])
df["technique name"] = technique_name_encoder.fit_transform(df["technique name"])

# Split the data into features and target variable
X = df[["technique name"]]
y = df["group name"]

# Train a RandomForestClassifier with the technique names as input
rf_classifier = RandomForestClassifier(random_state=42)
rf_classifier.fit(X, y)

# Evaluate the model's accuracy
y_pred = rf_classifier.predict(X)
accuracy = accuracy_score(y, y_pred)
print(f"Model Accuracy: {accuracy * 100:.2f}%")

# Define a function to predict groups based on technique names
def predict_groups_by_technique_name(technique_name_input):
    """Predict up to 5 APT groups for a given technique name."""
    if technique_name_input not in technique_name_encoder.classes_:
        return f"Technique '{technique_name_input}' not found in the dataset."

    # Encode the input technique name
    encoded_technique_name = technique_name_encoder.transform([technique_name_input])[0]

    # Predict probabilities for all classes
    probabilities = rf_classifier.predict_proba([[encoded_technique_name]])[0]

    # Get indices of top 5 probabilities
    top_indices = probabilities.argsort()[-5:][::-1]
    predicted_groups = group_name_encoder.inverse_transform(top_indices)
    predicted_probs = [probabilities[i] for i in top_indices]

    # Format the results
    results = [
        f"{group} (probability: {prob:.2f})"
        for group, prob in zip(predicted_groups, predicted_probs)
    ]

    return f"The following APT groups are predicted to use the technique '{technique_name_input}':\n" + "\n".join(results)

# Test the function with an example input
example_technique = "Malicious File"
response = predict_groups_by_technique_name(example_technique)
print(response)


Model Accuracy: 11.44%
The following APT groups are predicted to use the technique 'Malicious File':
Windshift (probability: 0.02)
Molerats (probability: 0.02)
Sidewinder (probability: 0.01)
FIN8 (probability: 0.01)
Elderwood (probability: 0.01)


