In [7]:
import org.apache.spark.sql.{SparkSession, functions}
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

// create a SparkSession object
val spark = SparkSession.builder.appName("GraphDF").getOrCreate()

// create a graph DataFrame
val schema = new StructType()
         .add("src", IntegerType)
         .add("dst", IntegerType)
val edges_df = spark.createDataFrame(Seq((0, 1), (1, 2), (1, 3), (3, 4), (5, 6), (6, 7), (7, 8))).toDF(schema: _*)

// print the DataFrame
edges_df.show()

// Function for the reducer
def reducer(key: Int, values: Iterable[Int]): (Iterable[(Int, Int)], Int) = {
    var minim = key
    var valueList = List.empty[Int]
    var to_emit = List.empty[(Int, Int)]
    var counter = 0
    
    for (value <- values) {
        if (value < minim) {
            minim = value
        }
        valueList = valueList :+ value
    }
    
    if (minim < key) {
        to_emit = to_emit :+ (key, minim)
        for (value <- valueList) {
            if (minim != value) {
                counter += 1
                to_emit = to_emit :+ (value, minim)
            }
        }
    }
    (to_emit, counter)
}

// initialising count_new_pairs with a value > 0 to run the first iteration
val start_time = System.currentTimeMillis()
var count_new_pairs = 1
var iterations = 0

// Loop for the iterations
while (count_new_pairs > 0) {
    // CCF iterate
    // Map
    val result_map = edges_df.select(functions.col("src").alias("key"), functions.col("dst").alias("value")).union(
        edges_df.select(functions.col("dst").alias("key"), functions.col("src").alias("value")))
    // Shuffle and Sort
    val result_shufflesort = result_map.groupBy("key").agg(functions.collect_list("value").alias("values"))
    // Reduce
    val result_reducer = result_shufflesort.rdd.map(x => reducer(x.getAs[Int]("key"), x.getAs[Seq[Int]]("values")))
    val result_reducer2 = result_reducer.flatMap(x => x._1).toDF("src", "dst")
    // Updating the counting
    count_new_pairs = result_reducer.map(x => x._2).reduce(_ + _)
    // CCF dedup
    edges_df = result_reducer2.dropDuplicates()
    iterations += 1
}

val sorted_df = edges_df.sort(functions.col("dst"))
val elapsed_time = System.currentTimeMillis() - start_time

sorted_df.show()
println("Iterations:", iterations)
println("Elapsed time: ", elapsed_time)

// stop the SparkSession object
spark.stop()

<console>: 33: error: type mismatch;