Helpful links for processing graphs on pyspark:

https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html

https://graphframes.github.io/graphframes/docs/_site/user-guide.html

https://pysparktutorial.blogspot.com/2017/10/graphframes-pyspark.html

In [1]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.functions import col, size
import pyspark.sql.functions as fn

In [2]:
# Start spark in local mode using 54gb of memory
# local mode only runs on a single node, but it will utilize all cores (We have 48!)
conf = SparkConf().setAppName("test") \
    .setMaster("local[44]") \
    .set('spark.driver.memory','54g') \
    .set('spark.jars.packages', 'graphframes:graphframes:0.7.0-spark2.4-s_2.11')
#.setMaster("yarn") # this is used when we run on hadoop, ignore for now

sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

print("Spark Version: ", sc.version)
print("defaultParallelism: ", sc.defaultParallelism)
print("Spark WebURLL ", sc.uiWebUrl) # you can view running jobs here, but I am only able to connect to it via VNC rn, maybe SSH tunneling will fix this? idk

Spark Version:  2.4.4
defaultParallelism:  44
Spark WebURLL  http://c251-102.wrangler.tacc.utexas.edu:4040


In [3]:
sc._conf.getAll() # See all the current Spark configuration settings

[('spark.jars.packages', 'graphframes:graphframes:0.7.0-spark2.4-s_2.11'),
 ('spark.app.name', 'test'),
 ('spark.driver.memory', '54g'),
 ('spark.files',
  'file:///home/06271/cju256/.ivy2/jars/graphframes_graphframes-0.7.0-spark2.4-s_2.11.jar,file:///home/06271/cju256/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar'),
 ('spark.executor.id', 'driver'),
 ('spark.local.dir', '/data/06271/cju256/temp'),
 ('spark.master', 'local[44]'),
 ('spark.submit.pyFiles',
  '/home/06271/cju256/.ivy2/jars/graphframes_graphframes-0.7.0-spark2.4-s_2.11.jar,/home/06271/cju256/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar'),
 ('spark.driver.port', '38377'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1573517162464'),
 ('spark.repl.local.jars',
  'file:///home/06271/cju256/.ivy2/jars/graphframes_graphframes-0.7.0-spark2.4-s_2.11.jar,file:///home/06271/cju256/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.deployMode', 'client'),
 ('spark.jars

In [4]:
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import *

In [5]:
nodes_subset_path = '/data/06271/cju256/nodes.json'
edges_subset_path = '/data/06271/cju256/edges.json'

nodes_subset = sqlContext.read.json(nodes_subset_path)
edges_subset = sqlContext.read.json(edges_subset_path)

In [6]:
nodes_subset.printSchema()

root
 |-- about: string (nullable = true)
 |-- cancelled: boolean (nullable = true)
 |-- date_created: string (nullable = true)
 |-- email: string (nullable = true)
 |-- external_id: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- friends: string (nullable = true)
 |-- id: long (nullable = true)
 |-- is_business: boolean (nullable = true)
 |-- lastname: string (nullable = true)
 |-- name: string (nullable = true)
 |-- num_friends: long (nullable = true)
 |-- phone: string (nullable = true)
 |-- picture: string (nullable = true)
 |-- username: string (nullable = true)



In [7]:
edges_subset.printSchema()

root
 |-- _id: string (nullable = true)
 |-- comments_count: long (nullable = true)
 |-- created_time: string (nullable = true)
 |-- dst: long (nullable = true)
 |-- likes_count: long (nullable = true)
 |-- mentions_count: long (nullable = true)
 |-- message: string (nullable = true)
 |-- payment_id: long (nullable = true)
 |-- permalink: string (nullable = true)
 |-- src: long (nullable = true)
 |-- type: string (nullable = true)
 |-- unix_time: long (nullable = true)
 |-- updated_time: string (nullable = true)



In [13]:
nodes_subset.select('id','friends', 'num_friends').filter('friends == true').count()

0

In [8]:
from pyspark.sql.types import StringType, IntegerType

just_nodes = nodes_subset.withColumn("id_string", col('id').cast(IntegerType())).drop('id').withColumnRenamed("id_string",'id')
just_edges = edges_subset \
                .withColumn("src_string", col('src').cast(IntegerType())).drop('src').withColumnRenamed("src_string",'src') \
                .withColumn("dst_string", col('dst').cast(IntegerType())).drop('dst').withColumnRenamed("dst_string",'dst')

In [9]:
just_edges = just_edges.filter('dst is not null')

just_edges.filter('dst is null').count()

0

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

src_2 = udf(lambda src, dst, typ: dst if typ == 'charge' else src , LongType())
dst_2 = udf(lambda src, dst, typ: src if typ == 'charge' else dst , LongType())

edges2 = just_edges.select('src', 'dst', 'type')

edges3 = edges2.withColumn("src2", src_2(edges2.src, edges2.dst, edges2.type))
edges4 = edges3.withColumn('dst2', dst_2(edges2.src, edges2.dst, edges2.type))

money_flow_edges = edges4.selectExpr("src2 as src", "dst2 as dst")

In [11]:
money_flow_edges.show()

+------+------+
|   src|   dst|
+------+------+
| 33632| 59741|
|111497|110891|
| 34421| 22792|
| 85649|107484|
| 26924| 34907|
|115128|115131|
| 74104| 65588|
| 60063| 71313|
| 29674| 22222|
|116523|125011|
| 73816| 27407|
| 65739| 53702|
| 43575| 28429|
|130287|130290|
|116185|128330|
|129335|127245|
| 64514| 63739|
|135785| 77747|
|131328|126429|
|126227|123908|
+------+------+
only showing top 20 rows



In [12]:
money_flow_edges.printSchema()

root
 |-- src: long (nullable = true)
 |-- dst: long (nullable = true)



In [16]:
money_flow_edges.distinct().count()

131705390

In [44]:
money_flow_edges.count()

341309788

In [15]:
money_flow_distinct = money_flow_edges.groupBy('src', 'dst').count()
money_flow_distinct.printSchema()

root
 |-- src: long (nullable = true)
 |-- dst: long (nullable = true)
 |-- count: long (nullable = false)



In [16]:
money_flow_distinct.show()

+-------+-------+-----+
|    src|    dst|count|
+-------+-------+-----+
| 164496| 277424|   34|
| 445173| 452915|  408|
| 299233| 247051|   94|
| 645631| 275235|    8|
| 982986| 925688|    3|
| 248879| 540594|  132|
| 370468| 419610|    6|
| 210829| 178313|   28|
| 551463| 732586|    2|
|1500565|1451912|    1|
| 150386| 544205|    8|
| 253680| 361844|    8|
| 744023| 582098|   34|
|1075344|1246158|   16|
|1219636|2284421|   26|
|1595150| 781187|    1|
| 400927|1085247|   15|
| 966746| 966719|   38|
| 624112|1560925|  101|
|2486382| 243326|    1|
+-------+-------+-----+
only showing top 20 rows



In [19]:
import pyspark.sql.functions as F

result = money_flow_distinct.select([F.min("dst")])
result.show()

+--------+
|min(dst)|
+--------+
|       2|
+--------+



In [17]:
money_flow_distinct.coalesce(1).write.format('csv').save('/data/06271/cju256/money_flow_weighted_edgelist')

In [None]:
# do some time math

In [33]:
from pyspark.sql.types import StringType, IntegerType, LongType, TimestampType

just_edges = just_edges.withColumn('epoch_time', col('unix_time').cast(TimestampType())) \
                .withColumn("created_time_tmp", col('created_time').cast(TimestampType())).drop('created_time').withColumnRenamed('created_time_tmp', 'created_time') \
                .withColumn("updated_time_tmp", col('updated_time').cast(TimestampType())).drop('updated_time').withColumnRenamed('updated_time_tmp', 'updated_time')

In [35]:
from pyspark.sql import functions as F

timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('updated_time', format=timeFmt)
            - F.unix_timestamp('created_time', format=timeFmt))
just_edges = just_edges.withColumn("duration", timeDiff)

just_edges.filter('duration > 0').show()

+------+------+-------+-------------------+-------------------+-------------------+---------+
|   src|   dst|   type|         epoch_time|       updated_time|       created_time| duration|
+------+------+-------+-------------------+-------------------+-------------------+---------+
|142212| 93793|payment|2012-10-02 17:16:55|2013-03-18 12:58:30|2012-10-02 17:16:55| 14413295|
|124108|124107|payment|2012-12-31 05:28:01|2012-12-31 05:31:35|2012-12-31 05:28:01|      214|
|212641|213840|payment|2013-02-26 21:13:58|2013-02-26 21:34:08|2013-02-26 21:13:58|     1210|
|145349|145266|payment|2013-03-18 09:31:55|2013-03-19 07:28:37|2013-03-18 09:31:55|    79002|
|247500|251943|payment|2013-03-31 19:27:28|2013-04-05 06:49:03|2013-03-31 19:27:28|   386495|
|174121|148636|payment|2013-04-11 16:02:14|2013-04-11 16:59:34|2013-04-11 16:02:14|     3440|
| 46041| 48431|payment|2013-04-30 00:04:17|2013-05-01 08:12:52|2013-04-30 00:04:17|   115715|
|290657|143827|payment|2013-05-03 20:30:47|2013-05-04 21:31:

In [36]:
just_edges.filter("duration > 0").orderBy('duration', ascending=False).show()

+------+------+------+-------------------+-------------------+-------------------+---------+
|   src|   dst|  type|         epoch_time|       updated_time|       created_time| duration|
+------+------+------+-------------------+-------------------+-------------------+---------+
| 46015| 25927|charge|2012-07-02 13:02:24|2018-05-09 09:52:17|2012-07-02 13:02:24|184625393|
| 75333| 82755|charge|2012-10-06 19:27:33|2018-03-08 18:34:22|2012-10-06 19:27:33|170986009|
|156709|156664|charge|2012-10-24 04:07:37|2018-03-13 07:24:27|2012-10-24 04:07:37|169874210|
|  3949|166144|charge|2012-12-13 06:56:27|2018-01-07 15:23:26|2012-12-13 06:56:27|159956819|
|328576|304905|charge|2013-06-15 16:09:21|2018-05-25 08:24:07|2013-06-15 11:09:21|155942086|
|164574|108062|charge|2012-10-25 00:38:55|2017-09-28 12:33:28|2012-10-25 00:38:55|155476473|
|134105|243520|charge|2013-05-07 17:26:39|2018-03-24 22:23:44|2013-05-07 17:26:39|153982625|
|129591|131761|charge|2013-04-27 00:32:14|2018-03-12 13:20:12|2013-04-

In [37]:
just_edges.filter("duration > 86400").orderBy('duration', ascending=False).count()

10794463

In [19]:
g = GraphFrame(nodes_subset, edges_subset)
print(g_test)

GraphFrame(v:[id: bigint, about: string ... 13 more fields], e:[src: bigint, dst: bigint ... 11 more fields])


In [20]:
g = GraphFrame(just_nodes, just_edges)
print(g)

GraphFrame(v:[id: int, about: string ... 13 more fields], e:[src: int, dst: int ... 11 more fields])


In [21]:
display(g.vertices)
display(g.edges)

DataFrame[about: string, cancelled: boolean, date_created: string, email: string, external_id: string, firstname: string, friends: string, is_business: boolean, lastname: string, name: string, num_friends: bigint, phone: string, picture: string, username: string, id: int]

DataFrame[_id: string, comments_count: bigint, created_time: string, likes_count: bigint, mentions_count: bigint, message: string, payment_id: bigint, permalink: string, type: string, unix_time: bigint, updated_time: string, src: int, dst: int]

In [22]:
print("Nodes: ", g.vertices.count())
print("Edges: ", g.edges.count())

Nodes:  23133264


KeyboardInterrupt: 

In [13]:
g.degrees.orderBy('degree', ascending=False).show()

+--------+------+
|      id|degree|
+--------+------+
|    null|  2846|
|  820531|   696|
| 1002607|   100|
|  351854|    39|
|18765786|    31|
| 2367489|    28|
| 9094113|    24|
| 6655326|    22|
| 4943840|    20|
| 1667966|    19|
|19189246|    18|
| 8767908|    18|
| 1307021|    17|
| 7144810|    16|
| 2723711|    15|
| 6445290|    15|
|  206476|    15|
|18654246|    14|
| 3475962|    14|
| 3943041|    14|
+--------+------+
only showing top 20 rows



In [31]:
dir(g)

['DST',
 'ID',
 'SRC',
 '_ATTR',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_edges',
 '_jvm_gf_api',
 '_jvm_graph',
 '_sc',
 '_sqlContext',
 '_vertices',
 'aggregateMessages',
 'bfs',
 'cache',
 'connectedComponents',
 'degrees',
 'dropIsolatedVertices',
 'edges',
 'filterEdges',
 'filterVertices',
 'find',
 'inDegrees',
 'labelPropagation',
 'outDegrees',
 'pageRank',
 'parallelPersonalizedPageRank',
 'persist',
 'shortestPaths',
 'stronglyConnectedComponents',
 'svdPlusPlus',
 'triangleCount',
 'triplets',
 'unpersist',
 'vertices']

In [10]:
triangles = g.triangleCount()
#triangles.select('id', 'count').show()

In [36]:
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(c)")
motifs.show()

# More complex queries can be expressed by applying filters.
motifs.filter("a.id != c.id").show()

KeyboardInterrupt: 

In [None]:
motifs.filter("a.id != c.id").count()

In [11]:
degree_links = g.degrees.withColumn('links', col('degree') * ( col('degree') - 1))
#degree_links.show()

In [12]:
clustering_coef = triangles.select("id", "count").join(degree_links, on='id')
#clustering_coef.show()

In [13]:
clustering_coef = clustering_coef.withColumn("clustering_coef", col('count') / col('links'))
clustering_coef.show()

+--------+-----+------+-----+--------------------+
|      id|count|degree|links|     clustering_coef|
+--------+-----+------+-----+--------------------+
|10000108|    3|    15|  210|0.014285714285714285|
|10000172|   63|    47| 2162|0.029139685476410732|
|10000304|    1|     8|   56|0.017857142857142856|
|10000454|    4|    29|  812|0.004926108374384...|
|10000472|    1|     9|   72|0.013888888888888888|
|10000591|   33|   124|15252|0.002163650668764...|
|10000670|   38|    35| 1190|0.031932773109243695|
|10000720|   22|   111|12210|0.001801801801801...|
|10000989|  568|   291|84390|0.006730655290911245|
|  100010|    0|     5|   20|                 0.0|
|10001989|    3|     7|   42| 0.07142857142857142|
|10002011|   10|    44| 1892|0.005285412262156448|
|10002280|   58|   105|10920|0.005311355311355312|
| 1000240|   55|   172|29412|0.001869985040119679|
| 1000280|   88|   260|67340|0.001306801306801...|
|10002811|    2|     3|    6|  0.3333333333333333|
|10003360|    2|    25|  600|0.

In [14]:
clustering_coef.describe().show()

+-------+--------------------+------------------+------------------+------------------+--------------------+
|summary|                  id|             count|            degree|             links|     clustering_coef|
+-------+--------------------+------------------+------------------+------------------+--------------------+
|  count|            23133263|          23133263|          23133263|          23133263|            18500705|
|   mean|1.6629731765690641E7|14.130407759597079|29.550124165363098|4932.8911883291175|0.021464098934943332|
| stddev|1.0336052173548613E7|61.334882678439754|  77.1093475591475| 483843.3696589889|  0.0642531063456038|
|    min|                  10|                 0|                 1|                 0|                 0.0|
|    max|             9999999|             21038|            212126|        2047554790|                 0.5|
+-------+--------------------+------------------+------------------+------------------+--------------------+



In [15]:
clustering_coef.orderBy('clustering_coef', ascending=False).show()

+--------+-----+------+-----+---------------+
|      id|count|degree|links|clustering_coef|
+--------+-----+------+-----+---------------+
|10358879|    1|     2|    2|            0.5|
|10865863|    6|     4|   12|            0.5|
|10480795|    1|     2|    2|            0.5|
|10157554|    1|     2|    2|            0.5|
|10594781|    1|     2|    2|            0.5|
|10605938|    1|     2|    2|            0.5|
|10189165|    1|     2|    2|            0.5|
|10621410|    1|     2|    2|            0.5|
|10254885|    1|     2|    2|            0.5|
|10667695|    1|     2|    2|            0.5|
|10284320|    1|     2|    2|            0.5|
|10686037|    1|     2|    2|            0.5|
|10122283|    1|     2|    2|            0.5|
|10694655|    1|     2|    2|            0.5|
|10499540|    1|     2|    2|            0.5|
|10725346|    1|     2|    2|            0.5|
|10837788|    1|     2|    2|            0.5|
|10733158|    1|     2|    2|            0.5|
|10269066|    1|     2|    2|     

In [None]:
# Can i run two jupyer notebook jobs at onces , i.e sbatch job.jupyter twice 

# is spark run seperately each time? or does it share spark resources