## Transform MachineInfo from RDD to DataFrames

### TL;DR:

`Parqet` seems to reduce storage cost and `DataFrames` allows to make complex queries.

### Motivation
Expose information from MachineInfo as Spark’s DataFrames to provide SQL-based offline or on-demand analysis capabilities.

### Overview
* Extract and flatten MachineInfo to a couple of tables such as Processes, Events, Workloads, etc. Save the table in column-oriented `parquet` format to reduce the required disk space and hence improve the retention of these tables.
* Explore other ways to reduce the disk usage.
* Propose a couple of real-life usages of these DataFrames for Threat Hunting using Spark’s built-in aggregation functions and plugged-in user-defined functions.

### Future work
* Convert experimental code into `PsPipline` codebase.
* Write tests.

### 1. Import  dependencies

* Assume that `PsPipeline` has been compiled into a jar file.


In [1]:
// Ref: https://toree.apache.org/docs/current/user/faq/
%AddJar file:///Users/datng2/pspipline.jar
sc.setLogLevel("DEBUG")

Starting download from file:///Users/datng2/pspipline.jar
Finished download of pspipline.jar


In [2]:
// For implicit conversions from RDDs to DataFrames
// https://stackoverflow.com/questions/44094108/not-able-to-import-spark-implicits-in-scalatest
val spark2 = spark
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import spark2.implicits._
import sqlContext.implicits._

import java.net.URI
import collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.hadoop.fs.{FileSystem, Path}
import com.tetration.proto.MachineInfoProtos.MachineInfo
import com.tetration.processanalytics.pipeline.io.MachineInfoBatchReaderWithProtoField

spark2 = org.apache.spark.sql.SparkSession@6d92d0f0
sqlContext = org.apache.spark.sql.SQLContext@55a2f19b




org.apache.spark.sql.SQLContext@55a2f19b

### 2. Load `RDD[MachineInfo]`

In [3]:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, Path}

final val DATA_DIR = "/Users/datng2/data/machineinfo/galois"
final val NUM_BATCHES = 8

// Print all files in a directory
val batches = FileSystem
    .get(URI.create("file:///"), sc.hadoopConfiguration)
    .listStatus(new Path(DATA_DIR))
    .map(_.getPath.toString.split("/").last)  // extract filename from an absolute path
    .take(NUM_BATCHES)  // select first N-items 
    .toSeq    //convert to seq

val miProtoReader = new MachineInfoBatchReaderWithProtoField(
    sc,
    sc.hadoopConfiguration,
    FileSystem.get(URI.create("file:///"), sc.hadoopConfiguration),
    DATA_DIR)

val miRDDs = miProtoReader
    .readBatches(batches.asJava).asScala
    .reduce(_ union _)
    .rdd

DATA_DIR = /Users/datng2/data/machineinfo/galois
NUM_BATCHES = 8
batches = WrappedArray(201805091922, 201805092101, 201805091925, 201805091913, 201805091914, 201805091940, 201805091947, 201805091949)
miProtoReader = com.tetration.processanalytics.pipeline.io.MachineInfoBatchReaderWithProtoField@1c77331f
miRDDs = UnionRDD[38] at union at <console>:77


UnionRDD[38] at union at <console>:77

### 3. Define  an experimental schema

* The idea is to convert **RDD[MachineInfo] into RDD[Row]**, where each Row contains the information that our pipeline requires. In Spark 2.0, DataFrame is a type alias for *Dataset of Row*.
* We could potentially consider leveraging `protobuf` definition to transform RDD to DataFrame. However, we currently use a simplified schema for testing purpose.

In [4]:
case class ProcessInfoRow(
    val sensorId: String,
    val processKeyPart1: Long,
    val processKeyPart2: Long,
    val cpuUsage: Long,
    val memoryUsage: Long,
    val commandString: String
)

defined class ProcessInfoRow


In [5]:
// Transform functions
val MIToProcessInfo: MachineInfo => List[ProcessInfoRow] = mi => {
  mi.getProcessInfoList.asScala
    .map(pi => ProcessInfoRow(
        mi.getSensorId,
        pi.getKey.getPart1,
        pi.getKey.getPart2,
        pi.getCpuUsageUs,
        pi.getMemoryUsageKB,
        pi.getCommandString))
    .toList
}

