<a href="https://colab.research.google.com/github/HoarfrostRaven/BigData/blob/main/BGProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prepare the dataset

In [1]:
!wget https://snap.stanford.edu/data/web-Google.txt.gz
!gzip -d web-Google.txt.gz

--2025-03-28 22:21:25--  https://snap.stanford.edu/data/web-Google.txt.gz
Resolving snap.stanford.edu (snap.stanford.edu)... 171.64.75.80
Connecting to snap.stanford.edu (snap.stanford.edu)|171.64.75.80|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21168784 (20M) [application/x-gzip]
Saving to: ‘web-Google.txt.gz’


2025-03-28 22:21:26 (18.1 MB/s) - ‘web-Google.txt.gz’ saved [21168784/21168784]

gzip: web-Google.txt already exists; do you wish to overwrite (y or n)? n
	not overwritten


In [2]:
! pwd

/content


In [3]:
! ls

sample_data  web-Google.txt  web-Google.txt.gz


In [4]:
# !pip install pyspark
from pyspark import SparkConf
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [5]:
# Load the file, ignore first 4 lines and split the data
data = (
    sc.textFile("/content/web-Google.txt")
    .filter(lambda line: line.strip() and not line.startswith('#'))
    .map(lambda line: tuple(map(int, line.split())))
)

In [6]:
print(data.take(5))

[(0, 11342), (0, 824020), (0, 867923), (0, 891835), (11342, 0)]


# CCF-Iterate

In [7]:
def connected_components_ccf(data):
    """
    Computes connected components using the CCF algorithm.

    Args:
        data: An RDD of edges represented as tuples (node1, node2).
              Example: sc.parallelize([(1, 2), (2, 3), (2, 4), (4, 5), (6, 7), (7, 8)])

    Returns:
        An RDD of edges representing the connected components.
        Example: sc.parallelize([(2, 1), (3, 1), (4, 1), (5, 1), (7, 6), (8, 6)])
    """
    # Initialize with bidirectional edges and remove duplicates
    edges = data.flatMap(lambda x: [(x[0], x[1]), (x[1], x[0])]).distinct().cache()

    converged = False
    iteration = 0
    while not converged:
        iteration += 1
        print(f"--- Iteration {iteration} ---")

        # Keep only entries where the min neighbor is smaller than the node itself
        min_values = edges.filter(lambda x: x[1] < x[0])
        # Compute the minimum neighbor for each node from current edges
        min_values = min_values.reduceByKey(min).cache()

        # Generate new edges by propagating the minimum values through the graph
        new_created_edges = min_values.join(edges).filter(lambda x: x[1][0] != x[1][1]) \
                                      .map(lambda x: (x[1][1], x[1][0])).cache()

        # Check if no new edges are created, indicating convergence
        if new_created_edges.isEmpty():
            converged = True
        else:
            # Update edges to include new edges and their reverse, maintaining bidirectionality
            edges = min_values.union(new_created_edges) \
                              .flatMap(lambda x: [(x[0], x[1]), (x[1], x[0])]) \
                              .cache()

    # Return the min_values as the result, representing each node's root
    return min_values

In [None]:
from pyspark import StorageLevel

def connected_components_ccf(data):
    """
    Computes connected components using the CCF algorithm.

    Args:
        data: An RDD of edges represented as tuples (node1, node2).
              Example: sc.parallelize([(1, 2), (2, 3), (2, 4), (4, 5), (6, 7), (7, 8)])

    Returns:
        An RDD of edges representing the connected components.
        Example: sc.parallelize([(2, 1), (3, 1), (4, 1), (5, 1), (7, 6), (8, 6)])
    """

    # Set partitions to 2-3x total cores (adjust based on cluster size)
    num_partitions = 200

    # Initialize bidirectional edges with partition optimization
    edges = (data.flatMap(lambda x: [(x[0], x[1]), (x[1], x[0])])
             .distinct()
             .repartition(num_partitions)
             .persist(StorageLevel.MEMORY_AND_DISK))  # Spill to disk if OOM

    converged = False
    iteration = 0
    prev_edges = None  # Track previous iteration's data

    while not converged:
        iteration += 1
        print(f"--- Iteration {iteration} ---")

        # Filter first to reduce data volume before reduce
        filtered = edges.filter(lambda x: x[1] < x[0]).cache()

        # Compute minimum neighbors with partition preservation
        min_values = filtered.reduceByKey(min, numPartitions=num_partitions).cache()
        filtered.unpersist()  # Release intermediate data immediately

        # Join with partition alignment to minimize shuffle
        new_edges = (min_values.join(edges, numPartitions=num_partitions)
                     .filter(lambda x: x[1][0] != x[1][1])  # Remove self-edges
                     .map(lambda x: (x[1][1], x[1][0]))     # Remap edges
                     .cache())

        if new_edges.isEmpty():
            converged = True
            result = min_values
        else:
            # Release data from two iterations back
            if prev_edges is not None:
                prev_edges.unpersist()

            # Track current edges for future cleanup
            prev_edges = edges

            # Update edges with partition optimization
            edges = (min_values.union(new_edges)
                     .flatMap(lambda x: [(x[0], x[1]), (x[1], x[0])])
                     .repartition(num_partitions)
                     .persist(StorageLevel.MEMORY_AND_DISK))

            # Release current iteration's intermediates
            min_values.unpersist()
            new_edges.unpersist()

    # Final cleanup
    edges.unpersist()
    return result

In [8]:
# # Example data for testing
# test_data = sc.parallelize([(1, 2), (2, 3), (2, 4), (4, 5), (6, 7), (7, 8)])

# # Run the connected_components_ccf algorithm
# result = connected_components_ccf(test_data)

# # Collect and print the result
# print("Connected components:", result.collect())

In [9]:
# Compute connected components using the CCF algorithm
connected_components = connected_components_ccf(data)

# Print the results
print(connected_components.collect())

--- Iteration 1 ---
--- Iteration 2 ---
--- Iteration 3 ---
--- Iteration 4 ---
--- Iteration 5 ---


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 325 in stage 39.0 failed 1 times, most recent failure: Lost task 325.0 in stage 39.0 (TID 693) (2d56eda58160 executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
	at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
	at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:278)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:244)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1349/0x00000008408ebc40.apply(Unknown Source)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
	at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
	at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:278)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:244)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1349/0x00000008408ebc40.apply(Unknown Source)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
