Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Flow tuning [SPOT-164] #46

Merged
merged 8 commits into from Jun 6, 2017
Merged

Conversation

NathanSegerlind
Copy link
Contributor

This PR is to change the netflow Suspicious Connects analysis word formation step in the following ways:

  • time of day is binned on the hour
  • byte count is binned by the ceiling of the logarithm base 2 of the byte count
  • packet count is binned by the ceiling of the logarithm base 2 of the packet count
  • the protocol used by the flow is added to the word

The motivation for this is to remove the computationally expensive sorting steps used to calculate quantiles for binning numerical values in the old code, to incorporate the often meaningful protocol data, and to use a binning scheme more appropriate to the nature of the data being collected.

Testing on synthetic data has consistently show modest increases in the AUC with these changes.

experimental branch for the tuning of netflow:
*  shall we incorporate protocol information?
*  shall we bin time by hour ?
* shall we bin bytes by exponential buckets (eg log of bytes)
* shall we bin packet coutns by exponential buckets (eg log of packet counts) ?
some changes
Changed flow word creation logic.
* protocol is now part of the flow word
* time is binned by the hour
* byte count is binned by its logarithm
* packet count is binned by its logarithm

On our synthetic BP dataset, this led to substantial improvement in the model effectiveness.

It also removes two full dataest sorting passes from the model construction.
removed minute and second from the model columns since they are no longer being used
srcIP: String,
dstIP: String,
srcPort: Int,
dstPort: Int,
ipkt: Long,
protocol: String,
ibyt: Long,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was inconsistent throught whether ibyt was the field before ipkt or if ipkt was the field before ibyt. This was causing some frustrating errors.... we need a good way to catch slip ups like these; the only way I know how is to have very good unit test coverage that includes test cases that hit each feature in isolation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, what was inconsistent was the function signatures with the cutoffs always listed bytes first, but everywhere else packets came first and that threw me off when writing the code.... point is, good unit test coverage and a more sane design to keep these things straight could prevent wasted time

Copy link

@rabarona rabarona left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it's a good change.
There're only a few things to clarify before approving.

Please add JIRA-# to the description.

val timeOfDay: Double = hour.toDouble + minute.toDouble / 60 + second.toDouble / 3600
val lnOf2 = scala.math.log(2) // natural log of 2
val ibytBin: Long =
scala.math.ceil(scala.math.log(ibyt) / lnOf2).toLong // 0 values should never ever happen
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have a conversation with network specialists about this rule, seems like there can be 0s depending on how users collect their data. We need to decide if we are going to ignore those records.
@vgonzale78.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used floor instead of ceiling for the exponential/logarithmic binning. I think we need to choose one or the other to be consistent. However, I guess at the end it doesn't matter because either everything is shifted up or down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've settled on ceil( log2( 1 + x))

val timeBin = Quantiles.bin(timeOfDay, timeCuts)
val ibytBin = Quantiles.bin(ibyt, ibytCuts)
val ipktBin = Quantiles.bin(ipkt, ipktCuts)
val ipktBin: Long = scala.math.ceil(scala.math.log(ipkt) / lnOf2).toLong // 0 values should never ever happen
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have a conversation with network specialists about this rule, seems like there can be 0s depending on how users collect their data. We need to decide if we are going to ignore those records.
@vgonzale78.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same about org.apache.spot.utilities.MathUtils, we can add a method for Long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will use log2(1+x) to be safe at zero values

val timeOfDay: Double = hour.toDouble + minute.toDouble / 60 + second.toDouble / 3600
val lnOf2 = scala.math.log(2) // natural log of 2
val ibytBin: Long =
scala.math.ceil(scala.math.log(ibyt) / lnOf2).toLong // 0 values should never ever happen
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lujangus created an object called MathUtils in the org.apache.spot.utilities package with a function logBaseXInt, I think it would be a good idea to create a similar function for Long and leave it there in case we need a similar code in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, MathUtils recreates this step. Except that it is Int and automatically performs the floor function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's pretty crazy to have it be a long, since that would correspond to byte/packet counts that are of size 2 raised to (about) the 4 billionth power...


