In [1]:
import numpy as np
import re
import os

from pyspark.sql import SparkSession
from pyspark.ml.linalg import SparseVector, VectorUDT, Vectors
from pyspark.sql.types import *
from pyspark.rdd import RDD, PipelinedRDD

In [2]:
SECTION_PREF = [
    'HEADER:', '.text:', '.Pav:', '.idata', '.data', '.rdata', '.bss', '.edata:',
    '.rsrc:', '.tls', '.reloc:'
]

OP_INSTR = [
    'jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add',
    'imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb'
]

KEY = ['.dll', 'std::', ':dword']

# important keyword to interact with stack and manipulate memories
MEM_KW = ['FUNCTION', 'call'] # memcpy_s and memmove_s usually comes after call

In [3]:
class PreprocessASM:
    def __init__(self, fdir, minApp=30):
        self.fdir = fdir
        self.minApp = minApp
    
    def genToken(self, file):
        asm = os.path.join(self.fdir, file + '.asm')
        with open(asm, 'r', encoding='ISO-8859-1') as asmFile:
            tokens = re.sub(r'\t|\n|\r', ' ', asmFile.read()).split()
        
        filtered = []
        opList = []
        
        for i in range(1, len(tokens) - 1):
            if tokens[i] in OP_INSTR:
                filtered.append(tokens[i])
                opList.append(tokens[i])
            
            filtered += [p for p in SECTION_PREF if p in tokens[i]]
            filtered += [k for k in KEY if k in tokens[i]]
            filtered += [tokens[i] + ' ' + tokens[i + 1] for k in MEM_KW if k == tokens[i]]
            
            # memory and function call
            if tokens[i] == '__stdcall':
                bigram = tokens[i] + ' ' + tokens[i + 1].partition('(')[0]
                filtered.append(bigram)
                filtered.append(tokens[i - 1])
            
            # define bytes
            if tokens[i] == 'db' and tokens[i + 1][0] == "'":
                filtered.append(tokens[i] + ' ' + tokens[i + 1])
        
        return (file, filtered)
    
    def getFileSize(file):
        return (file, os.stat(os.path.join(self.fdir, file + '.asm')).st_size)
    
    def genBagWords(self, row):
        sparse = {}
        df = {}
        
        for w in row[1]:
            if w in self.dict:
                if self.dict[w] not in sparse:
                    sparse[self.dict[w]] = 0
                sparse[self.dict[w]] += 1
        
        tf = SparseVector(len(self.dict), sparse)
        return (row[0], tf)
                
    
    def process(self, X_RDD, y_RDD=None, train=True):
        X = X_RDD.map(self.genToken).cache()
        X_sz = X_RDD.map(self.getFileSize).cache()
        
        if train:
            self.dict = X.map(lambda r: r[1]) \
                         .flatMap(lambda w: w) \
                         .map(lambda w: (w, 1)) \
                         .reduceByKey(lambda acc, w: acc + w) \
                         .filter(lambda x: x[1] >= self.minApp) \
                         .collectAsMap()
            
            
            self.dict = dict(zip(self.dict, range(len(self.dict))))
        # bag of words
        X = X.map(self.genBagWords)
        
        if y_RDD:
            X = X.zipWithIndex().map(lambda r: (r[1], r[0]))
            y = y_RDD.zipWithIndex().map(lambda r: (r[1], r[0]))
            
            # x: (idx,((hash,features),label)
            data = X.join(y).map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
            schema = StructType([
                StructField('hash', StringType(), True),
                StructField('features', VectorUDT(), True),
                StructField('label', StringType(), True)
            ])
            data = data.toDF(schema)
            data = data.withColumn('label', data.label.cast(BooleanType()))
        else:
            schema = StructType([
                StructField('hash', StringType(), True),
                StructField('features', VectorUDT(), True)
            ])
            data = X.toDF(schema)
            
        return data

In [4]:
if __name__ == '__main__':
    sparkRaw = SparkSession.builder.appName('MSFT_MAL').getOrCreate()
    sc = sparkRaw.sparkContext
    
    X_file = ['0ACDbR5M3ZhBJajygTuf', '0A32eTdBKayjCWhZqDOQ']
    y_file = [1, 0]
    
    X = sc.parallelize(X_file)
    y = sc.parallelize(y_file)
    
    pipeline = PreprocessASM('data/', 30)
    data = pipeline.process(X, y)

In [10]:
data.write.save("/tmp/data")