Dataset: https://snap.stanford.edu/data/soc-Epinions1.html

In [None]:
!rm -rf spark-3.1.1-bin-hadoop3.2

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q findspark pyspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
!ls

sample_data


In [None]:
!pip show pyspark

Name: pyspark
Version: 3.5.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: 


In [None]:
!pip install graphframes



In [None]:
!python -V

Python 3.10.12


In [None]:
!curl -L -o "/usr/local/lib/python3.10/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar" https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.1-s_2.12/graphframes-0.8.2-spark3.1-s_2.12.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  242k  100  242k    0     0  1008k      0 --:--:-- --:--:-- --:--:-- 1008k


In [None]:
!ls /usr/local/lib/python3.10/dist-packages/pyspark/jars/

activation-1.1.1.jar
aircompressor-0.25.jar
algebra_2.12-2.0.1.jar
annotations-17.0.0.jar
antlr4-runtime-4.9.3.jar
antlr-runtime-3.5.2.jar
aopalliance-repackaged-2.6.1.jar
arpack-3.0.3.jar
arpack_combined_all-0.1.jar
arrow-format-12.0.1.jar
arrow-memory-core-12.0.1.jar
arrow-memory-netty-12.0.1.jar
arrow-vector-12.0.1.jar
audience-annotations-0.5.0.jar
avro-1.11.2.jar
avro-ipc-1.11.2.jar
avro-mapred-1.11.2.jar
blas-3.0.3.jar
bonecp-0.8.0.RELEASE.jar
breeze_2.12-2.1.0.jar
breeze-macros_2.12-2.1.0.jar
cats-kernel_2.12-2.1.1.jar
chill_2.12-0.10.0.jar
chill-java-0.10.0.jar
commons-cli-1.5.0.jar
commons-codec-1.16.0.jar
commons-collections-3.2.2.jar
commons-collections4-4.4.jar
commons-compiler-3.1.9.jar
commons-compress-1.23.0.jar
commons-crypto-1.1.0.jar
commons-dbcp-1.4.jar
commons-io-2.13.0.jar
commons-lang-2.6.jar
commons-lang3-3.12.0.jar
commons-logging-1.1.3.jar
commons-math3-3.6.1.jar
commons-pool-1.5.4.jar
commons-text-1.10.0.jar
compress-lzf-1.1.2.jar
curator-client-2.13.0.jar
cur

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars", "/usr/local/lib/python3.9/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar") \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)  # Property used to format output tables better\


# Dataset_location

In [None]:
input_dataset_path = '/content/sample_data/soc-Epinions1.txt'

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType

edge_schema = StructType([
    StructField("FromNodeId", IntegerType(), True),
    StructField("ToNodeId", IntegerType(), True)
])

edges = spark.read.csv(input_dataset_path, sep='\t', schema=edge_schema, header=True)
edges = edges.rdd.zipWithIndex().filter(lambda x: x[1] >= 3).map(lambda x: x[0]).toDF()
edges

FromNodeId,ToNodeId
0,4
0,5
0,7
0,8
0,9
0,10
0,11
0,12
0,13
0,14


In [None]:
from pyspark.sql.functions import col, concat_ws

vertex_schema = StructType([
    StructField("id", IntegerType(), True)
])

src_vertices = edges.select(col("FromNodeId").alias("id"))
dst_vertices = edges.select(col("ToNodeId").alias("id"))

vertices = src_vertices.union(dst_vertices).distinct()

vertices = vertices.select('id').distinct()
vertices = vertices.selectExpr("id as id")
vertices = vertices.selectExpr("cast(id as int) as id")

vertices.show()

+-----+
|   id|
+-----+
|   26|
|   29|
| 2040|
| 2250|
| 2453|
| 2509|
| 2927|
|  964|
|22129|
| 1677|
| 5385|
| 1950|
| 6721|
| 1806|
| 7225|
|50124|
| 4894|
| 1697|
| 8440|
| 7747|
+-----+
only showing top 20 rows



In [None]:
from graphframes import *
from graphframes import GraphFrame

In [None]:
print('PySpark Version :'+spark.version)
print('PySpark Version :'+spark.sparkContext.version)

PySpark Version :3.5.0
PySpark Version :3.5.0


In [None]:
# Rename columns in edges DataFrame to match the expected schema.
edges = edges.withColumnRenamed('FromNodeId', 'src').withColumnRenamed('ToNodeId', 'dst')

In [None]:
display(vertices)
type(vertices)

id
26
29
2040
2250
2453
2509
2927
964
22129
1677


pyspark.sql.dataframe.DataFrame

