In [55]:
import numpy as np
import pandas as pd
import os
import csv, json
import math
from PIL import Image

from collections import Counter

In [57]:
WINDOW_SIZE = 1024
STEP_SIZE = 512

In [58]:
def calculate_entropy(byte_stream:np.ndarray):
    total_count = np.sum(byte_stream)

    P_X = byte_stream / total_count

    P_X_nonzero = P_X[P_X > 0]
    H_X = -np.sum(P_X_nonzero * np.log2(P_X_nonzero))

    P_H_given_X = P_X * H_X
    P_H_X = P_X * P_H_given_X

    return P_H_X

# deprecated
def get_byte_entropy(byte_array:np.ndarray):
    return calculate_entropy(byte_array)

In [59]:
SIZE = 16

def export_image(filename:str, pixel1d:np.ndarray, size:int=SIZE):
    pixel1d_int = np.round(pixel1d, 0).tolist()
    pixel2d = [pixel1d_int[i * size:(i + 1) * size] for i in range(size)]
    try:
        final_image = Image.fromarray(np.array(pixel2d, dtype=np.uint8), 'L')
        final_image.save(filename)
    except Exception as e:
        print(f"Error while saving image '{filename}'")
        print(e)
        print(pixel1d)
        print()

In [60]:
def normalize_minmax(arr:np.ndarray)->np.ndarray:
    min_val = np.min(arr)
    max_val = np.max(arr)
    if min_val == max_val:
        return np.zeros_like(arr)
    else:
        return (arr - min_val) / (max_val - min_val)

In [61]:
def ensure_dirpath(exportdir):
    os.makedirs(exportdir, exist_ok=True)

def ensure_filepath(exportfile):
    dirpath, _ = os.path.split(exportfile)
    if dirpath:
        os.makedirs(dirpath, exist_ok=True)

def opencsv(filename:str, mode:str='r'):
    return open(filename, mode, encoding='utf-8', newline='')

In [98]:
def process_dataset(dataset, proc_bytestream:callable, *, on_normalize=None, export_csvfile, export_directory='./export'):
    '''
    proc_bytestream: (byte_stream:List[int]) -> List[int] : 들어오는 바이트스르림 처리
    on_normalize: (data:List[int]) -> List[int] : 처리된 데이터를 정규화 수행
    '''
    ensure_dirpath(export_directory)
    ensure_filepath(export_csvfile)

    if on_normalize is None:
        on_normalize = lambda x: x
    
    with opencsv(export_csvfile, 'w') as f:
        writer = csv.writer(f)
        writer.writerow(['index', 'path', 'filename', 'bytes'])
        
        for i, byte_stream_str in enumerate(dataset['bytes']):
            byte_stream = json.loads(byte_stream_str)

            processed = proc_bytestream(byte_stream)
            assert type(processed) == np.ndarray
            assert processed.shape == (256,)
            
            normalized = on_normalize(processed)
            assert normalized.shape == (256,)
            assert type(normalized) == np.ndarray

            pixels_json = json.dumps(normalized.tolist())
            writer.writerow([i, dataset['path'][i], dataset['filename'][i], pixels_json])
            export_image(os.path.join(export_directory, f'dataset_{i}.png'), normalized)

def get_entire_entropy_converter():
    '''
    전체 바이트 스트림에 대해 entropy hisrogram 계산 함수 반환
    '''
    def on_convert(byte_stream:list[int]):
        counter = Counter(byte_stream)
        bc = [counter[i] for i in range(0, 256)]
        return calculate_entropy(bc)
    return on_convert

def get_sliding_window_entropy_converter(window_size:int=WINDOW_SIZE, step_size:int=STEP_SIZE):
    '''
    지역 바이트 스트림에 대해 각각 계산후 평균을 내 entropy hisrogram 도출 함수 반환
    '''
    def on_convert(byte_stream:list[int]):
        local_entropies = []
        for i in range(0, len(byte_stream), step_size):
            counter = Counter(byte_stream[i:i+window_size])
            bc = [counter[i] for i in range(0, 256)]
            
            local_entropies.append(calculate_entropy(bc))
        
        entropies = np.array(local_entropies)
        return np.mean(entropies, axis=0)
    return on_convert

def get_frequency_converter():
    def on_convert(byte_stream:list[int]):
        counter = Counter(byte_stream)
        frequency = [counter[i] for i in range(0, 256)]
        return np.array(frequency)
    return on_convert

def on_normalize(data:list[int]):
    # Min-Max 정규화
    normalized = normalize_minmax(data)
    # 로그 스케일 변환
    normalized = np.log(normalized + 1) / np.log(2) * 255.0
    return normalized

