# Calculating graph metrics with GraphX

The purpouse of this notebook is to use the distributed graph processing engine called [GraphX](http://graphstream-project.org) for graph handling.

Test main features:
1. The test was implemented in [Scala 2.11](https://www.scala-lang.org/download/2.11.12.html)
2. The processing is handled by [Spark](https://github.com/apache/spark) a cluster computing framework.
3. The libraries required to run the test are the following:    
    * [Cassandra connector](https://github.com/datastax/spark-cassandra-connector) (2.11 scala build version).
    * [Spark](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11/2.2.1) (2.11 scala build version).
    * [Spark GraphX](http://) (2.11 scala build version).

4. The Spark version used correspond to standalone application which mean that the use of multiple hosts ecosystem to test isn't needed.

# Step 1: Load libraries from Maven
In order to download the required libraries from the Maven repositories we need to use the following instructions (special Jupyter notebook's commands that allow Maven integration). In a traditional development environment we use a POM (Maven) or build.sbt (SBT) file to define the dependencies.

In [1]:
//Import dependencies from Maven
classpath.add("org.apache.spark" % "spark-core_2.11" % "2.1.1")
classpath.add("org.apache.spark" % "spark-sql_2.11" % "2.1.1")
classpath.add("org.apache.spark" % "spark-graphx_2.11" % "2.1.1")
classpath.add("com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3")

Adding 114 artifact(s)
Adding 13 artifact(s)
Adding 11 artifact(s)
Adding 4 artifact(s)




# Step 2: Create a Spark context

Before start any computation we need to create a Spark context.

The following code set up Spark's configuration:

In [2]:
// Set up Spark's configuration
  import org.apache.spark.SparkConf
  val configuration = new SparkConf()
    .setAppName("GraphXTest")
    .setMaster("local[*]")
    .set("spark.executor.memory", "1g")
    .set("spark.testing.memory", "2147480000")// Avoid any memory issues
    .set("spark.cassandra.connection.host", "127.0.0.1")
    .set("spark.cassandra.auth.username", "cassandra")
    .set("spark.cassandra.auth.password", "cassandra")

[32mimport [36morg.apache.spark.SparkConf[0m
[36mconfiguration[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mSparkConf[0m = org.apache.spark.SparkConf@1c6fd88

Once configured, the Spark context can be created.

The following code initialize the Spark context:

In [3]:
// Initialize Spark context from configuration
import org.apache.spark.SparkContext
val sc = new SparkContext(configuration)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/11/13 10:11:35 INFO SparkContext: Running Spark version 2.1.1
18/11/13 10:11:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/11/13 10:11:39 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.1.207 instead (on interface eth0)
18/11/13 10:11:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
18/11/13 10:11:39 INFO SecurityManager: Changing view acls to: jose
18/11/13 10:11:39 INFO SecurityManager: Changing modify acls to: jose
18/11/13 10:11:39 INFO SecurityManager: Changing view acls groups to: 
18/11/13 10:11:39 INFO SecurityManager: Changing modify acls groups to: 
18/11/13 10:11:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(jose); groups with view permissions: Set(); users  with modify per

[32mimport [36morg.apache.spark.SparkContext[0m
[36msc[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mSparkContext[0m = org.apache.spark.SparkContext@964a3e

# Step 3: Retrieve interactions from Cassandra
To populate the graph we retrieve the course interactions from Cassandra.

First we specify interaction's datatype:

In [4]:
// Interaccion datatype
case class Interaccion(idinteraccion: String, atributos: Map[String, String])

defined [32mclass [36mInteraccion[0m

The next lines of code read interactions from Cassandra and save into RDD (Resilient Distributed Dataset):

In [5]:
// Read interacciones from Cassandra database
import com.datastax.spark.connector._ //Loads implicit functions
val courseId = "1"
val interactions = sc.cassandraTable[Interaccion]("diia", "interacciones")
    .where("atributos['id_curso_origen']=? AND atributos['nodo_destino']=''", courseId)
    .collect()

[32mimport [36mcom.datastax.spark.connector._[0m
[36mcourseId[0m: [32mString[0m = [32m"1"[0m
[36minteractions[0m: [32mArray[0m[[32mInteraccion[0m] = [33mArray[0m(
  [33mInteraccion[0m(
    [32m"040be443-3492-4757-8710-beef16401100"[0m,
    [33mMap[0m(
      [32m"tipo_interaccion"[0m -> [32m"vis"[0m,
      [32m"timestamp"[0m -> [32m"2010-10-14 05:04:51.482"[0m,
      [32m"tipo_contenido"[0m -> [32m"des"[0m,
      [32m"id_curso_origen"[0m -> [32m"1"[0m,
      [32m"nodo_destino"[0m -> [32m""[0m,
      [32m"contenido"[0m -> [32m""[0m,
      [32m"plataforma"[0m -> [32m"p"[0m,
      [32m"id_curso_destino"[0m -> [32m"1"[0m,
      [32m"sentimiento"[0m -> [32m""[0m,
      [32m"nodo_origen"[0m -> [32m"17"[0m
    )
  ),
  [33mInteraccion[0m(
    [32m"db1c65e6-7436-4398-8404-a6d60facccd2"[0m,
    [33mMap[0m(
      [32m"tipo_interaccion"[0m -> [32m"pub"[0m,
[33m...[0m

# Step 4: Build a graph from interactions

The next code iterate over each interaction previously fetched to fill the graph:

In [6]:
import org.apache.spark.graphx
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD

val edges: RDD[graphx.Edge[Interaccion]] = sc.parallelize(interactions.map(
interaccion => {
  graphx.Edge(interaccion.atributos.get("nodo_origen").get.toLong, interaccion.atributos.get("nodo_destino").get match {
    case "" => 0
    case s:String => s.toLong
  }, interaccion)

}
).toSeq)

// Set edges into new graph
val graphXGraph = Graph.fromEdges(edges,0.0)
print("Graph's node count:" + graphXGraph.vertices.count())

Graph's node count:24

[32mimport [36morg.apache.spark.graphx[0m
[32mimport [36morg.apache.spark.graphx.Graph[0m
[32mimport [36morg.apache.spark.rdd.RDD[0m
[36medges[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mrdd[0m.[32mRDD[0m[[32morg[0m.[32mapache[0m.[32mspark[0m.[32mgraphx[0m.[32mEdge[0m[[32mInteraccion[0m]] = ParallelCollectionRDD[2] at parallelize at Main.scala:39
[36mgraphXGraph[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mgraphx[0m.[32mGraph[0m[[32mDouble[0m, [32mInteraccion[0m] = org.apache.spark.graphx.impl.GraphImpl@118ad9f

# Step 5: Calculate graphx's algorithms

In this section we will calculate the graphx's build-in algorithms.

In [7]:
import org.apache.spark.graphx._

// Compute Pagerank algorithm
val pr = graphXGraph.pageRank(tol = 1.0e-5, resetProb = 0.85).vertices
val computedGraphx = ("graph id", graphXGraph.joinVertices(pr)(
    (id: VertexId, default: Double, pr: Double) => pr)
)

// Show graph information
println(s"Course ${computedGraphx}:\n Node count: ${computedGraphx._2.numVertices} \n Triangle count ${computedGraphx._2.triangleCount()}.")

// Show PageRank for each node
computedGraphx._2.vertices.sortBy(_._2.toDouble, ascending = false).take(10).foreach(
    vertex => println(s"-->Nodo ${vertex._1} -> Pagerank: ${vertex._2}")
)

Course (graph id,org.apache.spark.graphx.impl.GraphImpl@9e9d28):
 Node count: 24 
 Triangle count org.apache.spark.graphx.impl.GraphImpl@e22aea.
-->Nodo 0 -> Pagerank: 4.449999077359371
-->Nodo 13 -> Pagerank: 0.8500000000000001
-->Nodo 19 -> Pagerank: 0.8500000000000001
-->Nodo 15 -> Pagerank: 0.8500000000000001
-->Nodo 4 -> Pagerank: 0.8500000000000001
-->Nodo 21 -> Pagerank: 0.8500000000000001
-->Nodo 16 -> Pagerank: 0.8500000000000001
-->Nodo 22 -> Pagerank: 0.8500000000000001
-->Nodo 25 -> Pagerank: 0.8500000000000001
-->Nodo 11 -> Pagerank: 0.8500000000000001


[32mimport [36morg.apache.spark.graphx._[0m
[36mpr[0m: [32mgraphx[0m.[32mVertexRDD[0m[[32mDouble[0m] = VertexRDDImpl[185] at RDD at VertexRDD.scala:57
[36mcomputedGraphx[0m: ([32mString[0m, [32mGraph[0m[[32mDouble[0m, [32mInteraccion[0m]) = [33m[0m([32m"graph id"[0m, org.apache.spark.graphx.impl.GraphImpl@9e9d28)

Conclusions:

* GraphX provides few algorithms for graph handling.
* The graph building with GraphX is trivial.
* GraphX facilitates the use of Spark's RDD.
* The Datastax's driver enables Cassandra integration with Spark.

