In [1]:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

Intitializing Scala interpreter ...

Spark Web UI available at http://LAPTOP-0NFRQ5HD:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1617962733014)
SparkSession available as 'spark'


import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


In [51]:
val graph = GraphLoader.edgeListFile(sc,"data/soc-karate.mtx")
                .mapEdges(e => scala.util.Random.nextFloat)
                .mapVertices[String]((vid,i) =>  if(scala.util.Random.nextFloat <0.1) "infected" else "susceptible")
graph.vertices.collect

graph: org.apache.spark.graphx.Graph[String,Float] = org.apache.spark.graphx.impl.GraphImpl@5a6ef783
res15: Array[(org.apache.spark.graphx.VertexId, String)] = Array((34,susceptible), (4,susceptible), (16,infected), (22,susceptible), (28,infected), (30,infected), (14,susceptible), (32,susceptible), (24,susceptible), (6,infected), (8,susceptible), (12,susceptible), (18,susceptible), (20,infected), (26,susceptible), (10,susceptible), (2,susceptible), (13,susceptible), (19,susceptible), (15,susceptible), (21,susceptible), (25,infected), (29,susceptible), (11,infected), (27,infected), (33,infected), (23,susceptible), (1,susceptible), (17,susceptible), (3,susceptible), (7,infected), (9,susceptible), (31,susceptible), (5,susceptible))


In [33]:
def independant_cascade(graph:Graph[String,Float]):Graph[String,Float]={
    def mergeMsg (m1:String, m2:String):String = {
        if(m1=="become_inactive" || m2 == "become_inactive"){return "become_inactive"}
        if (m1 == "become_infected" || m2 == "become_infected"){return "become_infected"}
        return "nothing"
    }

    def vprog(VertexId:VertexId, VD:String, A:String): String ={
        if (VD == "susceptible" && A == "become_infected"){ return "infected"}
        if(VD=="infected" && A == "become_inactive"){return "inactive"}
        return VD
    }

    def sendMsg(triplet:EdgeTriplet[String, Float]): Iterator[(VertexId, String)]={
        if (triplet.srcAttr == "infected"){
            if (scala.util.Random.nextFloat < triplet.attr){
                return Iterator((triplet.dstId,"become_infected"),(triplet.srcId,"become_inactive"))
            }
        return Iterator((triplet.srcId,"become_inactive"))
        }
        return Iterator()
    }
    
    return graph.pregel("nothing",Int.MaxValue,EdgeDirection.Out)(vprog,sendMsg,mergeMsg)
    .mapVertices((vid,vdata) => if(vdata == "infected") "inactive" else vdata)
}

independant_cascade: (graph: org.apache.spark.graphx.Graph[String,Float])org.apache.spark.graphx.Graph[String,Float]


In [34]:
independant_cascade(graph).vertices.collect

res13: Array[(org.apache.spark.graphx.VertexId, String)] = Array((34,inactive), (4,susceptible), (16,susceptible), (22,susceptible), (28,susceptible), (30,susceptible), (14,inactive), (32,inactive), (24,inactive), (6,susceptible), (8,susceptible), (12,susceptible), (18,susceptible), (20,inactive), (26,inactive), (10,inactive), (2,inactive), (13,susceptible), (19,inactive), (15,susceptible), (21,inactive), (25,inactive), (29,inactive), (11,susceptible), (27,inactive), (33,inactive), (23,susceptible), (1,inactive), (17,susceptible), (3,inactive), (7,susceptible), (9,inactive), (31,susceptible), (5,susceptible))


In [48]:


type VD = (String,Float,Float)
type ED = Float
type Message = (String,Float)

