# Generate some data with a simple model

In [None]:
import org.apache.spark.rdd.RDD
import com.cra.figaro.library.atomic.continuous._aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

## System generating X values

The first variable is $X \sim \textit{U}(1.0, 5.0)$

In [None]:
val generatorModelX = Uniform(1.0, 5.0)

val generateOneX = () => {
  generatorModelX.generate()
  generatorModelX.value
}

## System generating Y values from X

We're going to generate a model like this 

$Y = X^2 + Z$

$Z \sim \mathcal{N}(0.0, 0.25)$

In [None]:
val generatorModelZ = Normal(0.0, 0.25)

val generateOneY = (x: Double) => {
  generatorModelZ.generate()
  x*x + math.abs(generatorModelZ.value)
}

In [None]:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

## Generate the universe (1000 items)

An universe is actually **all data**, we never have access to that.

In [None]:
val collection = (1 to 1000).map(i => {
                   val x = generateOneX().toDouble
                   val y = generateOneY(x).toDouble
                   (x, y)
  })
widgets.LineChart(collection.toList, fields=Some(("X", "Y")), maxPoints=collection.size)

In [None]:
val universe = sparkContext.parallelize( collection )
universe.cache()
universe.count

# Create the model generators

Create some functions (formulas) to be considered as models. 

That is, they are trying to model the real universe, we know that the real world is actually following a second order distribution, however here we create four models, from order `0` (constant) to to third order.

In [None]:
val toPoints0 = ( xy: (Double, Double)) => 
  LabeledPoint(xy._2, Vectors.dense(Array(1.0)))

val toPoints1 = ( xy: (Double, Double)) => 
  LabeledPoint(xy._2, Vectors.dense(Array(xy._1, 1.0)))

val toPoints2 = ( xy: (Double, Double)) => 
  LabeledPoint(xy._2, Vectors.dense(Array(xy._1*xy._1, xy._1, 1.0)))

val toPoints3 = ( xy: (Double, Double)) => 
  LabeledPoint(xy._2, Vectors.dense(Array(xy._1*xy._1*xy._1, xy._1*xy._1, xy._1, 1.0)))

# Example: simple linear regression on one sample

## Take a sample of this universe (1% items)

In [None]:
val sample = () => universe.sample(true, 0.01)

Let's create a order `1` model this sample.

In [None]:
val data = sample().map(toPoints1)
widgets.LineChart(data.map(lp => (lp.features(0), lp.label)).collect.toList, fields=Some(("X", "Y")))

## Order `1` Linear regression

In [None]:
import org.apache.spark.mllib.optimization.{LBFGS, LeastSquaresGradient, SimpleUpdater}
import org.apache.spark.mllib.regression.LinearRegressionModel

We're now creating a function that will basically train a **Linear Regression** using **Least-squared loss** function on the modeled data.

In [None]:
val train = (dta: RDD[LabeledPoint]) => {
  val one = dta.first
  val numCorrections = 10
  val convergenceTol = 1e-4
  val maxNumIterations = 100
  val regParam = 0.1
  val initialWeightsWithIntercept = Vectors.dense(new Array[Double](one.features.size))
  
  val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
    dta.map(lp => (lp.label, lp.features)), // create the RDD[(Double, Vector)]
    new LeastSquaresGradient(),             // loss function
    new SimpleUpdater(),
    numCorrections,
    convergenceTol,
    maxNumIterations,
    regParam,
    initialWeightsWithIntercept
  )
  
  new LinearRegressionModel(weightsWithIntercept, 0.0)
}

Now we can run it on the first order modeled data.

In [None]:
val model = train(data)

## Estimate the error for the `1` order model

This is going to apply the model on the sample data we have, then it computes the standard error on it.

Recall that this dataset is **exactly** the same as for the training part.

In [None]:
val safe = new java.io.Serializable {
  val localModel = model
  val pred = (p: LabeledPoint) => localModel.predict(p.features)
  val localData = data
  
  localModel.predict(localData.map(_.features))
  val valuesAndPreds = localData.map { point =>
    val prediction = pred(point)
    (point.label, prediction)
  }
  val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
}
import safe._

In [None]:
:markdown
So we have a **${safe.MSE}** mean square error 

We can also plot what the predictions and the data are looking like.

In [None]:
import notebook.front.third.wisp._
Plot(Seq(Pairs(data.collect().map(v => (v.features(0), v.label)).toList.sortBy(_._1), chart="line"),
         Pairs((data.collect().map(v => v.features(0)) zip safe.valuesAndPreds.map(_._2).collect.toList).sortBy(_._1), chart="line")
     )
)

## How does this predict works?

Getting back to the model, we basically trained two parameters:
* $\beta_0$, the interceptor
* $\beta_1$, the weigths vector (slope in the dimensions) 

Hence the prediction will take the features we have seen, the data, and apply a linear inference on it, that is apply $y = \beta_0 + \beta_1 * x$ 

In [None]:
:markdown
We'll use the intercept `${model.intercept}` and the weights `${model.weights}`

So let's take an observation, and look at the features (data) and label (value).

In [None]:
val sample1 = data.take(1).head

In [None]:
:markdown
* label `${sample1.label}`
* features `${sample1.features}`

In [None]:
val predict1 = model.intercept + (sample1.features.toArray zip model.weights.toArray).map(x => x._1*x._2).sum
()

