# Using Graphframes in Spark

### Import required libraries

In [1]:
from __future__ import print_function, division
%matplotlib inline
import matplotlib
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import style
from hive_query import hive_query

### Define System Paths and Environment variables

In [2]:
import sys
sys.path.append("/usr/hdp/current/spark2-client/python")
sys.path.append("/usr/hdp/current/spark2-client/python/lib/py4j-0.10.3-src.zip")
import os
os.environ['SPARK_HOME']='/usr/hdp/current/spark2-client'
os.environ['PYSPARK_PYTHON']='python2.7'
os.environ['PYSPARK_SUBMIT_ARGS']='--jars /home/cram/spark2-client/spark-2.1.0-bin-hadoop2.7/jars/com.typesafe.scala-logging_scala-logging-api_2.11-2.1.2.jar,/home/cram/spark2-client/spark-2.1.0-bin-hadoop2.7/jars/com.typesafe.scala-logging_scala-logging-slf4j_2.11-2.1.2.jar,/home/cram/spark2-client/spark-2.1.0-bin-hadoop2.7/jars/graphframes_graphframes-0.4.0-spark2.0-s_2.11.jar,/home/cram/spark2-client/spark-2.1.0-bin-hadoop2.7/jars/graphframes-0.4.0-spark2.1-s_2.11.jar,/home/cram/spark2-client/spark-2.1.0-bin-hadoop2.7/jars/org.scala-lang_scala-reflect-2.11.0.jar,/home/cram/spark2-client/spark-2.1.0-bin-hadoop2.7/jars/org.slf4j_slf4j-api-1.7.7.jar --py-files /home/cram/spark2-client/spark-2.1.0-bin-hadoop2.7/python/pyspark/graphframes/graphframe.py pyspark-shell'

### Configure the Spark

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("GraphFrames")
conf.set("spark.executor.memory", "12g")
conf.set("spark.yarn.am.memory","12g")
# conf.set("spark.executor.instances","80")
conf.set("spark.dynamicAllocation.enabled","true")
conf.set("spark.shuffle.service.enabled","true")
conf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout","600s")
#conf.set("spark.dynamicAllocation.minExecutors","128")
#conf.set("spark.dynamicAllocation.maxExecutors","1024")
conf.set("spark.yarn.dist.files","/usr/hdp/current/spark2-client/python/lib/pyspark.zip,/usr/hdp/current/spark2-client/python/lib/py4j-0.10.3-src.zip")
conf.setExecutorEnv('PYTHONPATH','pyspark.zip:py4j-0.10.3-src.zip')
conf.set("spark.network.timeout",1000000)
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.extraLibraryPath","/usr/hdp/current/hadoop-client/lib/native")
conf.set("spark.executor.extraClassPath","/usr/hdp/current/hadoop-client/lib/snappy*.jar")

<pyspark.conf.SparkConf at 0x7f311169f290>

#### Create the Spark Context

In [4]:
sc = SparkContext(conf=conf)
hiveContext = HiveContext(sc)

### Define a Graph

In [5]:
# Read the data (Facebook Data)
# hdfs dfs -mkdir graph
# hdfs dfs -put /home/cram/notebooks/facebook_combined.txt graph/facebook_combined.txt
#  Data is available at https://snap.stanford.edu/data/egonets-Facebook.html
from pyspark.sql.types import *
dfschema=schema = StructType([StructField("src", IntegerType(), True),StructField("dst", IntegerType(), True)])
edges=hiveContext.read.format("csv").option("delimiter"," ").schema(dfschema).load('graph/facebook_combined.txt')
edges.count()

88234

In [6]:
edges.show()

+---+---+
|src|dst|
+---+---+
|  0|  1|
|  0|  2|
|  0|  3|
|  0|  4|
|  0|  5|
|  0|  6|
|  0|  7|
|  0|  8|
|  0|  9|
|  0| 10|
|  0| 11|
|  0| 12|
|  0| 13|
|  0| 14|
|  0| 15|
|  0| 16|
|  0| 17|
|  0| 18|
|  0| 19|
|  0| 20|
+---+---+
only showing top 20 rows



In [7]:
n1 = edges.select("src").distinct()
n2 = edges.select("dst").distinct()
n = n1.unionAll( n2 ).withColumnRenamed("src","name").distinct()
n.count()

4039

In [8]:
nodes = n.withColumn("id",n.name)
nodes.show()

+----+----+
|name|  id|
+----+----+
| 148| 148|
| 463| 463|
| 471| 471|
| 496| 496|
|1088|1088|
|1238|1238|
|1342|1342|
|1580|1580|
|1591|1591|
|1645|1645|
|1829|1829|
|1959|1959|
|2122|2122|
|2142|2142|
|2366|2366|
|2659|2659|
|2866|2866|
|3175|3175|
|3749|3749|
|3794|3794|
+----+----+
only showing top 20 rows