val processInfoDF = miRDDs
    .flatMap(MIToProcessInfo)
    .toDF

MIToProcessInfo = > List[ProcessInfoRow] = <function1>
processInfoDF = [sensorId: string, processKeyPart1: bigint ... 4 more fields]


<console>:6: error: Symbol 'type scala.AnyRef' is missing from the classpath.
This symbol is required by 'class org.apache.spark.sql.catalyst.QualifiedTableName'.
Make sure that type AnyRef is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
A full rebuild may help if 'QualifiedTableName.class' was compiled against an incompatible version of scala.
  lazy val $print: String =  {
           ^


[sensorId: string, processKeyPart1: bigint ... 4 more fields]

### 4. Sample Queries:
Ref: 
1. https://github.com/apache/incubator-toree/blob/master/etc/examples/notebooks/magic-tutorial.ipynb
2. https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html

In [6]:
// Create a ProcessInfo Table
processInfoDF.createOrReplaceTempView("ProcessInfo")

// Compute average CPU usage (in microsecond)
sqlContext
    .sql("SELECT AVG(cpuUsage) FROM ProcessInfo")
    .show

+-------------------+
|      avg(cpuUsage)|
+-------------------+
|4.006694183496826E9|
+-------------------+



In [7]:
// Count number of rows sensorId
sqlContext.sql("SELECT COUNT(*) FROM ProcessInfo").show

+--------+
|count(1)|
+--------+
|    5199|
+--------+



In [8]:
// Create a SQL query to count number of unique command strings in each sensorId
sqlContext.sql("""
    SELECT sensorId, COUNT(DISTINCT commandString) 
        FROM ProcessInfo
        GROUP BY sensorId
    """)
    .show(10)

+--------------------+-----------------------------+
|            sensorId|count(DISTINCT commandString)|
+--------------------+-----------------------------+
|fda22d272815f1503...|                          317|
|7f3fc376d08f771af...|                          434|
|11f991d882bdab1f7...|                          307|
|20bee197e8368b81b...|                          315|
|a26526ab2b11a0a1c...|                          318|
|967d3582a6a9bc0da...|                          399|
+--------------------+-----------------------------+



In [13]:
processInfoDF.printSchema

root
 |-- sensorId: string (nullable = true)
 |-- processKeyPart1: long (nullable = false)
 |-- processKeyPart2: long (nullable = false)
 |-- cpuUsage: long (nullable = false)
 |-- memoryUsage: long (nullable = false)
 |-- commandString: string (nullable = true)



### 5. Saving to Parquet vs RDD

In [None]:
import org.apache.hadoop.io.compress.GzipCodec
sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip")

// Saving data to disk
processInfoDF.write.parquet("pi.parquet")
processInfoDF.rdd.saveAsTextFile("pi.rdd", classOf[GzipCodec])

In [12]:
// Compute size of RDD vs Parquet
val path = "/Users/datng2/internship/mi-spark/"

val fs = FileSystem.get(URI.create("file:///"), sc.hadoopConfiguration)

val rddSize = fs.listStatus(new Path(path + "pi.rdd"))
    .map(x => x.getBlockSize())
    .sum

val parquetSize = fs.listStatus(new Path(path + "pi.parquet"))
    .map(x => x.getBlockSize())
    .sum

// Display result
println("RDD output size    : " + rddSize / 1000 + " kB")
println("Parquet output size: " + parquetSize / 1000 + " kB")
println("Saving Percecentage: " + (100 - (parquetSize * 100/ rddSize)) + "%")

RDD output size    : 6476005 kB
Parquet output size: 1275068 kB
Saving Percecentage: 81%


path = /Users/datng2/internship/mi-spark/
fs = org.apache.hadoop.fs.LocalFileSystem@7b6f5f17
rddSize = 6476005376
parquetSize = 1275068416


lastException: Throwable = null


1275068416