In [13]:
import sys
sys.path.append ('..')
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler



In [14]:
report_data = pd.read_pickle ('../data/test/report_data.pkl')
look_up_table = pd.read_pickle ('../data/lookup_tables/m06d_1_2_a.pkl')

In [3]:
look_up_table

Unnamed: 0,technique_ID,sorted_similar_techniques,technique_earliest_stage,technique_latest_stage
0,T1548,"[T1218.002, T1553, T1563.001, T1563.002, T1218...",4,5
1,T1548.002,"[T1055.001, T1055.012, T1091, T1210, T1218.007...",4,5
2,T1548.004,"[T1134.003, T1216, T1134.002, T1134.001, T1553...",4,5
3,T1548.001,"[T1006, T1222.002, T1222.001, T1134.002, T1564...",4,5
4,T1548.003,"[T1134, T1564.002, T1553.004, T1222.001, T1555...",4,5
...,...,...,...,...
620,T1102.002,"[T1560, T1555, T1102, T1087.001, T1012, T1219,...",10,10
621,T1102.001,"[T1568, T1090.001, T1027.004, T1115, T1090, T1...",10,10
622,T1102.003,"[T1059.002, T1546.011, T1553.001, T1574.012, T...",10,10
623,T1047,"[T1078, T1053.005, T1547.001, T1059.003, T1105...",2,2


In [15]:
def get_interacted_tactic_range (interacted_techniques: list, look_up_table: pd.DataFrame()):
    """From a list of interacted techniques: Returns a tuple containing the earliest and latest tactic stage
    """
    interacted_table = look_up_table[look_up_table['technique_ID'].isin(interacted_techniques)]
    earliest_stage = interacted_table['technique_earliest_stage'].min()
    latest_stage = interacted_table['technique_earliest_stage'].max()
    
    return (earliest_stage, latest_stage)

def get_cadidate_techniques (interacted_techniques: list,  look_up_table: pd.DataFrame(), n: int, mode: str = 'latest'):
    """From a list of interacted techniques: Returns a list of candidate techniques. \n
    Step 1: Takes n most similar techniques for each interacted techniques.\n
    Step 2: From the list of Step 1: filter some techniques based on the tactic stage of the interacted techniques\n
        If `mode == 'latest'`: remove candidate techniques if their latest tactic stage is before the latest interacted stage\n
        If `mode == 'earliest'`: remove candidate techniques if their latest tactic stage is before the earliest interacted stage
    """
    interacted_table = look_up_table[look_up_table['technique_ID'].isin(interacted_techniques)]
    # get the first n items in each list
    interacted_table.loc[:, 'sorted_similar_techniques'] = interacted_table['sorted_similar_techniques'].apply(lambda x: x[0:n])
    # filter duplicates by getting unique values
    candidate_techniques = list(interacted_table['sorted_similar_techniques'].explode().unique())
    
    earliest_interacted_stage, latest_interacted_stage = get_interacted_tactic_range (interacted_techniques, look_up_table)
    candidate_table = look_up_table[look_up_table['technique_ID'].isin(candidate_techniques)]
    if mode == 'latest':
        candidate_techniques = list (candidate_table[candidate_table['technique_latest_stage'] >= latest_interacted_stage]['technique_ID'].values)
    elif mode == 'earliest':
        candidate_techniques = list (candidate_table[candidate_table['technique_latest_stage'] >= earliest_interacted_stage]['technique_ID'].values)
    return candidate_techniques

def make_test_data (report_data: pd.DataFrame, look_up_table: pd.DataFrame(), n: int = 200, mode: str = 'latest'):

    """From the CISA report data, make data for testing. Method:\n
    1. For each report, iteratively take from interacted Techniques as "detected techniques" and the rest as "true subsequent techniques".\n
    2. For each list of "detected techniques", get the candidate Techniques from the provided look-up table

    Args:
        report_data (pd.DataFrame): CiSA report data
        look_up_table (pd.DataFrame): look-up table created from a model
        n (int, optional): number of most similar Technique for each detected techniques. Defaults to 200.
        mode (str, optional): filter mode for look-up table. Defaults to 'latest'.

    Returns:
        _type_: _description_
    """
    test_group_IDs = []
    test_detected_techniques = []
    test_true_subsequent_techniques = []
    test_candidate_techniques = []
    for _, row in report_data.iterrows():
        group_ID = row['group_ID']
        for i in range (len (row['active_techniques'])-1):
            detected_techniques = row['active_techniques'][0:i+1]
            true_subsequent_techniques_techniques = row['active_techniques'][i+1:]
            candidate_techniques = get_cadidate_techniques (interacted_techniques = detected_techniques, look_up_table=look_up_table, n = n, mode = mode)
            
            test_group_IDs.append (group_ID)
            test_detected_techniques.append (detected_techniques)
            test_true_subsequent_techniques.append (true_subsequent_techniques_techniques)
            test_candidate_techniques.append (candidate_techniques)
    data = {
        'group_ID': test_group_IDs,
        'detected_techniques': test_detected_techniques,
        'candidate_techniques': test_candidate_techniques,
        'true_subsequent_techniques': test_true_subsequent_techniques,
    }
    res_df = pd.DataFrame(data = data)
    return res_df

