# ISOMAP

Isomap stands for isometric mapping. Isomap is a non-linear dimensionality reduction method based on spectral theory which tries to preserve the geodesic distances in the lower dimension. Isomap starts by creating a neighborhood network. After that, it uses graph distance to approximate geodesic distance between all pairs of points. And then, through  eigenvalue decomposition of geodesic distance matrix it finds the low dimensional embedding of the dataset. In  non-linear manifolds euclidean metric for distance holds good if and only if neigborhood structure can be approximated as linear. If neighborhood contains holes, then euclidean distances can be highly misleading. In contrast to this, if we measure the distance between two points by following the manifold, we will have a better approximation of how far or near two points are. 
Let's understand this with a extremely simple 2-D example. Suppose our data lies on a circular manifold in a 2-D structure like in the image below.

Why geodesic distances are better than euclidean distances in nonlinear manifolds?
![pic](isomap_explain.png)

We will reduce the data to 1-D using euclidean distances and approximate geodesic distances. Now, if we look at 
the 1-D mapping based on the euclidean metric, we see that for points which are far apart(`a & b`) have been mapped poorly. Only the points which can be approximated to lie on a linear manifold(`c & d`) give satisfactory results. On the other hand see the mapping with geodesic distances, it nicely approximates the close points as neighbors and far away points as distant.   
The geodesic distances between two points in the image is approximated by graph distance between the two points. Thus, euclidean distances should not be used for appoximating distance between two points in non-linear manifolds while geodesic distances can be used.

Isomap uses the above principle to create a similarity matrix for eigenvalue decomposition. Unlike other non-linear dimensionality reduction like `LLE & LPP` which only use local information, isomap uses the local information to create a global similarity matrix. Isomap algorithm uses euclidean metrics to prepare the neighborhood graph. Then , it approximates the gedoesic distance between two points by measuring shortest path between these points using graph distance. Thus, it approximates both global as well as local structure of the dataset in the low dimensional embedding. 

Let's have a basic understanding of few concepts which we need to implement Isomap algorithm.  
**Pregel API** - Pregel is a distributed programming model developed by google for processing large scale graphs . It is the inspiration behind the Apache giraph project and GraphX library of spark. Pregel is basically a message-passing interface based on a idea that a vertex's state should depend on its neighbors. A pregel computation takes as input a graph and a set of vertex states. At every iteration which is called *superstep* it processes messages received at a vertex and updates the vertex state. After that it decides which of it's neighbors should receive message at next superstep and what should be the message. Thus, messages are passed along edges and computation happens only at the vertices. Graph is not passed across the network only messages. Computation stops at maximum iterations or when no messages are left to pass.  Let's understand it using a simple example.
Suppose, We need to find the degree of each vertex for the graph given below. Image shown below represents a single iteration of pregel model. At initialization, every vertex's degree is `0`. We can send an empty message as initial message to start the computation. At the end of superstep 1, Each vertex sends  message `1` through each of its edges. At next superstep each vertex sums the messages received and update its degree. 

![pregel](pregel.png)


