In [1]:
from sklearn.decomposition import PCA
import re
from gensim.models import Word2Vec
from sklearn.preprocessing import LabelEncoder
import pickle
import pandas as pd

In [2]:
obfuscation_indicators = [
    'Chr(', 'Hex(', 'Execute(', 'Base64Decode(',
    'Environ(', 'Shell(', 'WScript.Shell', 'GetFile(', 'MSXML2.ServerXMLHTTP', 
    'CreateObject("MSXML2.XMLHTTP")', 'CreateObject("ADODB.Stream")', 
    'Randomize', 'Replace(', 'StrReverse(', 'CallByName(', 'GetObject(',
    'Xor', 'vbuicode', 'vbfromunicode', 'If False Then ... End If'
    ]

def is_obfuscated(code):
    for indicator in obfuscation_indicators:
        if indicator in code:
            return 1
    return 0

In [3]:
def has_url_or_ip(code):
    # Regular expression pattern to match URLs or IPs
    pattern = re.compile(r'(https?://|ftp://|www\.)|((25[0-5]|2[0-4][0-9]|[0-1]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[0-1]?[0-9][0-9]?)')

    # Check if the pattern is found in the code
    return 1 if pattern.search(code) else 0

In [4]:
information_disclosure_keywords = ['winmgmts', 'Win32_Process', 'shell', 
                                   'ssn', 'secret', 'pwd', 'shadow', 'bypass', 
                                  'EvilClippy', 'Base64Decode', 'CreateObject',
                                  'CommandLine', 'Auto_open', 'Replace', 'Based',
                                  'Worksheet_Change', 'bin.base64', 'xor', 'GetFile',
                                  'Shell', 'DownloadFile', 'Eval']

# Define a function to check for the presence of information disclosure keywords
def has_information_disclosure(code):
    for keyword in information_disclosure_keywords:
        if keyword in code.lower():
            return 1
    return 0

In [5]:
def set_data(df):
    df['code_length'] = df['vba_code'].apply(len)
    df['num_lines'] = df['vba_code'].apply(lambda x: x.count('\n') + 1)
    df['avg_chars_per_line'] = df.apply(lambda row: row['code_length'] / row['num_lines'] if row['num_lines'] > 0 else 0, axis=1)
    df['num_loops'] = df['vba_code'].apply(lambda x: x.count('For') + x.count('While'))
    df['has_error_handling'] = df['vba_code'].apply(lambda x: 1 if 'On Error' in x else 0)
    df['has_url_or_ip'] = df['vba_code'].apply(has_url_or_ip)
    df['is_obfuscated'] = df['vba_code'].apply(is_obfuscated)
    df['num_string_literals'] = df.apply(lambda row: len(re.findall(r'"([^"]*)"', row['vba_code'])) / row['code_length'] if row['code_length'] > 0 else 0, axis=1)
    df['has_information_disclosure'] = df['vba_code'].apply(has_information_disclosure)
    df['num_numeric_literals'] = df.apply(lambda row: len(re.findall(r'\b\d+\b', row['vba_code'])) / row['code_length'] if row['code_length'] > 0 else 0, axis=1)
    df['num_exclamation_marks'] = df.apply(lambda row: row['vba_code'].count('!') / row['code_length'] if row['code_length'] > 0 else 0, axis=1)
    df['num_functions'] = df.apply(lambda row: (row['vba_code'].count('Sub ') + row['vba_code'].count('Function ')) / row['code_length'] if row['code_length'] > 0 else 0, axis=1)
    
    df = df.drop('vba_code', axis=1)
    return df

In [6]:
def aggregate_word_vectors(tokens, model):
    vectors = [model.wv[word] for word in tokens if word in model.wv]
    if not vectors:
        return [0] * model.vector_size  
    return sum(vectors) / len(vectors)  

