In [0]:
// Training Model
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType

val tweetSchema = new StructType()
                .add("tweetId", "string")
                .add("tweetText", "string")
                .add("location", "string")
                .add("timestamp", "string");

val spark = SparkSession
		.builder()
		.appName("StreamHandler")
		.config("spark.master", "local")
		.getOrCreate();

val traingDataSet = "/home/opt/data/training/"

val trainingDS = spark.read.json(traingDataSet).select($"location", to_date(unix_timestamp($"timestamp", "EEE MMM dd HH:mm:ss Z yyyy").cast("timestamp")).as("timestamp"))

val trainingCountsDF = trainingDS
    .groupBy($"location", $"timestamp")
    .count()
    
trainingCountsDF.createOrReplaceTempView("trainingTable")

trainingCountsDF.show
// shows counts per location per day 
// |   California|2016-02-08|    2|
// |     New York|2016-02-08|    2|
// |   California|2016-02-07|    2|


%sql
select location, weekofyear(timestamp) as Week, AVG(count) 
as weekly_avg from trainingTable where count > 0 group by  weekofyear(timestamp) , location
having count(*) > 0 order by weekly_avg DESC


%sql
select location, date_format(timestamp, "dd-MM-YYYY") as time, AVG(count) 
as count from trainingTable where count > 0 group by date_format(timestamp, "dd-MM-YYYY") , location
having count(*) > 0 order by time

%sql
select weekofyear(timestamp) as Week from trainingTable where count > 0 group by  weekofyear(timestamp)

/// create the features
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.{Pipeline, PipelineModel}

val locationRankDF = sql("select row_number() over (order by location) as locRank, location as locName from trainingTable  GROUP BY location")
locationRankDF.createOrReplaceTempView("locationRankMap")
println("Location Rank")

////
def calcLabel: (Double => Double) = (arg: Double) => {if (arg > 2.5) 1.0 else 0.0 }

// TRAINING DATA SET

val trngDf1 = sql(s"""
SELECT locRank, avg(count) as weeklyAvg, weekofyear(timestamp) as week
  FROM trainingTable
  JOIN locationRankMap ON location = locName
  GROUP BY weekofyear(timestamp), locRank
  HAVING weeklyAvg > 0
  order by weeklyAvg DESC
  """)
  
val trngDf2 = trngDf1.select($"locRank".cast("Double"), $"weeklyAvg".cast("Double"), $"week".cast("Double"))

println("Training DF 2")
trngDf2.show

val flulabel = udf(calcLabel)

val trngDf3 = trngDf2.withColumn("class", flulabel(trngDf2("weeklyAvg")))

println("Training DF 3")
trngDf3.show

val assembler = new VectorAssembler()
  .setInputCols(Array("locRank", "weeklyAvg","week"))
  .setOutputCol("features")

val trngDf4 = assembler.transform(trngDf3)

val labelIndexer = new StringIndexer().setInputCol("class").setOutputCol("label")
val trngDf5 = labelIndexer.fit(trngDf4).transform(trngDf4)

val splitSeed = 5043
val Array(trainingData, validationData) = trngDf5.randomSplit(Array(0.7, 0.3), splitSeed)

trainingData.show

// Label distribution
trainingData.groupBy("class").count().show()
trainingData.groupBy("class").agg(max("weeklyAvg").alias("max")).show()

//////////////////////////////// ///////////////////////////////////// ////////////////////////////////////////

// CTREATE THE MODEL
// val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.0234).setTol(0.00000001).setElasticNetParam(0.8)

val pipeline = new Pipeline()
  .setStages(Array(lr))
  
val model = pipeline.fit(trainingData)  
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.{Pipeline, PipelineModel}
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////

val locationRankDF = sql("select row_number() over (order by location) as locRank, location as locName from trainingTable  GROUP BY location")

locationRankDF.createOrReplaceTempView("locationRankMap")

println("Location Rank")

//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////

def calcPercentLabel: (Double => Double) = (arg: Double) => {if (arg > 1.1) 1.0 else 0.0 }

// TRAINING DATA SET

val trngDf1 = sql(s"""
SELECT locRank, sum(count) as weeklySum, weekofyear(timestamp) as week
  FROM trainingTable
  JOIN locationRankMap ON location = locName
  GROUP BY weekofyear(timestamp), locRank
  HAVING weeklySum > 0
  order by weeklySum DESC
  """)
  
val trngDf2 = trngDf1.select($"locRank".cast("Double"), $"weeklySum".cast("Double"), $"week".cast("Double")).toDF("locRank", "weekSum", "week")

def zeroToOne: (Double => Double) = (arg: Double) => {if (arg==0) 1.0 else arg}

val trngDfPrevWeek = trngDf1.select($"locRank".cast("Double"), $"weeklySum".cast("Double"), $"week".cast("Double")+1).toDF("locRank", "weekSumPrev", "week")

val weekOverWeek = trngDf2.join(trngDfPrevWeek, Seq("locRank", "week")).select($"locRank", $"week", $"weekSum"/$"weekSumPrev").toDF("locRank", "week", "weekOverWeek")

println("Week over week df")
weekOverWeek.show

println("Training DF 2")
trngDf2.show

val flulabel = udf(calcPercentLabel)

val trngDf3 = weekOverWeek.withColumn("class", flulabel(weekOverWeek("weekOverWeek")))

println("Training DF 3")
trngDf3.show

val assembler = new VectorAssembler()
  .setInputCols(Array("locRank", "weekOverWeek","week"))
  .setOutputCol("features")

val trngDf4 = assembler.transform(trngDf3)

val labelIndexer = new StringIndexer().setInputCol("class").setOutputCol("label")
val trngDf5 = labelIndexer.fit(trngDf4).transform(trngDf4)

val splitSeed = 5043
val Array(trainingData, validationData) = trngDf5.randomSplit(Array(0.7, 0.3), splitSeed)

println("training data ....")

trainingData.show(100)

//////////////////////////////// ///////////////////////////////////// ////////////////////////////////////////

val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.0234).setTol(0.00000001).setElasticNetParam(0.8)

val pipeline = new Pipeline()
  .setStages(Array(lr))
  
val model = pipeline.fit(trainingData)  

println(" $$$$$$ trainingData size >>> "+trainingData.count())

//println(s"Coefficients: ${model.coefficients} Intercept: ${model.intercept}")

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/home/opt/models/lr-model4")

//////
val predictions = model.transform(validationData)

print("$$$$$$ predictions size >>> "+predictions.count())

predictions.show(100)

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val accuracy = evaluator.evaluate(predictions)

val lp = predictions.select( "label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val truep = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count()
val falseN = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count()
val falseP = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count()
val ratioWrong=wrong.toDouble/counttotal.toDouble
val ratioCorrect=correct.toDouble/counttotal.toDouble

val  predictionAndLabels =predictions.select("rawPrediction", "label").rdd.map(x => (x(0).asInstanceOf[DenseVector](1), x(1).asInstanceOf[Double]))
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("area under the precision-recall curve: " + metrics.areaUnderPR)
println("area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)



// Evaluation
//evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_b7c8fb21061b
//accuracy: Double = 0.989247311827957

//counttotal: Long = 133
//correct: Long = 128
//wrong: Long = 5
//truep: Long = 124
//falseN: Long = 5
//falseP: Long = 0
//ratioWrong: Double = 0.03759398496240601
//ratioCorrect: Double = 0.9624060150375939

//area under the precision-recall curve: 0.8858736171236172
//area under the receiver operating characteristic (ROC) curve : 0.989247311827957