#1 **Programmation en RDDs (PySpark) - Python**

In [None]:
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark import SparkConf, SparkContext

# Initialize SparkSession and SparkContext
spark = SparkSession.builder.appName("CCF Correct RDD").getOrCreate()
sc = spark.sparkContext

# The accumulator for tracking new pairs, as described in the paper
new_pairs_counter = sc.accumulator(0)

def ccf_correct_implementation(edges_rdd, max_iters=20):
    """
    Correctly implements the Connected Component Finder (CCF) algorithm
    using PySpark RDDs based on the paper's methodology.
    """
    global new_pairs_counter

    # √âtape 1 : Initialisation des √©tiquettes (Component ID)
    # L'ID de composant initial est le n≈ìud lui-m√™me
    nodes = edges_rdd.flatMap(lambda edge: [edge[0], edge[1]]).distinct()
    node_component_rdd = nodes.map(lambda node: (node, node))

    # √âtape 2 : Cr√©er la liste des voisins (adjacency list)
    # The paper's MapReduce logic for CCF-Iterate is to emit (u,v) and (v,u)
    # for each edge, so all neighbors for a node are grouped together.
    # This input is then used by the reducer to find the min_val.
    neighbors_rdd = edges_rdd.flatMap(lambda x: [(x[0], x[1]), (x[1], x[0])])

    iteration = 0
    start_time = time.time()

    # --- Boucle d'it√©rations CCF ---
    while iteration < max_iters:
        iteration += 1
        print(f"üîÅ D√©marrage de l'it√©ration {iteration}...")

        # R√©initialiser le compteur pour la nouvelle it√©ration
        new_pairs_counter.value = 0

        # Join the current component labels with the neighbors list to
        # find the label of each neighbor.
        joined_rdd = node_component_rdd.join(neighbors_rdd).map(lambda x: (x[1][1], x[1][0]))

        # Group all labels associated with each node
        input_for_reducer = joined_rdd.union(node_component_rdd).groupByKey()

        # --- CCF-Iterate (Reducer Phase) ---
        def ccf_iterate_reducer(key_values):
            key, values_iter = key_values
            values = list(values_iter)
            min_val = min(values)

            # The paper's reducer logic (Figure 2)
            if min_val < key:
                # Emit the (key, min_val) pair
                yield (key, min_val)
                # Emit new pairs for other values and increment the counter
                for val in values:
                    if val != min_val:
                        new_pairs_counter.add(1)
                        yield (val, min_val)
            else:
                # If the current key is the smallest or equal, just return its current label
                yield (key, key)

        ccf_iterate_output = input_for_reducer.flatMap(ccf_iterate_reducer)

        # --- CCF-Dedup ---
        # Deduplicate the output of CCF-Iterate
        dedup_output = ccf_iterate_output.distinct()

        # Update the RDD for the next iteration
        node_component_rdd = dedup_output

        # Trigger an action to force the loop to evaluate the new_pairs_counter
        num_changes = node_component_rdd.count()

        # Check for convergence based on the accumulator
        if new_pairs_counter.value == 0:
            print(f"‚úÖ Convergence atteinte en {iteration} it√©rations.")
            break

    exec_time = time.time() - start_time

    return node_component_rdd, iteration, exec_time

# --- Section de chargement et d'ex√©cution pour chaque fichier de graphe ---
schema = StructType([
    StructField("source", IntegerType(), True),
    StructField("target", IntegerType(), True)
])

# The list of files to process
files = [
    ("G1_1k.csv", "G1"),
    ("G2_5k.csv", "G2"),
    ("G3_8k.csv", "G3"),
    ("G4_10k.csv", "G4")
]

results = []