logger.info("Fitting probabilistic model to data")
val model =
FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, config, flows.select(InSchema: _*))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the select to this line when flows can be already both, filtered and with columns selected? Is there any other use for flows that needs all the columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was an artifact of a test that I did; I wil remove it

inputRecords: DataFrame,
topicCount: Int): FlowSuspiciousConnectsModel = {

def cleanData(flows: DataFrame): DataFrame = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a duplicate of FlowSuspiciousConnectsAnalysis.cleanFlowRecords except that is not checking the minutes and seconds. What's the reason to add this function if it's only used in unit tests and the data is already clean from the object calling trainingModel function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move the change to SPOT-128 if you like, but I think that the cleaning code should be packaged with the model because it is enforcing preconditions necessary to build the model

@@ -0,0 +1,65 @@

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script seems to have many hard coded values, in order to make this available to everyone we should leave values unassigned or receive parameters and document what exactly this test is doing (functional/integration test). Else, I'd leave it for internal use only.

Also, the name can be something more descriptive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not have been checked in, thanks for the catch

@@ -61,39 +68,18 @@ object FlowSuspiciousConnectsAnalysis {
val invalidFlowRecords = filterAndSelectInvalidFlowRecords(inputFlowRecords)
dataValidation.showAndSaveInvalidRecords(invalidFlowRecords, config.hdfsScoredConnect, logger)

outputFlowRecords
Copy link

@rabarona rabarona Jun 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks very similar to the one being requested in SPOT-128 but feels a little incomplete.
The code base for this function is originally returning Unit. This change seems to serve only to TestFlow script, do we want to make this change and then complete the desired functionality in SPOT-128 or remove this change and let SPOT-128 take care?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an artifact of something I was doing during testing ; I will remove it and leave these changes to SPOT-128

@NathanSegerlind NathanSegerlind changed the title Flow tuning Flow tuning [SPOT-164] Jun 5, 2017
Copy link

@rabarona rabarona left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Just need a small clarification but overall great change.

@@ -108,8 +93,6 @@ object FlowSuspiciousConnectsAnalysis {

inputFlowRecords
.filter(cleanFlowRecordsFilter)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the reason to move the "select" step out of this function?
I just want to keep consistency between pipelines; changing the way it works here should change the way we do it for DNS and Proxy, the same question applies for function name cleanFlowRecords (name changing) and the removal of detectFlowAnomalies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed on the phone, as we move towards more of a library of routines that people can compose for their own experiments, it is natural that people will want to execute the suspicious connects analysis on a data frame in a such way that it acts like an "add columns" operation and does not drop a bunch of columns (which might contain useful side information, like class labels) just because they are not consumed by the suspicious connects model

// simplify netflow log entries into "words"

val dataWithWords = totalRecords.withColumn(SourceWord, FlowWordCreator.srcWordUDF(ModelColumns: _*))
.withColumn(DestinationWord, FlowWordCreator.dstWordUDF(ModelColumns: _*))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how this section is looking much cleaner (also in Proxy).

ibyt: Int,
opkt: Int,
obyt: Int)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, looks cleaner.

Copy link
Contributor

@lujangus lujangus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work.

* @param logger
* @return
*/
def detectFlowAnomalies(data: DataFrame,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert


/**
*
* @param inputFlowRecords raw flow records
* @return
*/
def filterAndSelectCleanFlowRecords(inputFlowRecords: DataFrame): DataFrame = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

@@ -108,8 +93,6 @@ object FlowSuspiciousConnectsAnalysis {

inputFlowRecords
.filter(cleanFlowRecordsFilter)
.select(InSchema: _*)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

@lujacab
Copy link

lujacab commented Jun 6, 2017

Working Fine, performance improvements runtime.
And I see new line comment, SuspiciousConnectsAnalysis: Fitting probabilistic model to data.

@raypanduro
Copy link
Contributor

+1

reverted some changes to defer them for a larger overhaul
Copy link

@rabarona rabarona left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lujacab
Copy link

lujacab commented Jun 6, 2017

+1

@asfgit asfgit merged commit c0f80c5 into apache:master Jun 6, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
6 participants