<a id="open-existing-db"></a>
###  Import the correct libraries

In [23]:
//import libraries
import org.apache.spark.{SparkConf, SparkContext, SparkFiles}
import org.apache.spark.sql.{SQLContext, SparkSession, Row}
import org.apache.spark.SparkFiles

import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.classification.{LogisticRegression, DecisionTreeClassifier}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.ibm.transformers.RenameColumn

import com.ibm.analytics.ngp.repository._
import com.ibm.analytics.ngp.ingest.Sampling
import com.ibm.analytics.ngp.util._
import com.ibm.analytics.ngp.pipeline.evaluate.{Evaluator,MLProblemType}

<a id="open-existing-db"></a>
###  Open the IBM Db2 Event store database

In [1]:
import com.ibm.event.oltp.EventContext
import com.ibm.event.common.ConfigurationReader
ConfigurationReader.setConnectionEndpoints("173.19.0.1:1100")
val eContext = EventContext.getEventContext("TESTDB")

In [2]:
spark.sparkContext.version

2.1.0

<a id="validate-db"></a>
###  Validate that the table have been created

In [3]:
val raw_weather_data = eContext.getTable("raw_weather_data")
val sky_condition_lookup = eContext.getTable("sky_condition_lookup")
val monthly_aggregate_precip = eContext.getTable("monthly_aggregate_precip")
val monthly_aggregate_windspeed = eContext.getTable("monthly_aggregate_windspeed")
val monthly_aggregate_pressure = eContext.getTable("monthly_aggregate_pressure")
val monthly_aggregate_temperature = eContext.getTable("monthly_aggregate_temperature")
val daily_aggregate_precip = eContext.getTable("daily_aggregate_precip")
val daily_aggregate_windspeed = eContext.getTable("daily_aggregate_windspeed")
val daily_aggregate_pressure = eContext.getTable("daily_aggregate_pressure")
val daily_aggregate_temperature = eContext.getTable("daily_aggregate_temperature")
val daily_predicted_temperature = eContext.getTable("daily_predicted_temperature")

<a id="create-sqlContext"></a>
### Create the IBM Db2 EventSession

In [5]:
import java.io.File
import com.ibm.event.oltp.EventContext
import org.apache.log4j.{Level, LogManager, Logger}
import org.apache.spark._
import org.apache.spark.sql.ibm.event.EventSession

val sqlContext = new EventSession(spark.sparkContext, "TESTDB")

<a id="prepare-DataFrame"></a>
### Prepare a DataFrame for the query 
The following API provides a DataFrame that holds the query results on the IBM Db2 Event Store table. 

In [8]:
val dfDailyTemp = sqlContext.loadEventTable("daily_aggregate_temperature")

Name: Unknown Error
Message: lastException: Throwable = null
<console>:38: error: value loadEventTable is not a member of org.apache.spark.sql.SQLContext
       val dfDailyTemp = sqlContext.loadEventTable("daily_aggregate_temperature")
                                    ^
StackTrace: 

In [7]:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
// Add data asset from file system
val dfDailyTemp = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv(sys.env("DSX_PROJECT_DIR")+"/datasets/daily_aggregate_temperature.csv")
dfDailyTemp.show(5)

Name: java.util.NoSuchElementException
Message: key not found: DSX_PROJECT_DIR
StackTrace:   at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)

In [9]:
dfDailyTemp.printSchema()

root
 |-- wsid: long (nullable = false)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- ts: long (nullable = false)
 |-- high: double (nullable = false)
 |-- low: double (nullable = false)
 |-- mean: double (nullable = false)
 |-- variance: double (nullable = false)
 |-- stdev: double (nullable = false)



In [10]:
dfDailyTemp.count()

17

In [11]:
dfDailyTemp.show(5)

+-----------+----+-----+---+-------------+----+---+------------------+-----------------+------------------+
|       wsid|year|month|day|           ts|high|low|              mean|         variance|             stdev|
+-----------+----+-----+---+-------------+----+---+------------------+-----------------+------------------+
|72494023234|2011|    8|  1|1314860400657|12.2|0.0| 8.054166666666667|9.104982638888883|3.0174463771356206|
|72494023234|2011|    8|  2|1314946800663|11.7|0.0|7.9458333333333355|7.534982638888888|2.7449922839397725|
|72494023234|2011|    8|  3|1315033200126|14.4|0.0|10.991666666666665|6.780763888888887|2.6039899940070597|
|72494023234|2011|    8|  4|1315119600128|15.0|0.0|11.916666666666668|8.223055555555556|2.8675870615476624|
|72494023234|2011|    8|  5|1315206000114|12.2|0.0| 9.679166666666665|5.934982638888889| 2.436181979838306|
+-----------+----+-----+---+-------------+----+---+------------------+-----------------+------------------+
only showing top 5 rows



