Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Flow tuning [SPOT-164] #46

Merged
merged 8 commits into from Jun 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 37 additions & 0 deletions spot-ml/src/main/scala/org/apache/spark/sql/WideUDFs.scala
Expand Up @@ -24,6 +24,25 @@ import scala.reflect.runtime.universe.{TypeTag, typeTag}
import scala.util.Try

object WideUDFs {

def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag,
A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]):
UserDefinedFunction = {
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType
:: ScalaReflection.schemaFor(typeTag[A2]).dataType
:: ScalaReflection.schemaFor(typeTag[A3]).dataType
:: ScalaReflection.schemaFor(typeTag[A4]).dataType
:: ScalaReflection.schemaFor(typeTag[A5]).dataType
:: ScalaReflection.schemaFor(typeTag[A6]).dataType
:: ScalaReflection.schemaFor(typeTag[A7]).dataType
:: ScalaReflection.schemaFor(typeTag[A8]).dataType
:: ScalaReflection.schemaFor(typeTag[A9]).dataType
:: ScalaReflection.schemaFor(typeTag[A10]).dataType
:: Nil).getOrElse(Nil)
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes)
}


/**
* Defines a user-defined function of 11 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
Expand All @@ -47,4 +66,22 @@ object WideUDFs {
:: Nil).getOrElse(Nil)
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes)
}
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag,
A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](f: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]):
UserDefinedFunction = {
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType
:: ScalaReflection.schemaFor(typeTag[A2]).dataType
:: ScalaReflection.schemaFor(typeTag[A3]).dataType
:: ScalaReflection.schemaFor(typeTag[A4]).dataType
:: ScalaReflection.schemaFor(typeTag[A5]).dataType
:: ScalaReflection.schemaFor(typeTag[A6]).dataType
:: ScalaReflection.schemaFor(typeTag[A7]).dataType
:: ScalaReflection.schemaFor(typeTag[A8]).dataType
:: ScalaReflection.schemaFor(typeTag[A9]).dataType
:: ScalaReflection.schemaFor(typeTag[A10]).dataType
:: ScalaReflection.schemaFor(typeTag[A11]).dataType
:: ScalaReflection.schemaFor(typeTag[A12]).dataType
:: Nil).getOrElse(Nil)
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes)
}
}
Expand Up @@ -26,7 +26,7 @@ object SuspiciousConnectsArgumentParser {
case class SuspiciousConnectsConfig(analysis: String = "",
inputPath: String = "",
feedbackFile: String = "",
duplicationFactor: Int = 1,
duplicationFactor: Int = 1000,
topicCount: Int = 20,
userDomain: String = "",
hdfsScoredConnect: String = "",
Expand Down Expand Up @@ -99,5 +99,8 @@ object SuspiciousConnectsArgumentParser {
opt[Double]("ldabeta").optional().valueName("float64").
action((x, c) => c.copy(ldaBeta = x)).
text("topic concentration for lda, default 1.001")



}
}

This file was deleted.

Expand Up @@ -37,13 +37,16 @@ object FlowSuspiciousConnectsAnalysis {


def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
inputFlowRecords: DataFrame) = {
inputFlowRecords: DataFrame) = {

logger.info("Starting flow suspicious connects analysis.")

val cleanFlowRecords = filterAndSelectCleanFlowRecords(inputFlowRecords)
val flows : DataFrame = filterAndSelectCleanFlowRecords(inputFlowRecords)


logger.info("Identifying outliers")
val scoredFlowRecords = detectFlowAnomalies(flows, config, sparkContext, sqlContext, logger)

val scoredFlowRecords = detectFlowAnomalies(cleanFlowRecords, config, sparkContext, sqlContext, logger)

val filteredFlowRecords = filterScoredFlowRecords(scoredFlowRecords, config.threshold)

Expand All @@ -63,6 +66,7 @@ object FlowSuspiciousConnectsAnalysis {

}


/**
* Identify anomalous netflow log entries in in the provided data frame.
*
Expand All @@ -78,23 +82,20 @@ object FlowSuspiciousConnectsAnalysis {
sparkContext: SparkContext,
sqlContext: SQLContext,
logger: Logger): DataFrame = {


logger.info("Fitting probabilistic model to data")
val model =
FlowSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, data, config.topicCount)

FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, config, data)
logger.info("Identifying outliers")
model.score(sparkContext, sqlContext, data)
}


/**
*
* @param inputFlowRecords raw flow records
* @return
*/
def filterAndSelectCleanFlowRecords(inputFlowRecords: DataFrame): DataFrame = {

val cleanFlowRecordsFilter = inputFlowRecords(Hour).between(0, 23) &&
inputFlowRecords(Minute).between(0, 59) &&
inputFlowRecords(Second).between(0, 59) &&
Expand All @@ -105,11 +106,9 @@ object FlowSuspiciousConnectsAnalysis {
inputFlowRecords(DestinationPort).isNotNull &&
inputFlowRecords(Ibyt).isNotNull &&
inputFlowRecords(Ipkt).isNotNull

inputFlowRecords
.filter(cleanFlowRecordsFilter)
.select(InSchema: _*)

}

/**
Expand Down
Expand Up @@ -18,8 +18,8 @@
package org.apache.spot.netflow

import org.apache.spark.sql.functions._
import org.apache.spot.utilities.Quantiles
import org.apache.spot.utilities.data.validation.InvalidDataHandler
import org.apache.spot.utilities.MathUtils.ceilLog2

import scala.util.{Success, Try}

Expand All @@ -35,14 +35,8 @@ case class FlowWords(srcWord: String, dstWord: String)
/**
* Contains methods and Spark SQL udf objects for calculation of netflow words from netflow records.
*
* @param timeCuts Quantile cut-offs for the time of day. Time of day is a floating point number
* >= 0.0 and < 24.0
* @param ibytCuts Quantile cut-offs for the inbytes.
* @param ipktCuts Quantile cut-offs if the incoming packet counts.
*/
class FlowWordCreator(timeCuts: Array[Double],
ibytCuts: Array[Double],
ipktCuts: Array[Double]) extends Serializable {
object FlowWordCreator extends Serializable {


/**
Expand All @@ -51,15 +45,14 @@ class FlowWordCreator(timeCuts: Array[Double],
* @return String "word" summarizing a netflow connection.
*/
def srcWordUDF = udf((hour: Int,
minute: Int,
second: Int,
srcIP: String,
dstIP: String,
srcPort: Int,
dstPort: Int,
ipkt: Long,
ibyt: Long) =>
flowWords(hour, minute, second, srcPort, dstPort, ipkt, ibyt).srcWord)
protocol: String,
ibyt: Long,
ipkt: Long) =>
flowWords(hour, srcPort, dstPort, protocol, ibyt, ipkt).srcWord)


/**
Expand All @@ -68,76 +61,75 @@ class FlowWordCreator(timeCuts: Array[Double],
* @return String "word" summarizing a netflow connection.
*/
def dstWordUDF = udf((hour: Int,
minute: Int,
second: Int,
srcIP: String,
dstIP: String,
srcPort: Int,
dstPort: Int,
ipkt: Long,
ibyt: Long) =>
flowWords(hour, minute, second, srcPort, dstPort, ipkt, ibyt).dstWord)
protocol: String,
ibyt: Long,
ipkt: Long) =>
flowWords(hour, srcPort, dstPort, protocol, ibyt, ipkt).dstWord)