In [7]:
def load_and_set_data(file):
    # load
    df = pd.read_csv(file, encoding='utf-16-le')
    
    # set features
    set_df = set_data(df)
    
    # set Word2Vec
    tokenized_data = [doc.split() for doc in df['vba_code']]
    word2vec_model = Word2Vec(sentences=tokenized_data, vector_size=160, window=5, min_count=1, workers=4)    
    X_word2vec = [aggregate_word_vectors(tokens, word2vec_model) for tokens in tokenized_data]
    X_word2vec_df = pd.DataFrame(X_word2vec, columns=[f'w2v_{i}' for i in range(len(X_word2vec[0]))])
    
    # set PCA
    pca = PCA()
    pca = PCA(n_components=121)
    
    X_reduced = pca.fit_transform(X_word2vec_df)
    X_PCA = pd.DataFrame(X_reduced)
    X_combined_pca = pd.concat([X_PCA, set_df], axis=1)
    X_combined_pca.columns = X_combined_pca.columns.astype(str)
    return X_combined_pca

In [8]:
df = load_and_set_data("test_dataset_without_labels.csv")
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,avg_chars_per_line,num_loops,has_error_handling,has_url_or_ip,is_obfuscated,num_string_literals,has_information_disclosure,num_numeric_literals,num_exclamation_marks,num_functions
0,0.911968,0.028852,1.158425,-2.343678,-2.903747,0.863238,0.006562,1.150249,0.458280,0.347325,...,26.118143,0,0,0,0,0.016963,0,0.009047,0.000000,0.003231
1,1.044721,-0.508470,-1.159662,-0.746937,-0.504604,0.403416,-0.244665,-1.918641,0.004179,-0.593387,...,33.300000,5,0,0,0,0.007007,0,0.001001,0.000000,0.004004
2,0.101813,-0.980538,-4.092450,2.057913,-1.233702,0.763052,0.025129,0.155528,0.151660,0.996027,...,35.900000,0,1,0,0,0.002786,0,0.015320,0.000000,0.001393
3,1.160467,-1.750940,-2.145751,1.021693,-0.991908,0.329452,0.610085,-0.844976,-0.935817,1.507647,...,26.995633,5,0,0,0,0.014882,0,0.004368,0.001779,0.002103
4,0.876440,1.044487,3.230786,-1.971717,-0.335826,0.531478,1.982565,-1.779857,-2.216256,1.289247,...,27.731959,0,0,0,0,0.001859,0,0.000000,0.000000,0.005204
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10625,1.844250,2.227531,-1.392174,-1.772885,1.736030,-0.717070,0.571943,-0.912183,-0.718201,-1.308992,...,43.875000,0,0,0,0,0.002849,0,0.000000,0.000000,0.002849
10626,0.405211,1.921472,0.951488,-2.311140,0.012277,0.581972,0.244963,-1.753249,-0.397423,-1.178849,...,19.176471,0,0,1,0,0.006135,1,0.003067,0.000000,0.004601
10627,2.008320,-1.639312,-0.938250,-1.334050,-2.430611,-0.472688,-2.398546,-1.575575,0.896505,-0.326476,...,27.027027,0,0,0,0,0.008000,0,0.004000,0.000000,0.003000
10628,2.308344,4.403209,-0.802662,-0.460244,2.749138,0.968618,-2.820523,1.332772,1.646129,-3.418230,...,71.428571,0,0,0,0,0.013000,0,0.011000,0.000000,0.000000


### load model from pickle file

In [9]:
model_pkl_file = "model.pkl" 

with open(model_pkl_file, 'rb') as file:  
    model = pickle.load(file)

In [10]:
prediction = model.predict(df)
prediction

array([1, 1, 1, ..., 1, 1, 1])

In [11]:
prediction_labels = ['white' if pred == 1 else 'mal' for pred in prediction]
import csv

# Specify the file path
csv_file_path = 'test_prediction.csv'

# Open the CSV file in write mode
with open(csv_file_path, 'w', newline='') as csvfile:
    # Create a CSV writer
    csv_writer = csv.writer(csvfile)
    
    # Write the header if needed
    csv_writer.writerow(['prediction'])  
    
    # Write the prediction labels to the CSV file
    csv_writer.writerows([[label] for label in prediction_labels])

print(f'Predictions have been written to {csv_file_path}.')

Predictions have been written to test_prediction.csv.
