From 4d9d5507308c43aa5cd822de1fded19c279f0f8d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 30 Dec 2016 07:38:52 +0100 Subject: [PATCH] Implemented the Stochastic Outlier Selection algorithm in the Machine Learning library, including the test code and scaladoc documentation. Furthermore extended the development documentation. --- docs/dev/libs/ml/index.md | 4 + docs/dev/libs/ml/sos.md | 120 ++++++ .../outlier/StochasticOutlierSelection.scala | 383 ++++++++++++++++++ .../StochasticOutlierSelectionITSuite.scala | 236 +++++++++++ 4 files changed, 743 insertions(+) create mode 100644 docs/dev/libs/ml/sos.md create mode 100644 flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala create mode 100644 flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala diff --git a/docs/dev/libs/ml/index.md b/docs/dev/libs/ml/index.md index dcd3e0a6f3a8b..129be325e2500 100644 --- a/docs/dev/libs/ml/index.md +++ b/docs/dev/libs/ml/index.md @@ -58,6 +58,10 @@ FlinkML currently supports the following algorithms: * [Alternating Least Squares (ALS)](als.html) +### Outlier selection + +* [Stochastic Outlier Selection (SOS)](sos.html) + ### Utilities * [Distance Metrics](distance_metrics.html) diff --git a/docs/dev/libs/ml/sos.md b/docs/dev/libs/ml/sos.md new file mode 100644 index 0000000000000..22f4c3049b912 --- /dev/null +++ b/docs/dev/libs/ml/sos.md @@ -0,0 +1,120 @@ +--- +mathjax: include +title: Stochastic Outlier Selection +nav-parent_id: ml +--- + + +* This will be replaced by the TOC +{:toc} + + +## Description + +An outlier is one or multiple observations that deviates quantitatively from the majority of the data set and may be the subject of further investigation. +Stochastic Outlier Selection (SOS) developed by Jeroen Janssens[[1]](#janssens) is an unsupervised outlier-selection algorithm that takes as input a set of +vectors. The algorithm applies affinity-based outlier selection and outputs for each data point an outlier probability. +Intuitively, a data point is considered to be an outlier when the other data points have insufficient affinity with it. + +Outlier detection has its application in a number of field, for example, log analysis, fraud detection, noise removal, novelty detection, quality control, + sensor monitoring, etc. If a sensor turns faulty, it is likely that it will output values that deviate markedly from the majority. + +For more information, please consult the [PhD Thesis of Jeroens Janssens](https://github.com/jeroenjanssens/phd-thesis) on +Outlier Selection and One-Class Classification which introduces the algorithm. + +## Parameters + +The stochastic outlier selection algorithm implementation can be controlled by the following parameters: + + + + + + + + + + + + + + + + + + + + + + + +
ParametersDescription
Perplexity +

+ Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference with SOS being a neighbor + is not a binary property, but a probabilistic one, and therefore it a real number. Must be between 1 and n-1, + where n is the number of points. A good starting point can be obtained by using the square root of the number of observations. + (Default value: 30) +

+
ErrorTolerance +

+ The accepted error tolerance to reduce computational time when approximating the affinity. It will + sacrifice accuracy in return for reduced computational time. + (Default value: 1e-20) +

+
MaxIterations +

+ The maximum number of iterations to approximate the affinity of the algorithm. + (Default value: 10) +

