In [1]:
import os
import json
import csv

import numpy as np

from lightgbm import LGBMClassifier

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score
from sklearn.feature_selection import RFE

from tqdm import tqdm
from sklearn.feature_extraction import FeatureHasher
from sklearn.feature_extraction import FeatureHasher

# 데이터 파일 경로: 본인의 컴퓨터에 맞게 수정
file_path = "/Users/bumseok/workspace/2021_Information_protection/Project 2/데이터"

In [2]:
SEED = 41

# helper function for read csv file
def read_label_csv(path):
    label_table = dict()
    with open(path, "r", encoding="ISO-8859-1") as f:
        for line in f.readlines()[1:]:
            fname, label = line.strip().split(",")
            label_table[fname] = int(label)
    return label_table


# helper function for read json file
def read_json(path):
    with open(path, "r") as f:
        return json.load(f)


# helper function for test model with these 8 algorithm
def load_model(**kwargs):
    if kwargs["model"] == "rf":
        return RandomForestClassifier(random_state=kwargs["random_state"], n_jobs=4)
    elif kwargs["model"] == "dt":
        return DecisionTreeClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lgb":
        return LGBMClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "svm":
        return SVC(random_state=kwargs["random_state"], probability=True)
    elif kwargs["model"] == "lr":
        return LogisticRegression(
            random_state=kwargs["random_state"], n_jobs=-1, max_iter=25000
        )
    elif kwargs["model"] == "knn":
        return KNeighborsClassifier(n_jobs=-1)
    elif kwargs["model"] == "adaboost":
        return AdaBoostClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "mlp":
        return MLPClassifier(random_state=kwargs["random_state"])
    else:
        print("Unsupported Algorithm")
        return None


def train(X_train, y_train, model):
    """
    머신러닝 모델을 선택하여 학습을 진행하는 함수

    :param X_train: 학습할 2차원 리스트 특징벡터
    :param y_train: 학습할 1차원 리스트 레이블 벡터
    :param model: 문자열, 선택할 머신러닝 알고리즘
    :return: 학습된 머신러닝 모델 객체
    """
    clf = load_model(model=model, random_state=SEED)
    clf.fit(X_train, y_train)
    return clf


def evaluate(X_test, y_test, model):
    """
    학습된 머신러닝 모델로 검증 데이터를 검증하는 함수

    :param X_test: 검증할 2차원 리스트 특징 벡터
    :param y_test: 검증할 1차원 리스트 레이블 벡터
    :param model: 학습된 머신러닝 모델 객체
    """
    predict = model.predict(X_test)
    print(f"{model}정확도", model.score(X_test, y_test))


## 특징 벡터 생성 예시
- PEMINER 정보는 모두 수치형 데이터이므로 특별히 가공을 하지 않고 사용 가능
- EMBER, PESTUDIO 정보는 가공해서 사용해야 할 특징들이 있음 (e.g. imports, exports 등의 문자열 정보를 가지는 데이터)
- 수치형 데이터가 아닌 데이터(범주형 데이터)를 어떻게 가공할 지가 관건 >> 인코딩 (e.g. 원핫인코딩, 레이블인코딩 등)

In [3]:
from sklearn.feature_extraction import FeatureHasher


class PeminerParser:
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []

    def process_report(self):
        """
        전체 데이터 사용
        """

        self.vector = [
            value for _, value in sorted(self.report.items(), key=lambda x: x[0])
        ]
        return self.vector


class EmberParser:
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []

    def get_general_file_info(self):
        general = self.report["general"]

        parseList = [
            "size",
            "vsize",
        ]

        vector = []

        for i in parseList:
            vector.append(general[i])

        return vector

    ##############################
    # 직접 추가한 특징 추출 함수 #
    ##############################

    # import하는 라이브러리등 feature 해시로 추출
    def get_imports_info(self):
        data = self.report["imports"]
        vector = []
        libraries = list(set([l.lower() for l in data]))
        vector.extend(
            FeatureHasher(256, input_type="string").transform([libraries]).toarray()[0]
        )  # libraries_hashed

        imports = [lib.lower() + ":" + e for lib, elist in data.items() for e in elist]
        vector.extend(
            FeatureHasher(1024, input_type="string").transform([imports]).toarray()[0]
        )  # imports_hashed

        return vector

    # 악성코드의 경우 export를 많이 할것으로 예상됨
    def get_exports_info(self):
        exports = self.report["exports"]
        vector = []
        vector.extend(
            FeatureHasher(128, input_type="string").transform([exports]).toarray()[0]
        )  # exports_hashed
        return vector

   
    def process_report(self):
        vector = []
        vector += self.get_general_file_info()
    
        """
            특징 추가
        """
        vector += self.get_general_file_info()
        vector += self.get_exports_info()
        vector += self.get_imports_info()
        return vector