In [12]:
val weatherStations = dfDailyTemp.select("wsid").distinct.collect.flatMap(_.toSeq)

In [13]:
import sqlContext.implicits._
val weatherStationsArray = weatherStations.map(ws => dfDailyTemp.where($"wsid" <=> ws))

In [15]:
import org.apache.spark.sql.functions.round
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.functions.col 
import play.api.libs.json._
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.feature.{VectorAssembler}
import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}

System.out.println(weatherStationsArray.length)
// for (weatherStation <- weatherStationsArray) {

    val weatherStationID = weatherStations(0)
    System.out.println(s"""Weather Station ID is ${weatherStationID}""")
    
    val w = org.apache.spark.sql.expressions.Window.orderBy("year", "month", "day")  
    val dfTrain = dfDailyTemp.withColumn("day-1", lag(col("mean"), 1, null).over(w)).
        withColumn("day-2", lag(col("mean"), 2, null).over(w)).
        withColumn("day-3", lag(col("mean"), 3, null).over(w))

    dfTrain.select("mean", "day-1", "day-2", "day-3").show()

    val dfTrain2 = dfTrain.withColumn("day-1", round(col("day-1"), 1)).
        withColumn("day-2", round(col("day-2"), 1)).
        withColumn("day-3", round(col("day-3"), 1))

    val dfTrain3 = dfTrain2.na.drop()

    dfTrain3.select("day-1", "day-2", "day-3").show()
    
    val splits = dfTrain3.randomSplit(Array(0.8, 0.20), seed = 24L)
    val training_data = splits(0)
    val test_data = splits(1)
    
    val features_assembler = new VectorAssembler().
        setInputCols(Array("day-1", "day-2", "day-3")).
        setOutputCol("features")
    
    val linearRegression = new LinearRegression().
        setMaxIter(10).
        setRegParam(0.3).
        setElasticNetParam(0.8).
        setLabelCol("mean").
        setFeaturesCol("features")
    
    val pipeline = new Pipeline().setStages(Array(features_assembler, linearRegression))

    val linearRegressionModel = pipeline.fit(training_data)

    val predictions = linearRegressionModel.transform(test_data)
    predictions.select("prediction").show()


1
Weather Station ID is 72494023234
+------------------+------------------+------------------+------------------+
|              mean|             day-1|             day-2|             day-3|
+------------------+------------------+------------------+------------------+
| 8.054166666666667|              null|              null|              null|
| 8.054166666666667| 8.054166666666667|              null|              null|
|7.9458333333333355| 8.054166666666667| 8.054166666666667|              null|
|7.9458333333333355|7.9458333333333355| 8.054166666666667| 8.054166666666667|
|10.991666666666665|7.9458333333333355|7.9458333333333355| 8.054166666666667|
|10.991666666666665|10.991666666666665|7.9458333333333355|7.9458333333333355|
|11.916666666666668|10.991666666666665|10.991666666666665|7.9458333333333355|
|11.916666666666668|11.916666666666668|10.991666666666665|10.991666666666665|
| 9.679166666666665|11.916666666666668|11.916666666666668|10.991666666666665|
| 9.679166666666665| 9.67916

In [16]:
%AddJar http://repo1.maven.org/maven2/org/jpmml/pmml-model/1.4.6/pmml-model-1.4.6.jar -f


Starting download from http://repo1.maven.org/maven2/org/jpmml/pmml-model/1.4.6/pmml-model-1.4.6.jar
Finished download of pmml-model-1.4.6.jar


In [17]:
%AddJar http://central.maven.org/maven2/org/jpmml/jpmml-sparkml/1.4.6/jpmml-sparkml-1.4.6.jar -f

Starting download from http://central.maven.org/maven2/org/jpmml/jpmml-sparkml/1.4.6/jpmml-sparkml-1.4.6.jar
Finished download of jpmml-sparkml-1.4.6.jar


In [19]:
import org.dmg.pmml.PMML
import org.jpmml.sparkml.PMMLBuilder

PMML pmml = new PMMLBuilder(training_data.schema(), linearRegressionModel).build();

// Viewing the result
JAXBUtil.marshalPMML(pmml, new StreamResult(System.out));

Name: Unknown Error
Message: <console>:87: error: value pmml is not a member of object org.dmg.pmml.PMML
val $ires8 = PMML.pmml
                  ^
<console>:85: error: value pmml is not a member of object org.dmg.pmml.PMML
       PMML pmml = new PMMLBuilder(training_data.schema(), linearRegressionModel).build();
            ^
StackTrace: 

<hr>
Copyright &copy; IBM Corp. 2018. Released as licensed Sample Materials.