In [None]:
import pyspark as ps
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark import SparkConf
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.sql.session import SparkSession
from pyspark.ml.classification import NaiveBayes
import sys
import requests
import re
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer




## Spark Session Setup : 
In order to deal with memory issues, we created a seperate method to initialize executor memory and driver memory.This solution is particularly good for jupyter notebook.  

In [None]:
def spark_session_setup():
#     """
#     creates a spark context
#     >>> sc = spark_session_setup()
#     """
    # in order to be bale to change log level
    conf = ps.SparkConf()
    conf.set('spark.logConf', 'true')
    conf.set('spark.executor.memory', '12G')
    conf.set('spark.driver.memory', '12G')
    # create a spark session
    sc = ps.SparkContext(appName='word_count', conf=conf)
    # change log level to ERROR
    sc.setLogLevel("ERROR")
    return sc

sc = spark_session_setup()
sql_context = ps.sql.SQLContext(sc)

In [None]:
def get_broadcast_label_dict() : 
#     '''This method is used to get a dictionary of filenames and their corresponding labels for what malware they represent. 
#     This dictionary enables an efficient use of map function later on where we are able to create a new column of labels in an RDD 
#     quite easily. 
#     '''
    filenames = requests.get(x_small_train_path).text.split('\n')
    labels = requests.get(y_small_train_path).text.split('\n')
    filename_label_dict = {}
    for filename, label in zip(filenames, labels):
        filename_label_dict[filename] = label
    return sc.broadcast(filename_label_dict)

def find_file(x): 
   # This is used to map a file name to its corresponding text in byte file 
    path = byte_data_path+x+'.bytes'
    text1 = requests.get(path).text
    return(x,text1)


def pre_process(x):
    # This method is used to preprocess the data and also add labels as a separate column. 
    fname = x[0]
    label = int(broadcast_filename_label_dict.value[fname])
    word_list = list(filter(lambda x: len(x)==2 and x!='??', re.split('\r\n| ', x[1])))
    return (fname, label, word_list)

def pre_process_test(x):
    fname = x[0]
    label = int(broadcast_filename_label_dict_test.value[fname])
    word_list = list(filter(lambda x: len(x)==2 and x!='??', re.split('\r\n| ', x[1])))
    return (fname, label, word_list)

def add_asm_texts_to_features(x): 

    #this method is used to add asm features into rdd

    path = asm_data_path+x[0]+'.asm'
    text1 = requests.get(path).text.splitlines()
    text2 = [element.partition(':')[0] for element in text1]
    text2.extend(x[2])
    return((x[0],x[1],text2))

In [None]:
# this is a setup for GCP cluster, where we can initialize executor and driver memories while pyspark command. 
sc = SparkContext.getOrCreate()
#SparkConf().setMaster("local[*]"))
sql_context = ps.sql.SQLContext(sc)

In [None]:
#data_paths for asm and byte raw data, which have name of file and their respective asma nd byte information. 
asm_data_path = 'https://storage.googleapis.com/uga-dsp/project1/data/asm/'
byte_data_path = 'https://storage.googleapis.com/uga-dsp/project1/data/bytes/'

#In order to access that raw data, we need references to use them as what? train or test??
#This was provided by our project requirements and links for respective filenames for X and Y are given below : 

x_small_train_path ='https://storage.googleapis.com/uga-dsp/project1/files/X_small_train.txt'
y_small_train_path ='https://storage.googleapis.com/uga-dsp/project1/files/y_small_train.txt'
x_small_test_path ='https://storage.googleapis.com/uga-dsp/project1/files/X_small_test.txt'
y_small_test_path ='https://storage.googleapis.com/uga-dsp/project1/files/y_small_test.txt'

In [None]:
# Initializing first rdd with files names given in X_small_train path.

text = requests.get(x_small_train_path).text
# number of partitions are given using numSlices, which is taken to be 80. This increased speed as all stages were futher divided 
#into multiple tasks where each partition corresponded with a partition.  
data = sc.parallelize(text.splitlines(),numSlices=80)
# used to take 1 row instance of data to show the contents of rdd.
data.take(1)

In [None]:

broadcast_filename_label_dict = get_broadcast_label_dict()
train_data=data.map(lambda x: find_file(x))
train_data.take(1)

In [None]:
train_data_with_labels=train_data.map(lambda x: pre_process(x))

In [None]:
train_data_with_labels.take(1)

In [None]:
# identical processes are used to create a test rdd as well. 
text_test = requests.get(x_small_test_path).text
test_data = sc.parallelize(text_test.splitlines(),numSlices=80)

filenames_test = requests.get(x_small_test_path).text.split('\n')
labels_test = requests.get(y_small_test_path).text.split('\n')
filename_label_dict_test = {}
for filename, label in zip(filenames_test, labels_test):
    filename_label_dict_test[filename] = label

broadcast_filename_label_dict_test = sc.broadcast(filename_label_dict_test)

test_data_new=test_data.map(lambda x: find_file(x))

test_data_with_labels=test_data_new.map(lambda x: pre_process_test(x))

In [None]:
test_data_with_labels.take(1)

In [None]:
# add_asm features is mapped to rdd
test_data_with_asm=test_data_with_labels.map(lambda x: add_asm_texts_to_features(x))
test_data_with_asm.take(1)

In [None]:
# test dataset formed into a dataframe
test_data_df_repar = sql_context.createDataFrame(test_data_with_asm, ['doc', 'label', 'text'])
test_data_df_repar.show(n=5)

In [None]:
# just a check for number of partitions, which is not expected to change. 
test_data_df_repar.rdd.getNumPartitions()

In [None]:
train_data_with_labels.take(1)

In [None]:
# asm added into train_data rdd/
train_data_with_asm=train_data_with_labels.map(lambda x: add_asm_texts_to_features(x))
train_data_with_asm.take(1)

In [None]:
# training data rdd formed into dataframe
train_data_df = sql_context.createDataFrame(train_data_with_asm, ['doc', 'label', 'text'])
train_data_df.show(n=5)

In [None]:
train_data_df.rdd.getNumPartitions()

In [None]:
# experiemntation done of various components of mllib pipeline. 

# ngram = NGram(n=1, inputCol='text', outputCol='ngrams')
# ngramed_df= ngram.transform(train_data_df)
# hashingTF = HashingTF(inputCol="ngrams", outputCol="features")
# hashedTF=hashingTF.transform(ngramed_df)
# hashedTF.show(n=2)

In [None]:
#Training: Tokenize, Frequency, TF-IDF

# remover = StopWordsRemover(inputCol="text", outputCol='filtered', stopWords=['??'])#, '00'])
ngram = NGram(n=1, inputCol='text', outputCol='ngrams')
hashingTF = HashingTF(inputCol="ngrams", outputCol="features") #, numFeatures=256)
#idf = IDF(inputCol='freqs', outputCol='features')
nb = NaiveBayes(smoothing=1)
#ML Pipeline Model
pipeline = Pipeline(stages=[ngram, hashingTF, nb])
model = pipeline.fit(train_data_df)
#model.save('NB_Best_Model')
predictions = model.transform(test_data_df_repar)

#Evaluate Model Accuracy

predictions = predictions.withColumn('label',predictions['label'].cast(DoubleType()))
add_one= functions.udf(lambda x:x+1)
predictions=predictions.withColumn('addedprediction',add_one('prediction'))
predictions = predictions.withColumn('addedprediction',predictions['addedprediction'].cast(DoubleType()))
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="addedprediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))


In [None]:
#GETTING 64.497% accuracy after adding one to prediction labels...