<a id="open-existing-db"></a>
###  1.2 Open an existing database
To use an existing database, use the following call:

In [25]:
import com.ibm.event.oltp.EventContext
val eContext = EventContext.getEventContext("KillrWeather")

In [26]:
val raw_weather_data = eContext.getTable("raw_weather_data")
val sky_condition_lookup = eContext.getTable("sky_condition_lookup")
val monthly_aggregate_precip = eContext.getTable("monthly_aggregate_precip")
val monthly_aggregate_windspeed = eContext.getTable("monthly_aggregate_windspeed")
val monthly_aggregate_pressure = eContext.getTable("monthly_aggregate_pressure")
val monthly_aggregate_temperature = eContext.getTable("monthly_aggregate_temperature")
val daily_aggregate_precip = eContext.getTable("daily_aggregate_precip")
val daily_aggregate_windspeed = eContext.getTable("daily_aggregate_windspeed")
val daily_aggregate_pressure = eContext.getTable("daily_aggregate_pressure")
val daily_aggregate_temperature = eContext.getTable("daily_aggregate_temperature")

<a id="query-table"></a>
## 4. Query the table 

<a id="create-sqlContext"></a>
### 4.1 Create sqlContext using EventSession

To run a Spark SQL query, you need to establish an IBM Db2 Event Store Spark session using sqlContext.

In [27]:
import java.io.File
import com.ibm.event.oltp.EventContext
import org.apache.log4j.{Level, LogManager, Logger}
import org.apache.spark._
import org.apache.spark.sql.ibm.event.EventSession

val sqlContext = new EventSession(spark.sparkContext, "KillrWeather")

<a id="prepare-DataFrame"></a>
### 4.2 Prepare a DataFrame for the query 
The following API provides a DataFrame that holds the query results on the IBM Db2 Event Store table. 

## New Content

In [28]:
val dfDailyTemp = sqlContext.loadEventTable("daily_aggregate_temperature")

In [29]:
dfDailyTemp.printSchema()

root
 |-- wsid: string (nullable = false)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- high: double (nullable = false)
 |-- low: double (nullable = false)
 |-- mean: double (nullable = false)
 |-- variance: double (nullable = false)
 |-- stdev: double (nullable = false)



In [30]:
dfDailyTemp.count()

103

In [31]:
dfDailyTemp.show(5)

+------------+----+-----+---+----+----+------------------+------------------+------------------+
|        wsid|year|month|day|high| low|              mean|          variance|             stdev|
+------------+----+-----+---+----+----+------------------+------------------+------------------+
|725030:14732|2008|    3|  1| 7.2| 0.0| 4.020833333333334|3.7516493055555546|1.9369174751536409|
|725030:14732|2008|    3|  2| 6.1|-1.1|2.0708333333333333| 5.711232638888888|2.3898185368117155|
|725030:14732|2008|    3|  3|11.1| 0.0| 6.179166666666666|11.096649305555557|3.3311633561798732|
|725030:14732|2008|    3|  4|16.7| 0.0|             10.05|             11.49|3.3896902513356584|
|725030:14732|2008|    3|  5|13.9| 0.0| 9.470833333333333|  8.09873263888889|2.8458272327899476|
+------------+----+-----+---+----+----+------------------+------------------+------------------+
only showing top 5 rows



In [32]:
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.functions.col 

val w = org.apache.spark.sql.expressions.Window.orderBy("year", "month", "day")  

val dfTrain = dfDailyTemp.withColumn("mean1", lag(col("mean"), 1, null).over(w)).
    withColumn("mean2", lag(col("mean"), 2, null).over(w)).
    withColumn("mean3", lag(col("mean"), 3, null).over(w))

In [33]:
dfTrain.select("mean", "mean1", "mean2", "mean3").show()

+-------------------+------------------+------------------+------------------+
|               mean|             mean1|             mean2|             mean3|
+-------------------+------------------+------------------+------------------+
|  5.345833333333333|              null|              null|              null|
| 1.5708333333333329| 5.345833333333333|              null|              null|
|             -6.875|1.5708333333333329| 5.345833333333333|              null|
| -3.287500000000001|            -6.875|1.5708333333333329| 5.345833333333333|
|              2.925|-3.287500000000001|            -6.875|1.5708333333333329|
|  4.887499999999999|             2.925|-3.287500000000001|            -6.875|
|  9.508333333333333| 4.887499999999999|             2.925|-3.287500000000001|
| 13.583333333333332| 9.508333333333333| 4.887499999999999|             2.925|
| 12.112499999999999|13.583333333333332| 9.508333333333333| 4.887499999999999|
|  8.254166666666666|12.112499999999999|13.583333333

