<div class="alert alert-block alert-info" style="margin-top: 20px">
    <a href="https://cocl.us/System_ML_notebook">
         <img src="https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/ML0111EN/Ad/TopAd.png" width="750" align="center">
    </a>
</div>

<a href="https://cognitiveclass.ai/">
    <img src="https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/ML0111EN/Ad/CCLog.png" width="200" align="center">
</a>

### SystemML and the Spark MLContext

Exercise objectives:
- Run sample SystemML code with the MLContext
- Run the Linear Regression example

To use one of the released version, use <code>%AddDeps org.apache.systemml systemml 1.2.0</code>.

In [None]:
%AddDeps org.apache.systemml systemml 1.2.0

Note: To use SystemML in a Spark shell with YARN, you would run this command:

`./bin/spark-shell --master yarn-client --num-executors 3 --driver-memory 5G --executor-memory 5G --executor-cores 4 --jars SystemML.jar`

Import the <code>mlcontext</code> library and the <code>SQLContext</code> library.

In [None]:
import org.apache.sysml.api.mlcontext._
import org.apache.spark.sql.SQLContext

Create the variables.

In [None]:
val ml = new MLContext(sc)                       
val sqlContext = new SQLContext(sc)

Note: The <code>SparkContext</code> <code>sc</code> has been initialized for you in this notebook. If you are using the Spark shell, it is also initialized for you at the start. When you write your own Spark application, you must explicitly define your <code>SparkContext</code> variable.

To show the capability of <code>systemml</code>, let's create a DataFrame with a random 100000 rows and 1000 columns. First import the necessary libraries.

In [None]:
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import scala.util.Random

Create the variables for the rows and columns.

In [None]:
val numRows = 100000
val numCols = 1000

Create the <code>RDD</code> with random doubles

In [None]:
val data = sc.parallelize(0 to numRows-1).map{ _ => Row.fromSeq(Seq.fill(numCols)(Random.nextDouble)) }

Create the schema

In [None]:
val schema = StructType((0 to numCols-1).map { i => StructField("C" + i, DoubleType, true) } )

Create the DataFrame

In [None]:
val df = sqlContext.createDataFrame(data, schema)

### Creating helper methods
Create some helper methods. The <code>systemml</code> output data is encapsulated in an <code>MLOutput</code> object. The <code>getScalar()</code> method extracts a scalar value from a DataFrame returned by MLOutput. The <code>getScalarDouble()</code> method returns such a value as a Double, and the <code>getScalarInt()</code> method returns such a value as an Int.

In [None]:
def getScalar(outputs: MLResults, symbol: String): Any = outputs.getDataFrame(symbol).first()(1)
def getScalarDouble(outputs: MLResults, symbol: String): Double = getScalar(outputs, symbol).asInstanceOf[Double]
def getScalarInt(outputs: MLResults, symbol: String): Int = getScalarDouble(outputs, symbol).toInt

### Binary-block matrix representation
<code>SystemML</code> is optimized to operate on a binary-block format for matrix representation. For large datasets, conversion from DataFrame to binary-block can require a significant quantity of time. Explicit DataFrame to binary-block conversion allows algorithm performance to be measured separately from data conversion time.

The <code>SystemML</code> binary-block matrix representation can be thought of as a two-dimensional array of blocks, where each block consists of a number of rows and columns. In this example, we specify a matrix consisting of blocks of size 1000x1000. The experimental <code>dataFrameToBinaryBlock()</code> method of <code>RDDConverterUtils</code> is used to convert the DataFrame <code>df</code> to a <code>SystemML</code> binary-block matrix, which is represented by the datatype <code>JavaPairRDD[MatrixIndexes, MatrixBlock]</code>.

Import the required libraries and utilities

In [None]:
import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;

Create the matrix

In [None]:
val numRowsPerBlock = 10
val numColsPerBlock = 10
val mc = new MatrixCharacteristics(numRows, numCols, numRowsPerBlock, numColsPerBlock)

Do the conversion to binary-block matrix using the <code>RDDConverterUtils</code> class