def linear_threshhold(graph:Graph[VD,ED]):Graph[VD,ED]={
    def mergeMsg (m1:Message, m2:Message):Message = {
        if(m1._1=="become_inactive" || m2._1 == "become_inactive"){return ("become_inactive",0f)}
        if (m1._1 == "become_infected" && m2._1 == "become_infected"){return ("become_infected",m1._2+m2._2)}
        if (m1._1 == "become_infected" ){return m1}
        if (m2._1 == "become_infected" ){return m2}
        return ("nothing",0f)
    }

    def vprog(VertexId:VertexId, vdata:VD, m:Message): VD ={
        if (m._1 == "become_inactive"){
            return ("inactive", vdata._2, vdata._3)
        }
        if (m._1 == "become_infected"){
            val state = vdata._1
            val new_infection_rate = vdata._2 + m._2
            val threshold = vdata._3
            if (state == "susceptible" && new_infection_rate > threshold){
                return ("infected",new_infection_rate,threshold)
            }
            return (state,new_infection_rate,threshold)
        }
        return vdata
    }

    def sendMsg(triplet:EdgeTriplet[VD, ED]): Iterator[(VertexId, Message)]={
        if (triplet.srcAttr._1 == "infected"){
            return Iterator((triplet.dstId,("become_infected",triplet.attr)),(triplet.srcId,("become_inactive",0f)))

        return Iterator((triplet.srcId,("become_inactive",0f)))
        }
        return Iterator()
    }
    
    return graph.pregel(("nothing",0f),Int.MaxValue,EdgeDirection.Out)(vprog,sendMsg,mergeMsg)
}

defined type alias VD
defined type alias ED
defined type alias Message
linear_threshhold: (graph: org.apache.spark.graphx.Graph[VD,ED])org.apache.spark.graphx.Graph[VD,ED]


In [89]:
def normalize_graph_weights[VD](graph:Graph[VD,Float])(implicit m: ClassManifest[VD]):Graph[VD,Float] = {
    val weight_sums_per_dst = graph.triplets.map(t => (t.dstId,t.attr))
                            .reduceByKey(( _:Float) + (_:Float))
    val newEdges = graph.edges.map(e => (e.dstId,e))
         .join(weight_sums_per_dst).map{ case ((dstId,(edge,weightSum)))=>Edge(edge.srcId,edge.dstId,edge.attr/weightSum)}
    return Graph(graph.vertices,newEdges)
}

val graph = GraphLoader.edgeListFile(sc,"data/soc-karate.mtx")
                .mapEdges(e => scala.util.Random.nextFloat)
                .mapVertices[VD]((vid,i) =>  if(scala.util.Random.nextFloat <0.1) ("infected",1f, scala.util.Random.nextFloat) else ("susceptible",0f, scala.util.Random.nextFloat))
graph.edges.collect

normalize_graph_weights: [VD](graph: org.apache.spark.graphx.Graph[VD,Float])(implicit m: ClassManifest[VD])org.apache.spark.graphx.Graph[VD,Float]
graph: org.apache.spark.graphx.Graph[VD,Float] = org.apache.spark.graphx.impl.GraphImpl@663b6e40
res42: Array[org.apache.spark.graphx.Edge[Float]] = Array(Edge(2,1,0.09596062), Edge(3,1,0.50865424), Edge(3,2,0.31126374), Edge(4,1,0.9875025), Edge(4,2,0.97877854), Edge(4,3,0.9131947), Edge(5,1,0.66562176), Edge(6,1,0.0850693), Edge(7,1,0.67088354), Edge(7,5,0.667345), Edge(7,6,0.7835842), Edge(8,1,0.3297881), Edge(8,2,0.8641607), Edge(8,3,0.8721171), Edge(8,4,0.42827487), Edge(9,1,0.13745981), Edge(9,3,0.8750417), Edge(10,3,0.6910226), Edge(11,1,0.19506484), Edge(11,5,0.62957555), Edge(11,6,0.3148815), Edge(12,1,0.103203), Edge(13,1,0.25128...


In [102]:
normalize_graph_weights(graph).edges

res55: org.apache.spark.graphx.EdgeRDD[Float] = EdgeRDDImpl[778] at RDD at EdgeRDD.scala:41


In [55]:
linear_threshhold(graph).vertices.filter(vdata => vdata._2._2 >0).collect

res19: Array[(org.apache.spark.graphx.VertexId, VD)] = Array((20,(inactive,1.0,0.30429024)), (2,(inactive,1.0555973,0.39165944)), (1,(infected,2.5511122,0.7237318)), (9,(susceptible,0.23637855,0.23980576)), (31,(inactive,1.0,0.49187386)), (5,(inactive,1.0,0.029999077)))
