###Imports
        

In [32]:
import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset
import com.databricks.spark.csv
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,DoubleType,DateType,TimestampType};
import scala.util.matching.Regex
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType,DoubleType};
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{udf,col,concat,lit,when,ceil}
import org.apache.spark.sql._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.StandardScaler


###Use A Custom Schema To Control Types


In [33]:
// This line reads in the file and parses it with a CSV reader
val sqlContext = new SQLContext(sc)
// Need this for my shorthand $ notation
import sqlContext.implicits._  

// For some reason, the reader below doesn't infer the types properly (all strings) .  Forcing the types here
val customSchema = StructType(Array(
    StructField("scorea", IntegerType, true),
    StructField("scoreb", IntegerType, true),
    StructField("timeleft", DoubleType, true),
    StructField("teama", StringType, true),
    StructField("teamb", StringType, true),
    StructField("scorea-scoreb", IntegerType, true),
    StructField("scoreb-scorea", IntegerType, true),
    StructField("pct-complete", DoubleType, true),
    StructField("pct-left", DoubleType, true),
    StructField("cf1", DoubleType, true),
    StructField("cf2", DoubleType, true),
    StructField("teamaspread", DoubleType, true),
    StructField("overunder", DoubleType, true),
    StructField("teambspread", DoubleType, true),
    StructField("teama_vegas_fscore", DoubleType, true),
    StructField("teamb_vegas_fscore", DoubleType, true),
    StructField("key", StringType, true),
    StructField("fscorea", DoubleType, true),
    StructField("fscoreb", DoubleType, true),
    StructField("fscorea-fscoreb", IntegerType, true),
    StructField("fscoreb-fscorea", IntegerType, true),
    StructField("away-win", DoubleType, true),
    StructField("home-win", DoubleType, true),
    StructField("teama_adj_fscore", DoubleType, true),
    StructField("teamb_adj_fscore", DoubleType, true),
    StructField("pfscoreb-pfscorea", DoubleType, true)
))

###Read In CSV File For Logistic Regression


In [34]:
val oddsfile = "swift://notebooks.spark/nba-datawrangle-lrDF.csv"
var logisticDF = sqlContext.read.
    format("com.databricks.spark.csv").
    option("header", "true"). // Use first line of all files as header
    option("inferSchema", "false"). // Automatically infer data types)
    option("nullValue", "empty").
    option("dateFormat", "yyyy-MM-dd").
    schema(customSchema).
    load(oddsfile)
    
logisticDF = logisticDF.withColumn("cf3", $"pct-left"*$"teamaspread" / 100).
                        withColumn("cf4", $"scoreb-scorea"*$"scoreb-scorea"*$"scoreb-scorea")


###Inspect the Data

In [35]:
////////////////////////
// Here make sure the data is read in properly
//logisticDF.filter($"timeleft" < 10).show(50)
logisticDF.describe()

###Function to Create the Model and Train it and Test it

In [36]:
/////////////////////////////////////////////////////////////
//  Rather than use an ML Pipleline, I created this function so 
// that I could extract/print some intermediate results and
// debug the model.  

