### **Bonus Question - Connected Components on MapReduce**
MapReduce is ideal for network analysis as it enables parallel processing of large graph datasets, making it scalable and efficient. By breaking tasks into map and reduce steps, it allows for distributed analysis of networks, which is essential for handling large-scale graph problems like connected components.

1. In this task, you are required to use PySpark and the MapReduce paradigm to identify the connected components in a flight network graph. The focus should be on airports rather than cities. As you know, a connected component refers to a group of airports where every pair of airports within the group is connected either directly or indirectly.

The function takes the following inputs:
1. Flight network
2. A starting date
3. An end date

The function outputs:
1. The number of the connected components during that period
2. The size of each connectd componenet
3. The airports within the largest connected component identified.

__Note:__ For this task, you should check if there is a flight between two airports during that period.
__Note:__ You are not allowed to use pre-existing packages or functions in PySpark; instead, you must implement the algorithm from scratch using the MapReduce paradigm.

2. Compare the execution time and the results of your implementation with those of the GraphFrames package for identifying connected components. If there is any difference in the results, provide an explanation for why that might occur.


### **Install all the packages needed using pip**

In [None]:
! pip install pyngrok gdown  pyspark  yellowbrick graphframes

### **Import all the packages needed, included the Spark's one**

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, explode, collect_list, min as spark_min, struct, size, row_number
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from graphframes import GraphFrame
import time
import json
import tempfile
import os

## **Initialize the Spark Session**

