## Connected Components on MapReduce

The goal of this section is to identify the connected components in the graph, while filtering the data based on a specific time interval. It focuses on identifying the connected components, calculating the total number of flights (edges) during that period, and determining the unique airports (nodes) involved.

Additionally, We provide the number of connected components identified, along with their sizes. We also identify the airports belonging to the largest component.

The Union-Find algorithm was used, and it is based on two main operations:

- **Find:** This operation helps figure out the representative of the set that a particular node belongs to. If two nodes have the same representative, it means they're part of the same connected component. 

```python
       def find(node):
           if node not in components:
               components[node] = node
           if components[node] != node:
               components[node] = find(components[node])
           return components[node]
```

- **Union**This operation connects two nodes and their respective components. If the nodes belong to different components, the operation joins them into a new component
       
                                  
```python
def union(node1, node2):
    root1, root2 = find(node1), find(node2)  # Trova i rappresentanti dei due nodi
    if root1 != root2:  # Se sono in componenti diverse
        components[root2] = root1  # Unisce le componenti
```
The algorithm stops when it reaches convergence, meaning when the component mapping (prev_components) matches the one obtained in the current iteration (curr_components). This indicates that all connections between nodes have been propagated, and no further changes need to be made.
In the following section, we introduce the implementation using GraphFrame that utilizes predefined algorithms (such as Connected Components)



In [None]:
from auxiliary_files.ConnectedComponents import *
csv = '/content/drive/MyDrive/ADM-HW5/archive_usa_airport/flights_final.csv'

# DATES must be in the format YYYY-MM-DD between 1990 and 2009
start_date = '1990-01-01'
end_date = '2005-12-31'

connections = find_connected_connections_spark(csv, start_date, end_date)



Analyzing flight network from 1990-01-01 to 2005-12-31
Maximum iterations set to: 10
Loading and filtering flight network data...
Total flights in the period: 2699030
Unique airports: 663
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Converged at iteration 5


In [None]:
connections

{'number_of_components': 1,
 'component_sizes': [663],
 'largest_component_size': 663}

## GraphFrames implementation

In [None]:
from graphframes import GraphFrame

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from graphframes import GraphFrame
import time

def find_connected_connections_graph(csv_path: str, start_date: str, end_date: str) -> dict:


    # Initialize the Spark session
    spark = SparkSession.builder \
        .appName("GraphFramesExample") \
        .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
        .getOrCreate()

    # Set checkpoint directory for GraphFrames
    spark.sparkContext.setCheckpointDir("/tmp/checkpoints")

    print(f"Analyzing flight network from {start_date} to {end_date}")

    # Load and filter flight network data
    print("Loading and filtering flight network data...")
    df = spark.read.csv(csv_path, header=True, inferSchema=True)

    # Convert "Fly_date" column to date and apply the date range filter
    df = df.withColumn("Fly_date", col("Fly_date").cast("date"))
    df = df.filter((col("Fly_date") >= start_date) & (col("Fly_date") <= end_date))
    df = df.select("Origin_airport", "Destination_airport", "Fly_date")

    # Create a DataFrame of unique airports (vertices)
    unique_airports = df.select("Origin_airport").distinct().union(df.select("Destination_airport").distinct()).distinct()
    vertices = unique_airports.withColumnRenamed("Origin_airport", "id")

    # Create edges DataFrame with source (src) and destination (dst)
    edges = df.withColumnRenamed("Origin_airport", "src").withColumnRenamed("Destination_airport", "dst")

    # Create GraphFrame
    g = GraphFrame(vertices, edges)

    # Finding connected components
    print("Finding connected components...")
    result = g.connectedComponents()

    # Calculate component sizes
    component_sizes = result.groupBy("component").count().collect()
    sizes = [row["count"] for row in component_sizes]

    # Find the largest component
    largest_component_id = max(component_sizes, key=lambda x: x["count"])["component"]
    largest_component = result.filter(result.component == largest_component_id).select("id").rdd.map(lambda x: x[0]).collect()


    final_result = {
        'number_of_components': len(component_sizes),
        'component_sizes': sizes,
        'largest_component_size': max(sizes),

    }

    # Stop the Spark session
    spark.stop()

    return final_result

def process_in_parallel(csv_path: str, start_date: str, end_date: str, num_partitions: int = None):
    """
    Wrapper function to handle the parallel processing with configurable partitions
    """
    try:
        # Initialize the Spark session and configure it (no need for SparkContext directly)
        spark = SparkSession.builder \
            .appName("ParallelConnectedComponents") \
            .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
            .getOrCreate()

        if num_partitions:
            spark.conf.set("spark.default.parallelism", str(num_partitions))

        # Run the analysis using the graph-based method
        result = find_connected_connections_graph(csv_path, start_date, end_date)

        return result

    except Exception as e:
        print(f"Error in parallel processing: {str(e)}")
        raise
    finally:
        # Stop the Spark session
        spark.stop()


