In [1]:
%scala

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._

import org.apache.spark.ml.feature.Imputer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.{Imputer, StandardScaler, StringIndexer, VectorAssembler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

import org.apache.spark.mllib.evaluation.RegressionMetrics

val path = "/FileStore/tables/diamonds.csv"
val label = "price"

// Load CSV into DataFrame
val dfBrut = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
//.option("numPartitions", partitionNumber)
.load(path)

val df = dfBrut.select($"carat", $"cut", $"color", $"clarity", $"depth", $"table",$"price".cast(DoubleType).as("price"), $"x", $"y", $"z")

// Fill null values with mean
val imputer = new Imputer()
.setInputCols(df.drop("cut", "color", "clarity").columns)
.setOutputCols(df.drop("cut", "color", "clarity").columns.map(c => s"${c}"))
.setStrategy("mean")

val dfCleaned = imputer.fit(df).transform(df)

//Indexing categorical features
val cutIndexer = new StringIndexer().setInputCol("cut").setOutputCol("cutIndex")
val cutIndexed = cutIndexer.fit(dfCleaned).transform(dfCleaned)
val colorIndexer = new StringIndexer().setInputCol("color").setOutputCol("colorIndex")
val colorIndexed = colorIndexer.fit(cutIndexed).transform(cutIndexed)
val clarityIndexer = new StringIndexer().setInputCol("clarity").setOutputCol("clarityIndex")
val clarityIndexed = clarityIndexer.fit(colorIndexed).transform(colorIndexed)

// One hot encoding indexed categorical features
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("cutIndex", "colorIndex", "clarityIndex"))
.setOutputCols(Array("cutVec", "colorVec", "clarityVec"))
val modelEncoder = encoder.fit(clarityIndexed)
val encoded = modelEncoder.transform(clarityIndexed).cache()

// Assembling features
val assembler = new VectorAssembler().
setInputCols(Array("carat", "depth", "table", "x", "y", "z", "cutVec", "colorVec", "clarityVec")).
setOutputCol("features")

val output = assembler.transform(encoded)


// Normalize each feature to have unit standard deviation.
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)

val scaledOutput = scaler.fit(output).transform(output)


// Sparse to array conversion for compatibility with our GD
val toArr: Any => Array[Double] = _.asInstanceOf[DenseVector].toArray
val toArrUdf = udf(toArr)
val asDense = udf((v: SparseVector) => v.toDense)

val output_dense = scaledOutput.withColumn("features_dense", asDense($"scaledFeatures"))

val output_arr = output_dense.withColumn("features_arr", toArrUdf('features_dense))

// These df are the same but compatible with our GD
// val train_arr = train.select(label, "features_arr").withColumnRenamed(label, "labels").withColumnRenamed("features_arr", "features")
// val test_arr = test.select(label, "features_arr").withColumnRenamed(label, "labels").withColumnRenamed("features_arr", "features")


// PIPELINE

val steps: Array[org.apache.spark.ml.PipelineStage] = Array(imputer, cutIndexer, colorIndexer, clarityIndexer, encoder, assembler, scaler)

val pipeline_prep = new Pipeline().setStages(steps)

val outputPipe = (pipeline_prep.fit(df)).transform(df)

// outputPipe.show(truncate=false)

// Split df into train and test set
val Array(train, test) = outputPipe.randomSplit(Array(0.8, 0.2), seed = 0)

val lr = new LinearRegression().setLabelCol(label).setFeaturesCol("features")

val model = lr.fit(train)

val holdout = model.transform(test)


val r2Evaluator = new RegressionEvaluator().setLabelCol("price").setMetricName("r2")

println("r2 = " + r2Evaluator.evaluate(holdout))

/*

def diamonds_gridsearch_fast_sgd_perf(df_train: DataFrame, df_test: DataFrame, epochsLocal : Array[Int], epochsGlobal : Int): DataFrame ={
    val w = Array(0.0, 0.0,0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

 

    var grid = List[Array[Double]]()
    for (epoch <- epochsLocal){
        // MBGD
        val t0_mbgd = System.nanoTime()
        val w_mbgd = MBGD_parallel(w, 0.00001, epochsGlobal, epoch, df_train, 1)
        val t1_mbgd = System.nanoTime()
        val pred_mbgd = predict(df_test, w_mbgd)
        val error_mbgd = mse(pred_mbgd, df_test)
        val score_mbgd = r2_score(pred_mbgd, df_test)
        
        // MBGD with Momentum 0.9
        val t0_mom = System.nanoTime()
        val w_mom = MOM_MBGD_parallel(w, 0.00001, epochsGlobal, epoch, df_train, 1, 0.9)
        val t1_mom = System.nanoTime()
        val pred_mom = predict(df_test, w_mom)
        val score_mom = r2_score(pred_mom, df_test)
        val error_mom = mse(pred_mom, df_test)

 

        // MBGD with Adagrad
        val t0_ada = System.nanoTime()
        val w_ada = ADA_MBGD_parallel(w, 10, epochsGlobal, epoch, df_train, 1, 0.000000001)
        val t1_ada = System.nanoTime()
        val pred_ada = predict(df_test, w_ada)
        val score_ada = r2_score(pred_ada, df_test)
        val error_ada = mse(pred_ada, df_test)
        
        val perf = Array[Double]((epoch*epochsGlobal).toDouble, score_mbgd, error_mbgd, (t1_mbgd-t0_mbgd)/1000000000.0, score_mom, error_mom, (t1_mom-t0_mom)/1000000000.0, score_ada, error_ada, (t1_ada-t0_ada)/1000000000.0)
        grid = perf :: grid 
     }
    val arr = grid.toArray
    val grid_df = sc.parallelize(arr).map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9))).toDF("epoch", "mbgd score", "mbgd error", "mbgd time", "mom score", "mom error", "mom time", "ada score","ada error", "ada time")
    return grid_df
}

*/
