Skip to content

Commit

Permalink
removing Apache Spark dependencies from SOS implementation
Browse files Browse the repository at this point in the history
.. and some other minor adaptions
  • Loading branch information
gni committed Aug 2, 2019
1 parent bd9f6d0 commit 504001e
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,6 +1,6 @@
language: scala
scala:
- 2.11.7
- 2.11.8
script:
- sbt clean coverage test
after_success:
Expand Down
26 changes: 18 additions & 8 deletions README.md
@@ -1,18 +1,28 @@
Stochastic Outlier Selection on Apache Spark
Stochastic Outlier Selection in Scala
============================

[![Codacy Badge](https://www.codacy.com/project/badge/9069624e46ac4d97bb19a34705f95965)](https://www.codacy.com)
[![Build Status](https://travis-ci.org/Fokko/spark-stochastic-outlier-selection.svg?branch=master)](https://travis-ci.org/Fokko/spark-stochastic-outlier-selection)
[![Coverage Status](https://coveralls.io/repos/Fokko/spark-stochastic-outlier-selection/badge.svg?branch=master&service=github)](https://coveralls.io/github/Fokko/spark-stochastic-outlier-selection?branch=master)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/df5fc23eb5b74795b62d0daa52436a0d)](https://www.codacy.com/app/Gnni/scala-stochastic-outlier-selection?utm_source=github.com&utm_medium=referral&utm_content=Gnni/scala-stochastic-outlier-selection&utm_campaign=Badge_Grade)
[![Build Status](https://travis-ci.org/Gnni/scala-stochastic-outlier-selection.svg?branch=master)](https://travis-ci.org/Gnni/scala-stochastic-outlier-selection)
[![Coverage Status](https://coveralls.io/repos/github/Gnni/scala-stochastic-outlier-selection/badge.svg?branch=master)](https://coveralls.io/github/Gnni/scala-stochastic-outlier-selection?branch=master)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/frl.driesprong/spark-stochastic-outlier-selection_2.11/badge.svg)](https://maven-badges.herokuapp.com/maven-central/frl.driesprong/spark-stochastic-outlier-selection_2.11)

Stochastic Outlier Selection (SOS) is an unsupervised outlier selection algorithm. It uses the concept of affinity to compute an outlier probability for each data point.
Adapted version of the implementation for Apache Spark. This versions
aims to perform Stochastic Outlier Selection (SOS) using Scala only,
i.e., w/o the need of any Apache Spark resources.

For more information about SOS, see the technical report: J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. [Stochastic Outlier Selection](https://github.com/jeroenjanssens/sos/blob/master/doc/sos-ticc-tr-2012-001.pdf?raw=true). Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the Netherlands, 2012.
SOS is an unsupervised outlier selection algorithm. It uses the concept of affinity to compute an outlier probability for each data point.

For more information about SOS, see the technical report: J.H.M.
Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik.
[Stochastic Outlier Selection](https://github.com/jeroenjanssens/sos/blob/master/doc/sos-ticc-tr-2012-001.pdf?raw=true).
Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the
Netherlands, 2012.

Selecting outliers from data
----------------------------------------

The current implementation accepts RDD's of the type `Array[Double]` and returns the indexes of the vector with it's degree of outlierness.
The current implementation accepts an Array with elements of the type
`Array[Double]` and returns the indexes of the vector with it's degree
of outlierness.

Current implementation only works with Euclidean distance, but this will be extended in the foreseeable future.
Current implementation only works with Euclidean distance.
21 changes: 8 additions & 13 deletions build.sbt
Expand Up @@ -3,24 +3,19 @@ name := "Scala Stochastic Outlier Selection"

version := "0.1.0"

//publishTo := {
// val nexus = "https://oss.sonatype.org/"
// if (isSnapshot.value)
// Some("snapshots" at nexus + "content/repositories/snapshots")
// else
// Some("releases" at nexus + "service/local/staging/deploy/maven2")
//}
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}

publishMavenStyle := true

publishArtifact in Test := false

pomIncludeRepository := { _ => false }

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.2.0"

libraryDependencies += "org.scalanlp" %% "breeze" % "0.13.2"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.5" % "test"
@@ -1,29 +1,19 @@
package frl.driesprong.outlierdectection

import org.apache.spark.{SparkConf, SparkContext}

object EvaluateOutlierDetection {

def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Stochastic Outlier Selection")

val sc = new SparkContext(conf)

val toyDataset = Array(
(0L, Array(1.00, 1.00)),
(1L, Array(3.00, 1.25)),
(2L, Array(3.00, 3.00)),
(3L, Array(1.00, 3.00)),
(4L, Array(2.25, 2.25)),
(5L, Array(8.00, 2.00))
val testDataset = Array(
(Array(1.00, 1.00)),
(Array(2.00, 1.00)),
(Array(1.00, 2.00)),
(Array(2.00, 2.00)),
(Array(5.00, 8.00))
)

StochasticOutlierDetection.performOutlierDetection( sc.parallelize(toyDataset) ).foreach( x =>
StochasticOutlierDetection.performOutlierDetection(testDataset, perplexity = 3.0).foreach( x =>
System.out.println(x._1 + " : " + x._2)
)

sc.stop()
}
}
Expand Up @@ -2,26 +2,23 @@ package frl.driesprong.outlierdectection

import breeze.linalg.{DenseVector, sum}
import breeze.numerics.{pow, sqrt}
import org.apache.spark.rdd.RDD

import scala.language.implicitConversions

object StochasticOutlierDetection {
val DefaultTolerance = 1e-20
val DefaultIterations = 500
val DefaultPerplexity = 30.0

def performOutlierDetection(inputVectors: RDD[(Long, Array[Double])],
def performOutlierDetection(inputVectors: Array[Array[Double]],
perplexity: Double = DefaultPerplexity,
tolerance: Double = DefaultPerplexity,
maxIterations: Int = DefaultIterations ): Array[(Long, Double)] = {
tolerance: Double = DefaultTolerance,
maxIterations: Int = DefaultIterations): Array[(Long, Double)] = {

val dMatrix = StochasticOutlierDetection.computeDistanceMatrixPair(inputVectors)
val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(inputVectors)
val aMatrix = StochasticOutlierDetection.computeAffinityMatrix(dMatrix, perplexity, maxIterations, tolerance)
val bMatrix = StochasticOutlierDetection.computeBindingProbabilities(aMatrix)
val oMatrix = StochasticOutlierDetection.computeOutlierProbability(bMatrix)

oMatrix.collect()
oMatrix
}

def binarySearch(affinity: DenseVector[Double],
Expand Down Expand Up @@ -56,38 +53,45 @@ object StochasticOutlierDetection {
newAffinity
}

def computeAffinityMatrix(dMatrix: RDD[(Long, Array[Double])],
def computeAffinityMatrix(dMatrix: Array[(Long, Array[Double])],
perplexity: Double = DefaultPerplexity,
maxIterations: Int,
tolerance: Double): RDD[(Long, DenseVector[Double])] = {
tolerance: Double): Array[(Long, DenseVector[Double])] = {
val logPerplexity = Math.log(perplexity)
dMatrix.map(r => (r._1, binarySearch(new DenseVector(r._2), logPerplexity, maxIterations, tolerance)))
}

def euclDistance(a: Array[Double], b: Array[Double]): Double = sqrt((a zip b).map { case (x, y) => pow(y - x, 2) }.sum)

def computeBindingProbabilities(rows: RDD[(Long, DenseVector[Double])]): RDD[(Long, Array[Double])] =
def computeBindingProbabilities(rows: Array[(Long, DenseVector[Double])]): Array[(Long, Array[Double])] =
rows.map(r => (r._1, (r._2 :/ sum(r._2)).toArray))

def computeDistanceMatrix(data: RDD[Array[Double]]): RDD[(Long, Array[Double])] = computeDistanceMatrixPair(data.zipWithUniqueId().map(_.swap))

def computeDistanceMatrixPair(data: RDD[(Long, Array[Double])]): RDD[(Long, Array[Double])] =
data.cartesian(data).flatMap {
case (a: (Long, Array[Double]), b: (Long, Array[Double])) =>
if (a._1 != b._1)
Some(a._1, euclDistance(a._2, b._2))
else
None
}.combineByKey(
(v1) => List(v1),
(c1: List[Double], v1: Double) => c1 :+ v1,
(c1: List[Double], c2: List[Double]) => c1 ++ c2
).map {
case (a, b) => (a, b.toArray)
}
def computeDistanceMatrix(data: Array[Array[Double]]): Array[(Long, Array[Double])] = computeDistanceMatrixPair(data.zipWithIndex.map(tuple => (tuple._2.toLong, tuple._1)))

def computeDistanceMatrixPair(data: Array[(Long, Array[Double])]): Array[(Long, Array[Double])] = {
data.flatMap(x => data.map(y => (x, y))).
flatMap {
case (a: (Long, Array[Double]), b: (Long, Array[Double])) =>
if (a._1 != b._1)
Some(a._1, euclDistance(a._2, b._2))
else
None
}.
groupBy(_._1).
mapValues(arrayLongDouble => {
arrayLongDouble.foldLeft(Array[Double]())((a, b) =>
a :+ b._2
)
}).
toArray
}

def computeOutlierProbability(rows: RDD[(Long, Array[Double])]):
RDD[(Long, Double)] =
def computeOutlierProbability(rows: Array[(Long, Array[Double])]):
Array[(Long, Double)] =
rows.flatMap(r => r._2.zipWithIndex.map(p =>
(p._2 + (if (p._2 >= r._1) 1L else 0L), p._1))).foldByKey(1.0)((a, b) => a * (1.0 - b))
(p._2 + (if (p._2 >= r._1) 1L else 0L), p._1))).
groupBy(_._1).
mapValues(arrayLongDouble => {
arrayLongDouble.foldLeft(1.0)((a, b) => a * (1.0 - b._2))
}).toArray
}
Expand Up @@ -2,44 +2,37 @@ package frl.driesprong.outlierdetection

import breeze.linalg.{DenseVector, sum}
import frl.driesprong.outlierdectection.StochasticOutlierDetection
import org.apache.spark.{SparkConf, SparkContext}
import org.scalactic.TolerantNumerics
import org.scalatest._

// Unit-tests created based on the Python script of https://github.com/jeroenjanssens/sos
class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAndAfter {
val master = "local"
val conf = new SparkConf().setAppName(this.getClass().getSimpleName()).setMaster(master)
val sc = new SparkContext(conf)

val perplexity = 3

val perplexity = 3.0
val epsilon = 1e-9f
implicit val doubleEq = TolerantNumerics.tolerantDoubleEquality(epsilon)

"Computing the distance matrix " should "give symmetrical distances" in {

val data = sc.parallelize(
Seq(
val data = Array(
Array(1.0, 3.0),
Array(5.0, 1.0)
))
)

val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist)).collect()
val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist))

dMatrix(0) should be(dMatrix(1))
}

"Computing the distance matrix " should "give the correct distances" in {

val data = sc.parallelize(
Seq(
val data =
Array(
Array(1.0, 1.0),
Array(2.0, 2.0),
Array(5.0, 1.0)
))
)

val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist)).collect()
val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data).map(_._2).sortBy(dist => sum(dist))

dMatrix(0) should be(Array(Math.sqrt(2.0), Math.sqrt(10.0)))
dMatrix(1) should be(Array(Math.sqrt(2.0), Math.sqrt(16.0)))
Expand Down Expand Up @@ -73,20 +66,20 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn
"Computing the affinity matrix " should "give the correct affinity" in {

// The datapoints
val data = sc.parallelize(
Seq(
val data =
Array(
Array(1.0, 1.0),
Array(2.0, 1.0),
Array(1.0, 2.0),
Array(2.0, 2.0),
Array(5.0, 8.0) // The outlier!
))
)

val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data)
val aMatrix = StochasticOutlierDetection.computeAffinityMatrix( dMatrix,
perplexity,
StochasticOutlierDetection.DefaultIterations,
StochasticOutlierDetection.DefaultTolerance).map(_._2).sortBy(dist => sum(dist)).collect()
StochasticOutlierDetection.DefaultTolerance).map(_._2).sortBy(dist => sum(dist))

assert(aMatrix.length == 5)
assert(aMatrix(0)(0) === 1.65024581e-06)
Expand Down Expand Up @@ -118,14 +111,14 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn
"Verify the binding probabilities " should "give the correct probabilities" in {

// The distance matrix
val dMatrix = sc.parallelize(
Seq(
val dMatrix =
Array(
(0L, new DenseVector(Array(6.61626106e-112, 1.27343495e-088))),
(1L, new DenseVector(Array(2.21858114e-020, 1.12846575e-044))),
(2L, new DenseVector(Array(1.48949023e-010, 1.60381089e-028)))
))
)

val bMatrix = StochasticOutlierDetection.computeBindingProbabilities(dMatrix).map(_._2).sortBy(dist => sum(dist)).collect()
val bMatrix = StochasticOutlierDetection.computeBindingProbabilities(dMatrix).map(_._2).sortBy(dist => sum(dist))

assert(bMatrix(0)(0) === 5.19560192e-24)
assert(bMatrix(0)(1) === 1.00000000e+00)
Expand All @@ -139,14 +132,14 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn

"Verifying the product " should "should provide valid products" in {

val data = sc.parallelize(
Seq(
val data =
Array(
(0L, Array(0.5, 0.3)),
(1L, Array(0.25, 0.1)),
(2L, Array(0.8, 0.8))
))
)

val oMatrix = StochasticOutlierDetection.computeOutlierProbability(data).map(_._2).sortBy(dist => dist).collect()
val oMatrix = StochasticOutlierDetection.computeOutlierProbability(data).map(_._2).sortBy(dist => dist)

val out0 = (1.0 - 0.5) * (1.0 - 0.0) * (1.0 - 0.8)
val out1 = (1.0 - 0.0) * (1.0 - 0.25) * (1.0 - 0.8)
Expand All @@ -162,14 +155,14 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn
"Verifying the output of the SOS algorithm " should "assign the one true outlier" in {

// The distance matrix
val data = sc.parallelize(
Seq(
val data =
Array(
Array(1.0, 1.0),
Array(2.0, 1.0),
Array(1.0, 2.0),
Array(2.0, 2.0),
Array(5.0, 8.0) // The outlier!
))
)

// Process the steps of the algorithm
val dMatrix = StochasticOutlierDetection.computeDistanceMatrix(data)
Expand All @@ -184,7 +177,7 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn
val oMatrix = StochasticOutlierDetection.computeOutlierProbability(bMatrix)

// Do a distributed sort, and then return to driver
val output = oMatrix.map(_._2).sortBy(rank => rank).collect()
val output = oMatrix.map(_._2).sortBy(rank => rank)

assert(output.length == 5)
assert(output(0) === 0.12707053787018440794)
Expand All @@ -194,4 +187,4 @@ class StocasticOutlierDetectionTest extends FlatSpec with Matchers with BeforeAn
assert(output(4) === 0.99227799024537555184) // The outlier!
}

}
}

0 comments on commit 504001e

Please sign in to comment.