# Functional vs Imperativ style Spark DenseVector sqdist

In [ ]:
import org.apache.spark.ml.linalg.{Vector, SparseVector, Vectors, DenseVector}

import org.apache.spark.ml.linalg.{Vector, SparseVector, Vectors, DenseVector}


## This method is the original one imported from spark github, the name of the function is modify and also `Vectors.sqdist` is used rather than `sqdist`

In [ ]:
def sqdistImperativ(v1: Vector, v2: Vector): Double = {
  require(v1.size == v2.size, s"Vector dimensions do not match: Dim(v1)=${v1.size} and Dim(v2)" +
    s"=${v2.size}.")
  var squaredDistance = 0.0
  (v1, v2) match {
    case (v1: SparseVector, v2: SparseVector) =>
      val v1Values = v1.values
      val v1Indices = v1.indices
      val v2Values = v2.values
      val v2Indices = v2.indices
      val nnzv1 = v1Indices.length
      val nnzv2 = v2Indices.length

      var kv1 = 0
      var kv2 = 0
      while (kv1 < nnzv1 || kv2 < nnzv2) {
        var score = 0.0

        if (kv2 >= nnzv2 || (kv1 < nnzv1 && v1Indices(kv1) < v2Indices(kv2))) {
          score = v1Values(kv1)
          kv1 += 1
        } else if (kv1 >= nnzv1 || (kv2 < nnzv2 && v2Indices(kv2) < v1Indices(kv1))) {
          score = v2Values(kv2)
          kv2 += 1
        } else {
          score = v1Values(kv1) - v2Values(kv2)
          kv1 += 1
          kv2 += 1
        }
        squaredDistance += score * score
      }

    case (v1: SparseVector, v2: DenseVector) =>
      squaredDistance = Vectors.sqdist(v1, v2)

    case (v1: DenseVector, v2: SparseVector) =>
      squaredDistance = Vectors.sqdist(v2, v1)

    case (DenseVector(vv1), DenseVector(vv2)) =>
      var kv = 0
      val sz = vv1.length
      while (kv < sz) {
        val score = vv1(kv) - vv2(kv)
        squaredDistance += score * score
        kv += 1
      }
    case _ =>
      throw new IllegalArgumentException("Do not support vector type " + v1.getClass +
        " and " + v2.getClass)
  }
  squaredDistance
}

sqdistImperativ: (v1: org.apache.spark.ml.linalg.Vector, v2: org.apache.spark.ml.linalg.Vector)Double


## This function is the functional one with tail recursiv call for DenseVector case

In [ ]:
def sqdistFunctional(v1: Vector, v2: Vector): Double = {
  require(v1.size == v2.size, s"Vector dimensions do not match: Dim(v1)=${v1.size} and Dim(v2)" +
    s"=${v2.size}.")
  var squaredDistance = 0.0
  (v1, v2) match {
    case (v1: SparseVector, v2: SparseVector) =>
      val v1Values = v1.values
      val v1Indices = v1.indices
      val v2Values = v2.values
      val v2Indices = v2.indices
      val nnzv1 = v1Indices.length
      val nnzv2 = v2Indices.length

      var kv1 = 0
      var kv2 = 0
      while (kv1 < nnzv1 || kv2 < nnzv2) {
        var score = 0.0

        if (kv2 >= nnzv2 || (kv1 < nnzv1 && v1Indices(kv1) < v2Indices(kv2))) {
          score = v1Values(kv1)
          kv1 += 1
        } else if (kv1 >= nnzv1 || (kv2 < nnzv2 && v2Indices(kv2) < v1Indices(kv1))) {
          score = v2Values(kv2)
          kv2 += 1
        } else {
          score = v1Values(kv1) - v2Values(kv2)
          kv1 += 1
          kv2 += 1
        }
        squaredDistance += score * score
      }

    case (v1: SparseVector, v2: DenseVector) =>
      squaredDistance = Vectors.sqdist(v1, v2)

    case (v1: DenseVector, v2: SparseVector) =>
      squaredDistance = Vectors.sqdist(v2, v1)

    case (DenseVector(vv1), DenseVector(vv2)) =>
      val sz = vv1.length
      @annotation.tailrec
      def go(d: Double, kv: Int): Double = {
        if (kv < sz) {
          val score = vv1(kv) - vv2(kv)
          go(d + score * score, kv + 1)
        }
        else d
      }
      go(0D, 0)
    case _ =>
      throw new IllegalArgumentException("Do not support vector type " + v1.getClass +
        " and " + v2.getClass)
  }
  squaredDistance
}

sqdistFunctional: (v1: org.apache.spark.ml.linalg.Vector, v2: org.apache.spark.ml.linalg.Vector)Double


## My humble speedtest, i know it has to be more rigorous with specific JVM performance tools but results looks encouraging in the favor of functional approach especially for medium and high dimensions

In [ ]:
val numberRuns = 1000000
val warmUpRun = 600000
// val vectorSizes = Array(3, 10, 50, 100, 1000)
// val vectorSizes = Array(2, 3, 5, 7, 10)
val vectorSizes = Array(10, 20, 40, 60, 80, 100, 200)
// val vectorSizes = Array(100, 1000, 10000)

val times = vectorSizes.map( vSize => {

  var ti = 0L
  var tf = 0L

  (0 until numberRuns).foreach( i =>  {

    val dv1 = Vectors.dense(Array.fill(vSize)(scala.util.Random.nextDouble))
    val dv2 = Vectors.dense(Array.fill(vSize)(scala.util.Random.nextDouble))

    val t01 = System.nanoTime
    sqdistImperativ(dv1, dv2)
    val t02 = System.nanoTime
    sqdistFunctional(dv1, dv2)
    val t03 = System.nanoTime

    if( i >= warmUpRun ) {
      ti += t02 - t01
      tf += t03 - t02
    }
  })


  val totalTimeSorted = Array(("Imperativ", ti), ("Functional", tf)).sortBy(_._2)
  (vSize, totalTimeSorted)
})

val flatten = times.flatMap{ case (vSize, ttimes) => ttimes.map{ case (id, tab) => (vSize, id, tab) } }

flatten

numberRuns: Int = 1000000
warmUpRun: Int = 600000
vectorSizes: Array[Int] = Array(10, 20, 40, 60, 80, 100, 200)
times: Array[(Int, Array[(String, Long)])] = Array((10,Array((Functional,22438685), (Imperativ,26230969))), (20,Array((Functional,21433623), (Imperativ,25964069))), (40,Array((Functional,21721776), (Imperativ,31009769))), (60,Array((Functional,22339339), (Imperativ,39731951))), (80,Array((Functional,21685000), (Imperativ,48099742))), (100,Array((Functional,22453746), (Imperativ,57582411))), (200,Array((Functional,24642399), (Imperativ,107086209))))
flatten: Array[(Int, String, Long)] = Array((10,Functional,22438685), (10,Imperativ,26230969), (20,Functional,21433623), (20,Imperativ,25964069), (40,Functional,21721776), (40,Imperativ,31009769), (60,Functional,22339339), (60,Imper...