def trainAndTest( indf : org.apache.spark.sql.DataFrame, featureAryString : Array[String] ) : (org.apache.spark.sql.DataFrame, org.apache.spark.ml.classification.LogisticRegressionModel, Array[Double])  = {
    println("*************************************************")
    
    /////////////////////////////
    // Split the data into training and test sets
    
    val splits = indf.randomSplit(Array(0.7,0.3,0.0), seed = 25L)
    val trainingdf = splits(0).cache()
    val testdf = splits(1).cache()
    
    println("Tranining Samples = " + trainingdf.count())
    println("Test      Samples = " + testdf.count())
    

     /////////////////////////////
     // use a vector assembler to build features for 
     // logistic model
     val assembler = new VectorAssembler()
       .setInputCols(featureAryString)
       .setOutputCol("featuresRaw")

     val trainingdf2  =  assembler.transform(trainingdf)
     val testdf2      =  assembler.transform(testdf)

     /////////////////////////////
     // Standardize the data 
     // logistic model
    val scaler = new StandardScaler()
     .setInputCol("featuresRaw")
     .setOutputCol("features")
     .setWithStd(false)
     .setWithMean(false)
    
    val scalerModel = scaler.fit(trainingdf2)
    // Normalize each feature to have unit standard deviation.
    val trainingdf3 = scalerModel.transform(trainingdf2)
    val testdf3 = scalerModel.transform(testdf2)

   
    /////////////////////////////
    //Logistic Regression Model Setup
    // Setup some of the configurations for the Logistic regression model ..
    val lr = new LogisticRegression()
      .setMaxIter(25)
      .setRegParam(0.0001)
      .setElasticNetParam(0.0)
      .setLabelCol("home-win")

    // Fit the model
    val lrModel   = lr.fit(trainingdf3)

    println("Reg Parameter:    =" + lrModel.getRegParam)
    println("lrModel.intercept = " + lrModel.intercept)
    println("lrModel.weights   = " + lrModel.weights)

    // Save the model for later use ....
    // Argh ! -> in 1.6.1 api, but not 1.5.2 :(  
    // lrModel.save("modelPath" )
    
    ////  Create a logistic regression summary object ////
    //val lrSummary = lrModel.summary
    //println("lrSummary.objectiveHistory = " + lrSummary.objectiveHistory.length)
    //println(lrSummary.objectiveHistory.deep.mkString("\n"))
    ////
    
     /////////////////////////////
     //Generate Predictions
     // transform is now used in lieu of predict from mllib.  Found this after studying the API for a while
    val trn_predictions = lrModel.transform(trainingdf3)
            .withColumn("correct", ($"home-win" === $"prediction"))
            .withColumn("pct-comp-ceil", ceil($"pct-complete"))

    val predictions = lrModel.transform(testdf3)
            .withColumn("correct", ($"home-win" === $"prediction"))
            .withColumn("pct-comp-ceil", ceil($"pct-complete"))

     /////////////////////////////
     //Evaluate Predictions and Print results
    
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("home-win")
      .setPredictionCol("prediction")
      .setMetricName("f1")  // recall / precision also options
    
    val trn_tot_f1 = evaluator.evaluate(trn_predictions)
    val tst_tot_f1 = evaluator.evaluate(predictions)
    val f1q1 = evaluator.evaluate(predictions.filter($"pct-complete" < 25))
    val f1q2 = evaluator.evaluate(predictions.filter($"pct-complete" > 25 && $"pct-complete" < 50))
    val f1q3 = evaluator.evaluate(predictions.filter($"pct-complete" > 50 && $"pct-complete" < 75))
    val f1q4 = evaluator.evaluate(predictions.filter($"pct-complete" > 75))
    
    println("Total Train f1 = " + (trn_tot_f1))
    println("Total Test  f1 = " + (tst_tot_f1))
    
    println("Q1 Test f1 = " + (f1q1))
    println("Q2 Test f1 = " + (f1q2))
    println("Q3 Test f1 = " + (f1q3))
    println("Q4 Test f1 = " + (f1q4))

    val f1Ary = Array(f1q1,f1q2,f1q3,f1q4,tst_tot_f1)

    // return the 
    (predictions,lrModel,f1Ary)
}

###Test and Train multiple models

In [37]:
/////////////////////////////
// Evaluate 3 different Models

val (prediction_0, model_0, f1m0)   = trainAndTest(logisticDF, Array("scoreb-scorea" ))
val (prediction_1, model_1, f1m1)   = trainAndTest(logisticDF, Array("scoreb-scorea",  "teamaspread" ))
val (prediction_2, model_2, f1m2)   = trainAndTest(logisticDF, Array("scoreb-scorea",  "teamaspread", "cf1", "cf2", "cf3" ))



*************************************************
Tranining Samples = 9169
Test      Samples = 4048
Reg Parameter:    =1.0E-4
lrModel.intercept = 0.41260820890900873
lrModel.weights   = [0.18877505797762903]
Total Train f1 = 0.7876734627050461
Total Test  f1 = 0.7645516405512659
Q1 Test f1 = 0.6652233914093495
Q2 Test f1 = 0.6878924759503559
Q3 Test f1 = 0.7840732373488812
Q4 Test f1 = 0.9144859475411855
*************************************************
Tranining Samples = 9169
Test      Samples = 4048
Reg Parameter:    =1.0E-4
lrModel.intercept = 0.17507717013282026
lrModel.weights   = [0.16674937572604415,0.11411501437620475]
Total Train f1 = 0.8198671063813032
Total Test  f1 = 0.8169695272693457
Q1 Test f1 = 0.7453483277756455
Q2 Test f1 = 0.7747614873259977
Q3 Test f1 = 0.8370955032805536
Q4 Test f1 = 0.907169459984065
*************************************************
Tranining Samples = 9169
Test      Samples = 4048
Reg Parameter:    =1.0E-4
lrModel.intercept = 0.17183996760200526

