# SetUp

In [1]:
import os
import sys
import glob

os.environ['SPARK_HOME'] = '/home/chiara/Documenti/BigData/CountingTriangles/spark-3.5.0-bin-hadoop3'
os.environ['HADOOP_HOME'] = '/home/chiara/Documenti/BigData/CountingTriangles/spark-3.5.0-bin-hadoop3'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'
os.environ['SPARK_LOCAL_IP'] = '172.17.0.1'

In [2]:
spark_python = os.path.join(os.environ.get('SPARK_HOME',None),'python')
py4j = glob.glob(os.path.join(spark_python,'lib','py4j-*.zip'))[0]
graphf = glob.glob(os.path.join(spark_python,'graphframes.zip'))[0]
sparkmeasure = glob.glob(os.path.join(spark_python,'sparkmeasure.zip'))[0]
sys.path[:0]=[spark_python,py4j]
sys.path[:0]=[spark_python,graphf]
sys.path[:0]=[spark_python, sparkmeasure]
os.environ['PYTHONPATH']=py4j+os.pathsep+graphf+sparkmeasure

from graphframes import *
from pyspark.sql import SparkSession


In [3]:
from graphframes import *
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, Row
from pyspark.sql.functions import col
from sparkmeasure import StageMetrics, TaskMetrics
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import sum

In [4]:
import findspark
findspark.init()
findspark.find()

'/home/chiara/Documenti/BigData/CountingTriangles/spark-3.5.0-bin-hadoop3'