In [None]:
:markdown
So the prediction is `${predict1}` and is `${math.abs(sample1.label - predict1)}` far from the correct label

In [None]:
widgets.ScatterChart(List(predict1, sample1.label, math.abs(sample1.label - predict1)), fields=Some(("X", "Y")))

# Try 0, 1st, 2nd & 3rd order functions on 1% universe samples

This function will:
* compute the error of a given model and the prediction of the data.
* create a plot that can be used to check the difference between the data and the prediction

In [None]:
import org.apache.spark.mllib.regression.LinearRegressionModel
import org.apache.spark.rdd.RDD
import com.quantifind.charts.highcharts._

val MSE = (model: LinearRegressionModel, dta: RDD[LabeledPoint], deg:Int) => {
  val localModel = model
  val pred = (p: LabeledPoint) => localModel.predict(p.features)
  val localData = dta
  
  localModel.predict(localData.map(_.features))
  val valuesAndPreds = localData.map { point =>
    val prediction = pred(point)
    (point.label, prediction)
  }
  
  val vs = localData.collect().map(v => (v.features.toArray.sum, v.label))
  
  val plot = Plot(Seq(
    Pairs(vs.toList.sortBy(_._1), chart="line"),
    Pairs((vs.map(_._1) zip valuesAndPreds.map(_._2).collect.toList).sortBy(_._1), chart="line")
  ))
  
  (plot, valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean())
}

## Simulations

Now we will run a lot of times the models on training data but also keep some data for validation testing later, that are usually named testing datasets.

So that we are simulating cases where we have a lot of samples that where either taken from the same dataset, or have been taken at different time.

In [None]:
case class ModelsMSE(mses: Array[(notebook.front.Widget, Double)])

val errors = (1 to 20).map{ i =>
                            
  val data = universe.sample(true, 0.01)                            
  val (data0, data1, data2, data3) = 
                           (data.map(toPoints0), data.map(toPoints1), data.map(toPoints2), data.map(toPoints3))
  
  val test = universe
  val (test0, test1, test2, test3) = 
                           (test.map(toPoints0), test.map(toPoints1), test.map(toPoints2), test.map(toPoints3))
              
  val model0 = train(data0)
  val model1 = train(data1)
  val model2 = train(data2)
  val model3 = train(data3)

  (ModelsMSE(Array(MSE(model0, data0, 0), MSE(model1, data1, 1), MSE(model2, data2, 2), MSE(model3, data3, 3))),
   ModelsMSE(Array(MSE(model0, test0, 0), MSE(model1, test1, 1), MSE(model2, test2, 2), MSE(model3, test3, 3))))
}

Let's take a look at one of these runs to see how trainings and tests ppredictions are comparable

In [None]:
val (tr, te) = errors(scala.util.Random.nextInt(errors.size))

def plots(order:Int) = html(<h4>Order {order}</h4>) ++ tr.mses(order)._1 ++ te.mses(order)._1

List.tabulate(4){i => plots(i)}.reduce(_ ++ _)

## Comparing the models

In the following, we'll be using the 20 trainings and testing we've done to check which model won.

For this, we'll count the number of times a specific order worked better than the others, for both the training set and the testing set.

In [None]:
case class Error(order: Int, bestCount: Int, sse: Double, count: Int) {
  def addBest(se: Double) = Error(order, bestCount+1, sse+se, count+1)
  def add(se: Double) = Error(order, bestCount, sse+se, count+1)
}

val zeros = List.tabulate(4)(i => Error(i, 0, 0.0, 0))

val errs = ((zeros) /: errors.map(_._2))((acc, err) => {
                                 val bestIdx = err.mses.map(_._2).zipWithIndex.minBy(_._1)._2
                                 acc.zipWithIndex.map{
                                   case (e, idx) if (idx == bestIdx) => e.addBest(err.mses(bestIdx)._2)
                                   case (e, idx) => e.add(err.mses(idx)._2)
                                 }
                                }
                               )
val trainErrs = ((zeros) /: errors.map(_._1))((acc, err) => {
                                 val bestIdx = err.mses.map(_._2).zipWithIndex.minBy(_._1)._2
                                 acc.zipWithIndex.map{
                                   case (e, idx) if (idx == bestIdx) => e.addBest(err.mses(bestIdx)._2)
                                   case (e, idx) => e.add(err.mses(idx)._2)
                                 }
                                }
                               )

import notebook.front.widgets._
table(
  5,
  Seq.tabulate(4){i=>
    Seq(
      text(""+i), text(""+{errs(i).bestCount}), text(""+(errs(i).sse/errs(i).count)), text(""+trainErrs(i).bestCount), text(""+(trainErrs(i).sse/trainErrs(i).count))
    )
  }.flatten,
  Seq(text("Model Order"), text("# Best model"), text("Mean Squared Error"), text("Training best model"), text("Training MSE"))
)

As we can see, the **second order** model is more often picked on the test data sets, which is great because we **know** (we're lucky) that the **Universe** is actually quadratic.

However, if we look at the training data set best models, we can see that the **third** order is/should **always** be the best. We is countersense with our knwowledge of the Universe.
This is simply because, the $R^2$ can only be reduced by adding new component into the model.

That's also why we be better use some regularization to choose the model than relying on a single metric.

But that's a different story!