In [None]:
val sysMlMatrix = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc, false, false)

### Linear Regression


Import the `LinearDataGenerator` library to assist with generating some raw data. Import the `sqlContext.implicits._` to convert RDD to DataFrame

In [None]:
import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils}
import sqlContext.implicits._

Use LinearDataGenerator to create test data of 10000 rows with 1000 columns and convert that into a DataFrame to be used for Spark ML and the SystemML linear regression algorithm.

In [None]:
val numRows = 1000
val numCols = 100
val rawData = {
  val tmp = LinearDataGenerator.generateLinearRDD(sc, numRows, numCols, 1).toDF()
  MLUtils.convertVectorColumnsToML(tmp, "features")
}

Repartition into a more parallelism-friendly number of partitions and cache it.

In [None]:
val data = rawData.repartition(64).cache()
println(data.getClass.getName)

Import the Spark Linear Regression library. We'll first train a model using the Spark ML linear regression algorithm.

In [None]:
import org.apache.spark.ml.regression.LinearRegression

Model Settings

In [None]:
val maxIters = 100
val reg = 0
val elasticNetParam = 0  // L2 reg

Fit the model

In [None]:
val lr = new LinearRegression().setMaxIter(maxIters).setRegParam(reg).setElasticNetParam(elasticNetParam)
val start = System.currentTimeMillis()
val model = lr.fit(data)
val trainingTime = (System.currentTimeMillis() - start).toDouble / 100.0

Summarize the model over the training set and gather some metrics

In [None]:
val trainingSummary = model.summary
val r2 = trainingSummary.r2
val iters = trainingSummary.totalIterations
val trainingTimePerIter = trainingTime / iters

Print statistics

In [None]:
println(s"R2: ${r2}")
println(s"Iterations: ${iters}")
println(s"Training time per iter: ${trainingTimePerIter} seconds")

Note the training time it took per iteration. We'll compare that to SystemML's training time below.

### SystemML kernels. 
This is where the magic of SystemML takes place. The <code>linearReg</code> fixed String variable is set to a linear regression algorithm written in DML, SystemML’s Declarative Machine Learning language.