class PestudioParser:
    def __init__(self, path):
        try:
            self.report = read_json(path)
        except:
            self.report = None
        self.vector = []

    """
        사용할 특징을 선택하여 벡터화 할 것을 권장
    """
    
    def get_image_info(self):
        return [float(self.report["image"]["overview"]["entropy"])]

    # libraries의 blacklist = "x"인 비율
    def get_libraries_info(self):
        try:
            libraries = self.report["image"]["libraries"]
        except KeyError:
            return [-1]
        if libraries == "n/a" or len(libraries) == 1:
            return [0]
        try:
            vector = [
                sum(1 for i in libraries["library"] if i["@blacklist"] == "x")
                / len(libraries["library"])
            ]
        except TypeError:
            return [1 if libraries["library"]["@blacklist"] == "x" else 0]
        return vector

    # imports의 blacklist = "x"인  비율
    def get_imports_info(self):
        try:
            imports = self.report["image"]["imports"]
        except KeyError:
            return [-1]
        if imports == "n/a" or len(imports) == 1:
            return [0]
        try:
            vector = [
                sum(1 for i in imports["import"] if i["@blacklist"] == "x")
                / len(imports["import"])
            ]
        except TypeError:
            return [1 if imports["import"]["@blacklist"] == "x" else 0]
        return vector

    # exports의 export 요소 개수 추출
    def get_exports_info(self):
        try:
            exports = self.report["image"]["exports"]
        except KeyError:
            return [-1]
        return [0 if exports == "n/a" else len(exports["export"])]

    # 인증서가 있으면 0 없다면 1
    def get_certificate_info(self):
        try:
            cert = self.report["image"]["certificate"]
            return [0 if cert == "n/a" else 1]
        except:
            return [0]

    def process_report(self):
        vector = []
        if self.report != None:
            vector += self.get_image_info()
            vector += self.get_libraries_info()
            vector += self.get_imports_info()
            vector += self.get_exports_info()
            vector += self.get_certificate_info()
        else:
            for _ in range(5):
                vector.append(-1)

        return vector


## 학습데이터 구성
- 특징 벡터 구성은 2차원이 되어야함 e.g.  [vector_1, vector_2, ..., vector_n]

- 각 벡터는 1차원 리스트, 벡터 크기는 모두 같아야함

In [4]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
label_table = read_label_csv(f"{file_path}/학습데이터_정답.csv")

X, y = [], []
for fname in tqdm(list(label_table.keys())):
    feature_vector = []
    label = label_table[fname.split(".")[0]]
    for data in ["PEMINER", "EMBER", "PESTUDIO"]:
        path = f"{file_path}/{data}/학습데이터/{fname}.json"

        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()
        elif data == "PESTUDIO":
            try:
                feature_vector += PestudioParser(path).process_report()
            except FileNotFoundError:
                pass
            
    X.append(feature_vector)
    y.append(label)

np.asarray(X).shape, np.asarray(y).shape

100%|██████████| 20000/20000 [02:04<00:00, 161.07it/s]


((20000, 1605), (20000,))

## 검증데이터 구성

In [5]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
val_label_table = read_label_csv(f"{file_path}/검증데이터_정답.csv")

X_val, y_val = [], []
for fname in tqdm(list(val_label_table.keys())):
    feature_vector = []
    label = val_label_table[fname.split(".")[0]]
    for data in ["PEMINER", "EMBER", "PESTUDIO"]:
        path = f"{file_path}/{data}/검증데이터/{fname}.json"

        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()
        elif data == "PESTUDIO":
            try:
                feature_vector += PestudioParser(path).process_report()
            except FileNotFoundError:
                pass

            # print(path)
    X_val.append(feature_vector)
    y_val.append(label)

np.asarray(X_val).shape, np.asarray(y_val).shape

100%|██████████| 10000/10000 [00:59<00:00, 167.29it/s]


((10000, 1605), (10000,))

## 학습 및 검증

In [6]:
models = []
model_list = ["rf", "dt", "lgb", "svm", "lr", "knn", "adaboost", "mlp"]
for model in tqdm(model_list):  # 여기에 dt lgb svm lr knn adaboost mlp 추가해서 모델 설정
    clf = train(X, y, model)
    models.append(clf)

# 검증
# 실제 검증 시에는 제공한 검증데이터를 검증에 사용해야 함
for model in tqdm(models):
    evaluate(X_val, y_val, model)

100%|██████████| 8/8 [50:37<00:00, 379.69s/it]
 12%|█▎        | 1/8 [00:02<00:15,  2.26s/it]

RandomForestClassifier(n_jobs=4, random_state=41)정확도 0.9573


 25%|██▌       | 2/8 [00:04<00:12,  2.03s/it]

DecisionTreeClassifier(random_state=41)정확도 0.9339


 38%|███▊      | 3/8 [00:05<00:07,  1.52s/it]

LGBMClassifier(random_state=41)정확도 0.9582


 50%|█████     | 4/8 [02:59<04:39, 69.93s/it]

SVC(probability=True, random_state=41)정확도 0.8269


 62%|██████▎   | 5/8 [03:00<02:15, 45.04s/it]

