In [1]:
import os

from pyspark import *
from pyspark.sql import *
from graphframes import *
from pyspark import SparkContext, SparkConf

import findspark
findspark.init()

In [2]:
# https://graphframes.github.io/graphframes/docs/_site/quick-start.html
# https://stackoverflow.com/questions/65011599/how-to-start-graphframes-on-spark-on-pyspark-on-juypter-on-docker
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'

In [3]:
# Resources:
# https://www.infoq.com/articles/apache-spark-graphx/
# https://databricks.com/blog/2016/03/16/on-time-flight-performance-with-graphframes-for-apache-spark.html
# https://kukuruku.co/post/social-network-analysis-spark-graphx/
# https://lamastex.github.io/scalable-data-science/db/week8/15_GraphX/026_GraphFramesUserGuide.html
# https://graphframes.github.io/graphframes/docs/_site/index.html

In [4]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession(sc)

In [5]:
edges = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load("Amazon0302.txt")
vertices = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load("VertexTable.txt")


In [6]:
edges = edges.withColumnRenamed("FromNodeId", "src").withColumnRenamed("ToNodeId", "dst")


In [7]:
g = GraphFrame(vertices, edges)
g = g.dropIsolatedVertices()

In [9]:
g.degrees.orderBy("degree", ascending=False).show(10)
g.inDegrees.orderBy("inDegree", ascending=False).show(10)
g.outDegrees.orderBy("outDegree", ascending=False).show(10)

+----+------+
|  id|degree|
+----+------+
| 296|     8|
| 467|    17|
| 675|     8|
| 691|    12|
|1159|    12|
| 829|    11|
|1090|    31|
|1436|   128|
|1512|     7|
|1572|    10|
|3414|    13|
|5925|    12|
|4032|     7|
|2069|    13|
|2088|    16|
|5325|     8|
|2136|     8|
|2162|     6|
|2294|     6|
|3210|    13|
+----+------+
only showing top 20 rows

+----+--------+
|  id|inDegree|
+----+--------+
| 296|       3|
| 467|      12|
| 675|       3|
| 691|       7|
|1159|       7|
| 829|       6|
|1090|      26|
|1436|     123|
|1512|       2|
|1572|       5|
|3414|       8|
|5925|       7|
|4032|       2|
|2069|       8|
|2088|      11|
|5325|       3|
|2136|       3|
|2162|       1|
|2294|       1|
|3210|       8|
+----+--------+
only showing top 20 rows

+----+---------+
|  id|outDegree|
+----+---------+
| 296|        5|
| 467|        5|
| 675|        5|
| 691|        5|
| 829|        5|
|1090|        5|
|1159|        5|
|1436|        5|
|1512|        5|
|1572|        5|
|2069| 

In [10]:
# TAKES LONG TIME TO RUN (10~ min)

# PageRank: Identify important vertices in a graph
results = g.pageRank(resetProbability=0.15, maxIter=7)

display(results.vertices)

DataFrame[id: string, Node: string, pagerank: double]

In [11]:
results.vertices.orderBy("pagerank", ascending=False).show()

+----+----+------------------+
|  id|Node|          pagerank|
+----+----+------------------+
|  33|  33| 289.1769754287916|
|  93|  93|254.58423276212014|
|   8|   8|208.60205771770336|
|  94|  94|200.44458731669394|
|2501|2501|198.02802125059114|
|4429|4429| 197.0437760883016|
| 151| 151|177.72974262730605|
|  56|  56| 175.4416985920772|
|  95|  95|173.33553169887944|
|2353|2353| 156.9996013893304|
| 429| 429|131.01514049873109|
|9106|9106| 129.8201823262895|
|  23|  23|124.88627037006144|
|1241|1241|124.12171913289434|
| 481| 481|120.03564499078678|
|3589|3589|112.05056148336236|
| 449| 449|110.16591461904625|
|  21|  21| 98.71169308257939|
|5120|5120| 97.27020021314428|
|8458|8458| 93.26086201683259|
+----+----+------------------+
only showing top 20 rows



In [13]:
inDeg = g.inDegrees

inDeg.orderBy("inDegree", ascending=False).show(5)

+-----+--------+
|   id|inDegree|
+-----+--------+
|14949|     420|
| 4429|     404|
|   33|     361|
|10519|     334|
|12771|     330|
+-----+--------+
only showing top 5 rows



In [14]:
# Ratio between in and out degrees
# A higher ratio value will tell us where many purchases end (but rarely begin)
# A lower value tells us where purchases often begin (but infrequently end)
from pyspark.sql.functions import desc
outDeg = g.outDegrees
degreeRatio = inDeg.join(outDeg, "id").selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")
degreeRatio.orderBy(desc("degreeRatio")).show(10, False)
degreeRatio.orderBy("degreeRatio").show(10, False)

+-----+-----------+
|id   |degreeRatio|
+-----+-----------+
|14949|84.0       |
|4429 |80.8       |
|33   |72.2       |
|10519|66.8       |
|12771|66.0       |
|8    |58.6       |
|481  |55.0       |
|3910 |55.0       |
|5737 |54.4       |
|9106 |45.4       |
+-----+-----------+
only showing top 10 rows

+------+-----------+
|id    |degreeRatio|
+------+-----------+
|112099|0.2        |
|117755|0.2        |
|126576|0.2        |
|118599|0.2        |
|101617|0.2        |
|111687|0.2        |
|106226|0.2        |
|109996|0.2        |
|121626|0.2        |
|113376|0.2        |
+------+-----------+
only showing top 10 rows



