In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, ArrayType
from pyspark.sql.functions import collect_set
from argparse import ArgumentParser
import os
import json
import pandas as pd
from pyspark.ml.feature import NGram, CountVectorizer, Tokenizer
from pyspark.ml.linalg import SparseVector
import numpy as np
import re


In [None]:
'''
parser = ArgumentParser(description='PySpark Data Processor')
parser.add_argument('--minInitialCount', type=int, default=2, metavar='M',
                    help='The minimum number of times a token must appear in a document\
                    to be considered for the global calculation')
parser.add_argument('--byteDir', type=str, default='.', metavar='bD',
                    help='The directory from which to pull binaries')
parser.add_argument('--asmDir', type=str, default='.', metavar='aD',
                    help='The directory from which to pull .asm files')
parser.add_argument('--dest', type=str, default='.', metavar='D',
                    help='The directory in which to store any output files')
'''
bucket='gs://uga-dsp'
minInitialCount=2
bytesDir=f"{bucket}/project1/data/bytes/"
asmDir=f"{bucket}/project1/data/asm/"
filesDir=f"{bucket}/project1/files/"
dest='gs://micky-practicum/'

In [1]:
sc._conf.getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.history.fs.logDirectory',
  'gs://dataproc-temp-us-east1-492533985610-4ibxr4au/56aac939-8c8b-47a6-9e3c-0ba7f1f63534/spark-job-history'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://pyspark-m:8088/proxy/application_1613711227585_0005'),
 ('spark.app.id', 'application_1613711227585_0005'),
 ('spark.sql.warehouse.dir', 'file:/spark-warehouse'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.driver.port', '33225'),
 ('spark.executor.instances', '2'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.historyServer.address', 'pyspark-m:18080'),
 ('spark.yarn.unmanagedAM.enabled', 'true'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.app.startTime', '1613712872503'),
 ('spark.driver.appUIAddress',
  'http://pyspark-m.us-east1-b.c.citric-passage-

In [None]:
dir(SparkConf())

In [None]:
conf = SparkConf().setAppName("DataProcessing")\
    .set('spark.executor.instances', '5')\
    .set('spark.executor.cores', '4')\
    .set('spark.default.parallelism','60')
print(conf.get('spark.default.parallelism'))
sc = SparkContext.getOrCreate(conf=conf)
print(sc.defaultParallelism)
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .getOrCreate()

In [None]:
'''
Defines parsing steps given a .txt file which specifies the hashes
of binary/asm files to include. 

NOTE: Currently the function will only deal with binaries, reflecting
the approach decided on by the group. This may later be modified to
accommodate asm files as well as binaries, or asm processing may occur
in a seperate but similar function
'''
def datasetParser(file):
    out=sc.textFile(file)\
        .flatMap(lambda x: x.split())
    return out

In [None]:
'''
Defines the main loop for computing features over a dataset
@param dataset: the path to the dataset indicator function (abslute path preferred)
@param func: the main processing function which can be specified per `main` call
@param outName: the name of the output file, given as a path (abslute path preferred)
                -- if outName is not defined, then the output will not be saved, only returned
'''
def main(dataset,func, schema=None,outName=None ):

    # Declare df as a variable for later use
    df=None
        
    # Initialize count for progress tracking
    count=0
    
    # Collects the hash-list provided by the indicator file
    dataList=datasetParser(filesDir+dataset).collect()

    # Set a basic loop over the data to process all relevant files
    for filename in dataList:
        print(f'Processing hash: {filename}\nProgress: {count+1}/{len(dataList)} | {(count+1)/len(dataList)*100:.2f}%')
        
        # Processes a single into a row entry using `func`
        row=func(filename)
        count+=1
        
        # First row creates the dataframe, the subsequent ones add to it
        if df==None:
            df=spark.createDataFrame(row,schema=schema)
        else:
            df=df.union(spark.createDataFrame(row,schema=schema))
   
        spark.catalog.clearCache()
        df.cache()
    
    # Save dataframe as a csv if outName is defined
    if outName is not None:
        df.toPandas().to_csv(outName,index=False)

    return df            

In [None]:
def colToIndex(x):
    index=None
    s=x[0]
    val=x[1]
    code=s[-2:]
    if code=='??':
        index=256
    else:
        index=int(s[-2:],16)
    out=np.zeros(257)
    out[index]=val
    return out.astype(np.int32)

In [None]:
def colToArray(x):
    index=None
    s=x[0]
    val=x[1]
    code=s[-2:]
    if code=='??':
        index=256
    else:
        index=int(s[-2:],16)
    out=np.zeros(257)
    out[index]=val
    return out

In [None]:
def buildSingleRow(file_hash):
    row=[file_hash]
    file=bytesDir+file_hash+'.bytes'
    wc=wordCount(file)
    X=wc.map(colToIndex).reduce(lambda x,y:x+y)
    return [row+X.tolist()]

In [None]:
def buildRowFaster(file_hash):
    row=[file_hash]
    file=bytesDir+file_hash+'.bytes'
    wc=wordCount(file)
    X=wc.map(colToIndex).reduce(lambda x,y:x+y)
    row+=X.astype(np.int32).tolist()
    del X
    
    _net=wc.map(lambda x:x[1])
    net=_net.reduce(lambda x,y:x+y)
    _rel=wc.map(lambda x:('rel_'+x[0],x[1]/net))
    X=_rel.map(colToIndex).reduce(lambda x,y:x+y)
    row+=X.tolist()
    del X
    
    row+=[net]
    return [tuple(row)]

In [None]:
def buildRow(file_hash):   
    
    row=[file_hash]
    file=bytesDir+file_hash+'.bytes'
    wc=wordCount(file)
    X=wc.collectAsMap()
    
    row+=[safeCheck(X,hexGen(i)) for i in range(256)]+[safeCheck(X,'??')]
    del X
    
    _net=wc.map(lambda x:x[1])
    net=_net.reduce(lambda x,y:x+y)
    _rel=wc.map(lambda x:('rel_'+x[0],x[1]/net))

    X=_rel.collectAsMap()
    row+=[safeCheck(X,'rel_'+hexGen(i),.0) for i in range(256)]+[safeCheck(X,'rel_??',.0)] 
    del X
    
    row+=[net]
    return [tuple(row)]
#relative=temp.map(lambda x:x/net)

In [None]:
'''
Defines ops for a single document. This function handles all the operations
which are fully contained within a single document (e.g. word count but not IDF). 
This function specifically generates a word-count for the document.

'''

def wordCount(file):
    #print("Reading file: "+file)
    out=sc.textFile(file)\
        .flatMap(lambda x: x.split())\
        .filter(lambda x: len(x)==2)\
        .map(lambda x:(x,1))\
        .reduceByKey(lambda x, y: x + y)
    return out

In [None]:
def safeCheck(X,key,val=0):
    return X[key] if key in X else val

In [None]:
def hexGen(i):
    return ('0'+str(hex(i)).upper()[2:])[-2:]

In [None]:
def _assembleSchema():
    out=[hexGen(i) for i in range(256)]+['??']\
       +['rel_'+hexGen(i) for i in range(256)]+['rel_??']
    return out

In [None]:
_schema=sc.broadcast(_assembleSchema())

In [None]:
def buildSingleSchema():
    schema = StructType([StructField('hash',StringType())]\
                        +[StructField(_schema.value[i],LongType()) for i in range(257)])
    return schema

In [None]:
def buildSchema():
    schema = StructType([StructField('hash',StringType())]\
                        +[StructField(_schema.value[i],LongType()) for i in range(257)]\
                        +[StructField(_schema.value[i],FloatType()) for i in range(257,514)]\
                        +[StructField('total_count',LongType())])
    return schema

In [None]:
rdd=sc.wholeTextFiles(bytesDir)

In [None]:
rdd.getNumPartitions()

In [None]:
%%configure -f

In [None]:
sc.defaultParallelism

In [None]:
schema=StructType([StructField('file_name',StringType()),StructField('contents',StringType())])

In [None]:
df=spark.createDataFrame(rdd,schema)

In [None]:
tokenizer = Tokenizer(inputCol="contents", outputCol="words")
tokenized = tokenizer.transform(df)

In [None]:
rdd2=tokenized.drop('contents').rdd

In [None]:
schema2=StructType([StructField('hash',StringType()),StructField('words',ArrayType(StringType()))])

In [None]:
rdd3=rdd2.map(lambda x: (re.findall(pattern.value, x[0])[0],[y for y in x[1] if len(y)==2]))

In [None]:
tokenized2=spark.createDataFrame(rdd3,schema2)

In [None]:
pattern=sc.broadcast('\w+(?=\.bytes)')

In [None]:
pattern

In [None]:
tokenized2.show()

In [None]:
cv = CountVectorizer(inputCol="words", outputCol="word_count")

model = cv.fit(tokenized2)

result = model.transform(tokenized2)

In [None]:
result.storageLevel.useMemory

In [None]:
result.show()

In [None]:
#result = spark.read.load("examples/src/main/resources/users.parquet")
spark.catalog.clearCache()
result.cache()

In [None]:
result.select("hash", "word_count").write.save(dest+"X_train_pre.parquet")
print('finished')
#result.show()

In [None]:
%time df=main("X_small_train.txt",buildSingleRow,buildSingleSchema(),outName=dest+'counts/X_small_train.csv')

# Experimentation

In [None]:
%%time
file_hash='HV0ctLUKfW1ozkmC7BMJ'
file=bytesDir+file_hash+'.bytes'
#wc=wordCount(file)
wc=wordCount(file).map(colToIndex).reduce(lambda x,y:x+y)

In [None]:
def buildBiGramRow(file_hash):
    file=bytesDir+file_hash+'.bytes'
    
    rdd=sc.textFile(file).map(lambda x:{'line':x.split()[1:]})
    docf=spark.createDataFrame(rdd)
    
    ngram = NGram(n=2,inputCol='line',outputCol='ngrams')
    docf=ngram.transform(docf)
    
    cv = CountVectorizer(inputCol="ngrams", outputCol="ngram count")
    model=cv.fit(docf)
    result=model.transform(docf)
    
    _vocab=sc.broadcast(model.vocabulary)
    
    nc=result.select('ngram count').rdd
    t=nc.flatMap(lambda x: ((int(i),int(x[0][int(i)])) for i in x[0].indices))\
        .reduceByKey(lambda x,y:x+y)\
        .map(lambda x:(_vocab.value[x[0]],x[1]))
    
    out=t.collect()
    return out

In [None]:
%time df=main("X_small_train.txt",buildBiGramRow)

In [None]:
sparse=SparseVector(256*256,t.collect())

In [None]:
T=t.collect()

In [None]:
ngramDict={model.vocabulary[x[0]]:x[1] for x in T}

In [None]:
DF = sqlContext.read.load(dest+'X_small_train.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

# Alternative read method bellow (though you must discard first row due to inproper header loading)
# DF = spark.read.format("csv").load('gs://micky-practicum/'+'X_small_train.csv',schema=schema,index=False)

In [None]:
DF.show()