# Data Formatter
----
In this notebook we'll process the raw CI log data into a format we can plug into the neural net. Specifically, we're going to take the various nested fields of the data, flatten it, map the dataset into a list of strings, and create a 35-dimensional vector representation of that list. We'll then pop those representations out into .npy files which the neural net notebook (neuralCI.ipynb) will be using. This notebook takes a little while to run (specifically, the dataset sampling section) but once it's run once, it shouldn't need to be run again.

## Imports and Spark Initialization

In [2]:
import os
import numpy as np

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext,Row

spark = SparkSession.builder.master("local[*]").getOrCreate()

#stop weird parquet warns from cluttering logs
logger = spark._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

## Load CI Data

In [3]:
cwd = os.getcwd()
data = spark.read.parquet("{}/rhci-moby.parquet".format(cwd))

## Flatten Data
The dataset comes with a very nested schema, which for our purposes is a hindrance. Let's flatten out the dataset. Annoyingly, there's no apparent built-in way to do this in Spark, so we have to write a bit of hacky code to do it. 

Spark lets you select and rename columns via:

`df.select(df[n_1].alias(a_1),...,df[n_n].alias(a_n))`

which means that we can select a nested field (say, a.b.c) and give it a top layer alias (a_b_c), it'll appear with other top layer fields. Therefore, if we can walk through the schema, find all subfields, give them top layer aliases, and then write some giant `select` statement to grab the 59 or so different fields, we can flatten our dataset.

However, writing a giant select statement by hand isn't exactly an elegant solution, so I wrote code that writes said giant select statement for us.

In [27]:
#create alias for schemas
def addAlias(string):
    labelString = string.replace(".","_")
    return "data['{}'].alias('{}')".format(string,labelString)

#recursively walk through schema and flatten, assign aliases
def flattenSchema(schema,label):
    global selector
    for field in schema:
        if isinstance(field.dataType,pyspark.sql.types.StructType):
            flattenSchema(field.dataType,label+field.name+".")
        else:   
            if field.name != None:
                selector.append(addAlias(label+field.name))

#using array of flat, aliased schemas, generate code that will flatten the dataframe
def createFlatFunc():
    global selector 
    selector = []
    flattenSchema(data.schema,"")
    flattener = ",".join(selector)
    return "flatData = data.select({})".format(flattener)

#execute the generated code
exec(createFlatFunc())

["data['@timestamp'].alias('@timestamp')", "data['__label'].alias('__label')", "data['ci_agent.label'].alias('ci_agent_label')", "data['ci_agent.name'].alias('ci_agent_name')", "data['ci_job.build_id'].alias('ci_job_build_id')", "data['ci_job.full_url'].alias('ci_job_full_url')", "data['ci_job.log_url'].alias('ci_job_log_url')", "data['ci_job.name'].alias('ci_job_name')", "data['ci_job.phase'].alias('ci_job_phase')", "data['ci_job.status'].alias('ci_job_status')", "data['ci_master.hostname'].alias('ci_master_hostname')", "data['file'].alias('file')", "data['geoip.location.lat'].alias('geoip_location_lat')", "data['geoip.location.lon'].alias('geoip_location_lon')", "data['hostname'].alias('hostname')", "data['ipaddr4'].alias('ipaddr4')", "data['ipaddr6'].alias('ipaddr6')", "data['level'].alias('level')", "data['message'].alias('message')", "data['offset'].alias('offset')", "data['payload_type'].alias('payload_type')", "data['pid'].alias('pid')", "data['pipeline_metadata.@version'].alias

## Sample the Dataset
We don't need the full 220,000,000 points for our model. Let's shoot for ~100k points, 75k for training, 25k for testing. (Caching here takes a bit, but saves time later)

In [5]:
sampData = flatData.sample(False, 0.00045).cache()
sampData.count()

100019

## Stringify Data
The way we're going to attempt to model categorical data (i.e., almost all of the data) is by Word2Vec, so we need to turn each data point into a list of words. The simplest way to do this is to just cast each field of the data into string, but there's some unicode weirdness in the data, so we have to write function that's a bit more complex to do it.

In [6]:
def unicodeToString(row):
    ci_status = 1 if row[9]=="SUCCESS" else 0
    sentence = []
    for f,field in enumerate(row):
        if f == 9:
            continue
        elif isinstance(field,list):
            for item in field:
                sentence.append(str(item))
        else:
            if type(field)==type(u'unicode'):
                sentence.append(str(field.encode("utf8")))
            else:
                sentence.append(str(field))
    return Row(sentences=sentence,ci_status=ci_status)

strData = spark.createDataFrame(sampData.rdd.map(lambda x: unicodeToString(x)))

## Split Data into Training and Test Sets

In [7]:
train,valid = strData.randomSplit((.75,.25))
train.count()

74961

## Create Word2Vec Representation of Data
We want to make sure to train the word2Vec model on only the training data, obviously!

In [8]:
from pyspark.ml.feature import Word2Vec

word2vec = Word2Vec(vectorSize=35, minCount=0,inputCol='sentences',outputCol='result')
model = word2vec.fit(train)

trainRes = model.transform(train)
validRes = model.transform(valid)

tVectors  = trainRes.collect()
vVectors  = validRes.collect()

## Save and Write Vectors as Numpy Arrays

In [30]:
import pickle

def numpify(data,label):
    status,sentences,vectors = zip(*data)
    X,Y = np.array(vectors),np.array(status)
    if label=="v":
        #save log messages
        f = open(cwd+"/formattedData/validLogs.pkl","w")
        pickle.dump(sentences,f)
        
        #save field names
        fieldNames = []
        for i in sampData.schema:
            if i.name != "ci_job_status":
                fieldNames.append(i.name)
        f = open(cwd+"/formattedData/fieldNames.pkl","w")
        pickle.dump(fieldNames,f)
        
    np.save(cwd+"/formattedData/"+label+"vectors.npy",X)
    np.save(cwd+"/formattedData/"+label+"status.npy",Y)
    return X,Y

numpify(tVectors,"t")
numpify(vVectors,"v")

(array([[-0.18946984, -0.23785067, -0.11499191, ..., -0.27119543,
         -0.06544313, -0.18031824],
        [-0.17551685, -0.23796774, -0.09862728, ..., -0.27467051,
         -0.06229201, -0.18529017],
        [-0.17930162, -0.23702078, -0.10352075, ..., -0.28103825,
         -0.06863766, -0.16743859],
        ..., 
        [-0.21213448, -0.20482012, -0.08226802, ..., -0.29655874,
         -0.05118705, -0.16114787],
        [-0.21120786, -0.20717621, -0.07814386, ..., -0.29926059,
         -0.05289695, -0.15936702],
        [-0.37408199, -0.28546104, -0.03341023, ..., -0.30052532,
         -0.03687222, -0.10015753]]), array([0, 0, 0, ..., 1, 1, 1]))