In [17]:
!pip install graphframes



In [18]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import os
from graphframes import *
import pickle
from pyspark.sql.functions import col,explode,size,first
from pyspark.sql.functions import udf, collect_list
from pyspark.sql.functions import count,collect_list,flatten
from pyspark.sql.functions import desc

In [19]:
conf = SparkConf()

#url par défaut d'une api kubernetes accédé depuis l'intérieur du cluster (ici le notebook tourne lui même dans kubernetes)
conf.setMaster("k8s://https://kubernetes.default.svc:443")

#image des executors spark: pour des raisons de simplicité on réutilise l'image du notebook
conf.set("spark.kubernetes.container.image", os.environ['IMAGE_NAME'])

# Nom du compte de service pour contacter l'api kubernetes : attention le package du datalab crée lui même cette variable d'enviromment.
# Dans un pod du cluster kubernetes il faut lire le fichier /var/run/secrets/kubernetes.io/serviceaccount/token
# Néanmoins ce paramètre est inutile car le contexte kubernetes local de ce notebook est préconfiguré
# conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) 

# Nom du namespace kubernetes
conf.set("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE'])

# Nombre d'executeur spark, il se lancera autant de pods kubernetes que le nombre indiqué.
conf.set("spark.executor.instances", "10")

# Mémoire alloué à la JVM
# Attention par défaut le pod kubernetes aura une limite supérieur qui dépend d'autres paramètres.
# On manipulera plus bas pour vérifier la limite de mémoire totale d'un executeur
conf.set("spark.executor.memory", "4g")

conf.set("spark.kubernetes.driver.pod.name", os.environ['KUBERNETES_POD_NAME'])

# Paramètres d'enregistrement des logs spark d'application
# Attention ce paramètres nécessitent la création d'un dossier spark-history. Spark ne le fait pas lui même pour des raisons obscurs
# import s3fs
# endpoint = "https://"+os.environ['AWS_S3_ENDPOINT']
# fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': endpoint})
# fs.touch('s3://tm8enk/spark-history/.keep')
# sparkconf.set("spark.eventLog.enabled","true")
# sparkconf.set("spark.eventLog.dir","s3a://tm8enk/spark-history")
#ici pour gérer le dateTimeFormatter dépendant de la verion de java...
conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
conf.set("spark.default.parallelism",10)
conf.set("spark.sql.shuffle.partitions",10)
conf.set("spark.jars.packages","graphframes:graphframes:0.8.1-spark3.0-s_2.12")

<pyspark.conf.SparkConf at 0x7f9575ddcb80>

In [20]:
spark = SparkSession.builder.appName("graph").config(conf = conf).getOrCreate()

In [21]:
vertices = [(1,"A"), (2,"B"), (3, "C")]
edges = [(1,2,"love"), (2,1,"hate"), (2,3,"follow")]

v = spark.createDataFrame(vertices, ["id", "name"])
e = spark.createDataFrame(edges, ["src", "dst", "action"])

premierGraphe = GraphFrame(v, e)
premierGraphe.edges.show()

In [23]:
schema = pickle.load(open("/home/onyxia/work/schema.p", "rb" ))
print(schema)

StructType([StructField('contributors', StringType(), True), StructField('coordinates', StringType(), True), StructField('created_at', StringType(), True), StructField('display_text_range', ArrayType(LongType(), True), True), StructField('entities', StructType([StructField('hashtags', ArrayType(StructType([StructField('indices', ArrayType(LongType(), True), True), StructField('text', StringType(), True)]), True), True), StructField('media', ArrayType(StructType([StructField('additional_media_info', StructType([StructField('monetizable', BooleanType(), True)]), True), StructField('description', StringType(), True), StructField('display_url', StringType(), True), StructField('expanded_url', StringType(), True), StructField('id', LongType(), True), StructField('id_str', StringType(), True), StructField('indices', ArrayType(LongType(), True), True), StructField('media_url', StringType(), True), StructField('media_url_https', StringType(), True), StructField('sizes', StructType([StructField

In [24]:
df = spark.read.format("json")  \
    .schema(schema) \
    .load("s3a://projet-spark-lab/diffusion/tweets/input")

In [25]:
df.select(col("id").alias("id"),
         col("user.id").alias("user_id"),
         col("user.name").alias("name"),
         col("user.followers_count"),
         col("entities.hashtags.text").alias("hashtags"),
         size("entities.hashtags").alias("has_hashtag"),
         col("text"),
         col("entities.user_mentions")) \
 .filter(col("has_hashtag")>0) \
 .head(1) 

                                                                                

[Row(id=1380403542725365762, user_id=1229160600737132544, name='FREDERIC 🇫🇷🐷🍴', followers_count=1981, hashtags=['FakeNews'], has_hashtag=1, text='RT @ThierryMARIANI: Quand @MLP_officiel le dénonce, c’est une #FakeNews.\nQuand l’@InseeFr le confirme, cela devient hélas une évidence que…', user_mentions=[Row(id=87212906, id_str='87212906', indices=[3, 18], name='Thierry MARIANI', screen_name='ThierryMARIANI'), Row(id=217749896, id_str='217749896', indices=[26, 39], name='Marine Le Pen', screen_name='MLP_officiel'), Row(id=217473382, id_str='217473382', indices=[81, 89], name='Insee', screen_name='InseeFr')])]

In [None]:
tweeters=df.select(col("user.id").alias("id"), col("user.name").alias("name"))
users_mentionned=df.select(explode(col("entities.user_mentions"))).select(col("col.id").alias("id"),col("col.screen_name").alias("name")).distinct()
vertices=tweeters.union(users_mentionned)
# on définit un udf qui prend le premier élément
udf_first = udf(lambda x: x[0])
# on groupe l'union des deux dataset par id d'utilisateur et on collect 
#sous la forme d'une liste leurs noms qui peuvent différer entre les mentions et le compte
# on applique l'udf pour ne retenir que le nom en tête
final_vertices=vertices.groupby("id").agg(udf_first(collect_list("name")).alias("name"))
final_vertices.show(10, truncate=False)

[Stage 14:==>                                                   (47 + 10) / 958]

In [None]:
edges=df.select(col("user.id").alias("user_id"), \
               explode(col("entities.user_mentions")).alias("mention"),
               col("entities.hashtags.text").alias("hashtags"),
               "id") \
 .groupby(col("user_id").alias("src"), \
          col("mention.id").alias("dst")) \
 .agg(count("id").alias("nb"),
      flatten(collect_list("hashtags")).alias("hashtags"),
      collect_list("id").alias("id"))
edges.show(10)

In [None]:
g = GraphFrame(final_vertices, edges)

In [None]:
g.cache()

In [None]:
print("le graphe a "+ str(g.vertices.count()) +" noeuds et "+ str(g.edges.count()) + " arcs")

In [None]:
pagerank = g.pageRank(resetProbability=0.15,tol=0.01)

In [None]:
pagerank.vertices.sort(desc("pagerank")).show(truncate=False)

In [None]:
pagerank.edges.where((col("src")=="103918784") ).show(truncate=False)
pagerank.edges.where((col("dst")=="103918784") ).count()

In [None]:
g.inDegrees.join(g.vertices,"id").orderBy(desc("inDegree")).show(10,False)

In [None]:
g.outDegrees.join(g.vertices,"id").orderBy(desc("outDegree")).show(10,False)

In [None]:
spark.stop()