In [56]:
import findspark
findspark.init()
import pyspark
from operator import add
from pyspark import SparkConf,SparkContext
from pyspark.ml.feature import NGram
from pyspark.sql.functions import col,udf
from pyspark.sql import SQLContext, Row
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
import re

In [2]:
conf = SparkConf().setAppName("MalwareClassification")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '10G'))
sc = SparkContext(conf=conf)

In [3]:
#sc = pyspark.SparkContext(appName="DocClassification")
sqlc = SQLContext(sc)

In [4]:
hashFiles = sc.textFile("/home/vyom/UGA/DSP/Project2/data/train/X_small_train.txt")
asmFiles = hashFiles.map(lambda x: "/home/vyom/UGA/DSP/Project2/data/train/asm/"+ x+".asm")
bytesFiles = hashFiles.map(lambda x: "/home/vyom/UGA/DSP/Project2/data/train/bytes/"+ x+".bytes")

In [5]:
def fun(accum,x):
    return accum+','+x
asmFileString = asmFiles.reduce(fun)[0:203]
bytesFileString = bytesFiles.reduce(fun)[0:215]

In [6]:
rdd1= sc.wholeTextFiles(asmFileString)
#rdd1= sc.wholeTextFiles("/home/vyom/UGA/DSP/Project2/data/train/asm/")

In [7]:
OpcodesList = sc.textFile("/home/vyom/UGA/DSP/Project2/allOpcodes.txt")
opcodes= sc.broadcast(OpcodesList.collect())

# Get Opcodes list using python approach

In [8]:
opcodesInDoc = rdd1.map(lambda x: x[1].split()).map(lambda x: [word for word in x if word in opcodes.value]).zipWithIndex().map(lambda x: (x[1],x[0]))

# Get Opcodes list using mostly spark

In [9]:
#opcodesInDoc = rdd1.zipWithIndex().map(lambda x: (x[1],x[0][1].split())).flatMapValues(lambda x: x).filter(lambda x: x[1] in opcodes.value).groupByKey().map(lambda x: (x[0],list(x[1])))

In [10]:
#opcodesInDoc.take(10)

# Get N-grams and N-grams count

In [11]:
ngramFrame = sqlc.createDataFrame(opcodesInDoc,["docId","opcodes"])

In [12]:
twoGram = NGram(n=2, inputCol="opcodes", outputCol="2grams")
ngramFrame = twoGram.transform(ngramFrame)

In [13]:
threeGram = NGram(n=3, inputCol="opcodes", outputCol="3grams")
ngramFrame= threeGram.transform(ngramFrame)

In [14]:
fourGram = NGram(n=4, inputCol="opcodes", outputCol="4grams")
ngramFrame = fourGram.transform(ngramFrame)

In [15]:
twoGramRdd = ngramFrame.select("docId","2grams").rdd.map(tuple)
threeGramRdd =ngramFrame.select("docId","3grams").rdd.map(tuple)
fourGramRdd =ngramFrame.select("docId","4grams").rdd.map(tuple)

In [16]:
oneGramCounts = opcodesInDoc.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

In [17]:
twoGramCounts = twoGramRdd.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

In [18]:
threeGramCounts = threeGramRdd.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

In [19]:
fourGramCounts = fourGramRdd.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

# Get Segments 

In [20]:
segments = rdd1.zipWithIndex().map(lambda x: (x[1],x[0][1].splitlines())).map(lambda x: (x[0],[re.findall(r'\w+:?(?=:)',word) for word in x[1]])).flatMapValues(lambda x: x).map(lambda x: (x[0],x[1][0])).map(lambda x: (x,1)).reduceByKey(add)

# Get Bytes

In [21]:
bytesRdd= sc.wholeTextFiles(bytesFileString)

In [22]:
Bytes = bytesRdd.map(lambda x: x[1].split()).map(lambda x: [word for word in x if len(word)<3]).zipWithIndex().map(lambda x: (x[1],x[0]))

In [23]:
bytesDataFrame = sqlc.createDataFrame(Bytes,["docId","bytes"])

In [24]:
bytesTwoGram = NGram(n=2, inputCol="bytes", outputCol="2grams")
bytesDataFrame = bytesTwoGram.transform(bytesDataFrame)

In [25]:
bytesThreeGram = NGram(n=3, inputCol="bytes", outputCol="3grams")
bytesDataFrame = bytesThreeGram.transform(bytesDataFrame)

In [26]:
bytesFourGram = NGram(n=4, inputCol="bytes", outputCol="4grams")
bytesDataFrame = bytesFourGram.transform(bytesDataFrame)

In [27]:
bytesTwoGramRdd = bytesDataFrame.select("docId","2grams").rdd.map(tuple)
bytesThreeGramRdd =bytesDataFrame.select("docId","3grams").rdd.map(tuple)
bytesFourGramRdd =bytesDataFrame.select("docId","4grams").rdd.map(tuple)