###Examine F1 scores from the models

In [38]:
////////////////////////////////////////////
// F1 score is a metric used to evaluate different models
// it runs on a scale from 0 to 1 with the larger value
// meaning the model performs better
// F1 score is a combination of precision / recall and
// helps in situations where outcomes are highly skewed 
// in one direction.  eg 95% samples are wins, 5% are losses
// in that example, i could make a model that blindly predicts win
// every time and I would be 95% correct... F1 adjusts for this fact
// and would penalize me for the false negatives


// Build a small dataframe to hold my F1 scores....
val lblRdd = sc.parallelize( List("model0","model1","model2"),2)
val errRdd = sc.parallelize( Array(f1m0,f1m1,f1m2),2)
case class errData(label: String, q1: Double,q2: Double,q3: Double,q4: Double, tot:Double)
val test = lblRdd.zip(errRdd)

val errDf = test.map({ 
  case (lbl: String, Array(q1: Double,q2: Double,q3: Double,q4: Double, tot:Double)) => errData(lbl,q1,q2,q3,q4,tot)
}).toDF("model", "Q1","Q2","Q3","q4","total")

errDf.show()
//test.take(3)

//going from model0 -> model1 yields a decent improvement, but after that the improvement is marginally better with the extra terms

###Lets take a look at some of the Errors to see if there is any pattern