In [5]:
spark = (SparkSession.builder
    .appName('Counting Triangles')
    .config('spark.driver.extraClassPath', '/usr/local/bin/postgresql-42.2.5.jar')
    .config('spark.executor.memory', '8g')
    .config("spark.driver.memory", "8g")
    .config('spark.memory.offHeap.enabled', True)
    .config('spark.memory.offHeap.size', '20g') 
    .enableHiveSupport()
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/26 12:30:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Parsing Dataset

In [6]:
dataset = 'datasets/soc-Epinions1.txt'
with open(dataset,'r') as f:
    content = f.readlines()
edges_list = list(filter( lambda x: not x.startswith('#') ,content))


if 'facebook' in dataset:
    edges = list(map(lambda x: tuple(x.split(' ')), edges_list))
else:
    edges = list(map(lambda x: tuple(x.split('\t')), edges_list))

edges_tuples = list(map(lambda x: (int(x[0]), int(x[1].replace('\n',''))), edges))

In [7]:
list1, list2 = zip(*edges_tuples)
nodes = list(set(list1 + list2))
nodes_tuple = [Row(x) for x in nodes]

In [8]:
#get list of nodes, with columns renamed value and id
vertices = spark.createDataFrame(nodes, IntegerType())
vertices = vertices.withColumnRenamed('value','id')

#get edges such that the src node is always smaller then the dst node
edges_n = spark.createDataFrame(edges_tuples,["src", "dst"],IntegerType())
edges_inverted = edges_n.filter(edges_n.src > edges_n.dst)
edges_normal = edges_n.filter(edges_n.src < edges_n.dst)
edges_normal2 = edges_inverted.select(col('dst').alias('src'),col('src').alias('dst'))
edges_true = edges_normal.union(edges_normal2)
edges = edges_normal.union(edges_normal2).distinct()

### Check we read the right data

In [9]:
vertices.count()

                                                                                

75879

The edges used are not the same length as referred in the dataset page, this is because of the distinct() operation at the end of the selection of edges: we treat each graph as undirected and only consider edges where srcId < dstId, so when we encounter two edges of the kind (srcId, dstId) (dstId, srcId), with srcId < dstId or viceversa, we're going to keep just one edge.

In [10]:
edges_true.count()

                                                                                

508837

In [11]:
edges.count()

                                                                                

405740

# Query Implementation

In [12]:
stagemetrics = StageMetrics(spark)
taskmetrics = TaskMetrics(spark)
stagemetrics.begin()
taskmetrics.begin()

In [13]:
result = edges.alias("e1")\
    .join(edges.alias("e2"), col("e1.src") == col("e2.src")) \
    .join(edges.alias("e3"), (col("e1.dst") == col("e3.src")) & (col("e2.dst") == col("e3.dst"))) \
    .select(col("e1.src").alias("node1"), col("e1.dst").alias("node2"), col("e2.dst").alias("node3")).distinct()

In [14]:
result.count()

                                                                                

1624481

In [15]:
stagemetrics.end()

In [16]:
taskmetrics.end()

In [17]:
result.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[node1#68L, node2#69L, node3#70L], functions=[])
   +- HashAggregate(keys=[node1#68L, node2#69L, node3#70L], functions=[])
      +- Project [src#4L AS node1#68L, dst#5L AS node2#69L, dst#41L AS node3#70L]
         +- SortMergeJoin [dst#5L, dst#41L], [src#52L, dst#53L], Inner
            :- Sort [dst#5L ASC NULLS FIRST, dst#41L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(dst#5L, dst#41L, 200), ENSURE_REQUIREMENTS, [plan_id=1159]
            :     +- Project [src#4L, dst#5L, dst#41L]
            :        +- SortMergeJoin [src#4L], [src#40L], Inner
            :           :- Sort [src#4L ASC NULLS FIRST], false, 0
            :           :  +- Exchange hashpartitioning(src#4L, 200), ENSURE_REQUIREMENTS, [plan_id=1149]
            :           :     +- HashAggregate(keys=[src#4L, dst#5L], functions=[])
            :           :        +- Exchange hashpartitioning(src#4L, dst#5L, 200), EN

## Measure the performances

In [18]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12

Aggregated Spark stage metrics:
numStages => 6
numTasks => 53
elapsedTime => 35163 (35 s)
stageDuration => 34659 (35 s)
executorRunTime => 247540 (4,1 min)
executorCpuTime => 224467 (3,7 min)
executorDeserializeTime => 403 (0,4 s)
executorDeserializeCpuTime => 298 (0,3 s)
resultSerializationTime => 22 (22 ms)
jvmGCTime => 1074 (1 s)
shuffleFetchWaitTime => 7 (7 ms)
shuffleWriteTime => 4967 (5 s)
resultSize => 3694599 (3,5 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 14706526584
recordsRead => 0
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 91283228
shuffleTotalBlocksFetched => 604
shuffleLocalBlocksFetched => 604
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 936325617 (892,9 MB)
shuffleLocalBytesRead => 936325617 (892,9 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
sh

In [19]:
taskmetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12

Aggregated Spark task metrics:
numTasks => 53
successful tasks => 53
speculative tasks => 0
taskDuration => 248697 (4,1 min)
schedulerDelayTime => 437 (0,4 s)
executorRunTime => 247540 (4,1 min)
executorCpuTime => 224445 (3,7 min)
executorDeserializeTime => 403 (0,4 s)
executorDeserializeCpuTime => 275 (0,3 s)
resultSerializationTime => 22 (22 ms)
jvmGCTime => 1074 (1 s)
shuffleFetchWaitTime => 7 (7 ms)
shuffleWriteTime => 4950 (5 s)
gettingResultTime => 295 (0,3 s)
resultSize => 1291046 (1260,8 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 14706526584
recordsRead => 0
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 91283228
shuffleTotalBlocksFetched => 604
shuffleLocalBlocksFetched => 604
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 936325617 (892,9 MB)
shuffleLocalBytesRead => 936325617 (892,9 MB)
shuffleRe

In [21]:
stagemetrics.print_memory_report()


Additional stage-level executor metrics (memory usage info):

Stage 12 JVMHeapMemory maxVal bytes => 2007194112 (1914,2 MB)
Stage 12 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 14 JVMHeapMemory maxVal bytes => 2007194112 (1914,2 MB)
Stage 14 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 17 JVMHeapMemory maxVal bytes => 2007194112 (1914,2 MB)
Stage 17 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 20 JVMHeapMemory maxVal bytes => 2007194112 (1914,2 MB)
Stage 20 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 24 JVMHeapMemory maxVal bytes => 1734564352 (1654,2 MB)
Stage 24 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 29 JVMHeapMemory maxVal bytes => 1734564352 (1654,2 MB)
Stage 29 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)


# Confront with another implementation

In [22]:
graph = GraphFrame(vertices,edges)

In [23]:
stagemetrics = StageMetrics(spark)
taskmetrics = TaskMetrics(spark)
stagemetrics.begin()
taskmetrics.begin()

In [24]:
triangles = graph.triangleCount()

In [25]:
triangle_count = triangles.select(sum("count")/3)
triangle_count.select(col('(sum(count) / 3)').cast(IntegerType()).alias('count')).show()

                                                                                

+-------+
|  count|
+-------+
|1624481|
+-------+



In [26]:
stagemetrics.end()
taskmetrics.end()

In [27]:
triangles.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [CASE WHEN isnull(count#182L) THEN 0 ELSE count#182L END AS count#192L, id#2]
   +- SortMergeJoin [id#2], [id#178], LeftOuter
      :- Sort [id#2 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#2, 200), ENSURE_REQUIREMENTS, [plan_id=3731]
      :     +- Project [value#0 AS id#2]
      :        +- Scan ExistingRDD[value#0]
      +- Sort [id#178 ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[id#178], functions=[count(1)])
            +- Exchange hashpartitioning(id#178, 200), ENSURE_REQUIREMENTS, [plan_id=3727]
               +- HashAggregate(keys=[id#178], functions=[partial_count(1)])
                  +- Filter isnotnull(id#178)
                     +- Generate explode(array(a#99.id, b#101.id, c#122.id)), false, [id#178]
                        +- Project [a#99, b#101, c#122]
                           +- SortMergeJoin [cast(a#99.id as bigint), cast(c#122.id as bigint)], [__tmp-104388609

In [28]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12

Aggregated Spark stage metrics:
numStages => 12
numTasks => 89
elapsedTime => 13481 (13 s)
stageDuration => 16382 (16 s)
executorRunTime => 126532 (2,1 min)
executorCpuTime => 96943 (1,6 min)
executorDeserializeTime => 441 (0,4 s)
executorDeserializeCpuTime => 332 (0,3 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 916 (0,9 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 6166 (6 s)
resultSize => 395143 (385,9 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 7961934704
recordsRead => 0
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 24596730
shuffleTotalBlocksFetched => 533
shuffleLocalBlocksFetched => 533
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 289217294 (275,8 MB)
shuffleLocalBytesRead => 289217294 (275,8 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shu

In [29]:
taskmetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12

Aggregated Spark task metrics:
numTasks => 89
successful tasks => 89
speculative tasks => 0
taskDuration => 127481 (2,1 min)
schedulerDelayTime => 508 (0,5 s)
executorRunTime => 126532 (2,1 min)
executorCpuTime => 96907 (1,6 min)
executorDeserializeTime => 441 (0,4 s)
executorDeserializeCpuTime => 292 (0,3 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 916 (0,9 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 6127 (6 s)
gettingResultTime => 0 (0 ms)
resultSize => 395143 (385,9 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 7961934704
recordsRead => 0
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 24596730
shuffleTotalBlocksFetched => 533
shuffleLocalBlocksFetched => 533
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 289217294 (275,8 MB)
shuffleLocalBytesRead => 289217294 (275,8 MB)
shuffleRemoteByte

In [30]:
stagemetrics.print_memory_report()


Additional stage-level executor metrics (memory usage info):

Stage 30 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 30 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 31 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 31 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 32 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 32 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 34 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 34 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 35 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 35 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 37 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 37 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 40 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 40 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 44 JVMHeapMemory maxVal bytes => 2426059776 (2,3 GB)
Stage 44 OnHeapExecutionMemory maxVal bytes => 0 (0 

# Stop session

In [31]:
spark.stop()