In [32]:
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.graphx.{Graph, _}
import org.apache.spark.rdd.RDD
import scala.math.log
import org.apache.spark.SparkConf
    
//Create Spark session
val sparkMaster = "spark://master:7077"
var conf = new SparkConf().setAppName("GraphX TextRank").setMaster(sparkMaster)
val spark = SparkSession.builder.getOrCreate()

// define sentence similarity score
def sentenceSimilarity(sentence1: String, sentence2: String): Double = {
    val words1 = sentence1.split("[\\s,.;:?!]+").map(_.toLowerCase).toSet
    val words2 = sentence2.split("[\\s,.;:?!]+").map(_.toLowerCase).toSet
    val commonWords = words1.intersect(words2).size
    commonWords.toDouble / (log(words1.size + 1) + log(words2.size + 1) + 1)
}

// build sentence graph
val path = "hdfs://master/user/custom/graphx-mpi/ANN.txt"
val input_sentences = spark.sparkContext.textFile(path).flatMap(line => line.split('.'))
val vertices = input_sentences.zipWithIndex.map {
  case (sentence, index) => (index, sentence)
}
val pairs = vertices.cartesian(vertices)
  .filter { case ((i1, _), (i2, _)) => i1 != i2 }
  .map { case ((i1, s1), (i2, s2)) => (i1, i2, sentenceSimilarity(s1, s2)) }
  .map { case (i1, i2, sim) => Edge(i1, i2, sim) }
val graph = Graph(vertices, pairs)

// normalize weighted graph
val outgoingEdgesSum = graph.aggregateMessages[Double](
  ctx => ctx.sendToSrc(ctx.attr),
  (a, b) => a + b,
  TripletFields.EdgeOnly
).collect.toMap
val normEdges = graph.edges.map(e => {
  val srcSum = outgoingEdgesSum.getOrElse(e.srcId,1.0)
  val newWeight = e.attr / srcSum
  Edge(e.srcId, e.dstId, newWeight)
})
val initialGuess = 1.0
val preparedGraph = Graph(vertices, normEdges)
  .mapVertices((_, attr) => initialGuess)

// Define Pregel's UDFs for PageRank
val d = 0.85
val numVertices = preparedGraph.numVertices
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
  (1 - d) / numVertices + d * msgSum

def sendMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
  Iterator((edge.dstId, edge.srcAttr * edge.attr))

def messageCombiner(a: Double, b: Double): Double = a + b

// Execute Pregel for a fixed number of iterations.
val rankGraph = Pregel(graph = preparedGraph, initialMsg = 0.0, maxIterations = 50)(
  vprog = vertexProgram, sendMsg = sendMessage, mergeMsg = messageCombiner)

/* extract summary: the summary is composed of the top 3 sentences per PageRank ordered according to how they appear in the original text.
 */
val summary = graph.vertices.join(rankGraph.vertices)
  .map { case (id, (sentence, rank)) => (id, sentence, rank) }
  .top(3)(Ordering.by(_._3)).sortBy(_._1)
  .map { case (_, sentence, _) => sentence }
  .mkString(".\n") + "."

print("SUMMARY:\n" + summary)

SUMMARY:
An ANN is based on a collection of connected units or nodes called artificial neurons, which loosely model the neurons in a biological brain.
The signal at a connection is a real number, and the output of each neuron is computed by some non-linear function of the sum of its inputs.
The training of a neural network from a given example is usually conducted by determining the difference between the processed output of the network (often a prediction) and a target output.

import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.graphx.{Graph, _}
import org.apache.spark.rdd.RDD
import scala.math.log
import org.apache.spark.SparkConf
sparkMaster: String = spark://master:7077
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@6ca234f8
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7d0981a4
sentenceSimilarity: (sentence1: String, sentence2: String)Double
path: String = hdfs://master/user/custom/graphx-mpi/ANN.txt
input_sentences: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4478] at flatMap at <console>:128
vertices: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[4480] at map at <console>:129
pairs: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = M...