In [28]:
bytesOneGramCounts = Bytes.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

In [29]:
bytesTwoGramCounts = bytesTwoGramRdd.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

In [30]:
bytesThreeGramCounts = bytesThreeGramRdd.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

In [31]:
bytesFourGramCounts = bytesFourGramRdd.flatMapValues(lambda x: x).map(lambda x: (x,1)).reduceByKey(add)

# Get the sparse matrix

In [32]:
labels = sc.textFile("/home/vyom/UGA/DSP/Project2/data/train/y_small_train.txt")

In [33]:
labelRdd = sc.parallelize(labels.zipWithIndex().map(lambda x: (x[1],x[0])).collect()[0:3])

In [34]:
labelFrame = labelRdd.toDF(["did","label"])

In [35]:
allFeatures = sc.union([oneGramCounts,twoGramCounts,threeGramCounts,fourGramCounts,segments,bytesOneGramCounts,bytesTwoGramCounts,bytesThreeGramCounts,bytesFourGramCounts])

In [36]:
allFeatures = allFeatures.reduceByKey(add).map(lambda x: (x[0][1],(x[0][0],x[1])))

In [37]:
vocab = allFeatures.map(lambda x: (x[0],1)).reduceByKey(add).map(lambda x: x[0]).zipWithIndex()

In [38]:
vocab = allFeatures.keys().distinct().zipWithIndex()

In [39]:
allFeaturesJoined = allFeatures.join(vocab).map(lambda x: (x[1][0][0],x[1][1],x[1][0][1]))

In [40]:
allFeatureMat = allFeaturesJoined.map(lambda x: MatrixEntry(x[0],x[1],x[2]))
mat = CoordinateMatrix(allFeatureMat).toIndexedRowMatrix().rows.toDF(["did","features"])

In [41]:
fin = mat.join(labelFrame,['did'])

In [50]:
fin.write.parquet("/home/vyom/UGA/DSP/Project2/allfeatures")

# Get all N-grams 

In [21]:
allTwoGrams = twoGramCounts.map(lambda x: (x[1][0],1)).reduceByKey(add).map(lambda x: x[0])

In [22]:
allThreeGrams = threeGramCounts.map(lambda x: (x[1][0],1)).reduceByKey(add).map(lambda x: x[0])

In [23]:
allFourGrams = fourGramCounts.map(lambda x: (x[1][0],1)).reduceByKey(add).map(lambda x: x[0])

In [24]:
features= OpcodesList.union(allTwoGrams).union(allThreeGrams).union(allFourGrams)

In [25]:
featureFrame = sqlc.createDataFrame(opcodesInDoc,["docId","opcodes"])

# label wise N-gram


In [156]:
# The code line below is just to get labels only for the 4 files used

In [24]:
labelRdd = sc.parallelize(labelRdd.take(4))labelRdd = sc.parallelize(labelRdd.take(4))

In [25]:
labelWiseOneGramCounts = oneGramCounts.leftOuterJoin(labelRdd).map(lambda x: (x[1][1],(x[1][0],x[0])))

In [26]:
labelWisetTwoGramCounts = twoGramCounts.leftOuterJoin(labelRdd).map(lambda x: (x[1][1],(x[1][0],x[0])))

In [27]:
labelWiseThreeGramCounts = threeGramCounts.leftOuterJoin(labelRdd).map(lambda x: (x[1][1],(x[1][0],x[0])))

In [28]:
labelWiseFourGramCounts = fourGramCounts.leftOuterJoin(labelRdd).map(lambda x: (x[1][1],(x[1][0],x[0])))

# Extract Segments

In [32]:
segments = rdd1.zipWithIndex().map(lambda x: (x[1],x[0][1].splitlines())).map(lambda x: (x[0],[re.findall(r'\w+:?(?=:)',word) for word in x[1]])).flatMapValues(lambda x: x).map(lambda x: (x[0],x[1][0])).map(lambda x: (x,1)).reduceByKey(add)

In [178]:
segmentsRdd = segments.reduceByKey(add).map(lambda x: (x[0][1],(x[0][0],x[1])))

In [179]:
vocab = segmentsRdd.keys().distinct().zipWithIndex()

In [180]:
segmentsRdd = segmentsRdd.join(vocab).map(lambda x: (x[1][0][0],x[1][1],float(x[1][0][1])))

In [181]:
segmentsRdd2 = segmentsRdd.map(lambda x: MatrixEntry(x[0],x[1],float(x[2])))
matSegment = CoordinateMatrix(segmentsRdd2).toIndexedRowMatrix().rows.toDF(["did","features"])

In [183]:
finSegment = matSegment.join(labelFrame,['did'])

In [49]:
finSegment.write.parquet("/home/vyom/UGA/DSP/Project2/SegmentDataFrame")

NameError: name 'finSegment' is not defined