From 504001e92a78212e8a784556f73fb66af05d57ac Mon Sep 17 00:00:00 2001 From: gni Date: Fri, 2 Aug 2019 14:14:27 +0200 Subject: [PATCH] removing Apache Spark dependencies from SOS implementation .. and some other minor adaptions --- .travis.yml | 2 +- README.md | 26 +++++--- build.sbt | 21 +++--- .../EvaluateOutlierDetection.scala | 24 ++----- .../StochasticOutlierDetection.scala | 64 ++++++++++--------- .../StocasticOutlierDetectionTest.scala | 57 ++++++++--------- 6 files changed, 93 insertions(+), 101 deletions(-) diff --git a/.travis.yml b/.travis.yml index d17ac60..dacb737 100755 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: scala scala: - - 2.11.7 + - 2.11.8 script: - sbt clean coverage test after_success: diff --git a/README.md b/README.md index 885ee05..addd790 100755 --- a/README.md +++ b/README.md @@ -1,18 +1,28 @@ -Stochastic Outlier Selection on Apache Spark +Stochastic Outlier Selection in Scala ============================ -[![Codacy Badge](https://www.codacy.com/project/badge/9069624e46ac4d97bb19a34705f95965)](https://www.codacy.com) -[![Build Status](https://travis-ci.org/Fokko/spark-stochastic-outlier-selection.svg?branch=master)](https://travis-ci.org/Fokko/spark-stochastic-outlier-selection) -[![Coverage Status](https://coveralls.io/repos/Fokko/spark-stochastic-outlier-selection/badge.svg?branch=master&service=github)](https://coveralls.io/github/Fokko/spark-stochastic-outlier-selection?branch=master) +[![Codacy Badge](https://api.codacy.com/project/badge/Grade/df5fc23eb5b74795b62d0daa52436a0d)](https://www.codacy.com/app/Gnni/scala-stochastic-outlier-selection?utm_source=github.com&utm_medium=referral&utm_content=Gnni/scala-stochastic-outlier-selection&utm_campaign=Badge_Grade) +[![Build Status](https://travis-ci.org/Gnni/scala-stochastic-outlier-selection.svg?branch=master)](https://travis-ci.org/Gnni/scala-stochastic-outlier-selection) +[![Coverage Status](https://coveralls.io/repos/github/Gnni/scala-stochastic-outlier-selection/badge.svg?branch=master)](https://coveralls.io/github/Gnni/scala-stochastic-outlier-selection?branch=master) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/frl.driesprong/spark-stochastic-outlier-selection_2.11/badge.svg)](https://maven-badges.herokuapp.com/maven-central/frl.driesprong/spark-stochastic-outlier-selection_2.11) -Stochastic Outlier Selection (SOS) is an unsupervised outlier selection algorithm. It uses the concept of affinity to compute an outlier probability for each data point. +Adapted version of the implementation for Apache Spark. This versions +aims to perform Stochastic Outlier Selection (SOS) using Scala only, +i.e., w/o the need of any Apache Spark resources. -For more information about SOS, see the technical report: J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. [Stochastic Outlier Selection](https://github.com/jeroenjanssens/sos/blob/master/doc/sos-ticc-tr-2012-001.pdf?raw=true). Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the Netherlands, 2012. +SOS is an unsupervised outlier selection algorithm. It uses the concept of affinity to compute an outlier probability for each data point. + +For more information about SOS, see the technical report: J.H.M. +Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. +[Stochastic Outlier Selection](https://github.com/jeroenjanssens/sos/blob/master/doc/sos-ticc-tr-2012-001.pdf?raw=true). +Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the +Netherlands, 2012. Selecting outliers from data ---------------------------------------- -The current implementation accepts RDD's of the type `Array[Double]` and returns the indexes of the vector with it's degree of outlierness. +The current implementation accepts an Array with elements of the type +`Array[Double]` and returns the indexes of the vector with it's degree +of outlierness. -Current implementation only works with Euclidean distance, but this will be extended in the foreseeable future. +Current implementation only works with Euclidean distance. diff --git a/build.sbt b/build.sbt index fa6575a..5b1f825 100755 --- a/build.sbt +++ b/build.sbt @@ -3,24 +3,19 @@ name := "Scala Stochastic Outlier Selection" version := "0.1.0" -//publishTo := { -// val nexus = "https://oss.sonatype.org/" -// if (isSnapshot.value) -// Some("snapshots" at nexus + "content/repositories/snapshots") -// else -// Some("releases" at nexus + "service/local/staging/deploy/maven2") -//} +publishTo := { + val nexus = "https://oss.sonatype.org/" + if (isSnapshot.value) + Some("snapshots" at nexus + "content/repositories/snapshots") + else + Some("releases" at nexus + "service/local/staging/deploy/maven2") +} publishMavenStyle := true - publishArtifact in Test := false - pomIncludeRepository := { _ => false } scalaVersion := "2.11.8" -libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0" - -libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.2.0" - +libraryDependencies += "org.scalanlp" %% "breeze" % "0.13.2" libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.5" % "test" diff --git a/src/main/scala/frl/driesprong/outlierdectection/EvaluateOutlierDetection.scala b/src/main/scala/frl/driesprong/outlierdectection/EvaluateOutlierDetection.scala index f3ffbfc..d5a135e 100644 --- a/src/main/scala/frl/driesprong/outlierdectection/EvaluateOutlierDetection.scala +++ b/src/main/scala/frl/driesprong/outlierdectection/EvaluateOutlierDetection.scala @@ -1,29 +1,19 @@ package frl.driesprong.outlierdectection -import org.apache.spark.{SparkConf, SparkContext} - object EvaluateOutlierDetection { def main(args: Array[String]) { - val conf = new SparkConf() - .setMaster("local[*]") - .setAppName("Stochastic Outlier Selection") - - val sc = new SparkContext(conf) - val toyDataset = Array( - (0L, Array(1.00, 1.00)), - (1L, Array(3.00, 1.25)), - (2L, Array(3.00, 3.00)), - (3L, Array(1.00, 3.00)), - (4L, Array(2.25, 2.25)), - (5L, Array(8.00, 2.00)) + val testDataset = Array( + (Array(1.00, 1.00)), + (Array(2.00, 1.00)), + (Array(1.00, 2.00)), + (Array(2.00, 2.00)), + (Array(5.00, 8.00)) ) - StochasticOutlierDetection.performOutlierDetection( sc.parallelize(toyDataset) ).foreach( x => + StochasticOutlierDetection.performOutlierDetection(testDataset, perplexity = 3.0).foreach( x => System.out.println(x._1 + " : " + x._2) ) - - sc.stop() } } diff --git a/src/main/scala/frl/driesprong/outlierdectection/StochasticOutlierDetection.scala b/src/main/scala/frl/driesprong/outlierdectection/StochasticOutlierDetection.scala index 217cb3b..1b03240 100755 --- a/src/main/scala/frl/driesprong/outlierdectection/StochasticOutlierDetection.scala +++ b/src/main/scala/frl/driesprong/outlierdectection/StochasticOutlierDetection.scala @@ -2,8 +2,6 @@ package frl.driesprong.outlierdectection import breeze.linalg.{DenseVector, sum} import breeze.numerics.{pow, sqrt} -import org.apache.spark.rdd.RDD - import scala.language.implicitConversions object StochasticOutlierDetection { @@ -11,17 +9,16 @@ object StochasticOutlierDetection { val DefaultIterations = 500 val DefaultPerplexity = 30.0 - def performOutlierDetection(inputVectors: RDD[(Long, Array[Double])], + def performOutlierDetection(inputVectors: Array[Array[Double]], perplexity: Double = DefaultPerplexity, - tolerance: Double = DefaultPerplexity, - maxIterations: Int = DefaultIterations ): Array[(Long, Double)] = { + tolerance: Double = DefaultTolerance, + maxIterations: Int = DefaultIterations): Array[(Long, Double)] = { - val dMatrix = StochasticOutlierDetection.computeDistanceMatrixPair(inputVectors) + val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(inputVectors) val aMatrix = StochasticOutlierDetection.computeAffinityMatrix(dMatrix, perplexity, maxIterations, tolerance) val bMatrix = StochasticOutlierDetection.computeBindingProbabilities(aMatrix) val oMatrix = StochasticOutlierDetection.computeOutlierProbability(bMatrix) - - oMatrix.collect() + oMatrix } def binarySearch(affinity: DenseVector[Double], @@ -56,38 +53,45 @@ object StochasticOutlierDetection { newAffinity } - def computeAffinityMatrix(dMatrix: RDD[(Long, Array[Double])], + def computeAffinityMatrix(dMatrix: Array[(Long, Array[Double])], perplexity: Double = DefaultPerplexity, maxIterations: Int, - tolerance: Double): RDD[(Long, DenseVector[Double])] = { + tolerance: Double): Array[(Long, DenseVector[Double])] = { val logPerplexity = Math.log(perplexity) dMatrix.map(r => (r._1, binarySearch(new DenseVector(r._2), logPerplexity, maxIterations, tolerance))) } def euclDistance(a: Array[Double], b: Array[Double]): Double = sqrt((a zip b).map { case (x, y) => pow(y - x, 2) }.sum) - def computeBindingProbabilities(rows: RDD[(Long, DenseVector[Double])]): RDD[(Long, Array[Double])] = + def computeBindingProbabilities(rows: Array[(Long, DenseVector[Double])]): Array[(Long, Array[Double])] = rows.map(r => (r._1, (r._2 :/ sum(r._2)).toArray)) - def computeDistanceMatrix(data: RDD[Array[Double]]): RDD[(Long, Array[Double])] = computeDistanceMatrixPair(data.zipWithUniqueId().map(_.swap)) - - def computeDistanceMatrixPair(data: RDD[(Long, Array[Double])]): RDD[(Long, Array[Double])] = - data.cartesian(data).flatMap { - case (a: (Long, Array[Double]), b: (Long, Array[Double])) => - if (a._1 != b._1) - Some(a._1, euclDistance(a._2, b._2)) - else - None - }.combineByKey( - (v1) => List(v1), - (c1: List[Double], v1: Double) => c1 :+ v1, - (c1: List[Double], c2: List[Double]) => c1 ++ c2 - ).map { - case (a, b) => (a, b.toArray) - } + def computeDistanceMatrix(data: Array[Array[Double]]): Array[(Long, Array[Double])] = computeDistanceMatrixPair(data.zipWithIndex.map(tuple => (tuple._2.toLong, tuple._1))) + + def computeDistanceMatrixPair(data: Array[(Long, Array[Double])]): Array[(Long, Array[Double])] = { + data.flatMap(x => data.map(y => (x, y))). + flatMap { + case (a: (Long, Array[Double]), b: (Long, Array[Double])) => + if (a._1 != b._1) + Some(a._1, euclDistance(a._2, b._2)) + else + None + }. + groupBy(_._1). + mapValues(arrayLongDouble => { + arrayLongDouble.foldLeft(Array[Double]())((a, b) => + a :+ b._2 + ) + }). + toArray + } - def computeOutlierProbability(rows: RDD[(Long, Array[Double])]): - RDD[(Long, Double)] = + def computeOutlierProbability(rows: Array[(Long, Array[Double])]): + Array[(Long, Double)] = rows.flatMap(r => r._2.zipWithIndex.map(p => - (p._2 + (if (p._2 >= r._1) 1L else 0L), p._1))).foldByKey(1.0)((a, b) => a * (1.0 - b)) + (p._2 + (if (p._2 >= r._1) 1L else 0L), p._1))). + groupBy(_._1). + mapValues(arrayLongDouble => { + arrayLongDouble.foldLeft(1.0)((a, b) => a * (1.0 - b._2)) + }).toArray } diff --git a/src/test/scala/frl/driesprong/outlierdetection/StocasticOutlierDetectionTest.scala b/src/test/scala/frl/driesprong/outlierdetection/StocasticOutlierDetectionTest.scala index b8f80be..5d8afee 100755 --- a/src/test/scala/frl/driesprong/outlierdetection/StocasticOutlierDetectionTest.scala +++ b/src/test/scala/frl/driesprong/outlierdetection/StocasticOutlierDetectionTest.scala @@ -2,44 +2,37 @@ package frl.driesprong.outlierdetection import breeze.linalg.{DenseVector, sum} import frl.driesprong.outlierdectection.StochasticOutlierDetection -import org.apache.spark.{SparkConf, SparkContext} import org.scalactic.TolerantNumerics import org.scalatest._ // Unit-tests created based on the Python script of https://github.com/jeroenjanssens/sos class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAndAfter { - val master = "local" - val conf = new SparkConf().setAppName(this.getClass().getSimpleName()).setMaster(master) - val sc = new SparkContext(conf) - - val perplexity = 3 - + val perplexity = 3.0 val epsilon = 1e-9f implicit val doubleEq = TolerantNumerics.tolerantDoubleEquality(epsilon) "Computing the distance matrix " should "give symmetrical distances" in { - val data = sc.parallelize( - Seq( + val data = Array( Array(1.0, 3.0), Array(5.0, 1.0) - )) + ) - val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist)).collect() + val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist)) dMatrix(0) should be(dMatrix(1)) } "Computing the distance matrix " should "give the correct distances" in { - val data = sc.parallelize( - Seq( + val data = + Array( Array(1.0, 1.0), Array(2.0, 2.0), Array(5.0, 1.0) - )) + ) - val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist)).collect() + val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist)) dMatrix(0) should be(Array(Math.sqrt(2.0), Math.sqrt(10.0))) dMatrix(1) should be(Array(Math.sqrt(2.0), Math.sqrt(16.0))) @@ -73,20 +66,20 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn "Computing the affinity matrix " should "give the correct affinity" in { // The datapoints - val data = sc.parallelize( - Seq( + val data = + Array( Array(1.0, 1.0), Array(2.0, 1.0), Array(1.0, 2.0), Array(2.0, 2.0), Array(5.0, 8.0) // The outlier! - )) + ) val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data) val aMatrix = StochasticOutlierDetection.computeAffinityMatrix( dMatrix, perplexity, StochasticOutlierDetection.DefaultIterations, - StochasticOutlierDetection.DefaultTolerance).map(_._2).sortBy(dist => sum(dist)).collect() + StochasticOutlierDetection.DefaultTolerance).map(_._2).sortBy(dist => sum(dist)) assert(aMatrix.length == 5) assert(aMatrix(0)(0) === 1.65024581e-06) @@ -118,14 +111,14 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn "Verify the binding probabilities " should "give the correct probabilities" in { // The distance matrix - val dMatrix = sc.parallelize( - Seq( + val dMatrix = + Array( (0L, new DenseVector(Array(6.61626106e-112, 1.27343495e-088))), (1L, new DenseVector(Array(2.21858114e-020, 1.12846575e-044))), (2L, new DenseVector(Array(1.48949023e-010, 1.60381089e-028))) - )) + ) - val bMatrix = StochasticOutlierDetection.computeBindingProbabilities(dMatrix).map(_._2).sortBy(dist => sum(dist)).collect() + val bMatrix = StochasticOutlierDetection.computeBindingProbabilities(dMatrix).map(_._2).sortBy(dist => sum(dist)) assert(bMatrix(0)(0) === 5.19560192e-24) assert(bMatrix(0)(1) === 1.00000000e+00) @@ -139,14 +132,14 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn "Verifying the product " should "should provide valid products" in { - val data = sc.parallelize( - Seq( + val data = + Array( (0L, Array(0.5, 0.3)), (1L, Array(0.25, 0.1)), (2L, Array(0.8, 0.8)) - )) + ) - val oMatrix = StochasticOutlierDetection.computeOutlierProbability(data).map(_._2).sortBy(dist => dist).collect() + val oMatrix = StochasticOutlierDetection.computeOutlierProbability(data).map(_._2).sortBy(dist => dist) val out0 = (1.0 - 0.5) * (1.0 - 0.0) * (1.0 - 0.8) val out1 = (1.0 - 0.0) * (1.0 - 0.25) * (1.0 - 0.8) @@ -162,14 +155,14 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn "Verifying the output of the SOS algorithm " should "assign the one true outlier" in { // The distance matrix - val data = sc.parallelize( - Seq( + val data = + Array( Array(1.0, 1.0), Array(2.0, 1.0), Array(1.0, 2.0), Array(2.0, 2.0), Array(5.0, 8.0) // The outlier! - )) + ) // Process the steps of the algorithm val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data) @@ -184,7 +177,7 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn val oMatrix = StochasticOutlierDetection.computeOutlierProbability(bMatrix) // Do a distributed sort, and then return to driver - val output = oMatrix.map(_._2).sortBy(rank => rank).collect() + val output = oMatrix.map(_._2).sortBy(rank => rank) assert(output.length == 5) assert(output(0) === 0.12707053787018440794) @@ -194,4 +187,4 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn assert(output(4) === 0.99227799024537555184) // The outlier! } -} +} \ No newline at end of file