In [34]:
import org.apache.spark.sql.functions.round

val dfTrain2 = dfTrain.withColumn("mean", round(col("mean"), 1)).
    withColumn("mean1", round(col("mean1"), 1)).
    withColumn("mean2", round(col("mean2"), 1)).
    withColumn("mean3", round(col("mean3"), 1))

In [35]:
val dfTrain3 = dfTrain2.na.drop()

In [36]:
dfTrain3.select("mean", "mean1", "mean2", "mean3").show()

+----+-----+-----+-----+
|mean|mean1|mean2|mean3|
+----+-----+-----+-----+
|-3.3| -6.9|  1.6|  5.3|
| 2.9| -3.3| -6.9|  1.6|
| 4.9|  2.9| -3.3| -6.9|
| 9.5|  4.9|  2.9| -3.3|
|13.6|  9.5|  4.9|  2.9|
|12.1| 13.6|  9.5|  4.9|
| 8.3| 12.1| 13.6|  9.5|
| 8.5|  8.3| 12.1| 13.6|
| 8.3|  8.5|  8.3| 12.1|
| 5.2|  8.3|  8.5|  8.3|
| 3.4|  5.2|  8.3|  8.5|
| 2.7|  3.4|  5.2|  8.3|
| 1.6|  2.7|  3.4|  5.2|
| 1.4|  1.6|  2.7|  3.4|
| 5.1|  1.4|  1.6|  2.7|
| 2.6|  5.1|  1.4|  1.6|
|-1.5|  2.6|  5.1|  1.4|
|-5.9| -1.5|  2.6|  5.1|
|-0.6| -5.9| -1.5|  2.6|
| 3.1| -0.6| -5.9| -1.5|
+----+-----+-----+-----+
only showing top 20 rows



In [37]:
val splits = dfTrain3.randomSplit(Array(0.8, 0.20), seed = 24L)
val training_data = splits(0)
val test_data = splits(1)

In [38]:
import org.apache.spark.ml.feature.{VectorAssembler}

val features_assembler = new VectorAssembler().
    setInputCols(Array("mean1", "mean2", "mean3" )).
    setOutputCol("features")

In [39]:
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression().
    setMaxIter(10).
    setRegParam(0.3).
    setElasticNetParam(0.8).
    setLabelCol("mean").
    setFeaturesCol("features")

In [40]:
import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}

val pipeline = new Pipeline().setStages(Array(features_assembler,lr))

In [41]:
val lrModel = pipeline.fit(training_data)

In [42]:
val predictions = lrModel.transform(test_data)

In [43]:
predictions.printSchema()

root
 |-- wsid: string (nullable = false)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- high: double (nullable = false)
 |-- low: double (nullable = false)
 |-- mean: double (nullable = true)
 |-- variance: double (nullable = false)
 |-- stdev: double (nullable = false)
 |-- mean1: double (nullable = true)
 |-- mean2: double (nullable = true)
 |-- mean3: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [44]:
predictions.select("mean", "prediction").show()

+----+------------------+
|mean|        prediction|
+----+------------------+
|-3.3|-2.180579955391931|
| 9.5| 4.029306644077588|
| 1.4|2.8647809037067287|
| 5.1| 2.823364966767187|
| 3.1|2.2094690231508345|
| 1.2|  5.42483420588961|
|11.7|  6.22901196815263|
| 4.0| 5.046127730571579|
|-0.8| 1.387744859726542|
| 0.3|1.9901624523418961|
| 2.1| 4.561894047529638|
| 6.2|1.9074152459439286|
| 4.0| 4.713169759256957|
| 8.7| 8.960297693923128|
| 7.6| 6.308917199374498|
|12.4|10.428716454491164|
+----+------------------+



<a id="summary"></a>
## Summary
This demo introduced you to the IBM Db2 Event Store API for managing and querying data.

## References
* [IBM Db2 Event Store documentation](https://www.ibm.com/support/knowledgecenter/SSGNPV)

<hr>
Copyright &copy; IBM Corp. 2017. Released as licensed Sample Materials.