/**
* Calculate the source and destination words summarizing a netflow record.
*
* @param hour
* @param minute
* @param second
* @param srcPort
* @param dstPort
* @param ipkt
* @param ibyt
* @return [[FlowWords]] containing source and destination words.
*/
def flowWords(hour: Int, minute: Int, second: Int, srcPort: Int, dstPort: Int, ipkt: Long, ibyt: Long): FlowWords = {
def flowWords(hour: Int, srcPort: Int, dstPort: Int, protocol: String, ibyt: Long, ipkt: Long): FlowWords = {

Try {
val timeOfDay: Double = hour.toDouble + minute.toDouble / 60 + second.toDouble / 3600

val timeBin = Quantiles.bin(timeOfDay, timeCuts)
val ibytBin = Quantiles.bin(ibyt, ibytCuts)
val ipktBin = Quantiles.bin(ipkt, ipktCuts)
val ibytBin = ceilLog2(ibyt + 1)
val ipktBin = ceilLog2(ipkt + 1)

val LowToLowPortEncoding = 111111
val HighToHighPortEncoding = 333333

val proto = protocol


if (dstPort == 0 && srcPort == 0) {

val baseWord = Array("0", timeBin, ibytBin, ipktBin).mkString("_")
val baseWord = Array("0", proto, hour, ibytBin, ipktBin).mkString("_")
FlowWords(srcWord = baseWord, dstWord = baseWord)

} else if (dstPort == 0 && srcPort > 0) {

val baseWord = Array(srcPort.toString(), timeBin, ibytBin, ipktBin).mkString("_")
val baseWord = Array(srcPort, proto, hour, ibytBin, ipktBin).mkString("_")
FlowWords(srcWord = "-1_" + baseWord, dstWord = baseWord)

} else if (srcPort == 0 && dstPort > 0) {

val baseWord = Array(dstPort.toString, timeBin, ibytBin, ipktBin).mkString("_")
val baseWord = Array(dstPort, proto, hour, ibytBin, ipktBin).mkString("_")
FlowWords(srcWord = baseWord, dstWord = "-1_" + baseWord)

} else if (srcPort <= 1024 && dstPort <= 1024) {

val baseWord = Array(LowToLowPortEncoding, timeBin, ibytBin, ipktBin).mkString("_")
val baseWord = Array(LowToLowPortEncoding, proto, hour, ibytBin, ipktBin).mkString("_")
FlowWords(srcWord = baseWord, dstWord = baseWord)

} else if (srcPort <= 1024 && dstPort > 1024) {

val baseWord = Array(srcPort.toString, timeBin, ibytBin, ipktBin).mkString("_")
val baseWord = Array(srcPort, proto, hour, ibytBin, ipktBin).mkString("_")
FlowWords(srcWord = "-1_" + baseWord, dstWord = baseWord)

} else if (srcPort > 1024 && dstPort <= 1024) {

val baseWord = Array(dstPort.toString, timeBin, ibytBin, ipktBin).mkString("_")
val baseWord = Array(dstPort, proto, hour, ibytBin, ipktBin).mkString("_")
FlowWords(srcWord = baseWord, dstWord = "-1_" + baseWord)

} else {

// this is the srcPort > 1024 && dstPort > 1024 case

val baseWord = Array(HighToHighPortEncoding, timeBin, ibytBin, ipktBin).mkString("_")

val baseWord = Array(HighToHighPortEncoding, proto, hour, ibytBin, ipktBin).mkString("_")
FlowWords(srcWord = baseWord, dstWord = baseWord)
}

Expand Down
Expand Up @@ -25,31 +25,21 @@ import org.apache.spot.utilities.data.validation.InvalidDataHandler
/**
* Estimate the probabilities of network events using a [[FlowSuspiciousConnectsModel]]
*
* @param timeCuts Quantile cut-offs for binning time-of-day values when forming words from netflow records.
* @param ibytCuts Quantile cut-offs for binning ibyt values when forming words from netflow records.
* @param ipktCuts Quantile cut-offs for binning ipkt values when forming words from netflow records.
* @param topicCount Number of topics used in the topic modelling analysis.
* @param wordToPerTopicProbBC Broadcast map assigning to each word it's per-topic probabilities.
* Ie. Prob [word | t ] for t = 0 to topicCount -1
*/


class FlowScoreFunction(timeCuts: Array[Double],
ibytCuts: Array[Double],
ipktCuts: Array[Double],
topicCount: Int,
class FlowScoreFunction(topicCount: Int,
wordToPerTopicProbBC: Broadcast[Map[String, Array[Double]]]) extends Serializable {


val flowWordCreator = new FlowWordCreator(timeCuts, ibytCuts, ipktCuts)

/**
* Estimate the probability of a netflow connection as distributed from the source IP and from the destination IP
* and assign it the least of these two values.
*
* @param hour Hour of flow record.
* @param minute Minute of flow record.
* @param second Second of flow record.
* @param srcIP Source IP of flow record.
* @param dstIP Destination IP of flow record.
* @param srcPort Source port of flow record.
Expand All @@ -61,20 +51,18 @@ class FlowScoreFunction(timeCuts: Array[Double],
* @return Minium of probability of this word from the source IP and probability of this word from the dest IP.
*/
def score(hour: Int,
minute: Int,
second: Int,
srcIP: String,
dstIP: String,
srcPort: Int,
dstPort: Int,
ipkt: Long,
protocol: String,
ibyt: Long,
ipkt: Long,
srcTopicMix: Seq[Double],
dstTopicMix: Seq[Double]): Double = {


val FlowWords(srcWord, dstWord) = flowWordCreator.flowWords(hour: Int, minute: Int, second: Int,
srcPort: Int, dstPort: Int, ipkt: Long, ibyt: Long)
val FlowWords(srcWord, dstWord) = FlowWordCreator.flowWords(hour, srcPort, dstPort, protocol, ibyt, ipkt)

val zeroProb = Array.fill(topicCount) {
0.0
Expand Down