# K-Means Example

&emsp; Show capabilities processing iterative or recursive algorithms
- Wayang API allows working with cycles
- Enumerating process review if an operator was already processed

&emsp; Review in detail how to work with Wayang API


---

## Preparing dependancies


This step imports the required modules to execute the code. All these packages come from the previous Maven Instalation

```scala
import $ivy.`ORGANIZATION:MODULE:VERSION` will bring the exact version
import $ivy.`ORGANIZATION::MODULE:VERSION` the "::" bring the version of scala needed
```

The imported libraries are:

Module | Java's | Scala's | Description
:----- | -------------: | --------------: | :----------
wayang-core | 8, 11 | 2.11, 2.12 | provides core data structures and the optimizer (required)
wayang-basic | 8, 11 | 2.11, 2.12 | provides common operators and data types for your apps (recommended)
wayang-api-scala-java | 8, 11 | 2.11, 2.12 | provides an easy-to-use Scala and Java API to assemble wayang plans (recommended)
wayang-spark | 8, 11 | 2.11, 2.12 | adapters for [Apache Spark](https://spark.apache.org) processing platforms
hadoop-common | 8,11 | - | Hadoop-commons is required because the lack of the Environment Variable **HADOOP_HOME**


In [1]:
import $ivy.`com.thoughtworks.paranamer:paranamer:2.8`

import $ivy.`org.apache.wayang::wayang-api-scala-java:0.6.1-SNAPSHOT`
import $ivy.`org.apache.wayang:wayang-core:0.6.1-SNAPSHOT`
import $ivy.`org.apache.wayang:wayang-basic:0.6.1-SNAPSHOT`
import $ivy.`org.apache.wayang::wayang-spark:0.6.1-SNAPSHOT`
import $ivy.`org.apache.hadoop:hadoop-common:2.8.5`

[32mimport [39m[36m$ivy.$                                                        
[39m
[32mimport [39m[36m$ivy.$                                             
[39m
[32mimport [39m[36m$ivy.$                                              
[39m
[32mimport [39m[36m$ivy.$                                               
[39m
[32mimport [39m[36m$ivy.$                                      [39m

--- 

## Import Class Section

The classes that are required to run the code

In [2]:
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.util.fs.FileSystems
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.plugin.Plugin
import org.apache.wayang.spark.Spark
import java.io.File
import java.util.{Collection => JavaCollection}
import scala.collection.JavaConversions._
import scala.util.Random

[32mimport [39m[36morg.apache.wayang.api._
[39m
[32mimport [39m[36morg.apache.wayang.core.api.{Configuration, WayangContext}
[39m
[32mimport [39m[36morg.apache.wayang.core.util.fs.FileSystems
[39m
[32mimport [39m[36morg.apache.wayang.core.function.ExecutionContext
[39m
[32mimport [39m[36morg.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
[39m
[32mimport [39m[36morg.apache.wayang.core.plugin.Plugin
[39m
[32mimport [39m[36morg.apache.wayang.spark.Spark
[39m
[32mimport [39m[36mjava.io.File
[39m
[32mimport [39m[36mjava.util.{Collection => JavaCollection}
[39m
[32mimport [39m[36mscala.collection.JavaConversions._
[39m
[32mimport [39m[36mscala.util.Random[39m

In [3]:
/**
  * Represents objects with an x and a y coordinate.
  */
sealed trait PointLike {

  /**
    * @return the x coordinate
    */
  def x: Double

  /**
    * @return the y coordinate
    */
  def y: Double

}

/**
  * Represents a two-dimensional point.
  *
  * @param x the x coordinate
  * @param y the y coordinate
  */
case class Point(x: Double, y: Double) extends PointLike {

  /**
    * Calculates the Euclidean distance to another [[Point]].
    *
    * @param that the other [[PointLike]]
    * @return the Euclidean distance
    */
  def distanceTo(that: PointLike) = {
    val dx = this.x - that.x
    val dy = this.y - that.y
    math.sqrt(dx * dx + dy * dy)
  }

  override def toString: String = f"($x%.2f, $y%.2f)"
}

/**
  * Represents a two-dimensional point with a centroid ID attached.
  */
case class TaggedPoint(x: Double, y: Double, centroidId: Int) extends PointLike {

  /**
    * Creates a [[Point]] from this instance.
    *
    * @return the [[Point]]
    */
  def toPoint = Point(x, y)

}

/**
  * Represents a two-dimensional point with a centroid ID and a counter attached.
  */
case class TaggedPointCounter(x: Double, y: Double, centroidId: Int, count: Int = 1) extends PointLike {

  def this(point: PointLike, centroidId: Int, count: Int) = this(point.x, point.y, centroidId, count)

  /**
    * Adds coordinates and counts of two instances.
    *
    * @param that the other instance
    * @return the sum
    */
  def +(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.centroidId, this.count + that.count)

  /**
    * Calculates the average of all added instances.
    *
    * @return a [[TaggedPoint]] reflecting the average
    */
  def average = TaggedPoint(x / count, y / count, centroidId)

}

defined [32mtrait[39m [36mPointLike[39m
defined [32mclass[39m [36mPoint[39m
defined [32mclass[39m [36mTaggedPoint[39m
defined [32mclass[39m [36mTaggedPointCounter[39m

In [4]:
/**
  * UDF to select the closest centroid for a given [[Point]].
  */
class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {

  /** Keeps the broadcasted centroids. */
  var centroids: JavaCollection[TaggedPoint] = _

  override def open(executionCtx: ExecutionContext) = {
    centroids = executionCtx.getBroadcast[TaggedPoint]("centroids")
  }

  override def apply(point: Point): TaggedPointCounter = {
    var minDistance = Double.PositiveInfinity
    var nearestCentroidId = -1
    for (centroid <- centroids) {
      val distance = point.distanceTo(centroid)
      if (distance < minDistance) {
        minDistance = distance
        nearestCentroidId = centroid.centroidId
      }
    }
    new TaggedPointCounter(point, nearestCentroidId, 1)
  }
}

defined [32mclass[39m [36mSelectNearestCentroid[39m

In [5]:
/**
* Creates random centroids.
*
* @param n      the number of centroids to create
* @param random used to draw random coordinates
* @return the centroids
*/
def createRandomCentroids(n: Int, random: Random = new Random()) =
  // NOTE: The random cluster ID makes collisions during resurrection 
  //       less likely but in general permits ID collisions.
  for (i <- 1 to n) yield TaggedPoint(random.nextGaussian(), random.nextGaussian(), random.nextInt())

defined [32mfunction[39m [36mcreateRandomCentroids[39m

In [6]:
val inputFile = new File("files/census.txt").toURI().toString()
val k = 100
val iterations = 10

[36minputFile[39m: [32mString[39m = [32m"file:/home/jovyan/work/census.txt"[39m
[36mk[39m: [32mInt[39m = [32m100[39m
[36miterations[39m: [32mInt[39m = [32m10[39m

In [7]:
var plugin: Plugin = Spark.basicPlugin

In [8]:
class Kmeans(field0: Int, field1: Int) extends Serializable {
    
  def apply(plugin: Plugin, k: Int, inputFile: String, iterations: Int): Iterable[Point] = {
    val context = new WayangContext().withPlugin(plugin)
    val planBuilder = new PlanBuilder(context)
      
    // Read and parse the input file(s).
    val points = planBuilder
      .readTextFile(inputFile)
      .filter( line => line.trim.nonEmpty && !line.startsWith("caseid") )
      .map { line =>
        val fields = line.split(",")
        Point(fields(field0).toDouble, fields(field1).toDouble)
      }

    // Create initial centroids.
    val initialCentroids = planBuilder
      .loadCollection(createRandomCentroids(k))

    // Do the k-means loop.
    val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
      points
        .mapJava(
          new SelectNearestCentroid
        )
        .withBroadcast(currentCentroids, "centroids")
        .reduceByKey(_.centroidId, _ + _)
        .map(_.average)
    })

    // Collect the result.
    finalCentroids
      .map(_.toPoint)
      .collect()
  }
}

defined [32mclass[39m [36mKmeans[39m

In [9]:
var result = new Kmeans(0, 1).apply(plugin, k, inputFile, iterations)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/jovyan/.cache/coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/jovyan/.cache/coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/03/03 18:06:12 INFO SparkContext: Running Spark version 3.1.2
22/03/03 18:06:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/03 18:06:13 INFO ResourceUtils: No custom resources configured for spark.driver.
22/03/03 18:06:13 INFO SparkContext: 