In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import copy
import csv
import warnings
from typing import List, Set, Dict, Tuple, Union

warnings.filterwarnings('ignore')
Combination = Tuple[int, ...]

In [None]:
SOURCE_POWERSHELL_DATASET_FILENAME: str = r"D:\Obfuscation\data\datasets\POWERSHELL_DATASET.csv"
TOKENIZER_POWERSHELL_DATASET_FILENAME: str = r"D:\Obfuscation\data\datasets\TOKENIZER_DATASET_UPDATED.csv"
WRONG_COMMANDS_FILENAME: str = r"D:\Obfuscation\data\WRONG_COMMANDS.csv"

BALANCED_OBF_METHODS_RATIO_150_FILENAME: str = r"D:\Obfuscation\data\datasets\balanced\BALANCED_OBF_METHODS_RATIO_150_DATASET.csv"
BALANCED_OBF_METHODS_RATIO_200_FILENAME: str = r"D:\Obfuscation\data\datasets\balanced\BALANCED_OBF_METHODS_RATIO_200_DATASET.csv"
BALANCED_OBF_METHODS_RATIO_2000_FILENAME: str = r"D:\Obfuscation\data\datasets\balanced\BALANCED_OBF_METHODS_RATIO_2000_DATASET.csv"

In [None]:
OBF_METHODS_LOWER_INDEX: int = 2  # included(index starts from 0)
OBF_METHODS_UPPER_INDEX: int = 7  # excluded
WRONG_COMMANDS_ROW_NUMBER_INDEX: int = 0
WRONG_COMMANDS_ROW_NUMBER_DIFFERENCE: int = 2  # between row_number in csv file and real index

In [None]:
source_df = pd.read_csv(SOURCE_POWERSHELL_DATASET_FILENAME)
tokenizer_df = pd.read_csv(TOKENIZER_POWERSHELL_DATASET_FILENAME, sep=';')

In [None]:
print(source_df.shape)
source_df.head()

In [None]:
print(tokenizer_df.shape)
tokenizer_df.head()

In [None]:
obf_methods: List[str] = source_df.columns[OBF_METHODS_LOWER_INDEX:OBF_METHODS_UPPER_INDEX].to_list()

In [None]:
obf_methods

In [None]:
def get_data(filename: str, sep: str = ',') -> List[str]:
    with open(filename, encoding="UTF8") as file:
        reader = csv.reader(file, delimiter=sep)
        for row in reader:
            yield row

In [None]:
def get_wrong_command_indexes(filename: str, has_header: bool = True, sep: str = ',') -> Set[int]:
    generator = get_data(filename=filename, sep=sep)
    indexes: Set[int] = set()
    if has_header:
        header: List[str] = next(generator, None)
    row: List[str] = next(generator, [])
    while row:
        if not str.isdigit(row[WRONG_COMMANDS_ROW_NUMBER_INDEX]):
            break
        index: int = int(row[WRONG_COMMANDS_ROW_NUMBER_INDEX]) - WRONG_COMMANDS_ROW_NUMBER_DIFFERENCE
        indexes.add(index)
        row = next(generator, [])
    return indexes

In [None]:
wrong_commands_indexes: Set[int] = get_wrong_command_indexes(WRONG_COMMANDS_FILENAME)

In [None]:
correct_commands_indexes: List[int] = [index for index in np.arange(len(source_df)) if index not in wrong_commands_indexes]

In [None]:
print(len(wrong_commands_indexes))
wrong_commands_indexes

In [None]:
len(correct_commands_indexes)

In [None]:
source_df.iloc[correct_commands_indexes, :][obf_methods].value_counts(sort=False).plot.bar()
None

In [None]:
limits: Tuple[int, int, int] = (150, 200, 2000)
    
combinations: List[Combination] = [
    (0, 0, 0, 0, 0),  # ""
    (0, 0, 0, 0, 1),  # "symbol"
    (0, 0, 0, 1, 0),  # "string"
    (0, 0, 0, 1, 1),  # "string, symbol"
    (0, 0, 1, 0, 0),  # "encoding"
    (0, 0, 1, 1, 0),  # "encoding, string"
    (0, 1, 0, 0, 0),  # "variables"
    (0, 1, 0, 0, 1),  # "variables, symbol"
    (0, 1, 0, 1, 0),  # "variables, string"
    (0, 1, 0, 1, 1),  # "variables, string, symbol"
    (0, 1, 1, 0, 0),  # "variables, encoding"
    (0, 1, 1, 1, 0),  # "variables, encoding, string"
    (0, 1, 1, 1, 1),  # "variables, encoding, string, symbol"
    (1, 0, 0, 0, 0),  # "short"
    (1, 0, 1, 0, 0),  # "short, encoding"
]
    
counts: pd.Series = source_df.iloc[correct_commands_indexes, :][obf_methods].value_counts(sort=False)

In [None]:
counts

