# Using Gaffer with PySpark and GraphFrames #

In [None]:
from gafferpy_pyspark import gafferpy_pyspark_session as gs
from gafferpy_core import gaffer as g
from gafferpy_core import gaffer_utils as u
from gafferpy_pyspark import gaffer_pyspark as gp

from graphframes import *
import os
import math
from matplotlib import pyplot as plot

from pyspark.sql.functions import asc,desc

## Start a Gaffer PySpark session and do some setup ##

In [None]:
gs.GafferPysparkSession().create_session()

In [None]:
user = u.User(user_id="user")

In [None]:
import os
gaffer_home = os.environ["GAFFER_HOME"]
print(gaffer_home)

In [None]:
schemaPath = gaffer_home + "/example/schema.json"
graphConfigPath = gaffer_home + "/example/graphconfig.json"
storePropertiesPath = gaffer_home + "/miniaccumulo/pyspark.store.properties"

## Create a connection to the graph ##

In [None]:
graph = (gs.Graph.Builder()
         .schema(schemaPath)
         .config(graphConfigPath)
         .storeProperties(storePropertiesPath)
         .build())

In [None]:
print(graph.getSchema())

## Add some data using Spark ##

In [None]:
split_op = gp.CalculateSplitPointsQuickstart(
    dataPath=gaffer_home + "/example/data.csv",
    elementGeneratorConfig=gaffer_home + "/example/element-generator.json",
    splitsFilePath=gaffer_home + "/splitsFile",
    numSplits=5,
    sampleRatioForSplits="0.1")

In [None]:
graph.execute(split_op,user)

In [None]:
add_op = gp.AddElementsFromHdfsQuickstart(
    dataPath=gaffer_home + "/example/data.csv", 
    elementGeneratorConfig=gaffer_home + "/example/element-generator.json",
    splitsFilePath=gaffer_home + "/splitsFile")

In [None]:
graph.execute(add_op, user)

## Run a simple query ##

In [None]:
entity_def = g.ElementDefinition(group="Emitter", group_by=[])
edge_def = g.ElementDefinition(group="Event", group_by=[])
entity_view = g.View(entities=[entity_def])
edges_view = g.View(edges=[edge_def])
view = g.View(entities=[entity_def], edges=[edge_def])

In [None]:
get_op = g.GetElements(input=[g.EntitySeed("1")], view=view)

In [None]:
results = graph.execute(get_op, user)

In [None]:
for element in results:
    print(element)

## Calculate the degree distribution ##

In [None]:
entities_rdd_op = gp.GetPySparkRDDOfAllElements(view=entity_view)

In [None]:
entities_rdd = graph.execute(entities_rdd_op, user)

In [None]:
def countInDegree(entity):
    return (entity.properties.get("messagesReceivedEstimate"), 1)

def countOutDegree(entity):
    return (entity.properties.get("messagesSentEstimate"), 1)

In [None]:
in_distro_rdd = entities_rdd.map(countInDegree).reduceByKey(lambda a, b: a + b)
out_distro_rdd = entities_rdd.map(countInDegree).reduceByKey(lambda a, b: a + b)

In [None]:
in_distro = in_distro_rdd.collect()
out_distro = out_distro_rdd.collect()

In [None]:
import math
in_x = []
in_y = []
in_log_x = []
in_log_y = []
for in_entry in in_distro:
    in_x.append(in_entry[0])
    in_y.append(in_entry[1])
    if in_entry[0] != 0:
        in_log_x.append(math.log(in_entry[0]))
        in_log_y.append(math.log(in_entry[1]))

out_x = []
out_y = []
out_log_x = []
out_log_y = []
for out_entry in out_distro:
    out_x.append(out_entry[0])
    out_y.append(out_entry[1])
    if out_entry[0] != 0:
        out_log_x.append(math.log(out_entry[0]))
        out_log_y.append(math.log(out_entry[1]))

In [None]:
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [16, 8]
plt.rcParams['axes.labelsize'] = 'x-large'
plt.rcParams['legend.fontsize'] = 'x-large'

In [None]:
plt.scatter(in_x, in_y, c='b', marker='x', label='in_degree')
plt.scatter(out_x, out_y, c='r', marker='o', label='out_degree')
plt.xlabel("count")
plt.ylabel("degree")
plt.legend(loc='upper right')
plt.show()

