# Malware Classification

+ https://github.com/danielgibert/mlw_classification_cnn_img

+ https://github.com/AFAgarap/malware-classification


## How to upload a file (Colaboratory)
![COLAB](image/LSTM1.png)

+ Click folder button on the right side
+ Upload malimg.npz

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
    print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

In [None]:
import argparse
import os

import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score

import tensorflow as tf
from tensorflow.keras import datasets, layers, models
from tensorflow.keras import Model

BATCH_SIZE = "batch_size"
CELL_SIZE = "cell_size"

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
tf.__version__

In [None]:
dataset = np.load("./malimg.npz", allow_pickle=True)  # malware 프로그램을 이미지로 변환한 데이터셋

In [None]:
def load_data(dataset, standardize=True):
    features = dataset["arr"][:, 0]
    features = np.array([feature for feature in features])
    features = np.reshape(
        features, (features.shape[0], features.shape[1] * features.shape[2])
    )

    if standardize:
        features = StandardScaler().fit_transform(features)
    features = features.astype(np.float32)
        
    labels = dataset["arr"][:, 1]
    labels = np.array([label for label in labels])

    return features, labels

In [None]:
features, labels = load_data(dataset=dataset)  # 데이터셋을 feature, label로 가공, 데이터 normalization

In [None]:
features.shape  # 9339개의 32x32 크기 grayscale 이미지 (channel: 1), (0~255 사이의 값을 가짐)

In [None]:
labels.shape    # 9339개의 label

In [None]:
MALWARE_FAMILIES = [
    "Adialer.C",
    "Agent.FYI",
    "Allaple.A",
    "Allaple.L",
    "Alueron.gen!J",
    "Autorun.K",
    "C2LOP.P",
    "C2LOP.gen!g",
    "Dialplatform.B",
    "Dontovo.A",
    "Fakerean",
    "Instantaccess",
    "Lolyda.AA1",
    "Lolyda.AA2",
    "Lolyda.AA3",
    "Lolyda.AT",
    "Malex.gen!J",
    "Obfuscator.AD",
    "Rbot!gen",
    "Skintrim.N",
    "Swizzor.gen!E",
    "Swizzor.gen!I",
    "VB.AT",
    "Wintrim.BX",
    "Yuner.A",
]
# malware category 이름

In [None]:
# get the number of features
num_features = features.shape[1]

# get the number of classes
num_classes = len(MALWARE_FAMILIES)

# split the dataset by 70/30
train_features, test_features, train_labels, test_labels = train_test_split(
    features, labels, test_size=0.30, stratify=labels
)

In [None]:
# 데이터셋 확인
n = (10, 10)

# 10x10개의 malware 이미지와 이름을 확인합니다.
fig, ax = plt.subplots(n[0],n[1],figsize=(12, 10))

for i in range(n[0] * n[1]):
    elem = i
    row = i // n[1]
    col = i % n[1]
    ax[row][col].imshow(train_features[elem].reshape(32,32), cmap='gray', vmin=-1, vmax=1)
    ax[row][col].set(xticks=[], yticks=[], frame_on=False)
    ax[row][col].set(xlabel="{}".format(MALWARE_FAMILIES[train_labels[elem]]))
fig.tight_layout() 

plt.show()

In [None]:
class CNNModel(Model):
    def __init__(self, num_classes, *, drop_rate=0):
        super().__init__()
        self.conv1 = layers.Conv2D(36,5, activation='relu', padding="same", input_shape=(32,32,1))
        self.conv2 = layers.Conv2D(72,5, activation='relu', padding="same")
        self.maxpool = layers.MaxPool2D((2,2))
        self.flatten = layers.Flatten()
        self.dropout = layers.Dropout(drop_rate)
        self.d1 = layers.Dense(1024, activation='relu')
        self.d2 = layers.Dense(num_classes)
        
    def call(self, x):
        x = tf.reshape(x, [-1, 32, 32, 1])
        # convolution layer (36x36, 5channel) + activation (relu) + max pooling
        x = self.conv1(x)
        x = self.maxpool(x)
        # convolution layer (72x72, 5channel) + activation (relu) + max pooling
        x = self.conv2(x)
        x = self.maxpool(x)
        # dense layer (1024 dimensions) + activation (relu) + dropout
        x = self.flatten(x)
        x = self.d1(x)
        x = self.dropout(x)
        # dense layer (num_classes dimensions)
        x = self.d2(x)
        return x

In [None]:
# 최적화 목적 함수 (cross entropy loss)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# 최적화 알고리즘 (Adam)
optimizer = tf.keras.optimizers.Adam()

In [None]:
# 모델 평가 (loss, 정확도)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="test_accuracy")

In [None]:
@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
        # 모형에 데이터를 넣어서 예측 값을 얻음
        predictions = model(images, training=True)  # training이 True일 경우 dropout 사용
        
        # 최적화 목표함수를 기준으로 예측 값을 평가 (loss를 구함)
        loss = loss_object(labels, predictions)
    
    # loss를 최소화하는 기울기(gradient)를 계산함.
    gradients = tape.gradient(loss, model.trainable_variables)
    
    # 기울기를 바탕으로 모형 파라미터를 수정함
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    # loss, accuracy 기록
    train_loss(loss)
    train_accuracy(labels, predictions)

@tf.function
def test_step(images, labels):
    predictions = model(images, training=False)  # training이 False일 경우 dropout 사용 안함
    t_loss = loss_object(labels, predictions)

    test_loss(t_loss)
    test_accuracy(labels, predictions)

In [None]:
# 데이더셋 로더 (batch size, shuffling)

train_ds = tf.data.Dataset.from_tensor_slices(
    (train_features, train_labels)).shuffle(train_features.shape[0]).batch(32)

test_ds = tf.data.Dataset.from_tensor_slices((test_features, test_labels)).batch(32)

In [None]:
model = CNNModel(num_classes, drop_rate=0)

In [None]:
EPOCHS = 20

for epoch in range(EPOCHS):
    # loss, accuracy 기록 리셋
    train_loss.reset_states()
    train_accuracy.reset_states()
    test_loss.reset_states()
    test_accuracy.reset_states()
    
    # training 데이터셋으로 학습
    for images, labels in train_ds:
        train_step(images, labels)

    # test 데이터셋으로 성능 평가
    for images, labels in test_ds:
        test_step(images, labels)
        
    print(
        f'Epoch {epoch + 1}, '
        f'Loss: {train_loss.result()}, '
        f'Accuracy: {train_accuracy.result() * 100}, '
        f'Test Loss: {test_loss.result()}, '
        f'Test Accuracy: {test_accuracy.result() * 100}'
    )

In [None]:
predicted_labels = model(test_features)
predicted_labels = tf.math.argmax(predicted_labels, 1)
predicted_labels = predicted_labels.numpy() # 모형 예측값

# precision, recall, f1-score
report = classification_report(y_true=test_labels, y_pred=predicted_labels, target_names=MALWARE_FAMILIES)
print(report)

In [None]:
# confusion matrix
conf = confusion_matrix(y_true=test_labels, y_pred=predicted_labels)

plt.figure(figsize=(12,10))
plt.imshow(conf, cmap=plt.cm.Greys, interpolation="nearest")
plt.title("Confusion matrix")

plt.colorbar()

tick_marks = np.arange(len(MALWARE_FAMILIES))
plt.xticks(tick_marks, MALWARE_FAMILIES, rotation=45)
plt.yticks(tick_marks, MALWARE_FAMILIES)

plt.tight_layout()
plt.ylabel("Actual label")
plt.xlabel("Predicted label")

plt.show()