In [96]:
EXPORT_PATH = '../export'
EXPORT_NORMAL_PATH = os.path.join(EXPORT_PATH, 'normal')
EXPORT_ATTACK_PATH = os.path.join(EXPORT_PATH, 'attack')

In [115]:
def run(dataset_path:list[str], base_export_path:str):
    def getpath(*target):
        return os.path.join(base_export_path, *target)
    for path in dataset_path:
        dataset = pd.read_csv(path)
        dataset_name , _= os.path.splitext(os.path.split(path)[1])
        print(dataset_name)
        
        print(' - entire entropy')
        # 전체 스트림에 대한 entropy histogram
        process_dataset(dataset,
                        get_entire_entropy_converter(),
                        on_normalize=on_normalize,
                        export_csvfile=getpath(f'global_entropy_{dataset_name}.csv'),
                        export_directory=getpath('global_entropy', dataset_name),
                        )
        print(' - entropy by sliding')
        # 슬라이딩 윈도우를 적용한 entropy histogram
        process_dataset(dataset,
                        get_sliding_window_entropy_converter(WINDOW_SIZE, STEP_SIZE),
                        on_normalize=on_normalize,
                        export_csvfile=getpath(f'local_entropy_{dataset_name}.csv'),
                        export_directory=getpath('local_entropy', dataset_name),
                        )
        print(' - frequency')
        # 별도의 처리없이 바로 빈도수를 통한 entropy 계산
        process_dataset(dataset,
                        get_frequency_converter(),
                        on_normalize=on_normalize,
                        export_csvfile=getpath(f'frequency_{dataset_name}.csv'),
                        export_directory=getpath('freqeuncy', dataset_name),
                        )

In [116]:
'''
공격 데이터셋
'''
dataset_path = []
dataset_path.append('../dataset/attack_decoded.csv')
dataset_path.append('../dataset/attack_original.csv')

dataset_path

['../dataset/attack_decoded.csv', '../dataset/attack_original.csv']

In [117]:
run(dataset_path, os.path.join(EXPORT_PATH, 'attack'))

attack_decoded
 - entire entropy
 - entropy by sliding
 - frequency
attack_original
 - entire entropy
 - entropy by sliding
 - frequency


In [18]:
'''
노멀 데이터셋
'''
DATASET_BASE_PATH = '../dataset/CSV'
dataset_path = []

for dirpath, _, filenames in os.walk(DATASET_BASE_PATH):
    dataset_path.extend([os.path.join(dirpath, f) for f in filenames])

dataset_path

['../dataset/CSV\\Github.csv',
 '../dataset/CSV\\GithubGist.csv',
 '../dataset/CSV\\invokeCradleCrafter.csv',
 '../dataset/CSV\\InvokeObfuscation.csv',
 '../dataset/CSV\\IseSteroids.csv',
 '../dataset/CSV\\PoshCode.csv',
 '../dataset/CSV\\PowerShellGallery.csv',
 '../dataset/CSV\\Random.csv',
 '../dataset/CSV\\Technet.csv']

In [119]:
run(dataset_path, os.path.join(EXPORT_PATH, 'normal'))

attack_decoded
 - entire entropy
 - entropy by sliding
 - frequency
attack_original
 - entire entropy
 - entropy by sliding
 - frequency


In [128]:
def validate_byte_histogram_csv(export_path:str):
    for dirpath, _, filenames in os.walk(export_path):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            if filename.endswith('.csv'):
                print('target:', filepath)
                try:
                    df = pd.read_csv(filepath)
                    for jsondata in df['bytes']:
                        data = json.loads(jsondata)
                        assert len(data) == 256
                    print(filepath)
                except AssertionError as e:
                    print('  Validation failed:', filepath)
                    print(e)
                except Exception as e:
                    print(f"  Error while validating '{filepath}'")
                    print(e)
                print()

In [129]:
validate_byte_histogram_csv(EXPORT_PATH)

../export\attack\frequency_attack_decoded.csv
../export\attack\frequency_attack_original.csv
../export\attack\global_entropy_attack_decoded.csv
../export\attack\global_entropy_attack_original.csv
../export\attack\local_entropy_attack_decoded.csv
../export\attack\local_entropy_attack_original.csv
../export\normal\frequency_attack_decoded.csv
../export\normal\frequency_attack_original.csv
../export\normal\global_entropy_attack_decoded.csv
../export\normal\global_entropy_attack_original.csv
../export\normal\local_entropy_attack_decoded.csv
../export\normal\local_entropy_attack_original.csv


In [169]:
np.log(1) / np.log(2)

0.0