LogisticRegression(max_iter=25000, n_jobs=-1, random_state=41)정확도 0.8413


 75%|███████▌  | 6/8 [03:16<01:10, 35.17s/it]

KNeighborsClassifier(n_jobs=-1)정확도 0.9028


 88%|████████▊ | 7/8 [03:18<00:24, 24.38s/it]

AdaBoostClassifier(random_state=41)정확도 0.9055


100%|██████████| 8/8 [03:19<00:00, 25.00s/it]

MLPClassifier(random_state=41)정확도 0.4605





In [7]:
# 학습
models = []
for model in tqdm(["rf", "lgb"]):  # 여기에 dt lgb svm lr knn adaboost mlp 추가해서 모델 설정
    clf = train(X, y, model)
    models.append(clf)

# 검증
# 실제 검증 시에는 제공한 검증데이터를 검증에 사용해야 함
for model in tqdm(models):
    evaluate(X_val, y_val, model)

100%|██████████| 2/2 [00:08<00:00,  4.33s/it]
 50%|█████     | 1/2 [00:01<00:01,  2.00s/it]

RandomForestClassifier(n_jobs=4, random_state=41)정확도 0.9573


100%|██████████| 2/2 [00:02<00:00,  1.44s/it]

LGBMClassifier(random_state=41)정확도 0.9582





## 앙상블 예제

In [8]:
def ensemble_result(X, y, models):
    """
    학습된 모델들의 결과를 앙상블하는 함수

    :param X: 검증할 2차원 리스트 특징 벡터
    :param y: 검증할 1차원 리스트 레이블 벡터
    :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    """

    # Soft Voting
    # https://devkor.tistory.com/entry/Soft-Voting-%EA%B3%BC-Hard-Voting
    predicts = []
    for i in tqdm(range(len(X))):
        probs = []
        for model in models:
            prob = model.predict_proba(X)[i][1]
            probs.append(prob)
        predict = 1 if np.mean(probs) >= 0.5 else 0
        predicts.append(predict)

    print("정확도", accuracy_score(y, predicts))


In [9]:
ensemble_result(X_val, y_val, models)

100%|██████████| 10000/10000 [5:11:30<00:00,  1.87s/it]    

정확도 0.9615





## 특징 선택 예제 (RFE 알고리즘 사용)

In [13]:
def select_feature(X, y, model):
    """
    주어진 특징 벡터에서 특정 알고리즘 기반 특징 선택

    본 예제에서는 RFE 알고리즘 사용
    https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.RFE.html#sklearn.feature_selection.RFE.fit_transform

    :param X: 검증할 2차원 리스트 특징 벡터
    :param y: 검증할 1차원 리스트 레이블 벡터
    :param model: 문자열, 특징 선택에 사용할 머신러닝 알고리즘
    """

    model = load_model(model=model, random_state=SEED)
    rfe = RFE(estimator=model)
    return rfe.fit_transform(X, y)

In [14]:
# 학습데이터에서 특징선택 알고리즘을 활용한 새로운 데이터 추출
selected_X = select_feature(X, y, "rf")
print("특징 추출 전 특징의 개수  : ", np.asarray(X_val).shape[1])
print("특징 추출 후 특징의 개수  : ", selected_X.shape[1])

특징 추출 전 특징의 개수  :  1605
특징 추출 후 특징의 개수  :  802


RFE를 통해 993개의 특징을 선택

In [15]:
# 추출된 학습데이터를 활용한 모델 학습
new_model = train(selected_X, y, "rf")

# 검증데이터에서 특징선택 알고리즘을 활용한 새로운 검증데이터 추출
selected_X_val = select_feature(X_val, y_val, "rf")

# 새로운 검증데이터를 활용한 새로운 모델 검증
evaluate(selected_X_val, y_val, new_model)

RandomForestClassifier(n_jobs=4, random_state=41)정확도 0.2904


# CSV 파일 추출 

In [16]:
X_test = []

l_test = os.listdir(f"{file_path}/EMBER/테스트데이터")

for fname in l_test:
    feature_vector = []
    for data in ["PEMINER", "EMBER", "PESTUDIO"]:
        path = f"{file_path}/{data}/테스트데이터/{fname}"

        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()
        elif data == "PESTUDIO":
            try:
                feature_vector += PestudioParser(path).process_report()
            except FileNotFoundError:
                pass
    X_test.append(feature_vector)
    # y_valid.append(label)

np.asarray(X_test).shape
# 10000 이 맞는 출력

(10000, 1605)

In [17]:
def ensemble_test_result(X, models):
    test_predict = []
    predicts = []
    for model in models:
        prob = [result for _, result in model.predict_proba(X)]
        predicts.append(prob)

    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]

    with open("result.csv", "w", encoding="utf-8", newline="") as f:
        wr = csv.writer(f)
        wr.writerow(["file", "predict"])
        for a in range(10000):
            wr.writerow([os.path.splitext(l_test[a])[0], predict[a]])

    print(predict)

In [18]:
# 10000개의 데이터에 대한 csv 생성
ensemble_test_result(X_test,models)

[1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 