In [16]:
"""
2020 정보보호와시스템보안 악성코드 분류기
팀 : 2017
김민정(20171589)
오윤재(20171651)
""" 

import os
import glob
import json
import pprint
import csv

import numpy as np

from lightgbm import LGBMClassifier

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import RFE

In [17]:
SEED = 41

def read_label_csv(path):  # label_table 딕셔너리로 저장  # key(파일이름) : value(정답)
    label_table = dict()
    with open(path, "r", encoding='cp949') as f:
        for line in f.readlines()[1:]:  # 0번째 줄은 헤더이므로 그 다음 줄부터 읽음
            fname, label = line.strip().split(",")
            label_table[fname] = int(label)   # 파일 이름: 정답으로 딕셔너리 구성
    return label_table

def read_json(path):  # json 파일 읽어옴
    with open(path, "r") as f:
        return json.load(f)  
    
def load_model(**kwargs):  # 사용 가능한 모델들
    if kwargs["model"] == "rf":
        return RandomForestClassifier(random_state=kwargs["random_state"], n_jobs=4)
    elif kwargs["model"] == "dt":
        return DecisionTreeClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lgb":
        return LGBMClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "svm":
        return SVC(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lr":
        return LogisticRegression(random_state=kwargs["random_state"], n_jobs=-1)
    elif kwargs["model"] == "knn":
        return KNeighborsClassifier(n_jobs=-1)
    elif kwargs["model"] == "adaboost":
        return AdaBoostClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "mlp":
        return MLPClassifier(random_state=kwargs["random_state"])
    else:
        print("Unsupported Algorithm")
        return None
    

def train(X_train, y_train, model):  # 학습은 학습데이터_정답.csv로
    '''
        머신러닝 모델을 선택하여 학습을 진행하는 함수
	
        :param X_train: 학습할 2차원 리스트 특징벡터
        :param y_train: 학습할 1차원 리스트 레이블 벡터
        :param model: 문자열, 선택할 머신러닝 알고리즘
        :return: 학습된 머신러닝 모델 객체
    '''
    clf = load_model(model=model, random_state=SEED)  # 파라미터로 받아온 모델 불러옴
    clf.fit(X_train, y_train)  # 불러온 모델 통해 학습
    return clf  # 학습된 모델 반환


def evaluate(X_test, y_test, model):  # 평가는 검증데이터_정답.csv로
    '''
        학습된 머신러닝 모델로 검증 데이터를 검증하는 함수
	
        :param X_test: 검증할 2차원 리스트 특징 벡터
        :param y_test: 검증할 1차원 리스트 레이블 벡터
        :param model: 학습된 머신러닝 모델 객체
    '''
    predict = model.predict(X_test)  # 데이터 이용 예측값 출력
    print("정확도", model.score(X_test, y_test))  # 정확도 수치로 표현



## 레이블 테이블 로드

In [18]:
label_table = read_label_csv("학습데이터_정답.csv")
validation_label_table = read_label_csv("검증데이터_정답.csv")

## 특징 벡터 생성 예시
- PEMINER 정보는 모두 수치형 데이터이므로 특별히 가공을 하지 않고 사용 가능
- EMBER, PESTUDIO 정보는 가공해서 사용해야 할 특징들이 있음 (e.g. imports, exports 등의 문자열 정보를 가지는 데이터)
- 수치형 데이터가 아닌 데이터(범주형 데이터)를 어떻게 가공할 지가 관건 >> 인코딩 (e.g. 원핫인코딩, 레이블인코딩 등)

In [19]:
ember_path = "EMBER/학습데이터/000c4ae5e00a1d4de991a9decf9ecbac59ed5582f5972f05b48bc1a1fe57338a.json"
peminer_path = "PEMINER/학습데이터/000c4ae5e00a1d4de991a9decf9ecbac59ed5582f5972f05b48bc1a1fe57338a.json"

ember_result = read_json(ember_path)
peminer_result = read_json(peminer_path)

In [20]:
pprint.pprint(ember_result)

{'byteentropy': [51180,
                 4,
                 5,
                 1,
                 3,
                 4,
                 2,
                 0,
                 0,
                 0,
                 0,
                 0,
                 0,
                 0,
                 0,
                 1,
                 1965,
                 10,
                 14,
                 4,
                 3,
                 6,
                 2,
                 2,
                 1,
                 3,
                 4,
                 21,
                 5,
                 3,
                 3,
                 2,
                 1860,
                 8,
                 5,
                 102,
                 4,
                 4,
                 4,
                 6,
                 6,
                 5,
                 4,
                 8,
                 8,
                 8,
                 8,
                 8,
                 5460,
  

In [21]:
pprint.pprint(peminer_result)

{'ADVAP132.DLL': 0,
 'AWFAXP32.DLL': 0,
 'AWFXAB32.DLL': 0,
 'AWPWD32.DLL': 0,
 'AWRESX32.DLL': 0,
 'AWUTIL32.DLL': 0,
 'BHNETB.DLL': 0,
 'BHSUPP.DLL': 0,
 'CCAPI.DLL': 0,
 'CCEI.DLL': 0,
 'CCPSH.DLL': 0,
 'CCTN20.DLL': 0,
 'CMC.DLL': 0,
 'COMCTL32.DLL': 0,
 'COMDLG32.DLL': 0,
 'CRTDLL.DLL': 0,
 'DCIMAN.DLL': 0,
 'DCIMAN32.DLL': 0,
 'DSKMAINT.DLL': 0,
 'FileHeader.Characteristics': 8450,
 'FileHeader.Machine': 332,
 'FileHeader.NumberOfSections': 5,
 'FileHeader.NumberOfSymbols': 0,
 'FileHeader.PointerToSymbolTable': 0,
 'FileHeader.SizeOfOptionalHeader': 224,
 'FileHeader.TimeDateStamp': 1346994627,
 'GDI32.DLL': 0,
 'GROUP.DLL': 0,
 'HYPERTERM.DLL': 0,
 'KERNL32.DLL': 0,
 'LZ32.DLL': 0,
 'MAPI.DLL': 0,
 'MAPI32.DLL': 0,
 'MFC30.DLL': 0,
 'MPR.DLL': 0,
 'MSFS32.DLL': 0,
 'MSNDUI.DLL': 0,
 'MSNET32.DLL': 0,
 'MSPST32.DLL': 0,
 'MSSHRUI.DLL': 0,
 'MSVIEWUT.DLL': 0,
 'NAL.DLL': 0,
 'NDIS30.DLL': 0,
 'NETAPI.DLL': 0,
 'NETAPI32.DLL': 0,
 'NETBIOS.DLL': 0,
 'NETDI.DLL': 0,
 'NETSETUP.DLL'

In [24]:
class PeminerParser:  # Peminer에서 추가할 특징들
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        '''
            전체 데이터 사용        
        '''
        
        self.vector = [value for _, value in sorted(self.report.items(), key=lambda x: x[0])]
        return self.vector
    

class EmberParser:  # Ember에서 추가할 특징들
    '''
        예제에서 사용하지 않은 특징도 사용하여 벡터화 할 것을 권장
    '''
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def get_histogram_info(self):  # histogram 특징 추출
        histogram = np.array(self.report["histogram"])
        total = histogram.sum()
        vector = histogram / total   #평균값으로 특징 가공
        return vector.tolist()
    
    def get_string_info(self):  # strings 특징 추출
        strings = self.report["strings"]

        hist_divisor = float(strings['printables']) if strings['printables'] > 0 else 1.0 # printables의 크기가 0이상이면 실수형태로 저장
        vector = [
            strings['numstrings'], 
            strings['avlength'], 
            strings['printables'],
            strings['entropy'], 
            strings['paths'], 
            strings['urls'],
            strings['registry'], 
            strings['MZ']
        ]
        vector += (np.asarray(strings['printabledist']) / hist_divisor).tolist()  # printabledist의 평균값을 리스트로 저장
        return vector
    
    def get_general_file_info(self): # general 특징 추출
        general = self.report["general"]
        vector = [
            general['size'], general['vsize'], general['has_debug'], general['exports'], general['imports'],
            general['has_relocations'], general['has_resources'], general['has_signature'], general['has_tls'],
            general['symbols']
        ]
        return vector
    
#     def get_section_info(self):  # section 특징 추출 (-> 정확도 하향되어 사용하지 않음)
#         section = self.report["section"]
#         sections = section["sections"]
#         vector = []
#         tmp = []
#         total = 0
#         for i in range(len(sections)):  # section 중 엔트로피 틀징 추출
#             di = sections[i]
#             tmp.append(di["entropy"])
#             total += float(tmp[i])
#         length = len(sections)
#         if length == 0: length = 1
#         vector = [total/length]  # 엔트로피 평균으로 특징 가공
#         #print(vector)
#         return vector

    def process_report(self):
        vector = []
        vector += self.get_general_file_info()
        vector += self.get_histogram_info()
        vector += self.get_string_info()
        '''
            특징 추가
        '''
        return vector
    
class PestudioParser:  # Pestudio에서 추가할 특징들
    '''
        사용할 특징을 선택하여 벡터화 할 것을 권장
    '''
    
    def __init__(self, path):
        self.report = read_json(path)
        self.strings = self.report["image"]
        self.vector = []
        
#     def get_callbacks(self):  # 존재하면 1, 존재하지 않으면 0  # callbacks 특징 추출 -> 정확도 하양되어 사용하지 않음
#         call_vector = []
#         try :
#             callback = self.strings["tls-callbacks"]

#             if type(callback) == type(""):
#                 return [-1]
#             else:
#                 if callback["@hint"] == 'n/a':
#                     call_vector.append(0)
#                 elif callback["@hint"] != 'n/a':
#                     call_vector.append(1)

#                 try :
#                     if callback["#text"] == 'n/a':
#                         call_vector.append(0)
#                     elif callback["#text"] != 'n/a':
#                         call_vector.append(1)
#                 except KeyError:
#                     call_vector.append(-1)
                
#                 return call_vector
            
#             return call_vector
#         except KeyError:
#             call_vector = [-1, -1]
#             return call_vector
        
#         return call_vector
        
    def get_indicators(self):  # indicators에서 특징 추출
        indicators = self.strings["indicators"]
        hint = indicators["@hint"]  # hint 값 추출
        tmp = []
        for i in range(len(hint)):
            tmp.append(hint[i]);
        idx = 0
        for j in range(len(tmp)):
            if tmp[j] == "/":
                idx = j
                break
        num1 = "0"
        num2 = "1"
        try:
            for p in range(idx):
                num1 += tmp[p]
            for q in range(idx+1, len(tmp)):
                num2 += tmp[q]
            indicator_vector = [float(num1)/float(num2)]
        except ValueError:
            print(num1, num2)
            indicator_vector = [-1.0]  # hint 값이 올바르지 않다면 -1로 출력
        return indicator_vector
        
    def get_virustotal(self):  # virustotal에서 특징 추출 
        virustotal = self.strings["virustotal"]
        if type(virustotal) == type(""):
            return [-1]  # 해당 특징이 없을 경우 -1로 출력
        else:
            virustotal = self.strings["virustotal"]
            detect = float(virustotal["@detection"])
            total = float(len(virustotal["engines"]["engine"]))
            virustotal_vector = [detect/total]  # virus일 확률 특징
            return virustotal_vector
        
    
    

    def get_overview(self):  # overview 특징 추출 
        overview = self.strings["overview"]
        overview_vector = [
                         overview["size-without-overlay"],
                         overview["md5-without-overlay"],
                         overview["sha1-without-overlay"],
                         overview["sha256-without-overlay"]
        ]
        vector = []
        for i in range(len(overview_vector)):  # overlay 있으면 악성(1), 없으면 정상 파일(0)
            if i != 'n/a':
                vector.append(1)
            else:
                vector.append(0)
        return vector
    

    def process_report(self):
        vector = []
        vector += self.get_overview()
        #vector += self.get_indicators()
        vector += self.get_virustotal()
        #vector += self.get_optional_header()
        #vector += self.get_string()
        #vector += self.get_callbacks()
        #print(vector)
        vector += self.get_indicators()
        return vector

## 학습데이터 구성
- 특징 벡터 구성은 2차원이 되어야함 e.g.  [vector_1, vector_2, ..., vector_n]

- 각 벡터는 1차원 리스트, 벡터 크기는 모두 같아야함

In [25]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X           # data 가공 필요 -> 크기 같도록, 2차원 리스트로
# 데이터의 레이블 모음(1차원 리스트) : y
X, y = [], []
X_test, y_test = [], []  # 검증할 데이터들 넣을 리스트
X_final, y_final = [], [] #  테스트 데이터 넣을 리스트
# PESTUDIO 추가
learnig_data_path = "./PEMINER/학습데이터"
train_list = os.listdir(learnig_data_path)  # 파일 이름들 추가
for fname in train_list:
    feature_vector = []
    file_name = fname[:-5]
    #print(file_name)
    label = label_table[file_name]  # 파일마다 올바른 label 붙여줌
    for data in ["PEMINER", "EMBER", "PESTUDIO"]:
        path = f"{data}/학습데이터/{fname}"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()

        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()

        elif data == "PESTUDIO":
            if os.path.isfile(path):  # Pestudio 파일이 존재할 때만 특징 벡터 추가
                feature_vector += PestudioParser(path).process_report()
            else:
                feature_vector += [-1,-1,-1,-1, -1.0, -1.0]

    X.append(feature_vector)
    y.append(label)

# 검증에 이용할 특징들 넣고(검증데이터에 있는 파일들) 특징을 가공해 넣음
verify_data_path = "./PEMINER/검증데이터"
test_list = os.listdir(verify_data_path)# 검증할 파일들 이름
for fname in test_list:
    feature_vector = []
    file_name = fname[:-5]
    #print(file_name)
    label = validation_label_table[file_name]  # 파일마다 올바른 label 붙여줌
    for data in ["PEMINER", "EMBER", "PESTUDIO"]:
        path = f"{data}/검증데이터/{fname}"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()

        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()

        elif data == "PESTUDIO":
            if os.path.isfile(path):  # Pestudio 파일이 존재할 때만 특징 벡터 추가
                feature_vector += PestudioParser(path).process_report()
            else:
                feature_vector += [-1,-1,-1,-1, -1.0, -1.0]  # 파일이 존재하지 않을 때 추가되는 특징 개수 길이만큼의 -1 원소 리스트로 반환

    X_test.append(feature_vector)
    y_test.append(label)
    
    
final_data_path = "./PEMINER/테스트데이터"
final_list = os.listdir(final_data_path)  # 파일 이름들 추가

for fname in final_list:
    feature_vector = []
    file_name = fname[:-5]

    for data in ["PEMINER", "EMBER", "PESTUDIO"]:
        path = f"{data}/테스트데이터/{fname}"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()

        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()

        elif data == "PESTUDIO":
            if os.path.isfile(path):  # Pestudio 파일이 존재할 때만 특징 벡터 추가
                feature_vector += PestudioParser(path).process_report()
            else:
                feature_vector += [-1,-1,-1,-1, -1.0, -1.0]

    X_final.append(feature_vector)
    y_final.append([])
    

np.asarray(X).shape, np.asarray(y).shape
np.asarray(X_test).shape, np.asarray(y_test).shape
np.asarray(X_final).shape, np.asarray(y_final).shape

((10000, 564), (10000, 0))

## 학습 및 검증

In [26]:
# 학습
models = []
for model in ["rf", "lgb"]:  # 각 모델로 학습했을 때, 가장 높은 정확도 보인 모델 2개 사용
    clf = train(X, y, model)  # x_tain, y_train, model
    models.append(clf)  # clf는 학습된 모델
print(models)
# 검증
# 실제 검증 시에는 제공한 검증데이터를 검증에 사용해야 함
for model in models:  #학습된 모델로 검증데이터 평가
    evaluate(X_test, y_test, model)  # x_test, y_test, model


[RandomForestClassifier(n_jobs=4, random_state=41), LGBMClassifier(random_state=41)]
정확도 0.9455
정확도 0.9611


## 앙상블 예제

In [27]:
def ensemble_result(X, y, models):  # 검증할 때, 각 모델드링 가진 결과를 합쳐서 최종 결과 도출하도록 만듬
    '''
        학습된 모델들의 결과를 앙상블하는 함수
	
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''
    
    # Soft Voting
    # https://devkor.tistory.com/entry/Soft-Voting-%EA%B3%BC-Hard-Voting
    predicts = []
    for model in models:
        prob = [result for _, result in model.predict_proba(X)]  # 확률의 형태로 결과값 넣음
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]
    with open('predict.csv', 'w', encoding='utf-8') as f: # csv파일 제작
        wr = csv.writer(f)
        # 테이블 헤더를 출력
        columnNames = ["file", "predict"]
        wr.writerow(columnNames)
        for rowIdx in range(len(final_list)):
            finame = final_list[rowIdx]
            row = [finame[:-5], predict[rowIdx]]
            wr.writerow(row)
    
    #print("정확도", accuracy_score(y, predict))  # y와 predict를 비교해서 예측값 계산

In [28]:
def ensemble_result_score(X, y, models):  # 검증할 때, 각 모델드링 가진 결과를 합쳐서 최종 결과 도출하도록 만듬
    '''
        학습된 모델들의 결과를 앙상블하는 함수
	
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''
    
    # Soft Voting
    # https://devkor.tistory.com/entry/Soft-Voting-%EA%B3%BC-Hard-Voting
    predicts = []
    for model in models:
        prob = [result for _, result in model.predict_proba(X)]  # 확률의 형태로 결과값 넣음
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]

    print("정확도", accuracy_score(y, predict))  # y와 predict를 비교해서 예측값 계산 --> 정확도 출력

In [29]:
ensemble_result(X_final, y_final, models) # 테스트데이터 검사 후 csv 파일 생성
ensemble_result_score(X_test, y_test, models) # 검증데이터 기반 정확도 확인

정확도 0.9624


## 특징 선택 예제 (RFE 알고리즘 사용)

In [276]:
def select_feature(X, y, model):
    '''
        주어진 특징 벡터에서 특정 알고리즘 기반 특징 선택
        
        본 예제에서는 RFE 알고리즘 사용
        https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.RFE.html#sklearn.feature_selection.RFE.fit_transform
        
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param model: 문자열, 특징 선택에 사용할 머신러닝 알고리즘
    '''
    
    model = load_model(model=model, random_state=SEED)
    rfe = RFE(estimator=model)
    return rfe.fit_transform(X, y)

In [None]:
selected_X = select_feature(X, y, "rf")
new_model = train(selected_X, y, "rf")

print(selected_X)
print(new_model)