In [9]:
from graphframes import graphframe as GF
g1 = GF.GraphFrame(nodes, edges)
g1

GraphFrame(v:[id: int, name: int], e:[src: int, dst: int])

In [10]:
k = g1.degrees.sort("degree",ascending=False)

In [11]:
k.show()

+----+------+
|  id|degree|
+----+------+
| 107|  1045|
|1684|   792|
|1912|   755|
|3437|   547|
|   0|   347|
|2543|   294|
|2347|   291|
|1888|   254|
|1800|   245|
|1663|   235|
|2266|   234|
|1352|   234|
| 483|   231|
| 348|   229|
|1730|   226|
|1985|   224|
|1941|   223|
|2233|   222|
|2142|   221|
|1431|   220|
+----+------+
only showing top 20 rows



### Page Rank Algorithm

In [36]:
pr2 = g1.pageRank(resetProbability=0.15,maxIter=10)

In [37]:
pr2.vertices.show()

+----+----+-------------------+
|name|  id|           pagerank|
+----+----+-------------------+
|1200|1200|0.15013201220174618|
|2000|2000| 0.1816136976890717|
|2200|2200|0.23690767656045705|
| 800| 800| 0.4069143579529082|
|2400|2400|0.25026176885102175|
| 600| 600|0.20920088439427809|
|1800|1800|   2.43119982404883|
|3400|3400| 1.4315205784750025|
|3200|3200| 0.5027906169409039|
| 200| 200|0.47881417119754555|
|   0|   0|               0.15|
|3600|3600|0.20544164213625452|
|3800|3800| 0.6600288124233537|
|1600|1600| 0.4063281568423561|
|2800|2800| 0.1881983676496505|
|1400|1400| 0.2953916426547574|
|3000|3000| 0.4163955463154627|
| 400| 400| 0.1854213276621568|
|1000|1000|0.18875404102250978|
|2600|2600| 1.5574296901350313|
+----+----+-------------------+
only showing top 20 rows



In [39]:
pr3 = pr2.vertices.sort("pagerank",ascending=False)
pr3.show()

+----+----+------------------+
|name|  id|          pagerank|
+----+----+------------------+
|3434|3434|18.184854947011132|
|1911|1911|18.123485079267827|
|2655|2655|17.525235558669866|
|1902|1902|17.355639012354075|
|1888|1888|13.343166596152308|
|2649|2649|12.160288086426291|
|1907|1907| 9.949812568135128|
|3971|3971| 9.826991926726324|
|2654|2654|  9.54217132093854|
|1910|1910| 8.091846524893004|
|1894|1894| 7.693883388950881|
|3430|3430| 7.231084079074089|
|1898|1898| 7.211028117522381|
|1882|1882| 7.077107010603322|
|3426|3426| 7.034880334304319|
|2660|2660| 6.983833314845806|
|2642|2642| 5.976237849088194|
| 332| 332| 5.724127492703557|
|3422|3422| 5.721584713429875|
|1891|1891|  5.66118441791449|
+----+----+------------------+
only showing top 20 rows



### Triangle Count

In [42]:
tcr = g1.triangleCount()

In [46]:
tcr.show()


+-----+----+----+
|count|name|  id|
+-----+----+----+
|   80| 148| 148|
|  361| 463| 463|
|  312| 471| 471|
|  399| 496| 496|
|   38| 833| 833|
|   40|1088|1088|
| 5335|1238|1238|
|   28|1342|1342|
| 2048|1580|1580|
| 1698|1591|1591|
|  783|1645|1645|
|   73|1829|1829|
| 3883|1959|1959|
| 8857|2122|2122|
|15165|2142|2142|
|  334|2366|2366|
|  663|2659|2659|
| 1763|2866|2866|
|   14|3175|3175|
|   14|3749|3749|
+-----+----+----+
only showing top 20 rows



### Connected Components

In [12]:
sc.setCheckpointDir("graph/checkpointdir/")

In [49]:
CCs = g1.connectedComponents()

In [61]:
CCs.show()

+----+----+---------+
|name|  id|component|
+----+----+---------+
| 148| 148|        0|
| 463| 463|        0|
| 471| 471|        0|
| 496| 496|        0|
|1088|1088|        0|
|1238|1238|        0|
|1342|1342|        0|
|1580|1580|        0|
|1591|1591|        0|
|1645|1645|        0|
|1829|1829|        0|
|1959|1959|        0|
|2122|2122|        0|
|2142|2142|        0|
|2366|2366|        0|
|2659|2659|        0|
|2866|2866|        0|
|3175|3175|        0|
|3749|3749|        0|
|3794|3794|        0|
+----+----+---------+
only showing top 20 rows



In [52]:
CCs.select("component").distinct()

1