# Generate some data with a simple model

In [ ]:
import org.apache.spark.rdd.RDD
import com.cra.figaro.library.atomic.continuous._

## System generating X values

The first variable is $X \sim \textit{U}(1.0, 5.0)$

In [ ]:
val generatorModelX = Uniform(1.0, 5.0)

val generateOneX:()=>Double = () => {
  generatorModelX.generate()
  generatorModelX.value
}

Here what values $X$ will take randomly:

First, we generate the values for the $x$ axis, which will be simply the list from $1$ to $1000$.

In [ ]:
val xAxis = List.range(1, 1000, 1)

Then we can generate the list of values for each value taken by $X$ on the `x` axis (well... for  $U$ it foesn't matter though)

In [ ]:
val yAxis = xAxis.map(x => generateOneX())

Now we can plot the values and _see_ how the distribution looks like.

We'll be using `zip` on the list container, whith create a list of pairs from two given lists, 
see [`zip`](https://www.scala-lang.org/api/current/scala/collection/immutable/List.html#zip[B](that:scala.collection.GenIterable[B]):List[(A,B)])

```scala
[1, 2] zip [3, 4] = [ [1, 3] , [2, 4] ]
```

Also, we will use the plotting functionnality of the Spark Notebook, in this case: the `ListChart` which plots the provided list of pairs as a line.

In [ ]:
LineChart(xAxis zip yAxis)

As we can see, the values don't show a specific pattern but all values land between $1$ and $5$.

## System generating Y values from X

We're going to generate a model like this 

$Y = X^2 + Z$

$Z \sim \mathcal{N}(0.0, 0.25)$

So $Y$ takes its values as simply as the _square_ value of the $X$ values, however there is some more noise added to it using $Z$.

$Z$ can be problematic (and is always there) because it contains (generally) all non considered infomation which can or are not modelled.
There are several reasons, generally because they are almost neglectable and not predictable... however, this is not always to be taken as granted; it's always recommended to check your assumptions!

The noise $Z$ (let's put it like this) is generated with small value around $0$ and deviating slowly (rarely) between $\pm$ 0.75 

In [ ]:
val generatorModelZ = Normal(0.0, 0.25)

Let's look at the values it will take, using the same process as before.

In [ ]:
val yAxisForZ = xAxis.map(x => generatorModelZ.generateRandomness) 

In [ ]:
LineChart(xAxis zip yAxisForZ)

It kind of looks the same as for the $U$ function, however the variation seems to be less, well, uniform.

To have another view of these value, we can look at the "density" by simply binning the values (on the $y$ axis); a simple way to do this is to plot the histogram with the `Plotly` library.

In [ ]:
CustomPlotlyChart(yAxisForZ, dataOptions="{type: 'histogram'}", dataSources="{x: '_2'}")

Another good way to look at a random variable is to use the `BoxPlot` which shows the _quantiles_ and extreme values.

In [ ]:
CustomPlotlyChart(yAxisForZ, 
                  dataOptions="{type: 'box'}",
                  dataSources="{y: '_2'}")

As we can see in the above plot, the distribution looks like a "bell" curve where most values are around the center, here $0$.

We'll see in other materials these concept in more details.

Now, we can create a function that will generate a value for $Y$ following the model.

In [ ]:
val generateOneY = (x: Double) => {
  x*x + math.abs(generatorModelZ.generateRandomness) // X^2 + Z
}

## Generate the universe (1000 items)

An universe is actually **all data**, we never have access to that.

In [ ]:
val collection = xAxis.map(i => {
                   val x = generateOneX().toDouble
                   val y = generateOneY(x).toDouble
                   (x, y)
  })
LineChart(collection.toList, maxPoints=collection.size)

Since we'll be using Spark for further investigation with this data, we'll Sparkify the created dataset, `collection`.

In [ ]:
val universe = sparkContext.parallelize( collection )
universe.cache()
universe.count

# Create the model generators

Create some functions (formulas) to be considered as models. 

That is, they are trying to model the real universe, we know that the real world is actually following a second order distribution, however here we create four models, from order `0` (constant) to to third order.

* $y=intercept$
* $y=intercept + ax$
* $y=intercept + ax + bx^2$
* $y=intercept + ax + bx^2 + cx^3$

In [ ]:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

We define a type for the function that generates the values.

In [ ]:
type Gen=((Double,Double))=>LabeledPoint

**Definition** of the generator for: $y=intercept$

In [ ]:
val toPoints0:Gen = { case (x: Double, y:Double) => LabeledPoint(y, Vectors.dense(Array(1.0))) }

**Definition** of the generator for: $y=intercept + ax$

In [ ]:
val toPoints1:Gen = { case (x: Double, y:Double) => LabeledPoint(y, Vectors.dense(Array(x, 1.0))) }

**Definition** of the generator for: $y=intercept + ax + bx^2$

In [ ]:
val toPoints2:Gen = { case (x: Double, y:Double) => LabeledPoint(y, Vectors.dense(Array(x*x, x, 1.0))) }

**Definition** of the generator for: $y=intercept + ax + bx^2 + cx^3$

In [ ]:
val toPoints3:Gen = { case (x: Double, y:Double) => LabeledPoint(y, Vectors.dense(Array(x*x*x, x*x, x, 1.0))) }

# Example: simple linear regression on one sample

## Take a sample of this universe (1% items)

In [ ]:
val sample = () => universe.sample(true, 0.01)

Let's create a order `1` model this sample.

In [ ]:
val data = sample().map(toPoints1)
LineChart(data.map(lp => (lp.features(0), lp.label)).collect.toList)

## Order `1` Linear regression

In [ ]:
import org.apache.spark.mllib.optimization.{LBFGS, LeastSquaresGradient, SimpleUpdater}
import org.apache.spark.mllib.regression.LinearRegressionModel

We're now creating a function that will basically train a **Linear Regression** using **Least-squared loss** function on the modeled data.

In [ ]:
val train = (dta: RDD[LabeledPoint]) => {
  val one = dta.first
  val numCorrections = 10
  val convergenceTol = 1e-4
  val maxNumIterations = 100
  val regParam = 0.1
  val initialWeightsWithIntercept = Vectors.dense(new Array[Double](one.features.size))
  
  val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
    dta.map(lp => (lp.label, lp.features)), // create the RDD[(Double, Vector)]
    new LeastSquaresGradient(),             // loss function
    new SimpleUpdater(),
    numCorrections,
    convergenceTol,
    maxNumIterations,
    regParam,
    initialWeightsWithIntercept
  )
  
  new LinearRegressionModel(weightsWithIntercept, 0.0)
}

Now we can run it on the first order modeled data.

In [ ]:
val model = train(data)

## Estimate the error for the `1` order model

This is going to apply the model on the sample data we have, then it computes the squared error on it.

Recall that this dataset is **exactly** the same as for the training part.

First we define the way we'll want to plot the data to "visualize" the performance using the
- features
- label
- predictions

In [ ]:
import org.apache.spark.mllib.linalg.Vector
def plotResults(features:RDD[Vector], labels:RDD[Double], predictions:RDD[Double]) = {
  val featuresData = features.map(f => f.toArray.sum)
  val realData = sparkSession.createDataFrame(featuresData zip labels).withColumn("type", lit("original"))
  val predData = sparkSession.createDataFrame(featuresData zip predictions).withColumn("type", lit("prediction"))
  
  new CustomPlotlyChart((realData union predData).orderBy("_1"),
                        layout="{title: 'Plot real and predicted values'}",
                        dataOptions="""{
                          splitBy: 'type',
                          byTrace: {
                            'real': {
                              mode: 'lines+markers',
                              marker: {
                                color: 'rgb(219, 64, 82)',
                                size: 4
                              }
                            },
                            'pred': {
                              mode: 'markers',
                              line: {
                                color: 'rgb(55, 128, 191)',
                                width: 2
                              }
                            }
                          }
                        }""",
                        dataSources="{x: '_1', y: '_2'}") 
}

Then we can create a function to compute the predictions and **mean squared error**.

In [ ]:
def run(model: LinearRegressionModel, dta: RDD[LabeledPoint]) = new {
  // local copies to avoid serialization problem
  val localModel = model
  val localLabel = dta.map(d => d.label)
  val localFeatures = dta.map(d => d.features)
  
  // we zip the label (true values) with predicted values using the `model.predict` function
  val localPredictions = localModel.predict(localFeatures)
  val valuesAndPreds = localLabel zip localPredictions
    
  // we compute the Mean Square Error
  val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()  
  
  val plot = plotResults(localFeatures, localLabel, localPredictions)
}

In [ ]:
val results0 = run(model, data)

In [ ]:
results0.MSE

In [ ]:
results0.plot

## How does this predict works?

Getting back to the model, we basically trained two parameters:
* $\beta_0$, the interceptor
* $\beta_1$, the weigths vector (slope in the dimensions) 

Hence the prediction will take the features we have seen, the data, and apply a linear inference on it, that is apply $y = \beta_0 + \beta_1 * x$ 

We'll use the intercept and weights below

In [ ]:
List(("intercept", model.intercept), ("weights", model.weights))

So let's take an observation, and look at the features (data) and label (value).

In [ ]:
val sample1 = data.first

In [ ]:
List(("label", sample1.label), ("features", sample1.features))

In the following, we'll apply the linear regression manually by executing the polynomial combination using the
- intercept
- features values `times` the trained weights

$$ 
\begin{aligned}
predict &= intercept + features * weights \\
        &= intercept + \sum_{i}^{} features_i * weights_i
\end{aligned}
$$

In [ ]:
val psum = (sample1.features.toArray zip model.weights.toArray) // list of pairs (feature, weight)
            .map(x => x._1*x._2)                                // list of products (equation term)
            .sum                                                // sum
val predict1 = model.intercept + psum

We can check how far this prediction is from correct label:

In [ ]:
math.abs(sample1.label - predict1)

# Try 0, 1st, 2nd & 3rd order functions on 1% universe samples

## Simulations

Now we will run a lot of times the models on training data but also keep some data for validation testing later, that are usually named testing datasets.

So that we are simulating cases where we have a lot of samples that where either taken from the same dataset, or have been taken at different time.

In [ ]:
case class ModelsMSE(mses: Array[(notebook.front.Widget, Double)])

val errors = (1 to 20).map{ i =>
                            
  val data = universe.sample(true, 0.01)                            
  val data0 = data.map(toPoints0)
  val data1 = data.map(toPoints1)
  val data2 = data.map(toPoints2)
  val data3 = data.map(toPoints3)
  
  val test = universe
  val test0 = test.map(toPoints0)
  val test1 = test.map(toPoints1)
  val test2 = test.map(toPoints2)
  val test3 = test.map(toPoints3)
              
  val model0 = train(data0)
  val model1 = train(data1)
  val model2 = train(data2)
  val model3 = train(data3)

  new {
    val run0 = (run(model0, data0), run(model0, test0))
    val run1 = (run(model1, data1), run(model1, test1))
    val run2 = (run(model2, data2), run(model2, test2))
    val run3 = (run(model3, data3), run(model3, test3))
  }
}

Let's take a look at **one** of these runs to see how trainings and tests predictions are comparable

In [ ]:
val picked = scala.util.Random.shuffle(errors).head

<div>
<h2>Picked run</h2>
<h3>Order 0</h3>
<h4>Training set: mse {picked.run0._1.MSE}</h4>
{picked.run0._1.plot}
<h4>Test set: mse {picked.run0._2.MSE}</h4>
{picked.run0._2.plot}

<h3>Order 1</h3>
<h4>Training set: mse {picked.run1._1.MSE}</h4>
{picked.run1._1.plot}
<h4>Test set: mse {picked.run1._2.MSE}</h4>
{picked.run1._2.plot}

<h3>Order 2</h3>
<h4>Training set: mse {picked.run2._1.MSE}</h4>
{picked.run2._1.plot}
<h4>Test set: mse {picked.run2._2.MSE}</h4>
{picked.run2._2.plot}

<h3>Order 3</h3>
<h4>Training set: mse {picked.run3._1.MSE}</h4>
{picked.run3._1.plot}
<h4>Test set: mse {picked.run3._2.MSE}</h4>
{picked.run3._2.plot}
</div>


## Comparing the models

In the following, we'll be using the 20 trainings and testing we've done to check which model won.

For this, we'll count the number of times a specific order worked better than the others, for both the training set and the testing set.

In [ ]:
class ErrorSummary(order: Int, var bestCount: Int, var sse: Double, var count: Int) {
  def addBest(se: Double) = {
    bestCount = bestCount+1
    add(se)
  }
  def add(se: Double) = {
    sse       = sse+se
    count     = count+1
  }
  
  override def toString() = s"$order, $bestCount, $sse, $count"
}

In [ ]:
val zeros = List.tabulate(4)(i => Error(i, 0, 0.0, 0))
val errorTests = Map(
  0 -> new ErrorSummary(0, 0, 0.0, 0),
  1 -> new ErrorSummary(1, 0, 0.0, 0),
  2 -> new ErrorSummary(2, 0, 0.0, 0),
  3 -> new ErrorSummary(3, 0, 0.0, 0)
)
val errorsTrainings = Map(
  0 -> new ErrorSummary(0, 0, 0.0, 0),
  1 -> new ErrorSummary(1, 0, 0.0, 0),
  2 -> new ErrorSummary(2, 0, 0.0, 0),
  3 -> new ErrorSummary(3, 0, 0.0, 0)
)

In [ ]:
errors.foreach { error =>
  val runs = List(error.run0, error.run1, error.run2, error.run3)
  val (trains, tests) = {
    val (trains, tests) = runs.unzip
    (trains.zipWithIndex, tests.zipWithIndex)
  }
  val bestTrain = trains.minBy(_._1.MSE)
  val bestTest  = tests.minBy(_._1.MSE)
  
  (0 until 4).foreach { i =>
    if (i == bestTrain._2) {
      errorsTrainings(i).addBest(bestTrain._1.MSE)
    } else {
      errorsTrainings(i).add(runs(i)._1.MSE)
    }
    if (i == bestTest._2) {
      errorTests(i).addBest(bestTest._1.MSE)
    } else {
      errorTests(i).add(runs(i)._2.MSE)
    }
  }
}

In [ ]:
errorTests

In [ ]:
table(
  5,
  Seq.tabulate(4){i=>
    Seq(
      text(""+i), text(""+{errorTests(i).bestCount}), text(""+(errorTests(i).sse/errorTests(i).count)), 
      text(""+errorsTrainings(i).bestCount), text(""+(errorsTrainings(i).sse/errorsTrainings(i).count))
    )
  }.flatten,
  Seq(text("Model Order"), text("# Best model"), text("Mean Squared Error"), text("Training best model"), text("Training MSE"))
)

As we can see, the **second order** model is more often picked on the **test** data sets, which is great because we **know** (we're lucky) that the **Universe** is actually quadratic.

However, if we look at the **training** data set best models, we can see that the **third** order is/should **always** be the best. We is countersense with our knwowledge of the Universe.
This is simply because, the $R^2$ can only be reduced by adding new component into the model.

That's also why we be better use some regularization to choose the model than relying on a single metric.

But that's a different story!