#  Online Retail Orders with Akka and Db2 Event Store

This notebook uses the [Scala](https://www.scala-lang.org/) programming language
to interact with IBM Db2 Event Stream. It demonstrates how to:

* Connect to Event Store
* Show the data that you inserted using WebSockets, Alpakka, Akka Streams and Db2 Event Store
* Manipulate and aggregate the data with Spark SQL
* Visualize the information with interactive Brunel charts

Run this notebook after running the setup notebook and after (or while)
feeding data in via the `db2-event-store-akka-streams` Scala app.

## Connect to IBM Db2 Event Store

### Determine the IP address of your host

Obtain the IP address of the host that you want to connect to by running the appropriate command for your operating system:

* On Mac, run: `ifconfig`
* On Windows, run: `ipconfig`
* On Linux, run: `hostname -i`

Edit the `host = "XXX.XXX.XXX.XXX"` value in the next cell to provide the IP address.

In [None]:
// Set your host IP address
var host = "XXX.XXX.XXX.XXX"

// Port will be 1100 for version 1.1.2 or later (5555 for version 1.1.1)
var port = "1100"

// Table name
var tableName = "OnlineRetailOrderDetail"

// Database name
var db = "TESTDB"

## Add Brunel integration
Use cell magic to install the Brunel integration for Apache Toree (Scala).

In [None]:
%AddJar -magic https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar -f

## Import Scala packages

Import packages for Scala, Spark, and IBM Db2 Event Store.

In [None]:
import sys.process._
import java.io.File
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import org.apache.log4j.{Level, LogManager, Logger}
import org.apache.spark._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.ibm.event.EventSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import com.ibm.event.catalog.TableSchema
import com.ibm.event.common.ConfigurationReader
import com.ibm.event.example.DataGenerator
import com.ibm.event.oltp.EventContext
import com.ibm.event.oltp.InsertResult

## Connect to Event Store

In [None]:
ConfigurationReader.setConnectionEndpoints(host + ":" + port)

## Load data from the Event Store table into a DataFrame

In [None]:
val sqlContext = new EventSession(spark.sparkContext, db)
import sqlContext.implicits._

val table = sqlContext.loadEventTable(tableName)
table.registerTempTable(tableName)

## Show the count and latest rows
Running this cell over and over is the fastest way to see that you have
successfully been inserting rows.

> Note: A timestamp was used for the `id`, so we can use that to sort events as they were received.

In [None]:
sqlContext.sql(s"select count(*) from $tableName").show()
sqlContext.sql(s"select * from $tableName order by id desc").show(3)  // Show most recent 3

Cell magic provides a is a prettier way of showing the latest 3 rows:

In [None]:
%%dataframe --limit 3
sqlContext.sql(s"select * from $tableName order by id desc")

## Aggregate and show the data

Use Spark SQL to build DataFrames with aggregated data.

### Calculate aggregated  gross sales and units by product
#### Show top 10 by gross sales

In [None]:
%%dataframe --limit 10
sqlContext.sql("select Description as Product, sum(Quantity) as Units, sum(CAST (UnitPrice as DECIMAL(15,2)) * Quantity) as Gross" + s" from $tableName" + " group by Description order by 3 desc")

#### Show top 10 by units

In [None]:
%%dataframe --limit 10
val aggDF = sqlContext.sql("select Description as Product, sum(Quantity) as Units, sum(CAST (UnitPrice as DECIMAL(15,2)) * Quantity) as Gross" + s" from $tableName" + " group by Description order by 2 desc")


## Visualize the data with Brunel

Because people can "see" and analyze data better with nice charts (it's not just eye candy).

### Gross sales by product

In [None]:
%%brunel data('aggDF') 
bar at(0,0,50,100) title("Gross Sales by Product")
  x(Description) y(Gross)
  tooltip(Description, Gross) color(Gross:BlueGreens) legends(none) opacity(#selection:.5)
  axes(x:'Product',y:'Gross') sort(Gross) interaction(select)|
treemap at(55,5,100,70)
  sort(Gross) size(Gross) color(Gross:BlueGreens) label(Gross) legends(none)
  tooltip(Description, Gross) opacity(#selection:.5) interaction(select)
:: width=1000, height=600

In [None]:
%%brunel data('aggDF') 
bar at(0,0,50,100) title("Units by Product")
  x(Description) y(Quantity)
  tooltip(Description, Quantity) color(Quantity:PurpleBlues) legends(none) opacity(#selection:.5)
  axes(x:'Product',y:'Units') sort(Quantity) interaction(select)|
treemap at(55,5,100,70)
  sort(Quantity) size(Quantity) color(Quantity:PurpleBlues) label(Quantity) legends(none)
  tooltip(Description, Quantity) opacity(#selection:.5) interaction(select)
:: width=1000, height=600

## Manipulate the the data

For example, we can take the timestamp and determine year, month, day and most interesting for this example -- **"day of week"**.
Let's see look at the data with an added dimension that shows Sun, Mon, Tue, Wed, Thu, Fri, and Sat.
Also notice... now we choose to count the invoices (there are multiple detail records per invoice) instead of units or $.

### Invoices by day of week
#### Use the timestamp `id` column to determine day-of-week
Remember we used a timestamp as the events were received. So this method shows you the day you fed the data in. Which might be interesting if you are emulating a live system or just want to see your activity show up.

In [None]:
val eventTimeDF = sqlContext.sql(
  "select InvoiceNo, CAST (id / 1000 as TIMESTAMP) as EventTime"
  + s" from $tableName"
)
val eventDayDF = eventTimeDF.withColumn("DayOfWeek",date_format(eventTimeDF("EventTime"), "E")).drop("EventTime")
eventDayDF.show(5)
val eventInvoiceDayDF = eventDayDF.groupBy("DayOfWeek", "InvoiceNo").count
eventInvoiceDayDF.show()

#### Use the InvoiceDate column to determine day-of-week
This allows you to work with our example CSV data and see a variety of days (even if you ran this in one day).

In [None]:
val invoiceTimeDF = sqlContext.sql(
  "select InvoiceNo, InvoiceDate"
  + s" from $tableName"
)
val invoiceDayDF = invoiceTimeDF.withColumn("DayOfWeek",date_format(invoiceTimeDF("InvoiceDate"), "E")).drop("InvoiceDate")
invoiceDayDF.show(5)
val countInvoicesDF = invoiceDayDF.groupBy("DayOfWeek", "InvoiceNo").count
countInvoicesDF.show()

In [None]:
%%brunel data('countInvoicesDF') 
x(DayOfWeek) y(#count)
bar
  count(InvoiceNo)


### Data feed statistics by time and country

In [None]:
val df = sqlContext.sql(
  "select Country, sum(Quantity) as Quantity, CAST (id / 1000 as TIMESTAMP) as EventTime"
  + s" from $tableName"
  + " group by Country, id order by 2 desc")
df.printSchema()
val countryDF2 = (
  df.withColumn("Year",year(df("EventTime")))
    .withColumn("Month",month(df("EventTime")))
    .withColumn("Week",weekofyear(df("EventTime")))
    .withColumn("Day",dayofmonth(df("EventTime")))
    .withColumn("DayOfWeek",date_format(df("EventTime"), "E"))
)
countryDF2.printSchema()
countryDF2.show(3)

In [None]:
val featureMetrics = countryDF2.select("Country", "DayOfWeek", "Quantity").groupBy("Country", "DayOfWeek").agg(sum("Quantity")).
  withColumnRenamed("sum(Quantity)","Quantity")

featureMetrics.show()

In [None]:
%%brunel data('featureMetrics') 
x(Country) y(Quantity)
stack bar
  sum(Quantity) x(Country) color(DayOfWeek)|
stack bar y(Quantity) x(DayOfWeek) color(Country)

In [None]:
%%brunel data('countryDF2') 
title("Units by Time and Country")
x(DayOfWeek) y(Quantity)
stack bar
  sum(Quantity) color(Country) tooltip(#all)
  legends(none) interaction(select)
:: width=1000, height=300

In [None]:
val CountryDF3 = countryDF2.select("Country", "Quantity","DayOfWeek").groupBy("Country","DayOfWeek").agg(sum("Quantity")).
  withColumnRenamed("sum(Quantity)","Quantity")

In [None]:
%%brunel data('CountryDF3') 
x(Country) y(Quantity) color(Quantity:BlueYellows) at(0,0,50,100) opacity(#selection:.7)
stack bar
  interaction(select)|
map('World') opacity(#selection:.7) at(50,0,100,100) x(Country) color(Quantity:BlueYellows) tooltip(#all)
:: width=1000, height=300


<p><font size=-1 color=gray>
&copy; Copyright 2018 IBM Corp. All Rights Reserved.
<p>
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing permissions and
limitations under the License.
</font></p>