## Spark graph

Dans les tutoriels précédents nous avons toujours travaillé avec des dataset ou table.

Il existe un écosystème spark pour traiter de gros graphes via l'api **graphx** historiquement que disponible en scala il existe **graphFrames** en python.

Essayons de jouer avec cette api.


### Création du context Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import os

conf = SparkConf()

#url par défaut d'une api kubernetes accédé depuis l'intérieur du cluster (ici le notebook tourne lui même dans kubernetes)
conf.setMaster("k8s://https://kubernetes.default.svc:443")

#image des executors spark: pour des raisons de simplicité on réutilise l'image du notebook
conf.set("spark.kubernetes.container.image", "inseefrlab/jupyter-datascience:master")

# Nom du compte de service pour contacter l'api kubernetes : attention le package du datalab crée lui même cette variable d'enviromment.
# Dans un pod du cluster kubernetes il faut lire le fichier /var/run/secrets/kubernetes.io/serviceaccount/token
# Néanmoins ce paramètre est inutile car le contexte kubernetes local de ce notebook est préconfiguré
# conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) 

# Nom du namespace kubernetes
conf.set("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE'])

# Nombre d'executeur spark, il se lancera autant de pods kubernetes que le nombre indiqué.
conf.set("spark.executor.instances", "10")

# Mémoire alloué à la JVM
# Attention par défaut le pod kubernetes aura une limite supérieur qui dépend d'autres paramètres.
# On manipulera plus bas pour vérifier la limite de mémoire totale d'un executeur
conf.set("spark.executor.memory", "4g")

conf.set("spark.kubernetes.driver.pod.name", os.environ['KUBERNETES_POD_NAME'])

# Paramètres d'enregistrement des logs spark d'application
# Attention ce paramètres nécessitent la création d'un dossier spark-history. Spark ne le fait pas lui même pour des raisons obscurs
# import s3fs
# endpoint = "https://"+os.environ['AWS_S3_ENDPOINT']
# fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': endpoint})
# fs.touch('s3://tm8enk/spark-history/.keep')
# sparkconf.set("spark.eventLog.enabled","true")
# sparkconf.set("spark.eventLog.dir","s3a://tm8enk/spark-history")
#ici pour gérer le dateTimeFormatter dépendant de la verion de java...
conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
conf.set("spark.default.parallelism",10)
conf.set("spark.sql.shuffle.partitions",10)
conf.set("spark.jars.packages","graphframes:graphframes:0.8.1-spark3.0-s_2.12")


<pyspark.conf.SparkConf at 0x7fa270555850>

On note que :
* on a pris 10 executeurs et on surcharge spark pour, lors de shuffle ou repartition, que son niveau de parallelisme soit 10 plutot que 200
* on importe un jar au démarrage il va aller le chercher sur maven central, ce jar sera appellé par la librairie python graphFrames à travers Py4j.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("graph").config(conf = conf).getOrCreate()

In [3]:
%pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose
  Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[K     |████████████████████████████████| 154 kB 5.2 MB/s eta 0:00:01
Installing collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7
Note: you may need to restart the kernel to use updated packages.


### Constitution d'un graphe

Un graphe peut être vu comme 2 datasets:
* Un **dataset de noeud** avec 2 colonnes : un identifiant et un attribut.
* Un **dataset d'arcs** reliant 2 noeuds : avec un identifiant de noeud source, un destination et un ou des attributs sur cet arc
 
A partir de ces 2 datasets que spark peut traiter comme des rdd à travers le cluster, on peut travailler sur un graphe, voici un exemple simple :

In [4]:
from graphframes import *

vertices = [(1,"A"), (2,"B"), (3, "C")]
edges = [(1,2,"love"), (2,1,"hate"), (2,3,"follow")]

v = spark.createDataFrame(vertices, ["id", "name"])
e = spark.createDataFrame(edges, ["src", "dst", "action"])

premierGraphe = GraphFrame(v, e)

In [5]:
premierGraphe.edges.show()

+---+---+------+
|src|dst|action|
+---+---+------+
|  1|  2|  love|
|  2|  1|  hate|
|  2|  3|follow|
+---+---+------+



### Faisons un graphe sur les données twitter concernant l'insee

Attention les données sont alimentées et à la réexecution du notebook les éléments suivants auront peut-être évolués, on peut filtrer sur la date des tweets si nécessaire avant le 16 avril pour retrouver le déroulé ci-dessous

In [6]:
#on importe le schema
import pickle
schema = pickle.load( open( "./7-streaming/schema.p", "rb" ) )

In [7]:
#on lit les donnnées
df = spark.read.format("json")  \
    .schema(schema) \
    .load("s3a://projet-spark-lab/diffusion/tweets/input") \

Essayons de créer le graph des tweets mentionnant l'insee avec comme noeud les user de twitter et comme arc, 2 utilisateurs sont reliés si l'émetteur du tweet a metionné un autre utilisateur dans le tweet

Par exemple si alice tweet "@bob tu vas vu le dernier tweet de l'Insee" le noeud alice sera relié -> à bob.

Nous avons de la chance les données de l'api twitter prémache le travail.

Regardons pour commencer dans le tweet de l'api twitter les données suivantes:
* l'identififiant du tweet
* l'identifiant de l'utilisateur qui tweete
* son nombre de followers pour savoir s'il est influent
* la liste des hashtags présents dans le tweet (déjà donné par twitter)
* une indicatrice pour savoir s'il y a des hashtag 
* le texte du tweet
* les user mentionnés dans le tweet sous forme de tableau (identifiant, name, screen_name)

In [8]:
from pyspark.sql.functions import col,explode,size,first
#.select("id","user.id","user.name","user.followers_count","entities.hashtags","text")
df.select(col("id").alias("id"),
         col("user.id").alias("user_id"),
         col("user.name").alias("name"),
         col("user.followers_count"),
         col("entities.hashtags.text").alias("hashtags"),
         size("entities.hashtags").alias("has_hashtag"),
         col("text"),
         col("entities.user_mentions")) \
 .filter(col("has_hashtag")>0) \
 .head(1) \

[Row(id=1380418192426860546, user_id=219397176, name='Isabelle Hinden', followers_count=1633, hashtags=['immigrés'], has_hashtag=1, text='RT @InseeFr: En 2017, 44 % de la hausse de la population provient des #immigrés, soit 139\xa0000 personnes. Depuis 2006, cette contribution à…', user_mentions=[Row(id=217473382, id_str='217473382', indices=[3, 11], name='Insee', screen_name='InseeFr')])]

Pour créer le dataset des noeuds:
* on récupére les id et noms des émetteurs de tweets

In [9]:
#vertices
tweeters=df.select(col("user.id").alias("id"), col("user.name").alias("name"))

* On récupère les id et noms des user mentionnés (même s'ils n'ont pas forcément twittés)

In [10]:
users_mentionned=df.select(explode(col("entities.user_mentions"))).select(col("col.id").alias("id"),col("col.screen_name").alias("name")).distinct()

On fait l'union des 2 dataframe et on va ne retenir qu'un nom (le nom dans la mention n'est pas toujours exact au nom du compte apparemment ca évitera les doublons)

In [11]:
vertices=tweeters.union(users_mentionned)

In [12]:
from pyspark.sql.functions import udf, collect_list
# on définit un udf qui prend le premier élément
udf_first = udf(lambda x: x[0])
# on groupe l'union des deux dataset par id d'utilisateur et on collect 
#sous la forme d'une liste leurs noms qui peuvent différer entre les mentions et le compte
# on applique l'udf pour ne retenir que le nom en tête

final_vertices=vertices.groupby("id").agg(udf_first(collect_list("name")).alias("name"))

In [13]:
final_vertices.show(10, truncate=False)

+--------+--------------------------------------+
|id      |name                                  |
+--------+--------------------------------------+
|4898091 |FinancialTimes                        |
|7540072 |neufmetres                            |
|10575072|Webzine de la dracenie et du Var Est ن|
|16683666|spectator                             |
|17385313|Julien_W                              |
|17437184|alphoenix                             |
|17464719|PascalR                               |
|17779850|Jennifer Ogor                         |
|18976358|SylvainePascual                       |
|19713578|chris dabin                           |
+--------+--------------------------------------+
only showing top 10 rows



Faisons maintenant le dataset des arcs :
* on prend l'identifiant de l'émetteur
* on prend duplique/explose la ligne pour avoir une ligne par mention dans le tweet
* on récupère les hashtags
* on récupère l'identifiant du tweet

on group by user_id et mention.id (user mentionné) et on agggrège pour avoir :
* le nombre de tweets
* la liste des hashtags vus entre ces 2 users dans le sens user_id -> mention.id

In [14]:
from pyspark.sql.functions import count,collect_list,flatten
edges=df.select(col("user.id").alias("user_id"), \
               explode(col("entities.user_mentions")).alias("mention"),
               col("entities.hashtags.text").alias("hashtags"),
               "id") \
 .groupby(col("user_id").alias("src"), \
          col("mention.id").alias("dst")) \
 .agg(count("id").alias("nb"),
      flatten(collect_list("hashtags")).alias("hashtags"),
      collect_list("id").alias("id"))

In [15]:
edges.show()

+--------+------------------+---+--------------------+--------------------+
|     src|               dst| nb|            hashtags|                  id|
+--------+------------------+---+--------------------+--------------------+
|15217683|793749212118867969|  1|                  []|[1380995523168120...|
|15872615|         121468512|  1|                  []|[1380245558141591...|
|16600674|         217473382|  6|                  []|[1380505509611069...|
|17193568|         217473382|  1|          [immigrés]|[1380513459482337...|
|17385313|         217473382|  6|                  []|[1380247279894986...|
|17464719|        2515649016|  1|                  []|[1380436148057608...|
|18629937|         121468512|  1|                  []|[1380492059690295...|
|18969131|          20947741|  1|                  []|[1380468019009355...|
|19377400|         112754792|  1|                  []|[1381174506446848...|
|20064944|        1460135654|  1|                  []|[1380449540738850...|
|20181221|  

### Créer le graphe

Ca y est on a nos 2 structures:
* **identifiant**, name ici seul **identifiant** est nécessaire name vient donner plus d'infos on pourrait ajouter d'autres colonnes
* **identifianttwittant(src), identifiantmentionné(dst)**, nombre de fois, liste de hashtags, liste des ids de tweets ici seul les 2 premiers identifiants sont nécessaires le reste donne du contexte qu'on pourrait utiliser sur les arcs.


In [16]:
from graphframes import *
g = GraphFrame(final_vertices, edges)

Maintenant qu'on a ce graphe on va le mettre en cache car nous allons le manipuler plusieurs fois

In [17]:
g.cache()

GraphFrame(v:[id: bigint, name: string], e:[src: bigint, dst: bigint ... 3 more fields])

In [18]:
print("le graphe a "+ str(g.vertices.count()) +" noeuds et "+ str(g.edges.count()) + " arcs")

le graphe a 8407 noeuds et 11423 arcs


### Recherche des utilisateurs populaires

On peut via l'api appliquer différents algorithmes, il est d'usage dans les graphes de traiter des notions suivantes :
* la notion de degré (nombre de connexion d'un noeud)
* la notion de triangle (triplet de noeud totalement connecté)
* la notion de chemin entre noeud

L'algorithme pageRank est un algorithme qui est connu pour etre utilisé dans les moteurs de recherche, il peut etre executé sur un graphe pour juger de la "popularité" d'un noeud.

On s'attend à ce que le comte de l'insee soit pas mal placé.


In [19]:
pagerank = g.pageRank(resetProbability=0.15,tol=0.01)

In [20]:
from pyspark.sql.functions import desc
pagerank.vertices.sort(desc("pagerank")).show(truncate=False)

+-------------------+--------------------------------------------------+------------------+
|id                 |name                                              |pagerank          |
+-------------------+--------------------------------------------------+------------------+
|103918784          |David Lisnard                                     |917.3584687834675 |
|217473382          |Insee                                             |600.8061601474191 |
|983334079981654016 |Docteur Peter EL BAZE                             |149.12888199110637|
|945473418          |Docteur Laurent Alexandre #JeSuisDoublementVacciné|147.51973826592192|
|3373762791         |GWGoldnadel                                       |138.67006416773918|
|793749212118867969 |Raphael Pradeau                                   |120.89617748694437|
|1243542086860918785|FrontPopOff                                       |115.42839094804633|
|382736141          |Enedis                                            |112.9967

Arf seulement 2ème sur la période de temps David Lisnard le maire de Cannes a bien volé la vedette 

In [21]:
pagerank.edges.where((col("src")=="103918784") ).show(truncate=False)

+---------+---------+---+--------+---------------------+------+
|src      |dst      |nb |hashtags|id                   |weight|
+---------+---------+---+--------+---------------------+------+
|103918784|103918784|1  |[]      |[1380509496284430336]|1.0   |
+---------+---------+---+--------+---------------------+------+



In [22]:
pagerank.edges.where((col("dst")=="103918784") ).count()

510

David lisnard n'a tweeté qu'une fois mais il s'est fait mentionné 510 fois autour de ce tweet.

https://twitter.com/davidlisnard/status/1380509496284430336

On peut regarder les utilisateurs ayant le plus été mentionnés

In [23]:
g.inDegrees.join(g.vertices,"id").orderBy(desc("inDegree")).show(10,False)

+------------------+--------+--------------------------------------------------+
|id                |inDegree|name                                              |
+------------------+--------+--------------------------------------------------+
|217473382         |1178    |Insee                                             |
|945473418         |582     |Docteur Laurent Alexandre #JeSuisDoublementVacciné|
|3373762791        |562     |GWGoldnadel                                       |
|103918784         |510     |David Lisnard                                     |
|983334079981654016|410     |Docteur Peter EL BAZE                             |
|793749212118867969|391     |Raphael Pradeau                                   |
|472412809         |198     |f_philippot                                       |
|978184280         |192     |Damien Rieu                                       |
|217749896         |182     |MLP_officiel                                      |
|87212906          |167     

ou qui ont mentionné le plus

In [24]:
g.outDegrees.join(g.vertices,"id").orderBy(desc("outDegree")).show(10,False)

+-------------------+---------+------------------------------------------------+
|id                 |outDegree|name                                            |
+-------------------+---------+------------------------------------------------+
|702991804661112832 |22       |Ari Kouts                                       |
|1244716894671732738|21       |Bonsens Claire                                  |
|1359816991033458689|20       |Cris Finland                                    |
|1580838720         |20       |Augusta Crochet 💙LR🇫🇷🇦🇲                    |
|1376154610998640650|19       |Nasty Estelle 🇨🇵❤️                            |
|1180210966711156736|17       |29quovadis                                      |
|1222491562824949763|16       |Jacques                                         |
|1284473275184226304|16       |Antoine Sulo                                    |
|1378687476697550852|16       |BouleBille                                      |
|2464135300         |16       |Dani

L'api propose d'autres algorithmes permettant:
* de clusteriser le graphe (connected ou strong connected components)
* de rechercher des patterns ou chemin dans le graphe ce qui pourrait etre intéressant pour voir par exemple une chaine de retweet.


Enfin on peut nous même créer nos propres algorithmes pour cela l'api propose une méthode aggregate messages permettant d'envoyer un message de noeud et noeud et définir l'aggregation à retenir

Il est aussi possible d'utiliser d3.js dans une celle ipython et d'afficher le graphe en javascript todo un exemple ici
https://bl.ocks.org/heybignick/3faf257bbbbc7743bb72310d03b86ee8

In [25]:
%%javascript
    var script = document.createElement('script');
    script.type = 'text/javascript';
    script.src = '//d3js.org/d3.v6.min.js';
    document.head.appendChild(script);
    console.log(window.d3)

<IPython.core.display.Javascript object>

In [26]:
from IPython.display import Javascript
svg_script = '''
var svg = d3.select(element)
    .append("svg")
    .attr("width", 300)
    .attr("height", 300);    

svg.append("circle")
    .style("stroke", "gray")
    .style("fill", "cyan")
    .attr("r", 130)
    .attr("cx", 150)
    .attr("cy", 150)
    .transition()
        .delay(100)
        .duration(10000)  
        .attr("r", 10)
        .attr("cx", 150)
        .style("fill", "blue"); 
'''

Javascript(svg_script)

<IPython.core.display.Javascript object>

In [27]:
from IPython.display import Javascript
svg_script = '''
  const data = {nodes : [{ id: "Myriel", group: 1},{ id: "Myriel3", group: 1}], 
  links : [{source: "Myriel", target: "Myriel3", value: 1}]}

  const links = data.links.map(d => Object.create(d));
  const nodes = data.nodes.map(d => Object.create(d));
  const width=300;
  const height=400;
  const simulation = d3.forceSimulation(nodes)
      .force("link", d3.forceLink(links).id(d => d.id))
      .force("charge", d3.forceManyBody())
      .force("center", d3.forceCenter(width / 2, height / 2));

  const svg = d3.create("svg")
      .attr("viewBox", [0, 0, width, height]);

  const link = svg.append("g")
      .attr("stroke", "#999")
      .attr("stroke-opacity", 0.6)
    .selectAll("line")
    .data(links)
    .join("line")
      .attr("stroke-width", d => Math.sqrt(d.value));

  function color ()  {
      const scale = d3.scaleOrdinal(d3.schemeCategory10);
      return d => scale(d.group);
  }
  
  function drag (simulation) {
  
  function dragstarted(event) {
    if (!event.active) simulation.alphaTarget(0.3).restart();
    event.subject.fx = event.subject.x;
    event.subject.fy = event.subject.y;
  }
  
  function dragged(event) {
    event.subject.fx = event.x;
    event.subject.fy = event.y;
  }
  
  function dragended(event) {
    if (!event.active) simulation.alphaTarget(0);
    event.subject.fx = null;
    event.subject.fy = null;
  }
  
  return d3.drag()
      .on("start", dragstarted)
      .on("drag", dragged)
      .on("end", dragended);
    }
  
  const node = svg.append("g")
      .attr("stroke", "#fff")
      .attr("stroke-width", 1.5)
    .selectAll("circle")
    .data(nodes)
    .join("circle")
      .attr("r", 5)
      .attr("fill", color)
      .call(drag(simulation));

  node.append("title")
      .text(d => d.id);

  simulation.on("tick", () => {
    link
        .attr("x1", d => d.source.x)
        .attr("y1", d => d.source.y)
        .attr("x2", d => d.target.x)
        .attr("y2", d => d.target.y);

    node
        .attr("cx", d => d.x)
        .attr("cy", d => d.y);
  });

 
'''

Javascript(svg_script)

<IPython.core.display.Javascript object>

In [None]:
spark.stop()