+
+ + +## Example + +{% highlight scala %} +val data = env.fromCollection(List( + LabeledVector(0.0, DenseVector(1.0, 1.0)), + LabeledVector(1.0, DenseVector(2.0, 1.0)), + LabeledVector(2.0, DenseVector(1.0, 2.0)), + LabeledVector(3.0, DenseVector(2.0, 2.0)), + LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! +)) + +val sos = new StochasticOutlierSelection().setPerplexity(3) + +val outputVector = sos + .transform(data) + .collect() + +val expectedOutputVector = Map( + 0 -> 0.2790094479202896, + 1 -> 0.25775014551682535, + 2 -> 0.22136130977995766, + 3 -> 0.12707053787018444, + 4 -> 0.9922779902453757 // The outlier! +) + +outputVector.foreach(output => expectedOutputVector(output._1) should be(output._2)) +{% endhighlight %} + +**References** + +[1]J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. +*Stochastic Outlier Selection*. Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the Netherlands, 2012. diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala new file mode 100644 index 0000000000000..2c04bb05fa4e2 --- /dev/null +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val data = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = new StochasticOutlierSelection().setPerplexity(3) + * + * val outputVector = sos + * .transform(data) + * .collect() + * + * val expectedOutputVector = Map( + * 0 -> 0.2790094479202896, + * 1 -> 0.25775014551682535, + * 2 -> 0.22136130977995766, + * 3 -> 0.12707053787018444, + * 4 -> 0.9922779902453757 // The outlier! + * ) + * + * outputVector.foreach(output => expectedOutputVector(output._1) should be(output._2)) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is + * in SOS being a neighbor is not a binary property, but a probabilistic one, and therefore it + * a real number. Must be between 1 and n-1, where n is the number of points. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance when computing the perplexity. When increasing this number, it + * will sacrifice accuracy in return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform to constrain the computational time. + * (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN + * For more information, please read the Stochastic Outlier Selection algorithm technical paper. + * + * @param perplexity the perplexity of the affinity fit + * @return + */ + def setPerplexity(perplexity: Double): StochasticOutlierSelection = { + require(perplexity >= 1, "Perplexity must be at least one.") + parameters.add(Perplexity, perplexity) + this + } + + /** The accepted error tolerance to reduce computational time when approximating the affinity. + * + * @param errorTolerance the accepted error tolerance with respect to the affinity + * @return + */ + def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = { + require(errorTolerance >= 0, "Error tolerance cannot be negative.") + parameters.add(ErrorTolerance, errorTolerance) + this + } + + /** The maximum number of iterations to approximate the affinity of the algorithm. + * + * @param maxIterations the maximum number of iterations. + * @return + */ + def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = { + require(maxIterations > 0, "Maximum iterations must be positive.") + parameters.add(MaxIterations, maxIterations) + this + } + +} + +object StochasticOutlierSelection extends WithParameters { + + // ========================================= Parameters ========================================== + case object Perplexity extends Parameter[Double] { + val defaultValue: Option[Double] = Some(30) + } + + case object ErrorTolerance extends Parameter[Double] { + val defaultValue: Option[Double] = Some(1e-20) + } + + case object MaxIterations extends Parameter[Int] { + val defaultValue: Option[Int] = Some(5000) + } + + // ==================================== Factory methods ========================================== + + def apply(): StochasticOutlierSelection = { + new StochasticOutlierSelection() + } + + // ===================================== Operations ============================================== + case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double]) + + implicit val transformLabeledVectors = { + + new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] { + + + /** Overrides the method of the parent class and applies the sochastic outlier selection + * algorithm. + * + * @param instance Instance of the class + * @param transformParameters The user defined parameters of the algorithm + * @param input A data set which consists of all the LabeledVectors, which should have an + * index or unique integer label as vector. + * @return The outlierness of the vectors compared to each other + */ + override def transformDataSet(instance: StochasticOutlierSelection, + transformParameters: ParameterMap, + input: DataSet[LabeledVector]): DataSet[(Int, Double)] = { + + val resultingParameters = instance.parameters ++ transformParameters + + val vectorsWithIndex = input.map(labeledVector => { + BreezeLabeledVector(labeledVector.label.toInt, labeledVector.vector.asBreeze) + }) + + // Don't map back to a labeled-vector since the output of the algorithm is + // a single double instead of vector + outlierSelection(vectorsWithIndex, resultingParameters) + } + } + } + + /** [[TransformDataSetOperation]] applies the stochastic outlier selection algorithm on a + * [[Vector]] which will transform the high-dimensionaly input to a single Double output. + * + * @tparam T Type of the input and output data which has to be a subtype of [[Vector]] + * @return [[TransformDataSetOperation]] a single double which represents the oulierness of + * the input vectors, where the output is in [0, 1] + */ + implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] + = { + new TransformDataSetOperation[StochasticOutlierSelection, T, Double] { + override def transformDataSet(instance: StochasticOutlierSelection, + transformParameters: ParameterMap, + input: DataSet[T]): DataSet[Double] = { + + val resultingParameters = instance.parameters ++ transformParameters + + // Map to the right format + val vectorsWithIndex = input.zipWithUniqueId.map(vector => { + BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze) + }) + + outlierSelection(vectorsWithIndex, resultingParameters).map(_._2) + } + } + } + + /** Internal entry point which will execute the different stages of the algorithm using a single + * interface + * + * @param inputVectors Input vectors on which the stochastic outlier selection algorithm + * will be applied which should be the index or a unique integer value + * @param transformParameters The user defined parameters of the algorithm + * @return The outlierness of the vectors compared to each other + */ + private def outlierSelection(inputVectors: DataSet[BreezeLabeledVector], + transformParameters: ParameterMap): DataSet[(Int, Double)] = { + val dissimilarityVectors = computeDissimilarityVectors(inputVectors) + val affinityVectors = computeAffinity(dissimilarityVectors, transformParameters) + val bindingProbabilityVectors = computeBindingProbabilities(affinityVectors) + val outlierProbability = computeOutlierProbability(bindingProbabilityVectors) + + outlierProbability + } + + /** Compute pair-wise distance from each vector, to all other vectors. + * + * @param inputVectors The input vectors, will compare the vector to all other vectors based + * on an distance method. + * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector + */ + def computeDissimilarityVectors(inputVectors: DataSet[BreezeLabeledVector]): + DataSet[BreezeLabeledVector] = + inputVectors.cross(inputVectors) { + (a, b) => (a.idx, b.idx, euclideanDistance(a.data, b.data)) + }.filter(dist => dist._1 != dist._2) // Filter out the diagonal, this contains no information. + .groupBy(0) + .sortGroup(1, Order.ASCENDING) + .reduceGroup { + distancesIterator => { + val distances = distancesIterator.toList + val distanceVector = distances.map(_._3).toArray + + BreezeLabeledVector(distances.head._1, BreezeDenseVector(distanceVector)) + } + } + + /** Approximate the affinity by fitting a Gaussian-like function + * + * @param dissimilarityVectors The dissimilarity vectors which represents the distance to the + * other vectors in the data set. + * @param resultingParameters The user defined parameters of the algorithm + * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector + */ + def computeAffinity(dissimilarityVectors: DataSet[BreezeLabeledVector], + resultingParameters: ParameterMap): DataSet[BreezeLabeledVector] = { + val logPerplexity = Math.log(resultingParameters(Perplexity)) + val maxIterations = resultingParameters(MaxIterations) + val errorTolerance = resultingParameters(ErrorTolerance) + + dissimilarityVectors.map(vec => { + val breezeVec = binarySearch(vec.data, logPerplexity, maxIterations, errorTolerance) + BreezeLabeledVector(vec.idx, breezeVec) + }) + } + + /** Normalizes the input vectors so each row sums up to one. + * + * @param affinityVectors The affinity vectors which is the quantification of the relationship + * between the original vectors. + * @return Returns new set of [[BreezeLabeledVector]] with represents the binding + * probabilities, which is in fact the affinity where each row sums up to one. + */ + def computeBindingProbabilities(affinityVectors: DataSet[BreezeLabeledVector]): + DataSet[BreezeLabeledVector] = + affinityVectors.map(vec => BreezeLabeledVector(vec.idx, vec.data :/ sum(vec.data))) + + /** Compute the final outlier probability by taking the product of the column. + * + * @param bindingProbabilityVectors The binding probability vectors where the binding + * probability is based on the affinity and represents the + * probability of a vector binding with another vector. + * @return Returns a single double which represents the final outlierness of the input vector. + */ + def computeOutlierProbability(bindingProbabilityVectors: DataSet[BreezeLabeledVector]): + DataSet[(Int, Double)] = bindingProbabilityVectors + .flatMap(vec => vec.data.toArray.zipWithIndex.map(pair => { + + // The DistanceMatrix removed the diagonal, but we need to compute the product + // of the column, so we need to correct the offset. + val columnIndex = if (pair._2 >= vec.idx) { + 1 + } else { + 0 + } + + (columnIndex + pair._2, pair._1) + })).groupBy(0).reduceGroup { + probabilities => { + var rowNumber = -1 + var outlierProbability = 1.0 + for (probability <- probabilities) { + rowNumber = probability._1 + outlierProbability = outlierProbability * (1.0 - probability._2) + } + + (rowNumber, outlierProbability) + } + } + + /** Performs a binary search to get affinities in such a way that each conditional Gaussian has + * the same perplexity. + * + * @param dissimilarityVector The input dissimilarity vector which represents the current + * vector distance to the other vectors in the data set + * @param logPerplexity The log of the perplexity, which represents the probability of having + * affinity with another vector. + * @param maxIterations The maximum iterations to limit the computational time. + * @param tolerance The allowed tolerance to sacrifice precision for decreased computational + * time. + * @param beta: The current beta + * @param betaMin The lower bound of beta + * @param betaMax The upper bound of beta + * @param iteration The current iteration + * @return Returns the affinity vector of the input vector. + */ + def binarySearch( + dissimilarityVector: BreezeVector[Double], + logPerplexity: Double, + maxIterations: Int, + tolerance: Double, + beta: Double = 1.0, + betaMin: Double = Double.NegativeInfinity, + betaMax: Double = Double.PositiveInfinity, + iteration: Int = 0) + : BreezeVector[Double] = { + + val newAffinity = dissimilarityVector.map(d => Math.exp(-d * beta)) + val sumA = sum(newAffinity) + val hCurr = Math.log(sumA) + beta * sum(dissimilarityVector :* newAffinity) / sumA + val hDiff = hCurr - logPerplexity + + if (iteration < maxIterations && Math.abs(hDiff) > tolerance) { + // Compute the Gaussian kernel and entropy for the current precision + val (newBeta, newBetaMin, newBetaMax) = if (hDiff.isNaN) { + (beta / 10.0, betaMin, betaMax) // Reduce beta to get it in range + } else { + if (hDiff > 0) { + val newBeta = + if (betaMax == Double.PositiveInfinity || betaMax == Double.NegativeInfinity) { + beta * 2.0 + } else { + (beta + betaMax) / 2.0 + } + + (newBeta, beta, betaMax) + } else { + val newBeta = + if (betaMin == Double.PositiveInfinity || betaMin == Double.NegativeInfinity) { + beta / 2.0 + } else { + (beta + betaMin) / 2.0 + } + + (newBeta, betaMin, beta) + } + } + + binarySearch(dissimilarityVector, + logPerplexity, + maxIterations, + tolerance, + newBeta, + newBetaMin, + newBetaMax, + iteration + 1) + } + else { + newAffinity + } + } +} diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala new file mode 100644 index 0000000000000..dc432782c6817 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +import breeze.linalg.{sum, DenseVector => BreezeDenseVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.outlier.StochasticOutlierSelection.BreezeLabeledVector +import org.apache.flink.ml.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class StochasticOutlierSelectionITSuite extends FlatSpec with Matchers with FlinkTestBase { + behavior of "Stochastic Outlier Selection algorithm" + val EPSILON = 1e-16 + + /* + Unit-tests created based on the Python scripts of the algorithms author' + https://github.com/jeroenjanssens/scikit-sos + + For more information about SOS, see https://github.com/jeroenjanssens/sos + J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + Tilburg, the Netherlands, 2012. + */ + + val perplexity = 3 + val errorTolerance = 0 + val maxIterations = 5000 + val parameters = new StochasticOutlierSelection().setPerplexity(perplexity).parameters + + val env = ExecutionEnvironment.getExecutionEnvironment + + it should "Compute the perplexity of the vector and return the correct error" in { + val vector = BreezeDenseVector(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 9.0, 10.0)) + + val output = Array( + 0.39682901665799636, + 0.15747326846175236, + 0.06248996227359784, + 0.024797830280027126, + 0.009840498605275054, + 0.0039049953849556816, + 6.149323865970302E-4, + 2.4402301428445443E-4, + 9.683541280042027E-5 + ) + + val search = StochasticOutlierSelection.binarySearch( + vector, + Math.log(perplexity), + maxIterations, + errorTolerance + ).toArray + + search should be(output) + } + + it should "Compute the distance matrix and give symmetrical distances" in { + + val data = env.fromCollection(List( + BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 3.0))), + BreezeLabeledVector(1, BreezeDenseVector(Array(5.0, 1.0))) + )) + + val distanceMatrix = StochasticOutlierSelection + .computeDissimilarityVectors(data) + .map(_.data) + .collect() + .toArray + + distanceMatrix(0) should be(distanceMatrix(1)) + } + + it should "Compute the distance matrix and give the correct distances" in { + + val expectedDistanceMatrix = Array( + Array(Math.sqrt(2.0), Math.sqrt(10.0)), + Array(Math.sqrt(2.0), Math.sqrt(16.0)), + Array(Math.sqrt(16.0), Math.sqrt(10.0)) + ) + + val data = env.fromCollection(Array( + BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 1.0))), + BreezeLabeledVector(1, BreezeDenseVector(Array(2.0, 2.0))), + BreezeLabeledVector(2, BreezeDenseVector(Array(5.0, 1.0))) + )) + + val distanceMatrix = StochasticOutlierSelection + .computeDissimilarityVectors(data) + .map(_.data.toArray) + .collect() + .sortBy(dist => sum(dist)) + .toArray + + distanceMatrix should be(expectedDistanceMatrix) + } + + it should "Computing the affinity matrix and return the correct affinity" in { + + val data = env.fromCollection(List( + BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 1.0))), + BreezeLabeledVector(1, BreezeDenseVector(Array(2.0, 1.0))), + BreezeLabeledVector(2, BreezeDenseVector(Array(1.0, 2.0))), + BreezeLabeledVector(3, BreezeDenseVector(Array(2.0, 2.0))), + BreezeLabeledVector(4, BreezeDenseVector(Array(5.0, 8.0))) // The outlier! + )) + + val distanceMatrix = StochasticOutlierSelection.computeDissimilarityVectors(data) + + + val affinityMatrix = StochasticOutlierSelection.computeAffinity(distanceMatrix, parameters) + .collect() + .map(_.data.toArray) + .sortBy(dist => sum(dist)) + .toArray + + val expectedAffinityMatrix = Array( + Array( + 1.6502458086204375E-6, 3.4496775759599478E-6, 6.730049701933432E-6, 1.544221669904019E-5), + Array(0.2837044890495805, 0.4103155587026411, 0.4103155587026411, 0.0025393148189994897), + Array(0.43192525601205634, 0.30506325262816036, 0.43192525601205634, 0.0023490595181415333), + Array(0.44804626736879755, 0.3212891538762665, 0.44804626736879755, 0.0022108233460722557), + Array(0.46466276524577704, 0.46466276524577704, 0.3382687394674377, 0.002071952211368232) + ) + + affinityMatrix should be(expectedAffinityMatrix) + } + + it should "Compute the binding probabilities and return the correct probabilities" in { + + val expectedBindingProbabilityMatrix = Array( + Array(0.00000000000000000, 0.3659685430819966, 0.36596854308199660, + 0.2664300527549236, 0.0016328610810832176), + Array(0.06050907527090226, 0.1264893287483121, 0.24677254025174370, + 0.5662290557290419, 0.0000000000000000000), + Array(0.25630819225892230, 0.3706990977807361, 0.37069909778073610, + 0.0000000000000000, 0.0022936121796053232), + Array(0.36737364041784460, 0.0000000000000000, 0.26343993596023335, + 0.3673736404178446, 0.0018127832040774768), + Array(0.36877315905154990, 0.2604492865700658, 0.00000000000000000, + 0.3687731590515499, 0.0020043953268345785) + ) + + // The distance matrix + val data = env.fromCollection(List( + BreezeLabeledVector(0, new BreezeDenseVector( + Array(0.00000000e+00, 4.64702705e-01, 4.64702705e-01, 3.38309859e-01, 2.07338848e-03))), + BreezeLabeledVector(1, new BreezeDenseVector( + Array(4.48047312e-01, 0.00000000e+00, 3.21290213e-01, 4.48047312e-01, 2.21086260e-03))), + BreezeLabeledVector(2, new BreezeDenseVector( + Array(4.31883411e-01, 3.05021457e-01, 0.00000000e+00, 4.31883411e-01, 2.34741892e-03))), + BreezeLabeledVector(3, new BreezeDenseVector( + Array(2.83688288e-01, 4.10298990e-01, 4.10298990e-01, 0.00000000e+00, 2.53862706e-03))), + BreezeLabeledVector(4, new BreezeDenseVector( + Array(1.65000529e-06, 3.44920263e-06, 6.72917236e-06, 1.54403440e-05, 0.00000000e+00))) + )) + + val bindingProbabilityMatrix = StochasticOutlierSelection.computeBindingProbabilities(data) + .map(_.data.toArray) + .collect() + .sortBy(_ (0)) // Sort by the first element, because the sum is always equal to 1 + .toArray + + bindingProbabilityMatrix should be(expectedBindingProbabilityMatrix) + } + + + it should "Compute the product of the vector, should return the correct values" in { + + val data = env.fromCollection(List( + BreezeLabeledVector(0, BreezeDenseVector(0.5, 0.3)), + BreezeLabeledVector(1, BreezeDenseVector(0.25, 0.1)), + BreezeLabeledVector(2, BreezeDenseVector(0.8, 0.8)) + )) + + val outlierMatrix = StochasticOutlierSelection.computeOutlierProbability(data) + .map(_._2) + .collect() + .sortBy(dist => dist) + .toArray + + // The math by hand + val expectedOutlierMatrix = Array( + (1.0 - 0.5) * (1.0 - 0.0) * (1.0 - 0.8), + (1.0 - 0.0) * (1.0 - 0.25) * (1.0 - 0.8), + (1.0 - 0.3) * (1.0 - 0.1) * (1.0 - 0) + ) + + outlierMatrix should be(expectedOutlierMatrix) + } + + it should "Verifying the output of the SOS algorithm assign the one true outlier" in { + + val data = env.fromCollection(List( + LabeledVector(0.0, DenseVector(1.0, 1.0)), + LabeledVector(1.0, DenseVector(2.0, 1.0)), + LabeledVector(2.0, DenseVector(1.0, 2.0)), + LabeledVector(3.0, DenseVector(2.0, 2.0)), + LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + )) + + val sos = new StochasticOutlierSelection().setPerplexity(3) + + val outputVector = sos + .transform(data) + .collect() + + val expectedOutputVector = Map( + 0 -> 0.2790094479202896, + 1 -> 0.25775014551682535, + 2 -> 0.22136130977995766, + 3 -> 0.12707053787018444, + 4 -> 0.9922779902453757 // The outlier! + ) + + outputVector.foreach(output => + expectedOutputVector(output._1) should be(output._2 +- EPSILON)) + } +}