In [None]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

In [1]:
//Realiza a multiplicação de matrizes serial 
def matMultSerial(A: Array[Array[Float]], B: Array[Array[Float]]): Array[Array[Float]] = {
    val row_count = A.length
    val column_count = B(0).length
    val R = Array.ofDim[Float](row_count, column_count)

    for ( i <- 0 to (row_count - 1)) {
        for (j <- 0 to (column_count - 1)) {
            var sum = 0.0f
            for (k <- 0 to (B.length - 1)) {
                sum += (A(i)(k) * B(k)(j))
            }
            R(i)(j) = sum
        }
    }

    return R
}

//Realiza a multiplicacao de matrizes distribuida com Spark
def coordinateMatrixMultiply(A: CoordinateMatrix, B: CoordinateMatrix): CoordinateMatrix = {
    val A_map = A.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
    val B_map = B.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })

    val productEntries = A_map
        .join(B_map)
        .map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
        .reduceByKey(_ + _)
        .map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })

    return new CoordinateMatrix(productEntries)
}


//Converte uma matriz para CoordinateMatrix para executar no coordinateMatrixMultiply
def convertArrayToCoordMat(Mat: Array[Array[Float]]): CoordinateMatrix = {
    var array = new Array[MatrixEntry]( (Mat.length * Mat(0).length) )
    var index = 0
    for ( i <- 0 to (Mat.length - 1)) {
        for (j <- 0 to (Mat(i).length - 1)) {
            array(index) = new MatrixEntry(i, j, Mat(i)(j))
            index += 1
        }
    }
    val entries_A = sc.parallelize(array)
    //entries_A.collect().foreach(println /*row => println(row.mkString(" "))*/)
    return new CoordinateMatrix(entries_A)
}

//Criar matrizes Random
val SIZE = 100
val A = Array.fill(SIZE, SIZE)(scala.util.Random.nextFloat())
val B = Array.fill(SIZE, SIZE)(scala.util.Random.nextFloat())

//Converte para CoordinateMatrix
val coordMatA = convertArrayToCoordMat(A)
val coordMatB = convertArrayToCoordMat(B) 

//Executa serial
/*val t = System.nanoTime
val C = matMultSerial(A, B)
println( (System.nanoTime - t) / 1e9d ) */

//Executa verão paralela com spark
val t = System.nanoTime
val C = coordinateMatrixMultiply(coordMatA, coordMatB)
println( (System.nanoTime - t) / 1e9d )

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
12,application_1530241154787_0018,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
0.409747878