In [17]:
def build_detected_group_profile (processed_group_features: pd.DataFrame(),
                                  processed_technique_features: pd.DataFrame(), 
                                  detected_techniques: list , threshold: int,
                                  train_labels: pd.DataFrame(), 
                                  group_id: str, settings: dict):
    """ 
    Build features for a newly detected group, including:
    1. Interaction rate (float): the initial value equals to the avg or min interaction rate of the interacted groups\n
    2. Interacted tactics (list of tactics): initial value: for each tactic from the interacted groups, the number of tactic interaction is the average number of interactions for that tactic\n
    3. Used software (list of software): initial value: the N most commonly used software, where N is the number of average software used by interacted groups\n

    Args:
        settings (dict): `'initial_interaction'`: set the initial interaction rate of the new group to the avg or min of the rate of train set

    """

    group_interaction_count = len(detected_techniques)
    
    # make a standard scaler, fit on train set's distribution
    pos_y = train_labels[train_labels['label'] == 1]
    train_interaction_count = pos_y['group_ID'].value_counts()
    scaler = StandardScaler()
    scaler.fit (train_interaction_count.values.reshape (-1,1))
    
    interacted_groups = list(pos_y['group_ID'].unique())
    interacted_group_features = processed_group_features [processed_group_features['group_ID'].isin(interacted_groups)]    
    
    # get the list of most frequent software from train set
    avg_software_interaction_rate = interacted_group_features['input_group_software_id'].apply(len).mean().round().astype(int)
    most_frequent_software = interacted_group_features['input_group_software_id'].explode().value_counts().sort_values(ascending = False)
    most_frequent_software = list(most_frequent_software.index)
    most_frequent_software.remove('other')
    most_frequent_software.remove('')
    
    group_interaction_rate = 0
    group_interacted_tactics = [[]]
    group_software =  [[]]
    
    ### 👉 Assign initial values if group has interaction count less than threshold 
    if group_interaction_count < threshold:
        if settings['initial_interaction'] == 'min':
            group_interaction_rate = scaler.transform(np.array(train_interaction_count.min()).reshape(1, -1)).item()
        elif settings['initial_interaction'] == 'avg':
            group_interaction_rate = 0
        
        avg_tactic_rate = interacted_group_features['input_group_tactics'].explode().value_counts()/len(interacted_groups)
        rounded_avg_tactic_rate = avg_tactic_rate.round().astype(int)
        group_interacted_tactics = [[idx for idx, val in rounded_avg_tactic_rate.items() for _ in range(val)]]
    
        group_software = [most_frequent_software[0:avg_software_interaction_rate]]
    
    elif group_interaction_count >= threshold:
        group_interaction_rate = scaler.transform([[group_interaction_count]])
        group_interaction_rate = group_interaction_rate[0][0]
        
        detected_techniques_features = processed_technique_features[processed_technique_features['technique_ID'].isin (detected_techniques)]
        group_interacted_tactics = [list (detected_techniques_features['input_technique_tactics'].explode().values)]
        possible_software = detected_techniques_features['input_technique_software_id'].explode().unique()
        possible_software = [software for software in possible_software if software in most_frequent_software[0:avg_software_interaction_rate]]
        group_software = [list (detected_techniques_features['input_technique_software_id'].explode().unique())]
    
    values = {
        'group_ID': group_id,
        'input_group_software_id': group_software,
        'input_group_tactics': group_interacted_tactics,
        'input_group_interaction_rate': group_interaction_rate,
        
    }
    detected_group_features = pd.DataFrame(values, index=[0])
    return detected_group_features

In [18]:
test_data = make_test_data (report_data= report_data, look_up_table= look_up_table)

In [7]:
group_features = pd.read_pickle ('../data/processed/model1/processed_group_features.pkl')
processed_technique_features = pd.read_pickle ('../data/processed/model1/processed_technnique_features.pkl')
train_labels =pd.read_pickle ('../data/processed/model1/processed_train_labels.pkl')


In [19]:
import numpy as np
pos_y = train_labels[train_labels['label'] == 1]
train_interaction_count = pos_y['group_ID'].value_counts()
scaler = StandardScaler()
scaler.fit (train_interaction_count.values.reshape (-1,1))
scaler.transform(np.array(train_interaction_count.mean()).reshape(1, -1)).item()

0.0

In [20]:
detected_techniques = ['T1078', 'T1047', 'T1059', 'T1059.001', 'T1059.003', 'T1059.001', 'T1059.003']
settings = {
    'initial_interaction' : 'min'
}

res = build_detected_group_profile (processed_group_features= group_features,
                                    processed_technique_features = processed_technique_features ,
                                    detected_techniques= detected_techniques[0:6], threshold=10,train_labels= train_labels,group_id='yah', settings=settings)
res

Unnamed: 0,group_ID,input_group_software_id,input_group_tactics,input_group_interaction_rate
0,yah,"[s0002, s0029, s0039, s0154, s0097, s0363, s0100]","[defense_evasion, defense_evasion, defense_eva...",-1.094657
