## 모듈 import

In [1]:
import os
import glob
import json
import pprint

import numpy as np
import pandas as pd

from lightgbm import LGBMClassifier

from tqdm import tqdm

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import RFE
from sklearn.feature_extraction import FeatureHasher

In [2]:
SEED = 41

def read_label_csv(path):
    label_table = dict()
    with open(path, "r", encoding='ISO-8859-1') as f:
        for line in f.readlines()[1:]:
            fname, label = line.strip().split(",")
            label_table[fname] = int(label)
    return label_table

def read_json(path):
    with open(path, "r") as f:
        return json.load(f)

def load_model(**kwargs):
    if kwargs["model"] == "rf":
        return RandomForestClassifier(random_state=kwargs["random_state"], n_jobs=4)
    elif kwargs["model"] == "dt":
        return DecisionTreeClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lgb":
        return LGBMClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "svm":
        return SVC(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lr":
        return LogisticRegression(random_state=kwargs["random_state"], n_jobs=-1)
    elif kwargs["model"] == "knn":
        return KNeighborsClassifier(n_jobs=-1)
    elif kwargs["model"] == "adaboost":
        return AdaBoostClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "mlp":
        return MLPClassifier(random_state=kwargs["random_state"])
    else:
        print("Unsupported Algorithm")
        return None
    

def train(X_train, y_train, model):
    '''
        머신러닝 모델을 선택하여 학습을 진행하는 함수
	
        :param X_train: 학습할 2차원 리스트 특징벡터
        :param y_train: 학습할 1차원 리스트 레이블 벡터
        :param model: 문자열, 선택할 머신러닝 알고리즘
        :return: 학습된 머신러닝 모델 객체
    '''
    clf = load_model(model=model, random_state=SEED)
    clf.fit(X_train, y_train)
    return clf


def evaluate(X_test, y_test, model):
    '''
        학습된 머신러닝 모델로 검증 데이터를 검증하는 함수
	
        :param X_test: 검증할 2차원 리스트 특징 벡터
        :param y_test: 검증할 1차원 리스트 레이블 벡터
        :param model: 학습된 머신러닝 모델 객체
    '''
    predict = model.predict(X_test)
    print("정확도", model.score(X_test, y_test))


## 레이블 테이블 로드

In [3]:
label_table = read_label_csv("/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/학습데이터_정답.csv")
check_label_table = read_label_csv("/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/검증데이터_정답.csv")

## 특징 벡터 생성 예시
- PEMINER 정보는 모두 수치형 데이터이므로 특별히 가공을 하지 않고 사용 가능
- EMBER, PESTUDIO 정보는 가공해서 사용해야 할 특징들이 있음 (e.g. imports, exports 등의 문자열 정보를 가지는 데이터)
- 수치형 데이터가 아닌 데이터(범주형 데이터)를 어떻게 가공할 지가 관건 >> 인코딩 (e.g. 원핫인코딩, 레이블인코딩 등)

In [4]:
class PeminerParser:
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        '''
            전체 데이터 사용        
        '''
        
        self.vector = [value for _, value in sorted(self.report.items(), key=lambda x: x[0])]
        return self.vector

In [5]:
class EmberParser:
    '''
        예제에서 사용하지 않은 특징도 사용하여 벡터화 할 것을 권장
    '''
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def get_histogram_info(self):
        histogram = np.array(self.report["histogram"])
        total = histogram.sum()
        vector = histogram / total
        return vector.tolist()
    
    def get_string_info(self):
        strings = self.report["strings"]

        hist_divisor = float(strings['printables']) if strings['printables'] > 0 else 1.0
        vector = [
            strings['numstrings'], 
            strings['avlength'], 
            strings['printables'],
            strings['entropy'], 
            strings['paths'], 
            strings['urls'],
            strings['registry'], 
            strings['MZ']
        ]
        vector += (np.asarray(strings['printabledist']) / hist_divisor).tolist()
        return vector
    
    def get_general_file_info(self):
        general = self.report["general"]
        vector = [
            general['size'], general['vsize'], general['has_debug'], general['exports'], general['imports'],
            general['has_relocations'], general['has_resources'], general['has_signature'], general['has_tls'],
            general['symbols']
        ]
        return vector

    # header, section, imports, exports 데이터 ember json 파일 가공
    def get_header_file_info(self):
        headers = self.report['header']
        header_coff = [
            'timestamp', 'machine', 'characteristics'
        ]
        header_optional = [
            'subsystem', 'dll_characteristics', 'magic', 'major_image_version', 'minor_image_version',
            'major_linker_version', 'minor_linker_version', 'major_operating_system_version', 'minor_operating_system_version', 
            'major_subsystem_version', 'minor_subsystem_version', 'sizeof_code', 'sizeof_headers', 'sizeof_heap_commit'
        ]
        vector = [
            headers['coff'][header_coff[0]],
            # headers['coff'][header_coff[1]],
            # headers['coff'][header_coff[2]],
            # headers['optional'][header_optional[0]],
            # headers['optional'][header_optional[1]],
            # headers['optional'][header_optional[2]],
            headers['optional'][header_optional[3]],
            headers['optional'][header_optional[4]],
            headers['optional'][header_optional[5]],
            headers['optional'][header_optional[6]],
            headers['optional'][header_optional[7]],
            headers['optional'][header_optional[8]],
            headers['optional'][header_optional[9]],
            headers['optional'][header_optional[10]],
            headers['optional'][header_optional[11]],
            headers['optional'][header_optional[12]],
            headers['optional'][header_optional[13]]
        ]
        return vector

    # def get_section_info(self):
    #     section = self.report['section']
    #     sections = self.report['sections']
    #     section_entry = [
    #         len(sections), # total number of sections
    #         # number of sections with nonzero size
    #         sum(1 for s in sections if s['size'] == 0),
    #         # number of sections with an empty name
    #         sum(1 for s in sections if s['name'] == ""),
    #     ]
    #     sections_names = [s['name'] for s in sections]
    #     sections_sizes = [(s['name'], s['size']) for s in sections]
    #     sections_entropy = [(s['name'], s['entropy']) for s in sections]
    #     sections_vsize = [(s['name'], s['vsize']) for s in sections]
    #     sections_props = [p for s in sections for p in s['props'] if s['name'] == section['entry']]
    #     vector = [
    #         section_entry, sections_names, sections_sizes, sections_entropy, sections_vsize, sections_props
    #     ]
    #     return vector

    def get_imports_info(self):
        imports = self.report["imports"]
        libraries = list(set([l.lower() for l in imports.keys()]))
        libraries_hashed = FeatureHasher(256, input_type="string").transform([libraries]).toarray()[0]
        import_function = [lib.lower() + ':' + e for lib, elist in imports.items() for e in elist]
        import_function_hashed = FeatureHasher(1024, input_type="string").transform([import_function]).toarray()[0]
        vector = [
            libraries_hashed, import_function_hashed
        ]
        return vector
        
    def get_exports_info(self):
        exports = self.report["exports"]
        exports_hashed = FeatureHasher(128, input_type="string").transform([exports]).toarray()[0]
        vector = [ exports_hashed ]
        return vector

    # def get_datadirectories(self):
    #     datadirectories = self.report["datadirectories"]
    #     names = [
    #         "EXPORT_TABLE", "IMPORT_TABLE", "RESOURCE_TABLE", "EXCEPTION_TABLE", "CERTIFICATE_TABLE",
    #         "BASE_RELOCATION_TABLE", "DEBUG", "ARCHITECTURE", "GLOBAL_PTR", "TLS_TABLE", "LOAD_CONFIG_TABLE",
    #         "BOUND_IMPORT", "IAT", "DELAY_IMPORT_DESCRIPTOR", "CLR_RUNTIME_HEADER"
    #     ]

    def process_report(self):
        vector = []
        vector += self.get_general_file_info()
        vector += self.get_histogram_info()
        vector += self.get_string_info()
        '''
            특징 추가
        '''
        # vector += self.get_header_file_info()
        # vector += self.get_section_info()
        # vector += self.get_imports_info()
        # vector += self.get_exports_info()
        # vector += self.get_datadirectories()
        
        return vector

In [6]:
class PestudioParser:
    '''
        사용할 특징을 선택하여 벡터화 할 것을 권장
    '''
    
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        pass

## 데이터 벡터 구성
- 특징 벡터 구성은 2차원이 되어야함 e.g.  [vector_1, vector_2, ..., vector_n]

- 각 벡터는 1차원 리스트, 벡터 크기는 모두 같아야함

### 학습데이터

In [7]:
# 학습데이터
peminer_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PEMINER/학습데이터/'
ember_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/EMBER/학습데이터/'
pestudio_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PESTUDIO/학습데이터/'

# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
X, y = [], []

for fname in list(label_table.keys()):
    feature_vector = []
    label = label_table[fname.split('.')[0]]
    for data in [peminer_url, ember_url, pestudio_url]:
        path = f"{data}/{fname}.json"
        if data == peminer_url:
            feature_vector += PeminerParser(path).process_report()
        elif data == ember_url:
            feature_vector += EmberParser(path).process_report()
        # else:
        #     feature_vector += PestudioParser(path).process_report()
    X.append(feature_vector)
    y.append(label)

np.asarray(X).shape, np.asarray(y).shape

((20000, 558), (20000,))

### 검증데이터

In [8]:
# 검증데이터
perminer_check_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PEMINER/검증데이터'
ember_check_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/EMBER/검증데이터'
pestudio_check_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PESTUDIO/검증데이터'

# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
t_X, t_y = [], []

for fname in list(check_label_table.keys()):
    feature_vector = []
    label = check_label_table[fname.split('.')[0]]
    for data in [perminer_check_url, ember_check_url, pestudio_check_url]:
        path = f"{data}/{fname}.json"
        if data == perminer_check_url:
            feature_vector += PeminerParser(path).process_report()
        elif data == ember_check_url:
            feature_vector += EmberParser(path).process_report()
        # else:
        #     feature_vector += PestudioParser(path).process_report()
    t_X.append(feature_vector)
    t_y.append(label)

np.asarray(t_X).shape, np.asarray(t_y).shape

((10000, 558), (10000,))

### 학습 및 검증 - rf와 lgb 모델

In [9]:
# 학습
model_name = ["rf", "dt", "lgb", "svm", "lr", "knn", "adaboost", "mlp"]
models = []
# clfs = []
for model in tqdm(model_name):
    clf = train(X, y, model)
    models.append(clf)
    # clfs.append(clf)

# 검증
for model in tqdm(models): 
    evaluate(t_X, t_y, model)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
100%|██████████| 8/8 [02:46<00:00, 20.76s/it]
 12%|█▎        | 1/8 [00:00<00:03,  2.24it/s]

정확도 0.9428


 25%|██▌       | 2/8 [00:00<00:02,  2.78it/s]

정확도 0.9092


 38%|███▊      | 3/8 [00:01<00:02,  2.41it/s]

정확도 0.951


 50%|█████     | 4/8 [01:02<01:38, 24.54s/it]

정확도 0.8269


 62%|██████▎   | 5/8 [01:03<00:47, 15.88s/it]

정확도 0.8233


 75%|███████▌  | 6/8 [01:13<00:27, 13.92s/it]

정확도 0.9045


 88%|████████▊ | 7/8 [01:14<00:09,  9.66s/it]

정확도 0.8938


100%|██████████| 8/8 [01:14<00:00,  9.35s/it]

정확도 0.8202





In [10]:
# 학습
models = []
# clfs = []
for model in tqdm(["rf", "lgb"]):
    clf = train(X, y, model)
    models.append(clf)
    # clfs.append(clf)

# 검증
for model in tqdm(models): 
    evaluate(t_X, t_y, model)

100%|██████████| 2/2 [00:09<00:00,  4.57s/it]
 50%|█████     | 1/2 [00:00<00:00,  2.76it/s]

정확도 0.9428


100%|██████████| 2/2 [00:00<00:00,  2.39it/s]

정확도 0.951





### 앙상블 - rf, lgb

In [11]:
def ensemble_result(X, y, models):
    '''
        학습된 모델들의 결과를 앙상블하는 함수
	
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''
    
    # Soft Voting
    # https://devkor.tistory.com/entry/Soft-Voting-%EA%B3%BC-Hard-Voting
    predicts = []
    for model in tqdm(models):
        prob = [result for _, result in model.predict_proba(X)]
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]
        
    print("정확도", accuracy_score(y, predict))

In [12]:
ensemble_result(t_X, t_y, models)

100%|██████████| 2/2 [00:00<00:00,  4.72it/s]

정확도 0.9524





## 특징 선택 예제 (RFE 알고리즘 사용)

In [13]:
def select_feature(X, y, model):
    '''
        주어진 특징 벡터에서 특정 알고리즘 기반 특징 선택
        
        본 예제에서는 RFE 알고리즘 사용
        https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.RFE.html#sklearn.feature_selection.RFE.fit_transform
        
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param model: 문자열, 특징 선택에 사용할 머신러닝 알고리즘
    '''
    
    model = load_model(model=model, random_state=SEED)
    rfe = RFE(estimator=model)
    return rfe.fit_transform(X, y)

In [14]:
selected_X = select_feature(X, y, "rf")

In [None]:
new_model = train(selected_X, y, "rf")

In [None]:
# 학습데이터
peminer_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PEMINER/학습데이터/'
ember_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/EMBER/학습데이터/'
pestudio_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PESTUDIO/학습데이터/'

# 검증데이터
perminer_check_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PEMINER/검증데이터'
ember_check_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/EMBER/검증데이터'
pestudio_check_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PESTUDIO/검증데이터'

# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
X, y = [], []

for fname in list(label_table.keys()):
    feature_vector = []
    label = label_table[fname.split('.')[0]]
    for data in [peminer_url, ember_url, pestudio_url]:
        path = f"{data}/{fname}.json"
        if data == peminer_url:
            feature_vector += PeminerParser(path).process_report()
        elif data == ember_url:
            feature_vector += EmberParser(path).process_report()
        # else:
        #     feature_vector += PestudioParser(path).process_report()
    X.append(feature_vector)
    y.append(label)

for fname in list(check_label_table.keys()):
    feature_vector = []
    label = check_label_table[fname.split('.')[0]]
    for data in [perminer_check_url, ember_check_url, pestudio_check_url]:
        path = f"{data}/{fname}.json"
        if data == perminer_check_url:
            feature_vector += PeminerParser(path).process_report()
        elif data == ember_check_url:
            feature_vector += EmberParser(path).process_report()
        # else:
        #     feature_vector += PestudioParser(path).process_report()
    X.append(feature_vector)
    y.append(label)

np.asarray(X).shape, np.asarray(y).shape

In [None]:
# 학습
models = []
for model in tqdm(["rf", "lgb"]):
    clf = train(X, y, model)
    models.append(clf)

In [None]:
def get_testdata_filename():
    return [i[:-5] for i in os.listdir('/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/EMBER/테스트데이터')]

In [None]:
test_label_table = get_testdata_filename()

In [None]:
# 테스트 데이터
peminer_test_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PEMINER/테스트데이터'
ember_test_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/EMBER/테스트데이터'
pestudio_test_url = '/Users/yeonsun/Documents/2021_InformationAndSystemSecurity/AI_Malware_Detect/데이터/PESTUDIO/테스트데이터'

# 데이터의 특징 벡터 모음(2차원 리스트) : X
t_X = []


for fname in tqdm(test_label_table):
    feature_vector = []
    for data in [peminer_test_url, ember_test_url, pestudio_test_url]:
        path = f"{data}/{fname}.json"
        if data == peminer_test_url:
            feature_vector += PeminerParser(path).process_report()
        elif data == ember_test_url:
            feature_vector += EmberParser(path).process_report()
        # else:
        #     feature_vector += PestudioParser(path).process_report()
    t_X.append(feature_vector)

np.asarray(t_X).shape

In [None]:
def ensemble_result(X, models):
    '''
        학습된 모델들의 결과를 앙상블하는 함수
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''
    
    # Soft Voting
    # https://devkor.tistory.com/entry/Soft-Voting-%EA%B3%BC-Hard-Voting
    predicts = []
    for model in tqdm(models):
        prob = [result for _, result in model.predict_proba(X)]
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]
    
    return predict

In [None]:
predict = ensemble_result(t_X, models)

In [None]:
submission_csv = pd.DataFrame(columns=['file', 'predict'])
predictions = pd.DataFrame(predict,columns = ['predict'])
test_frame = pd.DataFrame(get_testdata_filename())
submission_csv["predict"] = predictions["predict"]
submission_csv["file"] = test_frame
submission_csv.to_csv("predict.csv", index=False)