In [38]:
import yaml
import random

In [88]:
# Create single attack in dictionary format
def generate_single_attack_dict(name:str, attack_type:str, start:int, end:int, target:str, tags:list=None):
    attack_dict = {
        'name': name,
        'type': attack_type,
        'trigger': {
            'type': 'time',
            'start': start,
            'end': end
        },
        'tags': tags,
        'target': target
    }
    
    return attack_dict

In [89]:
# Choice of tag to involve in the selected attack based on attack_type and target 
def draw_attacked_tag_list(attack_type, target, tags_config):
    tags = []
    if attack_type == 'DOS':
        return None
    
    elif attack_type == 'MITM':
        # Get only one tag (modify to ask for multiple tags)
        n_sampels = 1
        sampled_tags = random.sample(tags_config[target], n_sampels)
        
        for tag in sampled_tags:
            var_name = tag['tag']
            prop = random.choice(tag['property'])
            value = random.choice(tag['value'])
        
            final_tag = {'tag': var_name, 'property': prop, 'value': value}
            tags.append(final_tag)
        
        return tags
        
    else:
        raise Exception("Wrong attack type!")

In [95]:
# Build daily attacks 
def build_daily_attacks_list(attack_types:list, targets:list, tags_config:dict, presence_chance:float, 
                             max_occurrences:int, episode_length:int, durations:list):
    daily_attacks = []
    
    presence_probability = random.uniform(0, 1)
    
    if presence_probability < presence_chance:
        n_attacks = random.randint(1, max_occurrences)

        last_timestep = 0
        for i in range(n_attacks):
            name = 'attack_' + str(i)
            
            # Sample attack type
            attack_type = random.choice(attack_types)
            
            # Sample attack trigger
            duration = random.choice(durations)
            ith_lower_bound = (episode_length // n_attacks) * i
            lower_bound =ith_lower_bound if ith_lower_bound * i < last_timestep else last_timestep
            upper_bound = (episode_length // n_attacks) * i + (episode_length // n_attacks)
            start_time = random.randint(lower_bound, upper_bound - duration - 1)
            end_time = start_time + duration
            last_timestep = end_time
            
            # Sample attack target
            target = random.choice(targets)
            
            # Sample attacked tags based on type and target
            tags = draw_attacked_tag_list(attack_type, target, tags_config)
            
            # Append the attack to the list of daily attacks
            daily_attacks.append(generate_single_attack_dict(name=name, 
                                                             attack_type=attack_type, 
                                                             start=start_time, 
                                                             end=end_time, 
                                                             target=target, 
                                                             tags=tags))
    return daily_attacks
        

In [96]:
# Generate all the required yaml files, storing weekly attacks and
def generate_weeks_under_attack(n_weeks:int, attack_types:list, targets:list, tags_config:dict, presence_chance:float, 
                                max_occurrences:int, episode_length:int, durations:list, where_to_store:str):
    yaml_dict = {}
    
    for week in range(n_weeks):
        attacks_list = build_daily_attacks_list(attack_types, targets, tags_config, presence_chance, max_occurrences,
                                               episode_length, durations)
        
        if attacks_list:
            #yaml_path = where_to_store + 'rand_attacks_' + str(week) + '.yaml'
            #with open(yaml_path, 'w') as fout:
            #    yaml.dump(attacks_list, fout)
            yaml_dict[week] = attacks_list
        
        else:
            yaml_dict[week] = None    
    
    index_file = 'scheduled_attacks_index.yaml'
    with open(index_file, 'w') as fout:
        yaml.dump(yaml_dict, fout)

In [98]:
n_weeks = 156
attack_types = ['MITM', 'DOS']
targets = ['PLC2', 'PLC3']
tags_config = {'PLC2': [{'tag': 'T41', 'property': ['pressure'], 'value': [0.1, 10]}],
        'PLC3': [{'tag': 'T42', 'property': ['pressure'], 'value': [0.1, 10]}]}
max_occurrences = 6
durations = [28800, 43200, 57600, 86400]
episode_length = 604800
presence_chance = 0.5
where_to_store = 'scheduled_attacks/'


generate_weeks_under_attack(n_weeks=n_weeks, 
                            attack_types=attack_types, 
                            targets=targets, 
                            tags_config=tags_config,
                            presence_chance=presence_chance, 
                            max_occurrences=max_occurrences, 
                            episode_length=episode_length, 
                            durations=durations, 
                            where_to_store=where_to_store)