In [None]:
limits_groups_per_combinations: Dict[str, Dict[Combination, int]] = {
    f"Combinations with maximum {limit} objects": 
    {combination: count if count <= limit else limit for combination, count in zip(combinations, counts)}
    for limit in limits
}

In [None]:
for key, limits_per_combinations in limits_groups_per_combinations.items():
    expected_total_objects: int = 0
    print(key)
    for combination, count in limits_per_combinations.items():
        print(f"{combination}: {count}")
        expected_total_objects += count
    print(f"Expected total objects quantity = {expected_total_objects}.\n")

In [None]:
rnd = np.random.RandomState(0)

# limits_per_combinations stores only correct(not wrong) command's quantity
def generate_random_obj_indexes(source_df: pd.DataFrame, limits_per_combinations: Dict[Combination, int], wrong_commands_indexes: Set[int], step: int = 50) -> Dict[int, Combination]:
    hash_map: Dict[int, Combination] = {}  # obj index: combination of obf methods for this object
    summa: int = sum(limits_per_combinations.values())
    index: int = -1
    while summa > 0:
        index = rnd.randint(index + 1, index + step + 1) % len(source_df)
        if index in wrong_commands_indexes:
            continue
        combination: Combination = tuple(source_df.iloc[index, OBF_METHODS_LOWER_INDEX:OBF_METHODS_UPPER_INDEX].to_list())
        if hash_map.get(index, None) is None and limits_per_combinations[combination] > 0:
            limits_per_combinations[combination] -= 1
            summa -= 1
            hash_map[index] = combination
    return hash_map

In [None]:
def get_tokenizer_obj_indexes_difference(sorted_wrong_commands_indexes: List[int], correct_command_index: int, wrong_commands_pointer: int = 0) -> int:
    result: int = 0
    for i in np.arange(wrong_commands_pointer, len(sorted_wrong_commands_indexes)):
        if correct_command_index < sorted_wrong_commands_indexes[i]:
            break
        result += 1
    return result

In [None]:
def generate_dataframe(source_df: pd.DataFrame, tokenizer_df: pd.DataFrame, limits_per_combinations: Dict[Combination, int], wrong_commands_indexes: Set[int], step: int = 50) -> pd.DataFrame:
    obj_indexes: Dict[int, Combination] = generate_random_obj_indexes(
        source_df=source_df,
        limits_per_combinations=limits_per_combinations,
        wrong_commands_indexes=wrong_commands_indexes,
        step=step
    )
    obj_indexes = dict(sorted(obj_indexes.items(), key=lambda pair: pair[0]))
    commands: List[str] = source_df.iloc[list(obj_indexes.keys()), :]["command_obfuscated"].to_list()
    combinations: List[Combination] = list(obj_indexes.values())
    tokenizer_indexes: List[int] = [0] * len(commands)
    i: int = 0
        
    sorted_wrong_commands_indexes: List[int] = sorted(wrong_commands_indexes)
    wrong_commands_pointer: int = 0
    for index, combination in obj_indexes.items():
        difference: int = get_tokenizer_obj_indexes_difference(sorted_wrong_commands_indexes, index, wrong_commands_pointer)
        wrong_commands_pointer += difference
        tokenizer_index: int = index - wrong_commands_pointer
        tokenizer_indexes[i] = tokenizer_index
        i += 1

    result_df: pd.DataFrame = tokenizer_df.iloc[tokenizer_indexes, :]
    result_df.insert(0, "command_obfuscated", commands, allow_duplicates=False)
    result_df["obfuscation_methods_mask"] = combinations
    return result_df

In [None]:
dataframes: Dict[str, pd.DataFrame] = {}

for key, limits_per_combinations in limits_groups_per_combinations.items():
    print(key)
    print("Генерация датафрейма началась.")
    dataframe: pd.DataFrame = generate_dataframe(
        source_df=source_df,
        tokenizer_df=tokenizer_df,
        limits_per_combinations=copy.deepcopy(limits_per_combinations),
        wrong_commands_indexes=wrong_commands_indexes
    )
    print("Генерация датафрейма завершена!")
    dataframes[key] = dataframe

In [None]:
for dataframe in dataframes.values():
    print(dataframe.shape)

In [None]:
def save_dataframes_to_csv(dataframes: List[pd.DataFrame], filenames: List[str]) -> bool:
    if len(dataframes) != len(filenames): 
        return False
    for dataframe, filename in zip(dataframes, filenames):
        dataframe.to_csv(filename, index=False)
    return True

In [None]:
dataframes_list: List[pd.DataFrame] = list(dataframes.values())
filenames: List[str] = [
    BALANCED_OBF_METHODS_RATIO_150_FILENAME,
    BALANCED_OBF_METHODS_RATIO_200_FILENAME,
    BALANCED_OBF_METHODS_RATIO_2000_FILENAME,
]

In [None]:
save_dataframes_to_csv(dataframes=dataframes_list, filenames=filenames)