for filename, label in files:
    filepath = f"data/{filename}" # Assuming the 'data' folder contains the files
    print(f"üìé Traitement de {label} ({filepath})...")

    try:
        df = spark.read.csv(filepath, header=True, schema=schema)
        edges_rdd = df.rdd.map(lambda row: (row['source'], row['target']))

        components, num_iters, exec_time = ccf_correct_implementation(edges_rdd, max_iters=20)

        nb_nodes = edges_rdd.flatMap(lambda edge: [edge[0], edge[1]]).distinct().count()
        nb_edges = edges_rdd.count()

        print(f"üìä Donn√©es du graphe: {nb_nodes} n≈ìuds, {nb_edges} ar√™tes")
        print(f"üîÅ It√©rations : {num_iters}")
        print(f"‚è±Ô∏è Temps : {round(exec_time, 3)} secondes")
        print("-" * 40)

        results.append((label, nb_nodes, nb_edges, num_iters, round(exec_time, 3)))

    except Exception as e:
        print(f"‚ùå Erreur avec {label} : {e}")
        print("-" * 40)

# Afficher les r√©sultats dans un tableau
result_rdd_df = pd.DataFrame(
    results,
    columns=["Graphe", "N≈ìuds", "Ar√™tes", "It√©rations", "Temps (s)"]
)

print("‚úÖ R√©sum√© des performances (RDD) :")
print(result_rdd_df)

üìé Traitement de G1 (data/G1_1k.csv)...
‚ùå Erreur avec G1 : [PATH_NOT_FOUND] Path does not exist: file:/content/data/G1_1k.csv.
----------------------------------------
üìé Traitement de G2 (data/G2_5k.csv)...
‚ùå Erreur avec G2 : [PATH_NOT_FOUND] Path does not exist: file:/content/data/G2_5k.csv.
----------------------------------------
üìé Traitement de G3 (data/G3_8k.csv)...
‚ùå Erreur avec G3 : [PATH_NOT_FOUND] Path does not exist: file:/content/data/G3_8k.csv.
----------------------------------------
üìé Traitement de G4 (data/G4_10k.csv)...
‚ùå Erreur avec G4 : [PATH_NOT_FOUND] Path does not exist: file:/content/data/G4_10k.csv.
----------------------------------------
‚úÖ R√©sum√© des performances (RDD) :
Empty DataFrame
Columns: [Graphe, N≈ìuds, Ar√™tes, It√©rations, Temps (s)]
Index: []


# **2 	Impl√©mentation CCF avec DataFrames _ Python**

In [None]:
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, least
from pyspark.sql.types import StructType, StructField, IntegerType

# Initialisation de SparkSession
spark = SparkSession.builder \
    .appName("CCF DataFrame Correct") \
    .getOrCreate()

def ccf_dataframe_implementation(edges_df, max_iters=20):
    """
    Impl√©mente l'algorithme Connected Component Finder (CCF)
    en utilisant les DataFrames PySpark.
    """

    # √âtape 1 : Initialisation des √©tiquettes
    nodes = edges_df.select("source").union(edges_df.select("target")) \
        .distinct() \
        .withColumnRenamed("source", "node")

    labels = nodes.withColumn("component_id", col("node"))

    iteration = 0
    start_time = time.time()

    # Cr√©er une liste d'adjacence bidirectionnelle
    adj_list = edges_df.select("source", "target").union(edges_df.select(col("target").alias("source"), col("source").alias("target")))

    while iteration < max_iters:
        iteration += 1
        print(f"üîÅ D√©marrage de l'it√©ration {iteration}...")

        # Propagation de l'√©tiquette minimale
        # Renommer la colonne 'component_id' de 'labels' pour l'utiliser sans ambigu√Øt√©.
        labels_renamed = labels.withColumnRenamed("component_id", "current_component_id")

        # 1. Joindre les √©tiquettes actuelles avec la liste d'adjacence
        new_labels = adj_list.join(labels_renamed, adj_list.target == labels_renamed.node) \
            .select(adj_list.source.alias("node"), labels_renamed.current_component_id.alias("neighbor_component_id")) \
            .groupBy("node") \
            .agg({"neighbor_component_id": "min"}) \
            .withColumnRenamed("min(neighbor_component_id)", "propagated_id")

        # 2. Joindre les √©tiquettes actuelles (renomm√©es) avec les nouvelles √©tiquettes propag√©es.
        current_and_new_labels = labels_renamed.join(new_labels, "node", "left_outer")

        # 3. Mettre √† jour l'√©tiquette si l'√©tiquette propag√©e est plus petite.
        updated_labels = current_and_new_labels \
            .withColumn(
                "new_label",
                least(col("current_component_id"), col("propagated_id"))
            ) \
            .select(col("node"), col("new_label").alias("component_id"))

        # V√©rification de la convergence
        # Joindre les √©tiquettes mises √† jour avec les anciennes
        # Renommer la colonne 'component_id' de 'labels' pour √©viter l'ambigu√Øt√©.
        changes = updated_labels.join(labels.withColumnRenamed("component_id", "old_component_id"), "node") \
            .filter(col("component_id") != col("old_component_id")) \
            .count()

        # Mettre √† jour les √©tiquettes pour la prochaine it√©ration
        labels = updated_labels

        if changes == 0:
            print(f"‚úÖ Convergence atteinte en {iteration} it√©rations.")
            break

    exec_time = time.time() - start_time

    return labels, iteration, exec_time