In [29]:
from auxiliary_files.ConnectedComponents import *
csv = '/content/drive/MyDrive/ADM-HW5/archive_usa_airport/flights_final.csv'

start_date = '1990-01-01'
end_date = '2005-12-31'
connections_GraphFrame = find_connected_connections_graph(csv, start_date, end_date)


Analyzing flight network from 1990-01-01 to 2005-12-31
Loading and filtering flight network data...
Finding connected components...


In [30]:
connections_GraphFrame

{'number_of_components': 1,
 'component_sizes': [663],
 'largest_component_size': 663}

## Execution Time comparison


The temporal analysis highlighted significant performance differences between the RDD-based implementation and the GraphFrames implementation for identifying connected components in a flight graph. The RDD approach proved to be considerably faster, with a total time of  91.73 seconds compared to 316.58 seconds for GraphFrames, showing an advantage of 224.85 seconds

This result seems to be justified by various factors: 
- the **`setup_time`** of RDD (without GraphFrames) was **extremely faster**, the data loading took **9.36 seconds less**

- the **``processing_time`** itself with the Union-Find algorithm was more efficient algorithm used by GraphFrames, saving 85.88 seconds.Maybe this is due to the iterative nature of the **connected components** algorithm used by GraphFrames. Initially, each vertex is assigned a unique component ID.Each vertex must repeatedly propagate its component ID to neighbors, updating to the lowest ID found. This process may require many iterations to achieve convergence. 

- Additionally, the **`component_analysis`** with RDD showed a significantly reduced time, with an advantage of **93.30  seconds** over GraphFrames.

- Overall, the RDD implementation confirmed greater optimization.


GraphFrames. "GraphFrame.connectedComponents." GraphFrames Documentation, n.d., https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#graphframes.GraphFrame.connectedComponents. Accessed 23 Dec. 2024.

In [33]:
import time
from typing import Dict, Any
from dataclasses import dataclass
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from graphframes import GraphFrame

@dataclass
class TimingMetrics:
    setup_time: float = 0
    data_loading_time: float = 0
    processing_time: float = 0
    component_analysis_time: float = 0
    total_time: float = 0

def measure_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        return result, end_time - start_time
    return wrapper

def find_connected_connections_with_timing(csv_path: str, start_date: str, end_date: str) -> Dict[str, Any]:
    metrics = TimingMetrics()
    total_start = time.time()

    # Setup timing
    setup_start = time.time()
    spark = SparkSession.builder.appName("ConnectedComponents").getOrCreate()
    metrics.setup_time = time.time() - setup_start

    # Data loading timing
    loading_start = time.time()
    df = spark.read.csv(csv_path, header=True)
    df = df.filter((df.Fly_date >= start_date) & (df.Fly_date <= end_date))
    df = df.select("Origin_airport", "Destination_airport", "Fly_date")
    metrics.data_loading_time = time.time() - loading_start

    # Processing timing
    processing_start = time.time()

    def emit_bidirectional_edges(row):
        origin = row.Origin_airport
        dest = row.Destination_airport
        return [(origin, dest), (dest, origin)]


    edges_rdd = df.rdd.flatMap(emit_bidirectional_edges).distinct()


    def update_components(edges):
        components = {}

        def find(node):
            if node not in components:
                components[node] = node
            if components[node] != node:
                components[node] = find(components[node])
            return components[node]

        def union(node1, node2):
            root1, root2 = find(node1), find(node2)
            if root1 != root2:
                components[root2] = root1


        for origin, dest in edges:
            union(origin, dest)


        return [(node, find(node)) for node in components.keys()]


    components_rdd = edges_rdd.mapPartitions(lambda x: update_components(x))


    prev_components = None
    curr_components = components_rdd.collectAsMap()

    iteration = 1
    while prev_components != curr_components and iteration <= 10:
        print(f"Iteration {iteration}")
        prev_components = curr_components
        components_rdd = edges_rdd.map(
            lambda edge: (edge[1], curr_components.get(edge[0]))
        ).filter(
            lambda x: x[1] is not None
        ).reduceByKey(
            lambda x, y: min(x, y)
        )

        curr_components = components_rdd.collectAsMap()
        iteration += 1


    if prev_components == curr_components:
        print(f"Converged at iteration {iteration - 1}")
    else:
        print("Reached maximum iterations without convergence.")



    metrics.processing_time = time.time() - processing_start

    # Component analysis timing
    analysis_start = time.time()
    final_components_df = spark.createDataFrame(
        components_rdd.map(lambda x: (x[0], x[1])).collect(),
        ["node", "component_id"]
    )

    component_sizes = final_components_df.groupBy("component_id").count().collect()
    sizes = [row["count"] for row in component_sizes]

    largest_component_id = max(component_sizes, key=lambda x: x["count"])["component_id"]
    largest_component = final_components_df.filter(
        final_components_df.component_id == largest_component_id
    ).select("node").rdd.map(lambda x: x[0]).collect()
    metrics.component_analysis_time = time.time() - analysis_start

    metrics.total_time = time.time() - total_start

    result = {
        'number_of_components': len(component_sizes),
        'component_sizes': sizes,
        'largest_component_size': max(sizes),
        'timing_metrics': {
            'setup_time': metrics.setup_time,
            'data_loading_time': metrics.data_loading_time,
            'processing_time': metrics.processing_time,
            'component_analysis_time': metrics.component_analysis_time,
            'total_time': metrics.total_time
        }
    }

    spark.stop()
    return result

def find_connected_connections_graph_with_timing(csv_path: str, start_date: str, end_date: str) -> Dict[str, Any]:
    metrics = TimingMetrics()
    total_start = time.time()

    # Setup timing
    setup_start = time.time()
    spark = SparkSession.builder \
        .appName("GraphFramesExample") \
        .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
        .getOrCreate()
    spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
    metrics.setup_time = time.time() - setup_start

    # Data loading timing
    loading_start = time.time()
    df = spark.read.csv(csv_path, header=True, inferSchema=True)
    df = df.withColumn("Fly_date", col("Fly_date").cast("date"))
    df = df.filter((col("Fly_date") >= start_date) & (col("Fly_date") <= end_date))
    df = df.select("Origin_airport", "Destination_airport", "Fly_date")
    metrics.data_loading_time = time.time() - loading_start

    # Processing timing
    processing_start = time.time()
    vertices = df.select("Origin_airport").distinct().union(
        df.select("Destination_airport").distinct()
    ).distinct().withColumnRenamed("Origin_airport", "id")

    edges = df.withColumnRenamed("Origin_airport", "src") \
              .withColumnRenamed("Destination_airport", "dst")

    g = GraphFrame(vertices, edges)
    result = g.connectedComponents()
    metrics.processing_time = time.time() - processing_start

    # Component analysis timing
    analysis_start = time.time()
    component_sizes = result.groupBy("component").count().collect()
    sizes = [row["count"] for row in component_sizes]

    largest_component_id = max(component_sizes, key=lambda x: x["count"])["component"]
    largest_component = result.filter(result.component == largest_component_id) \
                            .select("id").rdd.map(lambda x: x[0]).collect()
    metrics.component_analysis_time = time.time() - analysis_start

    metrics.total_time = time.time() - total_start

    final_result = {
        'number_of_components': len(component_sizes),
        'component_sizes': sizes,
        'largest_component_size': max(sizes),
        'timing_metrics': {
            'setup_time': metrics.setup_time,
            'data_loading_time': metrics.data_loading_time,
            'processing_time': metrics.processing_time,
            'component_analysis_time': metrics.component_analysis_time,
            'total_time': metrics.total_time
        }
    }

    spark.stop()
    return final_result

def compare_implementations_with_timing(csv_path: str, start_date: str, end_date: str):
    print("Running Original Implementation...")
    original_result = find_connected_connections_with_timing(csv_path, start_date, end_date)

    print("\nRunning GraphFrames Implementation...")
    graphframes_result = find_connected_connections_graph_with_timing(csv_path, start_date, end_date)

    print("\nTiming Analysis:")
    print("-" * 80)
    print(f"{'Section':<25} {'Original (s)':<15} {'GraphFrames (s)':<15} {'Difference (s)':<15}")
    print("-" * 80)

    sections = ['setup_time', 'data_loading_time', 'processing_time', 'component_analysis_time', 'total_time']

    for section in sections:
        orig_time = original_result['timing_metrics'][section]
        graph_time = graphframes_result['timing_metrics'][section]
        diff = orig_time - graph_time
        print(f"{section:<25} {orig_time:>14.2f} {graph_time:>14.2f} {diff:>14.2f}")

    return original_result, graphframes_result

csv_path = '/content/drive/MyDrive/ADM-HW5/archive_usa_airport/flights_final.csv'
start_date = '1990-01-01'
end_date = '2005-12-31'

original_result, graphframes_result = compare_implementations_with_timing(csv_path, start_date, end_date)

Running Original Implementation...
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Converged at iteration 5

Running GraphFrames Implementation...

Timing Analysis:
--------------------------------------------------------------------------------
Section                   Original (s)    GraphFrames (s) Difference (s) 
--------------------------------------------------------------------------------
setup_time                          0.00           0.44          -0.44
data_loading_time                  27.95          18.59           9.36
processing_time                    61.84         204.25        -142.41
component_analysis_time             1.94          93.30         -91.36
total_time                         91.73         316.58        -224.85