In [3]:
# Initialize Spark Session with configurations
print("Initializing Spark Session...")
spark = SparkSession.builder \
    .appName("Connected Components Analysis") \
    .config("spark.driver.memory", "15g") \
    .config("spark.executor.memory", "15g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()
print("... Spark Session Created")

Initializing Spark Session...


## **Read the CSV file of Airports an print some information**

In [4]:
print("Reading CSV file...")
df = spark.read.csv("/Airports2.csv", header=True, inferSchema=True)

# Print basic information about the dataset
print("\nDataset Information:")
print(f"Number of rows: {df.count()}")
print("\nSchema:")
df.printSchema()

Reading CSV file...

Dataset Information:
Number of rows: 3606803

Schema:
root
 |-- Origin_airport: string (nullable = true)
 |-- Destination_airport: string (nullable = true)
 |-- Origin_city: string (nullable = true)
 |-- Destination_city: string (nullable = true)
 |-- Passengers: integer (nullable = true)
 |-- Seats: integer (nullable = true)
 |-- Flights: integer (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- Fly_date: date (nullable = true)
 |-- Origin_population: integer (nullable = true)
 |-- Destination_population: integer (nullable = true)
 |-- Org_airport_lat: string (nullable = true)
 |-- Org_airport_long: string (nullable = true)
 |-- Dest_airport_lat: string (nullable = true)
 |-- Dest_airport_long: string (nullable = true)



# **Find out Connected Components using Map Reduce paradigm**

In [8]:
class ImprovedConnectedComponents:
    def __init__(self, max_iterations=20, spark_configs=None):
        """
        Initializes SparkSession with optimized configurations.

        Args:
            max_iterations (int, optional): Maximum number of iterations for component detection. Defaults to 20.
            spark_configs (dict, optional): Additional Spark configurations as key-value pairs.
        """
        print("Initializing SparkSession with optimized configurations...")

        builder = SparkSession.builder.appName("Optimized Connected Components")
        default_configs = {
            "spark.sql.adaptive.enabled": "true",
            "spark.sql.adaptive.skewJoin.enabled": "true",
            "spark.sql.shuffle.partitions": "400",
            "spark.driver.memory": "16g",
            "spark.executor.memory": "16g",
            "spark.memory.fraction": "0.8",
            "spark.memory.storageFraction": "0.2"
        }
        # Merge default and custom configurations
        for key, value in {**default_configs, **(spark_configs or {})}.items():
            builder = builder.config(key, value)

        self.spark = builder.getOrCreate()
        self.sc = self.spark.sparkContext
        self.max_iterations = max_iterations

    def find_connected_components(self, flight_network, start_date, end_date):
        """
        Identifies connected components in a flight network using an optimized algorithm.

        Args:
            flight_network_path (str): Path to the flight network CSV file.
            start_date (str): Start date for filtering (format YYYY-MM-DD).
            end_date (str): End date for filtering (format YYYY-MM-DD).

        Returns:
            dict: Results containing connected components information and performance metrics.
        """
        start_time = time.time()
        print(f"Analyzing flight network from {start_date} to {end_date}")
        print(f"Maximum iterations set to: {self.max_iterations}")

        # STEP 1: Load and filter the dataset
        print("Loading and filtering flight network data...")
        df = flight_network

        # Check if necessary columns exist
        required_columns = ['Fly_date', 'Origin_airport', 'Destination_airport']
        for col_name in required_columns:
            if col_name not in df.columns:
                raise ValueError(f"Required column '{col_name}' not found in the dataset")

        # Filter by date range
        filtered_df = df.filter((col("Fly_date") >= start_date) & (col("Fly_date") <= end_date))

        # Print dataset statistics
        total_flights = filtered_df.count()
        unique_airports = filtered_df.select("Origin_airport").distinct().count()
        print(f"Total flights in the period: {total_flights}")
        print(f"Unique airports: {unique_airports}")

        # Create undirected graph edges
        edges_df = filtered_df.select("Origin_airport", "Destination_airport").distinct()
        bidirectional_edges = edges_df.union(
            edges_df.select(
                col("Destination_airport").alias("Origin_airport"),
                col("Origin_airport").alias("Destination_airport")
            )
        ).distinct()

        # STEP 2: Connected Components using Disjoint Set (Union-Find) algorithm
        print("Computing connected components...")

        # Initial node mapping
        initial_nodes = bidirectional_edges.select("Origin_airport").distinct() \
            .withColumnRenamed("Origin_airport", "airport") \
            .withColumn("component", col("airport"))

        # Iterative component merging
        current_nodes = initial_nodes
        previous_component_count = 0

        for iteration in range(self.max_iterations):
            print(f"Iteration {iteration + 1}")

            # Join edges with current component mapping
            merged_nodes = bidirectional_edges.join(current_nodes,
                bidirectional_edges.Origin_airport == current_nodes.airport, "left") \
                .select(
                    col("Destination_airport").alias("airport"),
                    col("component")
                )

            # Merge components by taking the minimum component label
            current_nodes = merged_nodes.union(current_nodes) \
                .groupBy("airport") \
                .agg(spark_min("component").alias("component"))

            # Optional early stopping
            current_component_count = current_nodes.select("component").distinct().count()
            if current_component_count == previous_component_count:
                print(f"Converged at iteration {iteration + 1}")
                break
            previous_component_count = current_component_count

        # STEP 3: Analyze Connected Components
        print("Analyzing connected components...")
        components_df = current_nodes.groupBy("component") \
            .agg(collect_list("airport").alias("airports"))

        # Add component size using size() function
        components_df = components_df.withColumn("size", size(col("airports")))

        # Sort components by size in descending order
        window_spec = Window.orderBy(col("size").desc())
        components_df = components_df.withColumn("rank", row_number().over(window_spec))

        # Collect results
        results_rows = components_df.collect()

        # Calculate performance metrics
        end_time = time.time()
        total_processing_time = end_time - start_time

        # Prepare final results
        results = {
            "number_of_components": len(results_rows),
            "component_sizes": [row["size"] for row in results_rows],
            "largest_component_size": results_rows[0]["size"] if results_rows else 0,
            "largest_component_airports": results_rows[0]["airports"] if results_rows else [],
            "performance_metrics": {
                "total_flights": total_flights,
                "unique_airports": unique_airports,
                "processing_time_seconds": total_processing_time,
                "iterations_completed": iteration + 1
            }
        }

        print("Connected Components Analysis Completed!")
        return results

    def cleanup(self):
        """
        Stops the SparkSession to free up resources.
        """
        if self.spark:
            self.spark.stop()


## **Select a range of dates and print the result with performance metrics**

In [10]:
connector = ImprovedConnectedComponents(
    max_iterations=10
)

# Expand date range
results = connector.find_connected_components(
    flight_network=df,
    start_date="1990-01-01",
    end_date="1990-12-31"
)

# Print detailed results
print(json.dumps(results, indent=2))

# Cleanup
connector.cleanup()

Initializing SparkSession with optimized configurations...
Analyzing flight network from 1990-01-01 to 1990-12-31
Maximum iterations set to: 10
Loading and filtering flight network data...
Total flights in the period: 143082
Unique airports: 238
Computing connected components...
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Converged at iteration 5
Analyzing connected components...
Connected Components Analysis Completed!
{
  "number_of_components": 1,
  "component_sizes": [
    245
  ],
  "largest_component_size": 245,
  "largest_component_airports": [
    "ABE",
    "ABI",
    "ABQ",
    "ACT",
    "ACV",
    "ACY",
    "ADM",
    "ADQ",
    "AGS",
    "AIY",
    "ALB",
    "ALW",
    "AMA",
    "ANC",
    "ATL",
    "ATW",
    "AUS",
    "AVL",
    "AVP",
    "AZO",
    "BDL",
    "BFF",
    "BFI",
    "BFL",
    "BGM",
    "BGR",
    "BHM",
    "BIF",
    "BIL",
    "BIS",
    "BLI",
    "BNA",
    "BOS",
    "BRO",
    "BTM",
    "BTR",
    "BTV",
    "BUF",
    "BWI

# **Find out Connected Components Using Graph Frames package**

In [4]:
def setup_spark_session():
    """
    Initialize Spark session with GraphFrames JAR
    """
    # Stop any existing SparkContext
    if SparkContext._active_spark_context:
        SparkContext._active_spark_context.stop()

    # Configure SparkSession with GraphFrames package
    spark = SparkSession.builder \
        .appName("GraphFramesExample") \
        .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
        .config("spark.driver.memory", "15g") \
        .config("spark.executor.memory", "15g") \
        .config("spark.sql.shuffle.partitions", "100") \
        .getOrCreate()

    return spark

In [5]:
def analyze_flight_network(df, start_date, end_date):
    """
    Analyze flight network using GraphFrames with optimizations for large datasets
    """
    # Ensure Spark session is properly configured
    if SparkSession._instantiatedSession is None or SparkContext._active_spark_context is None:
        print("Reinitializing Spark Session...")
        spark = setup_spark_session()
    else:
        spark = SparkSession.builder.getOrCreate()

    try:
        # Cache the filtered dataframe
        filtered_flights = df.filter(
            (F.col("Fly_date") >= start_date) &
            (F.col("Fly_date") <= end_date)
        ).cache()

        print(f"Number of flights in period: {filtered_flights.count()}")

        # Create and cache vertices
        vertices = filtered_flights.select(
            F.col("Origin_airport").alias("id")
        ).union(
            filtered_flights.select(
                F.col("Destination_airport").alias("id")
            )
        ).distinct().cache()

        print(f"Number of unique airports: {vertices.count()}")

        # Create and cache edges
        edges = filtered_flights.select(
            F.col("Origin_airport").alias("src"),
            F.col("Destination_airport").alias("dst")
        ).distinct().cache()

        print(f"Number of unique routes: {edges.count()}")

        # Create graph
        print("Creating GraphFrame...")
        graph = GraphFrame(vertices, edges)

        # Run connected components with checkpoint
        print("Running connected components analysis...")
        start_time = time.time()

        # Use strongly connected components instead of connected components
        # This is often more memory-efficient
        result = graph.stronglyConnectedComponents(maxIter=10)

        execution_time = time.time() - start_time
        print(f"Execution Time: {execution_time:.2f} seconds")

        # Analyze results in chunks
        print("\nAnalyzing components...")
        num_components = result.select("component").distinct().count()
        print(f"Number of connected components: {num_components}")

        # Get component sizes efficiently
        print("\nComponent sizes (top 10 largest):")
        component_sizes = result.groupBy("component") \
            .count() \
            .orderBy(F.desc("count")) \
            .limit(10)
        component_sizes.show()

        # Get largest component efficiently
        largest_comp_id = component_sizes.first()["component"]
        largest_comp_size = component_sizes.first()["count"]

        print(f"\nLargest component has {largest_comp_size} airports")
        print("Sample of airports in largest component (up to 10):")
        result.filter(F.col("component") == largest_comp_id) \
            .select("id") \
            .limit(10) \
            .show()

        # Clean up
        filtered_flights.unpersist()
        vertices.unpersist()
        edges.unpersist()

        return result

    except Exception as e:
        print(f"An error occurred: {str(e)}")
        raise

    finally:
        # Clean up checkpoint directory
        checkpoint_dir = spark.sparkContext.getCheckpointDir()
        if checkpoint_dir and os.path.exists(checkpoint_dir):
            try:
                import shutil
                shutil.rmtree(checkpoint_dir)
            except:
                pass

def print_basic_stats(df, start_date, end_date):
    """
    Print basic statistics about the dataset before running full analysis
    """
    # Reinitialize Spark session if not active
    if SparkSession._instantiatedSession is None or SparkContext._active_spark_context is None:
        print("Reinitializing Spark Session...")
        spark = SparkSession.builder \
            .appName("ConnectedComponentsGraphFrames") \
            .config("spark.driver.memory", "15g") \
            .getOrCreate()

    print("Dataset Statistics:")
    df.filter(
        (F.col("Fly_date") >= start_date) &
        (F.col("Fly_date") <= end_date)
    ).agg(
        F.countDistinct("Origin_airport").alias("unique_origins"),
        F.countDistinct("Destination_airport").alias("unique_destinations"),
        F.count("*").alias("total_flights")
    ).show()


In [6]:
spark = setup_spark_session()
df = spark.read.csv("/Airports2.csv", header=True, inferSchema=True)
print_basic_stats(df, '1990-01-01', '1990-12-31')
result = analyze_flight_network(df, '1990-01-01', '1990-12-31')


Dataset Statistics:
+--------------+-------------------+-------------+
|unique_origins|unique_destinations|total_flights|
+--------------+-------------------+-------------+
|           238|                242|       143082|
+--------------+-------------------+-------------+

Number of flights in period: 143082
Number of unique airports: 245
Number of unique routes: 6035
Creating GraphFrame...
Running connected components analysis...




Execution Time: 286.22 seconds

Analyzing components...
Number of connected components: 11

Component sizes (top 10 largest):
+------------+-----+
|   component|count|
+------------+-----+
|           0|  235|
|463856467969|    1|
|523986010112|    1|
|558345748480|    1|
|146028888065|    1|
|163208757249|    1|
|240518168576|    1|
|274877906947|    1|
|781684047872|    1|
|352187318273|    1|
+------------+-----+


Largest component has 235 airports
Sample of airports in largest component (up to 10):
+---+
| id|
+---+
|HNL|
|BGM|
|BNA|
|BLI|
|ERI|
|CLT|
|TVC|
|SHV|
|CIC|
|CVG|
+---+