In [None]:
val linearReg =
"""
#
# THIS SCRIPT SOLVES LINEAR REGRESSION USING THE CONJUGATE GRADIENT ALGORITHM
#
# INPUT PARAMETERS:
# --------------------------------------------------------------------------------------------
# NAME  TYPE   DEFAULT  MEANING
# --------------------------------------------------------------------------------------------
# X     String  ---     Matrix X of feature vectors
# Y     String  ---     1-column Matrix Y of response values
# icpt  Int      0      Intercept presence, shifting and rescaling the columns of X:
#                       0 = no intercept, no shifting, no rescaling;
#                       1 = add intercept, but neither shift nor rescale X;
#                       2 = add intercept, shift & rescale X columns to mean = 0, variance = 1
# reg   Double 0.000001 Regularization constant (lambda) for L2-regularization; set to nonzero
#                       for highly dependend/sparse/numerous features
# tol   Double 0.000001 Tolerance (epsilon); conjugate graduent procedure terminates early if
#                       L2 norm of the beta-residual is less than tolerance * its initial norm
# maxi  Int      0      Maximum number of conjugate gradient iterations, 0 = no maximum
# --------------------------------------------------------------------------------------------
#
# OUTPUT:
# B Estimated regression parameters (the betas) to store
#
# Note: Matrix of regression parameters (the betas) and its size depend on icpt input value:
#         OUTPUT SIZE:   OUTPUT CONTENTS:                HOW TO PREDICT Y FROM X AND B:
# icpt=0: ncol(X)   x 1  Betas for X only                Y ~ X %*% B[1:ncol(X), 1], or just X %*% B
# icpt=1: ncol(X)+1 x 1  Betas for X and intercept       Y ~ X %*% B[1:ncol(X), 1] + B[ncol(X)+1, 1]
# icpt=2: ncol(X)+1 x 2  Col.1: betas for X & intercept  Y ~ X %*% B[1:ncol(X), 1] + B[ncol(X)+1, 1]
#                        Col.2: betas for shifted/rescaled X and intercept
#

fileX = "";
fileY = "";
fileB = "";

intercept_status = ifdef ($icpt, 0);     # $icpt=0;
tolerance = ifdef ($tol, 0.000001);      # $tol=0.000001;
max_iteration = ifdef ($maxi, 0);        # $maxi=0;
regularization = ifdef ($reg, 0.000001); # $reg=0.000001;

X = read (fileX);
y = read (fileY);

n = nrow (X);
m = ncol (X);
ones_n = matrix (1, rows = n, cols = 1);
zero_cell = matrix (0, rows = 1, cols = 1);

# Introduce the intercept, shift and rescale the columns of X if needed

m_ext = m;
if (intercept_status == 1 | intercept_status == 2)  # add the intercept column
{
    X = append (X, ones_n);
    m_ext = ncol (X);
}

scale_lambda = matrix (1, rows = m_ext, cols = 1);
if (intercept_status == 1 | intercept_status == 2)
{
    scale_lambda [m_ext, 1] = 0;
}

if (intercept_status == 2)  # scale-&-shift X columns to mean 0, variance 1
{                           # Important assumption: X [, m_ext] = ones_n
    avg_X_cols = t(colSums(X)) / n;
    var_X_cols = (t(colSums (X ^ 2)) - n * (avg_X_cols ^ 2)) / (n - 1);
    is_unsafe = ppred (var_X_cols, 0.0, "<=");
    scale_X = 1.0 / sqrt (var_X_cols * (1 - is_unsafe) + is_unsafe);
    scale_X [m_ext, 1] = 1;
    shift_X = - avg_X_cols * scale_X;
    shift_X [m_ext, 1] = 0;
} else {
    scale_X = matrix (1, rows = m_ext, cols = 1);
    shift_X = matrix (0, rows = m_ext, cols = 1);
}

# Henceforth, if intercept_status == 2, we use "X %*% (SHIFT/SCALE TRANSFORM)"
# instead of "X".  However, in order to preserve the sparsity of X,
# we apply the transform associatively to some other part of the expression
# in which it occurs.  To avoid materializing a large matrix, we rewrite it:
#
# ssX_A  = (SHIFT/SCALE TRANSFORM) %*% A    --- is rewritten as:
# ssX_A  = diag (scale_X) %*% A;
# ssX_A [m_ext, ] = ssX_A [m_ext, ] + t(shift_X) %*% A;
#
# tssX_A = t(SHIFT/SCALE TRANSFORM) %*% A   --- is rewritten as:
# tssX_A = diag (scale_X) %*% A + shift_X %*% A [m_ext, ];

lambda = scale_lambda * regularization;
beta_unscaled = matrix (0, rows = m_ext, cols = 1);

if (max_iteration == 0) {
    max_iteration = m_ext;
}
i = 0;

# BEGIN THE CONJUGATE GRADIENT ALGORITHM
r = - t(X) %*% y;

if (intercept_status == 2) {
    r = scale_X * r + shift_X %*% r [m_ext, ];
}

p = - r;
norm_r2 = sum (r ^ 2);
norm_r2_initial = norm_r2;
norm_r2_target = norm_r2_initial * tolerance ^ 2;

while (i < max_iteration & norm_r2 > norm_r2_target)
{
    if (intercept_status == 2) {
        ssX_p = scale_X * p;
        ssX_p [m_ext, ] = ssX_p [m_ext, ] + t(shift_X) %*% p;
    } else {
        ssX_p = p;
    }

    q = t(X) %*% (X %*% ssX_p);

    if (intercept_status == 2) {
        q = scale_X * q + shift_X %*% q [m_ext, ];
    }

    q = q + lambda * p;
    a = norm_r2 / sum (p * q);
    beta_unscaled = beta_unscaled + a * p;
    r = r + a * q;
    old_norm_r2 = norm_r2;
    norm_r2 = sum (r ^ 2);
    p = -r + (norm_r2 / old_norm_r2) * p;
    i = i + 1;
}
# END THE CONJUGATE GRADIENT ALGORITHM

if (intercept_status == 2) {
    beta = scale_X * beta_unscaled;
    beta [m_ext, ] = beta [m_ext, ] + t(shift_X) %*% beta_unscaled;
} else {
    beta = beta_unscaled;
}

# Output statistics
avg_tot = sum (y) / n;
ss_tot = sum (y ^ 2);
ss_avg_tot = ss_tot - n * avg_tot ^ 2;
var_tot = ss_avg_tot / (n - 1);
y_residual = y - X %*% beta;
avg_res = sum (y_residual) / n;
ss_res = sum (y_residual ^ 2);
ss_avg_res = ss_res - n * avg_res ^ 2;

R2_temp = 1 - ss_res / ss_avg_tot
R2 = matrix(R2_temp, rows=1, cols=1)
write(R2, "")

totalIters = matrix(i, rows=1, cols=1)
write(totalIters, "")

# Prepare the output matrix
if (intercept_status == 2) {
    beta_out = append (beta, beta_unscaled);
} else {
    beta_out = beta;
}

write (beta_out, fileB);
"""

