In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum as _sum, collect_list, size, explode
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = (SparkSession.builder.appName("pageRank4_enwiki_task3")
         .config("spark.driver.memory", "30g")  # Sets the Spark driver memory to 30GB
         .config("spark.executor.memory", "30g")  # Sets the Spark executor memory to 30GB
         .config("spark.executor.cores", "5")  # Sets the number of cores for each executor to 5
         .config("spark.task.cpus", "1")  # Sets the number of cpus per task to be 1
         .config("spark.eventLog.enabled", "true")
         .config("spark.eventLog.dir", "/mnt/data/spark-event-logs")
         .config("spark.local.dir", "/mnt/data/temp") 
         .master("spark://10.10.1.1:7077")  
         .getOrCreate())

# Load data
schema = StructType([
    StructField("page", StringType(), True),
    StructField("link", StringType(), True)
])
df = spark.read.csv("hdfs://10.10.1.1:9000/data/enwiki-pages-articles", sep="\t", schema=schema)
num_partitions = df.rdd.getNumPartitions()
print(f"Number of partitions: {num_partitions}")

# Initialize page ranks
pages = df.select("page").distinct()
links = df.groupBy("page").agg(collect_list("link").alias("links")).cache()
ranks = pages.select("page", lit(1).alias("rank"))

# Calculate PageRank
for iteration in range(4):
    contributions = links.join(ranks, "page").select("links", (col("rank") / size("links")).alias("contribution"))
    
    num_partitions = contributions.rdd.getNumPartitions()
    print(f"Number of partitions: {num_partitions}")
    
    contributions = contributions.withColumn("link", explode("links")).select("link", "contribution")
    
    ranks = contributions.groupBy("link").agg(_sum("contribution").alias("sum_contributions"))
    ranks = ranks.select(col("link").alias("page"), (lit(0.15) + lit(0.85) * col("sum_contributions")).alias("rank"))
    
# Sort the ranks in descending order
ranks = ranks.orderBy(col("rank").desc())

# Save results to HDFS
ranks.write.format("csv").mode("overwrite").save("hdfs://10.10.1.1:9000/data/pageRank_enwiki_task3_res")

spark.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/08 14:56:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/08 14:56:06 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
Number of partitions: 89




Number of partitions: 200




Number of partitions: 200




Number of partitions: 200




Number of partitions: 200


Py4JJavaError: An error occurred while calling o255.save.
: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot delete /data/pageRank_enwiki_task3_res. Name node is in safe mode.
Resources are low on NN. Please add or free up more resourcesthen turn off safe mode manually. NOTE:  If you turn off safe mode before adding resources, the NN will immediately return to safe mode. Use "hdfs dfsadmin -safemode leave" to turn safe mode off. NamenodeHostName:node0-link-1
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.newSafemodeException(FSNamesystem.java:1612)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkNameNodeSafeMode(FSNamesystem.java:1599)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3297)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1136)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:727)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
	at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1664)
	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:992)
	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:989)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:999)
	at org.apache.spark.internal.io.FileCommitProtocol.deleteWithJob(FileCommitProtocol.scala:183)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.deleteMatchingPartitions(InsertIntoHadoopFsRelationCommand.scala:234)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:128)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot delete /data/pageRank_enwiki_task3_res. Name node is in safe mode.
Resources are low on NN. Please add or free up more resourcesthen turn off safe mode manually. NOTE:  If you turn off safe mode before adding resources, the NN will immediately return to safe mode. Use "hdfs dfsadmin -safemode leave" to turn safe mode off. NamenodeHostName:node0-link-1
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.newSafemodeException(FSNamesystem.java:1612)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkNameNodeSafeMode(FSNamesystem.java:1599)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3297)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1136)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:727)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612)
	at org.apache.hadoop.ipc.Client.call(Client.java:1558)
	at org.apache.hadoop.ipc.Client.call(Client.java:1455)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
	at com.sun.proxy.$Proxy34.delete(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:655)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at com.sun.proxy.$Proxy35.delete(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1662)
	... 47 more


In [3]:
spark.stop()

# Benchmark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum as _sum, collect_list, size, explode
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = (SparkSession.builder.appName("pageRank4_enwiki_task3")
         .config("spark.driver.memory", "30g")  # Sets the Spark driver memory to 30GB
         .config("spark.executor.memory", "30g")  # Sets the Spark executor memory to 30GB
         .config("spark.executor.cores", "5")  # Sets the number of cores for each executor to 5
         .config("spark.task.cpus", "1")  # Sets the number of cpus per task to be 1
         .config("spark.eventLog.enabled", "true")
         .config("spark.eventLog.dir", "/mnt/data/spark-event-logs")
         .config("spark.local.dir", "/mnt/data/temp") 
         .master("spark://10.10.1.1:7077")  
         .getOrCreate())

# Load data
schema = StructType([
    StructField("page", StringType(), True),
    StructField("link", StringType(), True)
])
df = spark.read.csv("hdfs://10.10.1.1:9000/data/enwiki-pages-articles", sep="\t", schema=schema)

# Initialize page ranks
pages = df.select("page").distinct()
links = df.groupBy("page").agg(collect_list("link").alias("links"))
ranks = pages.select("page", lit(1).alias("rank"))

# Calculate PageRank
for iteration in range(4):
    contributions = links.join(ranks, "page").select("links", (col("rank") / size("links")).alias("contribution"))
    contributions = contributions.withColumn("link", explode("links")).select("link", "contribution")
    
    ranks = contributions.groupBy("link").agg(_sum("contribution").alias("sum_contributions"))
    ranks = ranks.select(col("link").alias("page"), (lit(0.15) + lit(0.85) * col("sum_contributions")).alias("rank"))
    
# Sort the ranks in descending order
ranks = ranks.orderBy(col("rank").desc())

# Save results to HDFS
ranks.write.format("csv").mode("overwrite").save("hdfs://10.10.1.1:9000/data/pageRank_enwiki_task1_res")

spark.stop()