Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

SPOT-183 Schema validation for input data #72

Merged
merged 3 commits into from
Jul 27, 2017

Conversation

rabarona
Copy link

@rabarona rabarona commented Jun 29, 2017

This PR implements changes requested in SPOT-183 and aims to overpass issues reported on SPOT-174 and SPOT-149. It validates input dataset (reading dataframe schema) and checks if it contains the schema required for model training.

Main changes

  • Added schema validation based on columns required for model training.

  • Updated each pipeline (DNS, Flow, Proxy) so the schema is validated before performing any other activity.

  • Updated main activity to show error about bad schema if any.

  • The main application now will print what fields are not matching the expected schema.

  • Added unit test for schema validation.

logger.info("Fitting probabilistic model to data")
val model =
DNSSuspiciousConnectsModel.trainModel(sparkSession, logger, config, dnsRecords)
if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a somewhat cryptic test condition... what is being checked here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the validator returns a Seq of messages (String). If everything went well, there will be 1 message (initializing message) but if there are schema errors there are going to be more than one.
If is > InputSchema.ResponseDefaultSize (meaning 1) then it indicates there are some errors, if it's one then everything is good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just return a binary Pass/Fail value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it can be, just want to have the list of columns not working. I can return a tuple or case class with Pass/Fail and a Seq of messages. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the idea of a pair or case class


logger.info("Fitting probabilistic model to data")
val model = ProxySuspiciousConnectsModel.trainModel(sparkSession, logger, config, proxyRecords)
if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cryptic condition

logger.info("Fitting probabilistic model to data")
val model =
FlowSuspiciousConnectsModel.trainModel(sparkSession, logger, config, flowRecords)
if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cryptic condition

* @param expectedSchema schema expected by model training and scoring methods
* @return
*/
def validate(inSchema: StructType, expectedSchema: StructType): Seq[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be explicit... this test should pass if there are extra columns in the dataframe, not just the ones used by the model schema ?

this should be commented on and tested on

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the validation is going to check that the columns required for model training then it's good to go, no matter if there are more columns.
Will do that.

@NathanSegerlind
Copy link
Contributor

LGTM

@NathanSegerlind
Copy link
Contributor

see you in 9 and 1/2 weeks

@lujacab
Copy link

lujacab commented Jul 21, 2017

+1

@@ -103,7 +103,11 @@ object SuspiciousConnects {
InvalidDataHandler.showAndSaveInvalidRecords(invalidRecords, config.hdfsScoredConnect, logger)
}

case None => logger.error("Unsupported (or misspelled) analysis: " + analysis)
case None => logger.error(s"Something went wrong while trying to run Suspicious Connects Analysis")
logger.error(s"The value of parameter analysis (provided: $analysis) is any of the valid analysis types? " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change?: "Is the value of the analysis parameter (provided: $analysis) any of the valid analysis types?"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#BadEnglish
Thanks!

@brandon-edwards
Copy link
Contributor

I gave one of my picky edit suggestions in a file toward the begging of the list of changed files. All else looks good to me: +1

Ricardo Barona added 3 commits July 27, 2017 12:51
Added schema validation based on columns required for model training.
Updated each pipeline (DNS, Flow, Proxy) so the schema is validated before performing any other activity.
Added unit test for schema validation.
Updated main activity to show error about bad schema if any.
Application now will print what fields are not matching the expected schema.
Made changes after code review from @NathanSegerlind
- ValidateSchema will return case class with flag isValid and Seq[String] for a list of invalid columns.
Changed flow, dns and proxy pipelines to handle validateSchema response InputSchemaValidationResponse
- Updated unit tests
@rabarona rabarona force-pushed the SPOT-183-Schema_validation_input_data branch from c30854a to 781ccf7 Compare July 27, 2017 18:00
@rabarona rabarona changed the title SPOT-166 Schema validation for input data SPOT-183 Schema validation for input data Jul 27, 2017
@asfgit asfgit merged commit 781ccf7 into apache:master Jul 27, 2017
@anilreddydonthireddy
Copy link

After merging the changes as part of PR. I am still getting the issue while running ML for proxy data.

18/06/20 07:45:16 INFO SuspiciousConnectsAnalysis: Running Spark LDA with params alpha = 1.02 beta = 1.001 Max iterations = 20 Optimizer = em
Exception in thread "main" java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
at org.apache.spark.mllib.clustering.EMLDAOptimizer.initialize(LDAOptimizer.scala:166)
at org.apache.spark.mllib.clustering.EMLDAOptimizer.initialize(LDAOptimizer.scala:80)
at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:331)
at org.apache.spot.lda.SpotLDAWrapper$.runLDA(SpotLDAWrapper.scala:132)
at org.apache.spot.proxy.ProxySuspiciousConnectsModel$.trainModel(ProxySuspiciousConnectsModel.scala:155)
at org.apache.spot.proxy.ProxySuspiciousConnectsAnalysis$.run(ProxySuspiciousConnectsAnalysis.scala:106)
at org.apache.spot.SuspiciousConnects$.main(SuspiciousConnects.scala:112)
at org.apache.spot.SuspiciousConnects.main(SuspiciousConnects.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
6 participants