In [15]:
outDeg = g.outDegrees
degreeRatio = inDeg.join(outDeg, "id").selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")
degreeRatio.orderBy(desc("degreeRatio")).show(10, False)
degreeRatio.orderBy("degreeRatio").show(10, False)

+-----+-----------+
|id   |degreeRatio|
+-----+-----------+
|14949|84.0       |
|4429 |80.8       |
|33   |72.2       |
|10519|66.8       |
|12771|66.0       |
|8    |58.6       |
|3910 |55.0       |
|481  |55.0       |
|5737 |54.4       |
|9106 |45.4       |
+-----+-----------+
only showing top 10 rows

+------+-----------+
|id    |degreeRatio|
+------+-----------+
|112115|0.2        |
|120824|0.2        |
|113568|0.2        |
|100776|0.2        |
|113888|0.2        |
|104758|0.2        |
|113925|0.2        |
|107162|0.2        |
|114143|0.2        |
|108272|0.2        |
+------+-----------+
only showing top 10 rows



In [8]:
#https://github-wiki-see.page/m/MonkeyCanCode/CS5590_Spark/wiki/ICP13
# Run static Label Propagation Algorithm for detecting communities in networks.
# Each node in the network is initially assigned to its own community. 
# At every superstep, nodes send their community affiliation to all neighbors and 
# update their state to the mode community affiliation of incoming messages.
# https://en.wikipedia.org/wiki/Label_propagation_algorithm
#LPA is a standard community detection algorithm for graphs. 
#It is very inexpensive computationally, although (1) convergence is not guaranteed and 
#(2) one can end up with trivial solutions (all nodes are identified into a single community).
communities = g.labelPropagation(maxIter=5)
communities.persist().show(10, False)

+------+------+-------------+
|id    |Node  |label        |
+------+------+-------------+
|103368|103368|369367188488 |
|103634|103634|257698038957 |
|186264|186264|240518168944 |
|32558 |32558 |1039382086565|
|11205 |11205 |532575944770 |
|171612|171612|1288490189195|
|210437|210437|8589935171   |
|212043|212043|360777253465 |
|232913|232913|120259085016 |
|235045|235045|541165880069 |
+------+------+-------------+
only showing top 10 rows



In [9]:
communities.select("id", "label").show()

+------+-------------+
|    id|        label|
+------+-------------+
|103368| 369367188488|
|103634| 257698038957|
|186264| 240518168944|
| 32558|1039382086565|
| 11205| 532575944770|
|171612|1288490189195|
|210437|   8589935171|
|212043| 360777253465|
|232913| 120259085016|
|235045| 541165880069|
| 81342|   8589935768|
| 82141| 188978562174|
|152121| 901943132412|
|208781| 446676599571|
| 47069| 936302870826|
| 73953| 240518169355|
|209191|1099511628402|
| 65039|  25769804925|
|104793|1030792152198|
|197235|1520418424093|
+------+-------------+
only showing top 20 rows



In [16]:
# Creates a new dataframe containing labels and how many nodes are in the specific labels
from pyspark.sql.functions import desc, col
CommunityCount = communities.groupBy(col("label")).count()
CommunityCount.orderBy(desc("count")).show(100, False)

+-------------+-----+
|label        |count|
+-------------+-----+
|1434519077722|278  |
|1494648620190|242  |
|1005022348562|197  |
|1331439862758|174  |
|128849019923 |165  |
|335007450316 |161  |
|352187318500 |158  |
|326417515741 |149  |
|1305670059036|135  |
|1443109012342|131  |
|68719477440  |125  |
|1606317769977|119  |
|1640677508178|118  |
|266287972517 |117  |
|1374389536000|115  |
|1219770712525|112  |
|1537598293221|111  |
|1065151890509|103  |
|1056561955933|100  |
|1030792151054|100  |
|515396076756 |99   |
|627065225252 |99   |
|1382979470271|98   |
|1623497638797|96   |
|472446403630 |94   |
|515396076031 |93   |
|1340029797568|90   |
|1434519077926|87   |
|1331439861899|87   |
|360777253917 |86   |
|472446403673 |83   |
|1245540516937|82   |
|25769804988  |77   |
|335007449928 |77   |
|1374389535013|75   |
|1632087573650|74   |
|1305670059250|73   |
|1185410974849|71   |
|927712936039 |71   |
|1271310320137|71   |
|403726927105 |70   |
|420906796105 |69   |
|283467841

In [17]:
# Look at which nodes are in the largest community
# We can maybe use this to try and describe the community later in the report? 
communities.filter(communities.label == 1434519077722).show()

+------+------+-------------+
|    id|  Node|        label|
+------+------+-------------+
| 55639| 55639|1434519077722|
|   241|   241|1434519077722|
|245357|245357|1434519077722|
|188556|188556|1434519077722|
|   486|   486|1434519077722|
| 29253| 29253|1434519077722|
|207494|207494|1434519077722|
|188857|188857|1434519077722|
|219267|219267|1434519077722|
|196911|196911|1434519077722|
|   250|   250|1434519077722|
|233496|233496|1434519077722|
|  1676|  1676|1434519077722|
| 34399| 34399|1434519077722|
| 32664| 32664|1434519077722|
| 53712| 53712|1434519077722|
|240786|240786|1434519077722|
|157591|157591|1434519077722|
| 15523| 15523|1434519077722|
| 13515| 13515|1434519077722|
+------+------+-------------+
only showing top 20 rows