In [None]:
plt.scatter(in_log_x, in_log_y, c='b', marker='x', label='in_degree')
plt.scatter(out_log_x, out_log_y, c='r', marker='o', label='out_degree')
plt.xlabel("log(count)")
plt.ylabel("log(degree)")
plt.legend(loc='upper right')
plt.show()

## Get a GraphFrame ##

In [None]:
emitter_def = g.ElementDefinition(group="Emitter", group_by=[])
event_def = g.ElementDefinition(group="Event", group_by=[])

emitter_view = g.View(entities=[emitter_def])
event_view = g.View(edges=[event_def])

In [None]:
emitter_df_op = gp.GetPysparkDataFrameOfElements(view=emitter_view)
event_df_op = gp.GetPysparkDataFrameOfElements(view=event_view)

In [None]:
emitter_df = graph.execute(emitter_df_op, user)

In [None]:
emitter_df.sort(desc("count")).limit(10).toPandas()

In [None]:
event_df = graph.execute(event_df_op, user)

In [None]:
event_df.sort(desc("count")).limit(10).toPandas()

In [None]:
graphframe = GraphFrame(
    emitter_df.select("vertex").withColumnRenamed("vertex", "id"),
    event_df.select("source", "destination").withColumnRenamed("source", "src").withColumnRenamed("destination", "dst"))

## Calculate Pageranks for the `Emitter` vertices ##

In [None]:
emitter_pagerank_gf = graphframe.pageRank(resetProbability=0.15, maxIter=10)

In [None]:
emitter_pageranks = emitter_pagerank_gf.vertices.select('id', 'pagerank').sort(desc('pagerank'))

In [None]:
emitter_pageranks.limit(10).toPandas()

## Write the Pageranks back into the graph as entities ##

In [None]:
def toPagerankToEntity(vertex,pagerank):
    entity = g.Entity(
        vertex=vertex, 
        group="Pagerank", 
        properties={
            "pagerank" : pagerank
        })
    return entity

In [None]:
emitter_pr_rdd = emitter_pageranks.sort('id').rdd

In [None]:
emitter_pr_elements = emitter_pr_rdd.map(lambda x : toPagerankToEntity(x['id'], x['pagerank']))

In [None]:
emitter_import_pr_op = gp.AddElementsFromPysparkRDD(rdd=emitter_pr_elements, outputDirectory=gaffer_home + "/import/pr/emitter/")



In [None]:
graph.execute(emitter_import_pr_op, user)

In [None]:
pr_def = g.ElementDefinition(group="Pagerank")
pr_view = g.View(entities=[pr_def])

In [None]:
get_pr_op = g.GetElements(input=[g.EntitySeed("1")], view=pr_view)

In [None]:
pr = graph.execute(get_pr_op, user)

In [None]:
for entity in pr:
    print(entity)

## Use LPA to divide the Emitters into communities ##

In [None]:
emitters_lpa = graphframe.labelPropagation(maxIter=10)

In [None]:
def countValues(val):
    return (val["label"],1)

In [None]:
emitters_lpa_labels = emitters_lpa.select("label").rdd.map(countValues).reduceByKey(lambda a,b : a+b).toDF(["label", "count"]).sort(desc("count"))


In [None]:
emitters_lpa_labels.limit(10).toPandas()

In [None]:
community = emitters_lpa.select('id').filter('label = 247').toPandas()['id'].tolist()

In [None]:
for emitter in community:
    print(emitter)

## Write the communities back into the graph as entities ##

In [None]:
def toLpaToEntity(vertex,community):
    entity = g.Entity(
        vertex=vertex, 
        group="Community", 
        properties={
            "community" : str(community)
        })
    return entity

In [None]:
emitters_lpa_rdd = emitters_lpa.select("id", "label").sort("id").rdd

In [None]:
emitter_lpa_elements = emitters_lpa_rdd.map(lambda x : toLpaToEntity(x["id"], x["label"]))

In [None]:
emitters_import_lpa_op = gp.AddElementsFromPysparkRDD(rdd=emitter_lpa_elements, outputDirectory=gaffer_home + "/import/lpa/emitter/")


In [None]:
graph.execute(emitters_import_lpa_op, user)

In [None]:
community_def = g.ElementDefinition(group="Community")
community_view = g.View(entities=[community_def])

In [None]:
get_community_op = g.GetElements(input=[g.EntitySeed("1902")], view=community_view)

In [None]:
communities = graph.execute(get_community_op, user)

In [None]:
for entity in communities:
    print(entity)