<p><font size=-1 color=gray>
&copy; Copyright 2018 IBM Corp. All Rights Reserved.
<p>
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing permissions and
limitations under the License.
</font></p>

# Ingest Clickstream Events

This notebook uses the [Scala](https://www.scala-lang.org/) programming language
to interact with IBM Db2 Event Stream. It demonstrates how to:

* Connect to Event Store
* Drop and create a database
* Define a table schema
* Drop and create a table
* Load a CSV file into a DataFrame
* Batch insert from a DataFrame into a table


## Connect to IBM Db2 Event Store

### Determine the IP address of your host

Obtain the IP address of the host that you want to connect to by running the appropriate command for your operating system:

* On Mac, run: `ifconfig`
* On Windows, run: `ipconfig`
* On Linux, run: `hostname -i`

Edit the `HOST = "XXX.XXX.XXX.XXX"` value in the next cell to provide the IP address.

In [None]:
// Set your host IP address
val Host = "XXX.XXX.XXX.XXX"

//Port will be 1100 for version 1.1.2 or later (5555 for version 1.1.1)
val Port = "1100"

## Import Scala packages

In [None]:
import sys.process._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameReader
import spark.implicits._

import com.ibm.event.catalog.TableSchema
import com.ibm.event.oltp.EventContext
import com.ibm.event.example.DataGenerator
import com.ibm.event.common.ConfigurationReader
import com.ibm.event.oltp.InsertResult

## Connect to Event Store

In [None]:
ConfigurationReader.setConnectionEndpoints(Host + ":" + Port)

## Create a database

Only one database can be active in Event Store. If you already have a database, you don't need to create one.
To create a database in Event Store, you can use the createDatabase function. If you want to drop an existing
database to create a new one, use the dropDatabase function first.

In [None]:
// See the comments and run this cell if you need to DROP and/or CREATE the database.

// EventContext.dropDatabase("TESTDB")  // Uncomment this if you want to drop an existing TESTDB
val context = EventContext.createDatabase("TESTDB") // Comment this out to re-use an existing TESTDB

val error =  context.openDatabase()
error.map(e => sys.error(e.toString))

## Create a table

### Define the schema

In [None]:
val clickdataSchema = StructType(Array(
  StructField("eventId", LongType, false),
  StructField("eventType", StringType, false),
  StructField("timestamp", StringType, false),
  StructField("ipaddress", StringType, false),
  StructField("sessionId", StringType, false),
  StructField("userId", StringType, false),
  StructField("pageUrl", StringType, false),
  StructField("browser", StringType, false)))

// Define Table schema for clickstream data
val clickStreamSchema = TableSchema(
  "ClickStreamTable", clickdataSchema, Array("eventId"), Array("eventId"))

### Create the Table
If you want to drop the existing table to create a new one, use the dropTable function first.

In [None]:
// Create the table - skip if table is already created

// var res = context.dropTable(clickStreamSchema.tableName)  // Uncomment to drop existing table
var res = context.createTable(clickStreamSchema)
if (res.isDefined) {
  println(s"Error while creating table ${clickStreamSchema.tableName}\n: ${res.get}")
} else {
  println(s"Table ${clickStreamSchema.tableName} successfully created.")
}

In [None]:
val clickstreamTable = context.getTable("ClickStreamTable")

## Load data from the CSV file to a DataFrame
Use the `add data assets` in the UI to make the file available to the notebook.
Then read the file from the assets directory into a DataFrame.

In [None]:
// Initialize spark session
val spark: SparkSession = SparkSession.builder().getOrCreate()
val clickStreamDF = spark.read.option("header", "true").option("inferSchema", false).schema(clickdataSchema).csv("assets/clickstream_data.csv")
clickStreamDF.show(5)

## Load data from the DataFrame to the table
Use the batchInsert function to load the data.

In [None]:
// Iteratively Insert rows in batch
val iter = clickStreamDF.toLocalIterator() 
val error = context.batchInsert(clickstreamTable, iter.asScala)
if (error.isDefined) {
  System.err.println(error)
}
println("Ingest completed successfully")