Convert data to proper format. Remember that we wanted to convert to a binary-block matrix representation prior to the actual execution of the algorithm. This would allow us to accurately capture the algorithm performance without the data conversion time affecting the measurement.

In [None]:
val mcX = new MatrixCharacteristics(numRows, numCols, 1000, 1000)
val mcY = new MatrixCharacteristics(numRows, 1, 1000, 1000)
val X = RDDConverterUtils.dataFrameToBinaryBlock(sc, data.select("features"), mcX, false, true)
val y = RDDConverterUtils.dataFrameToBinaryBlock(sc, data.select("label"), mcY, false, false)

In [None]:
println(mcX.getClass())
println(mcY)

Caching

In [None]:
val X2 = X.cache()
val y2 = y.cache()
val cnt1 = X2.count()
val cnt2 = y2.count() 

Now, we can train our model using the <code>SystemML</code> linear regression algorithm. We register the features matrix X and the label matrix y as inputs. We register the <code>beta_out</code> matrix, <code>R2</code>, and <code>totalIters</code> as outputs.

In [None]:
ml.resetConfig()
val Xmat = new MatrixMetadata(numRows, numCols)
val ymat = new MatrixMetadata(numRows, 1)
val X3 = MLContextConversionUtil.matrixObjectTo2DDoubleArray(MLContextConversionUtil.binaryBlocksToMatrixObject(X, Xmat))
val y3 = MLContextConversionUtil.matrixObjectTo2DDoubleArray(MLContextConversionUtil.binaryBlocksToMatrixObject(y, ymat))
val dmlscript = ScriptFactory.dml(linearReg).in("X", X3).in("y", y3).out("beta_out", "R2", "totalIters")

Run the script. Here is where you passed in that <code>linearReg</code> variable that was defined earlier.

In [None]:
val start = System.currentTimeMillis()
val outputs = ml.execute(dmlscript)
val trainingTime = (System.currentTimeMillis() - start).toDouble / 100.0

Get outputs. Note that we are using the helper methods that we defined earlier. The helper methods return Double and Int values from output generated by the <code>mlcontext</code>.

In [None]:
val B = outputs.getDataFrame("beta_out")
val r2 = getScalarDouble(outputs, "R2")
val iters = getScalarInt(outputs, "totalIters")
val trainingTimePerIter = trainingTime / iters

Print statistics

In [None]:
println(s"R2: ${r2}")
println(s"Iterations: ${iters}")
println(s"Training time per iter: ${trainingTimePerIter} seconds")
B.describe().show()

Note the training time here. How does this compare with the Spark ML linear regression?

<div class="alert alert-block alert-info" style="margin-top: 20px">
<h2>Get IBM Watson Studio free of charge!</h2>
    <p><a href="https://cocl.us/System_ML_notebook"><img src="https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/ML0111EN/Ad/BottomAd.png" width="750" align="center"></a></p>
</div>