In [None]:
g = GraphFrame(vertices, edges)



In [None]:
g.outDegrees.show()



+-----+---------+
|   id|outDegree|
+-----+---------+
|   26|      197|
|   29|      125|
| 2040|      120|
| 2250|       36|
| 2453|       45|
| 2509|      189|
| 2927|       16|
|  964|       54|
|22129|        6|
| 1677|      206|
| 5385|       42|
| 1950|        4|
| 6721|       10|
| 1806|       70|
| 7225|        2|
|50124|        2|
| 4894|       21|
| 1697|       30|
| 8440|        6|
| 7747|        5|
+-----+---------+
only showing top 20 rows



In [None]:
g.inDegrees.show()

+-----+--------+
|   id|inDegree|
+-----+--------+
|   26|     226|
|   29|     369|
| 2040|     154|
| 2250|      38|
| 2453|      37|
| 2509|      60|
| 2927|      14|
|  964|      13|
|22129|       7|
| 1677|      65|
| 5385|      21|
| 1950|      20|
| 6721|       8|
| 1806|      74|
| 7747|       5|
| 9458|      24|
| 2529|      10|
| 2214|      21|
| 7279|       1|
|11945|       9|
+-----+--------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import asc, desc

## A. Find the top 5 nodes with the highest outdegree and find the count of the number of outgoing edges in each


In [None]:
out_degree = g.outDegrees.orderBy(desc("OutDegree")).limit(5)
out_degree.show()

+-----+---------+
|   id|outDegree|
+-----+---------+
|  645|     1801|
|  763|     1669|
|  634|     1621|
|71399|     1128|
| 3924|      976|
+-----+---------+



## B.  Find the top 5 nodes with the highest indegree and find the count of the number of incoming edges in each


In [None]:
in_degree = g.inDegrees.orderBy(desc("inDegree")).limit(5)
in_degree.show()

+---+--------+
| id|inDegree|
+---+--------+
| 18|    3035|
|143|    1521|
|737|    1317|
|790|    1284|
|136|    1180|
+---+--------+



In [None]:
from pyspark.sql.functions import col

## C. Calculate PageRank for each of the nodes and output the top 5 nodes with the highest PageRank values. You are free to define any suitable parameters.


In [None]:
pagerank = g.pageRank(resetProbability=0.15, tol=0.01).vertices
top_pagerank = pagerank.sort(col("pagerank").desc()).limit(5)
top_pagerank.show()

+----+------------------+
|  id|          pagerank|
+----+------------------+
|  18|337.43855026548573|
| 737|234.39478550925054|
| 118|157.84625437275847|
|1719|154.72317556830524|
| 136|147.77563262328627|
+----+------------------+



## D. Run the connected components algorithm on it and the top 5 components with the largest number of nodes.

In [None]:
sc = spark.sparkContext
sc.setCheckpointDir("/tmp")

connected_components = g.connectedComponents()

component_sizes = connected_components.groupBy("component").count()

sorted_components = component_sizes.sort(col("count").desc())

top_components = sorted_components.limit(5)
top_components.show()

+---------+-----+
|component|count|
+---------+-----+
|        0|75877|
|    71749|    2|
+---------+-----+



## E. Run the triangle counts algorithm on each of the vertices and output the top 5 vertices with the largest triangle count. In case of ties, you can randomly select the top 5 vertices.

In [None]:
results = g.triangleCount()

sorted_results = results.select("id", "count").sort(col("count").desc())

top_vertices = sorted_results.limit(5)
top_vertices.show()

+---+-----+
| id|count|
+---+-----+
|645|48674|
| 18|47203|
| 27|25817|
|634|25230|
| 44|24752|
+---+-----+



## Writing output to specified file

In [None]:
output_file_path = 'combined_output.txt'

In [None]:
with open(output_file_path, 'w') as file:
    file.write("Top 5 nodes with highest outdegree:\n")
    file.write(out_degree.toPandas().to_string(index=False))
    file.write("\n\n")

    file.write("Top 5 nodes with highest indegree:\n")
    file.write(in_degree.toPandas().to_string(index=False))
    file.write("\n\n")

    file.write("Top 5 nodes with highest PageRank values:\n")
    file.write(top_pagerank.toPandas().to_string(index=False))
    file.write("\n\n")

    file.write("Top 5 components with largest number of nodes:\n")
    file.write(top_components.toPandas().to_string(index=False))
    file.write("\n\n")

    file.write("Top 5 vertices with largest triangle count:\n")
    file.write(top_vertices.toPandas().to_string(index=False))
    file.write("\n\n")