**Classical MDS** - Isomap is closely related to the original multi dimensional scaling algorithm proposed by the Torgerson and Gower. In fact, it is an extension of the classical multidimensional scaling. Classical multidimensional algorithm gives a closed form solution to the dimensionality reduction problem.  Classical MDS uses the euclidean distances as the similairty metric while isomap uses geodesic distances.
Steps of classical MDS are
1. Create matrix of squared dissimilarities $\Delta^2(X)$ from the given X.  
2. Obtain the matrix $B$ by double centring the dissimialrity matrix $B = -\frac{1}{2}J \Delta^2 J$
2. Compute the eigenvalue decomposition of matrix B, $B_{\Delta} = Q\Lambda Q^`$ 
3. Choose the K eigenvectors having K highest eigenvalues.


### Steps of IsoMaps
1. Necessary imports

In [1]:
//Imports 
// 1. from the spark module
import org.apache.spark.sql.{functions => funcs}
import org.apache.spark.ml.feature.{StandardScaler, MaxAbsScaler}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.linalg.{ Vectors => mlVs, Vector => mlV, DenseVector => mlDV}
import org.apache.spark.mllib.linalg.{Vector => mllibV, Vectors => mllibVs}
import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix, IndexedRow}
import org.apache.spark.sql.Row

// scala language modules
import scala.collection.immutable.{Map => imMap, TreeMap => TMap}
import scala.{math => m}

// vegas-viz for visualization
import vegas._
import vegas.data.External._

// mathematical computation library Breeze modules
import breeze.linalg._
import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM}
import breeze.stats._  

Isomap differs from classical MDS in intial few steps only. Instead of using euclidean metric for dissimialrity, it uses graph distances. Steps of Isomap algorithm
1. **Neighborhood search** -
Neighbourhood can be created  through k-nearest neighbor or $\epsilon$-ball neighborhood approach.  
**K-nearest neighbor** - Each point is connected to its K-nearest points. Using this appraoch we will always have K-neighbors for each and every point. Since a point selects exactly K-points and it may be selected by some other point as neighbor which is not in his neighborhood set. This situation generally arises in case of an isolated point which selects faraway points as neighbors, while these neighbors can select neighborhood set from smaller distance. This produces asymmetric neighborhood matrix.  
**$\epsilon$-ball neighbor** - Each point $Y_i$ selects every point inside the ball with radius $\epsilon$ and centered at $Y_i$ as its neighbors. This approach sometimes leads to points with no neighbors. It is hard to find the right $\epsilon$, since smaller value will give many isolated points and higher value will have many neighbors for each points. This approach is good for approximating geodesic distances.  
While creating neighborhood matrix, we have to make sure that the every neighborhood graphs is connected to atleast one other neighborhood graph. If two neighborhoods don't have a common points among them, then we will have disconnected compoenents in graph and dissimilarity matrix will remain incomplete .  
Now we have to assign weight to each edge for creating an adjacency matrix from the neighborhood graph. If we need an unweighted graph we can choose all edge weights equal to 1, otherwise we can use euclidean distances between the points as weights for the weighted graphs.  
We will be using $\epsilon$-Ball neighborhood approach with $\epsilon$=0.5.

In [None]:
//read the dataset 
val n_samples =150
val n_dim = 2
val n_bc = sc.broadcast(n_samples)

val convertUDF = funcs.udf((array : Seq[Double]) => {mlVs.dense(array.toArray)})

var rdd = sc.textFile("iris.csv").filter(_(0).isDigit).map(_.split(",").take(4)).
                                  map(_.map(_.toDouble)).zipWithIndex()
var df = spark.createDataFrame(rdd ).toDF("features","id").
                                withColumn("features",convertUDF(funcs.col("features")))

// scale the dataset 
val MaxScaler = new MaxAbsScaler().setInputCol("features").
                                setOutputCol("scaled_features")
val model_abs = MaxScaler.fit(df)

df = model_abs.transform(df)
df.cache()

df = df.drop("features").select(funcs.col("id"), funcs.col("scaled_features").alias("features"))

df = df.crossJoin(df.select(funcs.col("id").alias("id2"),funcs.col("features").alias("features2")))

val udf_dist = funcs.udf((x:mlV, y:mlV) => m.sqrt(mlVs.sqdist(x,y)))

df = df.withColumn("dist", udf_dist(funcs.col("features"),funcs.col("features2"))).
                                 drop("features","features2")

val epsilon = 0.5
val edge_bool = funcs.udf((x:Double, y:Double) => (x < y))

df = df.filter(edge_bool(funcs.col("dist"), funcs.lit(epsilon)))

df.cache()
df.show()


### 2.  Create the similariity matrix
After neighborhood search, we will use spark's graphX library for calculating the geodesic distances between the points. While creating our neighborhood network, we have to make sure that the resulting graph is a single connected component. If not, then our similarity matrix will remain incomplete and results will be incoherent. We need to iterate over the different values of neighborhood selection parameter to get the fully connected graph. As of now, spark does not have a shortestpath function for the weighted graph. We will have to implement it. 

In [3]:

def ShortestPath(Verts: RDD[(VertexId, imMap[Long, Double])], 
                 Edges: RDD[Edge[Double]], landmarks: Seq[Long] = Seq()): 
                                                        Graph[imMap[Long,Double],Double] = {

        
        val g = Graph(Verts, Edges)

        type SPMap = Map[VertexId, Double]
 

        def makeMap(x: (VertexId, Double)*) = Map(x: _*)

        def incrementMap(spmap1: SPMap, spmap2: SPMap, d: Double): SPMap = {
            spmap1.map { case (k, v) => 
                if (v + d < spmap2.getOrElse(k, Double.MaxValue)) k -> (v + d)
                else -1L -> 0.0
        
            }
        
        }

        def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
            (spmap1.keySet ++ spmap2.keySet).map {
              k => k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue))
            }(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
        }
        
        var spGraph: Graph[imMap[Long,Double],Double]  = null
        
        if (landmarks.isEmpty){
            spGraph = g.mapVertices { (vid, attr) => makeMap(vid -> 0)}
        }
        else{
            spGraph = g.mapVertices { (vid, attr) => 
                                if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()}
        }                                      
        
        val initialMessage = makeMap()

        def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
            addMaps(attr, msg)
        }

        def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {

            val newAttr = incrementMap(edge.srcAttr, edge.dstAttr, edge.attr) - (-1)
    
            if (!newAttr.isEmpty) Iterator((edge.dstId, newAttr))
            else Iterator.empty
    
        }

        val h = Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)

        return(h)
}
                                                        


Our shortest path function accepts two arguements first, a graph RDD with map as a vertex attribute and weight as an edge attribute & second ,a sequence of vertex ids for which we need distance. `sendMessage` defines a function which decides whom to send the message in the current iteration. The `vertexProgram` does all the processing of messages that are received at a node. 

### 3. Eigenvalue decomposition of the similarity matrix
Remember before eigenvalue decomposition, we have to square the distance and double centre the squared similarities matrix.

In [4]:
//code for full isomap algo


//initial states of the vertices
val states: Map[Long, Double] = imMap()

// vertex RDD
val Verts: RDD[(VertexId, imMap[Long, Double])] = sc.parallelize((0 to (n_samples-1)).toSeq.map(i => 
                                                                                        i.toLong -> states))
// Edge RDD                                                                                        
val Edges: RDD[Edge[Double]] = df.rdd.map(x => Edge(x.getLong(0), x.getLong(1), x.getDouble(2)))

// create centering matrix
def create_row( id:Int): mllibV = {
    
    var row = (1 to n_bc.value).map(i => (-1.0/n_bc.value)).toArray
    row(id) = row(id)+1.0
    return(mllibVs.dense(row))

}

val ids = sc.parallelize((0 to (n_bc.value -1)).toSeq)


val udf_cmat = funcs.udf((id:Int) => create_row(id))
val cMat = new IndexedRowMatrix(ids.map((id:Int) => IndexedRow(id, create_row(id)))).toBlockMatrix(10,10)


// need to square the dissimilarities. negative and 0.5 is due to the formula for centering the dist^2 matrix. & 
// it's much easier to apply it here than afterwards

val graph_dist = ShortestPath(Verts, Edges)
val graph_verts = graph_dist.mapVertices((x:Long,y:imMap[Long,Double]) => 
                                            TMap(y.toSeq:_*).values.toArray.map(i => -0.5*i*i)).vertices





val graph_irm = new IndexedRowMatrix(graph_verts.map((x:Tuple2[Long, Array[Double]]) => 
                                            IndexedRow(x._1, mllibVs.dense(x._2)))).toBlockMatrix(10,10)

// centered dist mat

val cDistMat = (cMat.multiply(graph_irm)).multiply(cMat).toIndexedRowMatrix()

val Svd = cDistMat.computeSVD(2, true)
val U = Svd.U
val s_bc = sc.broadcast(Svd.s.toArray)

val Isomap = U.rows.map(x => (x.index,x.vector.toArray.zip(s_bc.value).map(t => t._1/t._2)))

### 4. Visualization of the embedded dataset.


![pic](isomap.png)

## LandMark-Isomap

What we implemented above was the vanilla version of Isomap. It requires a lot of time and computing power.
It has two bottlenecks first Calculation of dissimilarity matrix requires $O(N^2)$ operations where $N$ is the number of the samples and second calculation of pairiwise graph distances. If N is huge, which is true generally in case of big datasets, it becomes impractical.
Solution to this problem is **Landmark Isomap**. Landmark isomap is based on landmark MDS. Landmark MDS selects a group of points termed as **Landmarks** and implements classical MDS on them. Based on the mapping obtained from classical MDS, remaining points are mapped in the low dimensional embedding using distance based triangulation.  
Steps for Landmark classical scaling  
1. Selects landmarks points $X_{landmarks}$
2. Apply classical MDS on landmarks points  and obtain low dimensional emebedding $L_k$
3. calculate $\delta_{u}$ where $\delta_{ui}$ is mean of $i_{th}$ row of dissimilarity matrix of landmark points.
4. Given a vector $x_a$ calculate $\delta_a$ where $\delta_{ai}$ is the squared distance between the point $x_a$ and the landmark point $i$
5. the low dimensional embedding for the $x_a$ is given by $y_a^{} = \frac{1}{2}L_k^{-1}(\delta_a - \delta_u)$ where $L_k^{-1}$ is the penrose moore inverse of the $L_k$

Selection of landmark points can be random or through a specific method. For obtaining a K-dimensional embedding at least K+1 landmark points are needed. For reasons related to the stability of the algorithm, number of landmark points chosen should be more than strict minimum.
The accuracy of isometric mapping in landmark isomap does not suffer much due to approximation in the algorithm.

In [None]:

val r = scala.util.Random
r.setSeed(100)
val num_landmarks = 10
val num_lmarks_bc = sc.broadcast(num_landmarks)
val landmark_ids = ((1 to (2*num_landmarks)) map {i => r.nextInt(n_samples)}).toSet.take(num_landmarks)
val states: Map[Long, Double] = imMap()

val Verts: RDD[(VertexId, imMap[Long, Double])] = sc.parallelize((landmark_ids).toSeq.map(i => 
                                                                                        i.toLong -> states))
                                                                                        
val Edges: RDD[Edge[Double]] = df.rdd.map(x => Edge(x.getLong(0), x.getLong(1), x.getDouble(2)))

val graph_dist = ShortestPath(Verts, Edges, landmark_ids.map(_.toLong).toSeq)

val graph_verts = graph_dist.mapVertices((x:Long,y:imMap[Long,Double]) => 
                                            TMap(y.toSeq:_*).values.toArray).vertices

// separate landmark and non_landmark ids
val landmark_bc = sc.broadcast(landmark_ids)
val df_graph_dist = spark.createDataFrame(graph_verts ).toDF("id","dist")

val is_lmark = funcs.udf(( x:Long ) => landmark_bc.value.contains(x.toInt))

val lmark_dist = df_graph_dist.filter(is_lmark(funcs.col("id")))
val nlmark_dist = df_graph_dist.filter(!is_lmark(funcs.col("id")))


//use breeze for linear algebra related calculation 

val lmark_dist_local:Array[Double] = TMap(lmark_dist.rdd.map((x:Row) => 
                    (x.getLong(0), x.getSeq(1))).collect().toSeq:_*).
                                values.map(_.toArray.map((i:Double) => -0.5*i*i)).flatten.toArray



val LmarkMat = new BDM(num_landmarks, num_landmarks, lmark_dist_local)

// create centering matix for the local matrix

def create_row( id:Int):Array[Double] = {
    
    var row = (0 to (num_lmarks_bc.value-1)).map(i => (-1.0/num_lmarks_bc.value)).toArray
    row(id) = row(id)+1.0
    return(row)
}

val cMat = new BDM(num_landmarks,num_landmarks, (0 to (num_lmarks_bc.value-1)).
                               toSeq.map(create_row).toArray.flatten)

val temp = cMat * LmarkMat
val cLmarkDist = temp * cMat



val eig_ = eigSym(cLmarkDist)
val eigVals = eig_.eigenvalues
val eigVecs = eig_.eigenvectors(::,0 to (n_dim-1))

                        
val n_dims = 2
val LmarkMat_pcomps = cLmarkDist * eigVecs
val LmarkMat_inv = pinv(LmarkMat_pcomps)*(-0.5)
val mean_vec = mean(LmarkMat(::,*)) 
val mean_vec_bc = sc.broadcast(mean_vec)
val LmarkMat_inv_bc = sc.broadcast(LmarkMat_inv)

//check this carefully whether you need to take column or row mean, breeze
// stores matrix in column based format unlike mnumpy which stores it in row based format

val convert = funcs.udf((array : Seq[Double]) => {mlVs.dense(array.toArray)})
val NLmark_dist_diff = nlmark_dist.withColumn("dist", convert(funcs.col("dist")))

def create_embedding(x:mlV):mlV={

    val diff = new BDM(num_landmarks, 1, (BDV( x.toArray ) - mean_vec_bc.value.t).toArray )
    val embedding = LmarkMat_inv_bc.value * diff
    return(mlVs.dense(embedding.toArray))
}
val udf_embed = funcs.udf((x:mlV) => create_embedding(x) )

val NL_isomap = NLmark_dist_diff.withColumn("diff", udf_embed(funcs.col("dist")))

// combine landmark and non-landmark embeddings

val lmark_embed_arr = LmarkMat_pcomps.t.toArray

var temp1:Array[Tuple2[Long, mlV]] = Array()

val sorted_ids:Seq[Int] = landmark_ids.toSeq.sortWith(_<_)

for(i <- (1 to num_landmarks)){

    temp1 = temp1 ++ Array((sorted_ids(i-1).toLong,mlVs.dense(lmark_embed_arr.slice((i-1)*n_dim, i*n_dim))))

}
val lmark_embed_df = spark.createDataFrame( sc.parallelize(temp1)).toDF("id", "diff") 


val L_isomap_embed =  lmark_embed_df.union(NL_isomap.drop("dist"))


### Visualization of the embedded dataset

![pic](Lisomap.png)

### Pros ans Cons of using Isomaps

Isomap is more powerful than other dimensionality reduction algorithms. It works well on non-linear manifolds 
and gives a closed form solution. But it performs poorly when manifols is not well sampled and contains  holes.
Also, as mentioned earlier neighborhood graph creation is tricky and slightly wrong parameters can lead to bad
results. 