In [39]:
prediction_2.filter($"correct" === false).select('teama,'scorea,'teamb,'scoreb,$"timeleft",$"pct-comp-ceil",'teamaspread,'fscorea,'fscoreb,'probability,$"home-win",$"fscoreb-fscorea",'prediction).
  filter($"fscoreb-fscorea" < 4 && $"fscoreb-fscorea" > -4).show(20)

prediction_2.select('teama,'scorea,'teamb,'scoreb,$"timeleft",$"pct-comp-ceil",'teamaspread,'fscorea,'fscoreb,'probability,$"home-win",'prediction).show(5)

//val (prediction_4, model_4, f1m4)   = trainAndTest(logisticDF, Array("scoreb-scorea",  "teamaspread", "cf1", "cf2", "cf3","cf4" ))

// Some errors due to 
//   early in game ....
//   close scores at the end
//   some games the spread strongly effects game at the end... mabye scale that somehow by time left ?
//   teams that had an early lead, even though not favored did end up winning.  maybe add a scorediff^2

+-----+------+-----+------+-------------+-------------+-----------+-------+-------+--------------------+--------+---------------+----------+
|teama|scorea|teamb|scoreb|     timeleft|pct-comp-ceil|teamaspread|fscorea|fscoreb|         probability|home-win|fscoreb-fscorea|prediction|
+-----+------+-----+------+-------------+-------------+-----------+-------+-------+--------------------+--------+---------------+----------+
|  hou|    77|  dal|    73|10.4833333333|           79|       -1.0|   86.0|   88.0|[0.68507285301836...|     1.0|              2|       0.0|
|  hou|    71|  dal|    70|12.2166666667|           75|       -1.0|   86.0|   88.0|[0.52397604298064...|     1.0|              2|       0.0|
|  hou|    83|  dal|    82|         5.05|           90|       -1.0|   86.0|   88.0|[0.55029109825172...|     1.0|              2|       0.0|
|  cle|    31|  chi|    22|         37.6|           22|       -7.5|  102.0|  105.0|[0.86593862015105...|     1.0|              3|       0.0|
|  hou|    83

Name: Syntax Error.
Message: 
StackTrace: 

###Plot Correct / Incorrect predictions as a function of pct-Complete (REMOVE)

###Logistic Analysis And Explanation

    Complex Model 2 Discussion
    When the logistic regression model is trained, the weights corresponding to each feature are optimized to minimize the error of the predictions.  Below
    are the weight from the final model that was trained with 5 features.

               scoreb-scorea         teamaspread          custom_feature_1    custom_feature_2      custom_feature_3
    weights =  [-0.034776110152298007,0.022592109761735337,0.24208359305268817,4.9571665391790966E-5,0.16204385518333322]

    Interpretting weights can be tricky, especially if input features are functions of each other (ie, if one feature changes, it implies another feature changes)
    
    Lets look at just the away team spread, as this feature is not a function of any other feature.
    
    The away spread weight is 0.02259.  If the spread increases by 1, then the probability of the away team winning is 
    
    
$$e^{0.02259} = 1.023$$ 
    
    This means that there is a 2.3% relative increase in the probablity the home team will win for every one point change in away team spread.
    

###Function to predict new examples (Requires debug, potentially REMOVE)

In [40]:
import org.apache.spark.sql._
// These is a helper function that converts an 'Any(Int)' type to a Double or Any(Double) to a Double
val ai2d : (Any => Double) = (in:Any) => in.asInstanceOf[java.lang.Integer].doubleValue
val ad2d : (Any => Double) = (in:Any) => in.asInstanceOf[java.lang.Double]

////////////////////////////////////////////////////////////
// These were a couple custom UDF's I needed to cleanse the data 
// and also to add a few features based on a proprietary way of combining
// the score with the time left.

// Date Logic to adjust for games that finish on the day after ....
// This is due to not having a great key to join my tables ...
val datecrossregex = new Regex("^0[0-3]")
val dateadjust : ((String, String) => String) = (datein, tsin ) => {
    val datetest =  datecrossregex.findFirstIn(tsin)
    val dateary = datein.split("-")
    val rv = datetest match {
      case Some(s) => { 
         val day = "%02d".format(dateary(2).toInt -1)
         val newdate = dateary(0) + "-" + dateary(1) + "-" + day  
         newdate
      }
      case None => datein
    }
    rv.asInstanceOf[String]
}
val dateadjustudf = udf(dateadjust)

// UDFs to create some extra features ... this one is for an experiemental combination of Time left and Score difference.  
// Made this via intuition.  This can be extended to add other custom features
//val crossOverTime = 8
//val exponentScaler = 0.5
val scoredivtimeXform: ((Double,Double,Double,Double) => Double) = (sd:Double, tl:Double, co:Double, exp:Double) => {
    val scaler = 1 / Math.pow( (tl / co) + 0.01 , exp)
    sd * scaler
}
val scoredivtimeUdf = udf(scoredivtimeXform)


def getPrediction_model2(teama : String  , scorea : Int , teamb : String , scoreb : Int,  timeleft : Double, teamaspread : Double, model : org.apache.spark.ml.classification.LogisticRegressionModel) : Unit = {

  //model2 features = "scoreb-scorea",  "teamaspread", "cf1", "cf2", "cf3"
  val sd = scoreb-scorea
  val ts = teamaspread
  val pctleft = timeleft / 48.0;
  
  val cf1 = scoredivtimeXform(sd.toDouble, pctleft.toDouble, 25.0, 0.5)
  val cf2 = scoredivtimeXform(sd.toDouble, pctleft.toDouble, 2.0, 1.3)
  val cf3 = pctleft*ts/100;
  val lp = LabeledPoint(0.0,Vectors.dense(sd,teamaspread,cf1,cf2,cf3))

  val single = Seq(lp).toDF("label","features")
  val result = model.transform(single)
  val rv = result.select($"prediction").head(1)
  val prb = result.select($"probability").head(1)

  val rv1 = ad2d(rv(0)(0))
  val prb1 = prb(0)(0)
  //println("dbg : sdt =" + sdt)
  println(teama + "(away) vs " + teamb + "(home)")
  println("Spread(HomeTeam) : " + teamaspread + " (+ means home team is not favored)")
  println("Time Left        : " + timeleft)
  val winner = {if (rv1 == 1) teamb else teama}
  println("Predicted Winner : " + winner + " Probablity : " + prb1)
}


###Simple Predictor based on a new Example ....

In [41]:
// Arguments : Home Team, Home Team Score, Home Team, Home Team Score, Timeleft, Away Team Spread, Model)
getPrediction_model2("lac",88, "por", 96, 20.0, -8.0, model_2)

lac(away) vs por(home)
Spread(HomeTeam) : -8.0 (+ means home team is not favored)
Time Left        : 20.0
Predicted Winner : por Probablity : [9.44528374407261E-6,0.999990554716256]


### Now lets implement Model2 in our Node.js web app!  See README for more details.