# --- Section de chargement et d'ex√©cution pour chaque fichier de graphe ---
schema = StructType([
    StructField("source", IntegerType(), True),
    StructField("target", IntegerType(), True)
])

files = [
    ("G1_1k.csv", "G1"),
    ("G2_5k.csv", "G2"),
    ("G3_8k.csv", "G3"),
    ("G4_10k.csv", "G4")
]

results = []

for filename, label in files:
    filepath = f"data/{filename}" # Assumant que le dossier 'data' contient les fichiers
    print(f"üìé Traitement de {label} ({filepath})...")

    try:
        # Charger les donn√©es en tant que DataFrame
        edges_df = spark.read.csv(filepath, header=True, schema=schema)

        components_df, num_iters, exec_time = ccf_dataframe_implementation(edges_df, max_iters=20)

        nb_nodes = edges_df.select("source").union(edges_df.select("target")).distinct().count()
        nb_edges = edges_df.count()

        print(f"üìä Donn√©es du graphe: {nb_nodes} n≈ìuds, {nb_edges} ar√™tes")
        print(f"üîÅ It√©rations : {num_iters}")
        print(f"‚è±Ô∏è Temps : {round(exec_time, 3)} secondes")
        print("-" * 40)

        results.append((label, nb_nodes, nb_edges, num_iters, round(exec_time, 3)))

    except Exception as e:
        print(f"‚ùå Erreur avec {label} : {e}")
        print("-" * 40)

# Afficher les r√©sultats dans un tableau
result_df = pd.DataFrame(
    results,
    columns=["Graphe", "N≈ìuds", "Ar√™tes", "It√©rations", "Temps (s)"]
)

print("‚úÖ R√©sum√© des performances (DataFrame) :")
print(result_df)

üìé Traitement de G1 (data/G1_1k.csv)...
üîÅ D√©marrage de l'it√©ration 1...
üîÅ D√©marrage de l'it√©ration 2...
üîÅ D√©marrage de l'it√©ration 3...
üîÅ D√©marrage de l'it√©ration 4...
üîÅ D√©marrage de l'it√©ration 5...
üîÅ D√©marrage de l'it√©ration 6...
üîÅ D√©marrage de l'it√©ration 7...
üîÅ D√©marrage de l'it√©ration 8...
‚úÖ Convergence atteinte en 8 it√©rations.
üìä Donn√©es du graphe: 996 n≈ìuds, 3000 ar√™tes
üîÅ It√©rations : 8
‚è±Ô∏è Temps : 135.066 secondes
----------------------------------------
üìé Traitement de G2 (data/G2_5k.csv)...
üîÅ D√©marrage de l'it√©ration 1...
üîÅ D√©marrage de l'it√©ration 2...
üîÅ D√©marrage de l'it√©ration 3...
üîÅ D√©marrage de l'it√©ration 4...
üîÅ D√©marrage de l'it√©ration 5...
üîÅ D√©marrage de l'it√©ration 6...
üîÅ D√©marrage de l'it√©ration 7...
üîÅ D√©marrage de l'it√©ration 8...
‚úÖ Convergence atteinte en 8 it√©rations.
üìä Donn√©es du graphe: 4983 n≈ìuds, 15000 ar√™tes
üîÅ It√©rations : 8
‚è±Ô∏è Temps : 100.33