# Préparation dataset

``
mkdir -p /tmp/3774877/dataset
cp /Infos/bd/spark/dataset/facebook/*csv /tmp/3774877/dataset/
``

* `facebook_users_prop.csv` : `utilisateur, prénom, nom, âge`
* `facebook_edges_prop.csv` : `source, destination, type_relation, nombre_messages_échangés`

In [ ]:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val dataset = "/tmp/3774877/dataset/"

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
dataset: String = /tmp/3774877/dataset/


# Exercice 1
Construction du graphe dirigé

In [ ]:
val users_csv = sc.textFile(dataset + "facebook_users_prop.csv").map(_.split(','))
users_csv.take(4)
val edges_csv = sc.textFile(dataset + "facebook_edges_prop.csv").map(_.split(','))
edges_csv.take(4)


users_csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:73
edges_csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:75
res4: Array[Array[String]] = Array(Array(0, 1, acquaintance, 2), Array(0, 2, colleague, 90), Array(0, 3, colleague, 45), Array(0, 4, colleague, 8))


In [ ]:
val users: RDD[(VertexId, (String, String, Int))] = 
  users_csv.map(array => (array(0).toLong, (array(1), array(2), array(3).toInt)))

val edges: RDD[Edge[(String, Int)]] = 
  edges_csv.map(array => (Edge(array(0).toLong, array(1).toLong, (array(2), array(3).toInt))))

val graph = Graph.apply(users, edges)

users: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, String, Int))] = MapPartitionsRDD[6] at map at <console>:77
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[(String, Int)]] = MapPartitionsRDD[7] at map at <console>:80
graph: org.apache.spark.graphx.Graph[(String, String, Int),(String, Int)] = org.apache.spark.graphx.impl.GraphImpl@6b3dc5fc


# Exercice 2

In [ ]:
graph.vertices
    .filter{ case(vId, (firstname, lastname, age)) => firstname == "Kendall"}
    .take(5)

res7: Array[(org.apache.spark.graphx.VertexId, (String, String, Int))] = Array((2058,(Kendall,Brewbaker,49)))


# Exercice 3

In [ ]:
graph.triplets
.filter{triple => triple.srcAttr._1 == "Kendall" | triple.dstAttr._1 == "Kendall"}
.map {triple => if (triple.dstAttr._1 == "Kendall") triple.srcAttr._1 else triple.dstAttr._1}
.count()

res9: Long = 76


# Exercice 4

In [ ]:
graph.triplets
  .filter{triple => triple.dstAttr._1 == "Kendall" &
          triple.attr._1 == "colleague" &
          triple.attr._2 > 70}
  .map(triple => triple.srcId)
  .take(5)

res29: Array[org.apache.spark.graphx.VertexId] = Array(1941, 1966, 1983)


# Exercice 5

In [ ]:
graph.triplets
    .filter(triple => triple.attr._1 == "friend" &
                      triple.attr._2 > 80)
    .map(triple => (triple.srcId, 1))
    .reduceByKey(_+_)
    .sortBy(tuple => tuple._2, false)
    .take(5)

res53: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((107,59), (1912,47), (1684,43), (483,17), (3437,17))


# Exercice 6

## Avec `graph.triplets`

In [ ]:
graph.triplets
.filter{triple => triple.srcAttr._1 == "Kendall" | triple.dstAttr._1 == "Kendall"}
.map {triple => if (triple.dstAttr._1 == "Kendall") (triple.srcAttr) else triple.dstAttr}
.count()

res55: Long = 76


## Avec `collectNeighbors`

In [ ]:
val kendall_id = graph.vertices
    .filter{ case(vId, (firstname, lastname, age)) => firstname == "Kendall"}
    .take(1)(0)._1

println("Kendall ID: " + kendall_id)

graph.collectNeighbors(EdgeDirection.Either)
    .filter(tuple => tuple._1 == kendall_id)
    .take(1)(0)._2
    .length
//.foreach{ case(vId, (f,l,a)) => println(f,l,a)}

Kendall ID: 2058
kendall_id: org.apache.spark.graphx.VertexId = 2058
res87: Int = 76


## Avec `aggregateMessage`

In [ ]:
// Correction :
graph.aggregateMessages[Array[String]] (
triplet => {
if(triplet.dstAttr._1 == "Kendall")
triplet.sendToDst(Array(triplet.srcAttr._1))
if(triplet.srcAttr._1 == "Kendall")
triplet.sendToSrc(Array(triplet.dstAttr._1))
},
(a,b) => a ++ b //a+" "+b
).values.collect.foreach(t=>println(t.size)) 

76


# Exercice 7

In [ ]:
// Méthode naïve :

val inDegrees = graph.inDegrees
println("inDegrees:")
inDegrees.take(5).foreach(println)

val minInDegree = inDegrees
    .values
    .reduce((a,b) => Math.min(a,b))

val minNodes = inDegrees.filter(t => t._2 == minInDegree).count()

inDegrees:
(3558,4)
(1084,5)
(1410,10)
(3456,7)
(3702,21)
inDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[197] at RDD at VertexRDD.scala:57
minInDegree: Int = 1
minNodes: Long = 323


In [ ]:
inDegrees.filter(n => n._2 == minInDegree)
    .innerJoin(graph.vertices){case (id, d, u) => u._1}
    .take(5)

res139: Array[(org.apache.spark.graphx.VertexId, String)] = Array((956,Geri), (1466,Millard), (160,Adelle), (3650,Yazmin), (2664,Chantal))


# Exercice 8

In [ ]:
graph.outerJoinVertices(inDegrees){(vId, data, optDeg) => optDeg.getOrElse(0)}
    .vertices
.filter{ case(vId, inDeg) => inDeg == 0}
.take(5)

res129: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((686,0), (0,0))
