# Querying on IBM Db2 Event Store Database Table
IBM Db2 Event Store is a hybrid transactional/analytical processing (HTAP) system. This notebook will demonstrate the best practices querying the table stored in IBM Db2 Event Store database.

In the previous session we have already created a database, a table with an index, and seen how to ingest sample data into the table using the "EventStore_Table_Creation" notebook.

***Pre-Req: EventStore_Table_Creation***

## Open the database

The cells in this section are used to open the database and create a temp view for the table that we created previously.   

Run the commands in the following cells to define access to the existing database.

In [1]:
from eventstore.oltp import EventContext
from eventstore.sql import EventSession
from pyspark.sql import SparkSession

## Setting the IP address to connect to your IBM Db2 Event Store cluster

For this, you will need to find out the connection string to your IBM Db2 Event Store cluster.

Perform the following steps:

- Replace the IP address in the below program code with the IP address of your local host
- Then execute the program cell below. It will connect to the IBM Db2 Event Store cluster in the provided connection string. 

In [2]:
from eventstore.common import ConfigurationReader

ip = "9.30.167.102"

endpoint = ip + ':1101'

print("Endpoint: "+ endpoint)

ConfigurationReader.setConnectionEndpoints(endpoint)

Endpoint: 9.30.167.102:1101


In [3]:
dbName = "TESTDB"

To run Spark SQL queries, you first have to set up a Db2 Event Store Spark session. The EventSession class extends the optimizer of the SparkSession class.

In [4]:
sparkSession = SparkSession.builder.appName("EventStore SQL in Python").getOrCreate()
eventSession = EventSession(sparkSession.sparkContext, dbName)

The next cell opens the database to allow operations against it to be executed.

In [5]:
eventSession.open_database()

With the following cells we can list all existing tables and then load the table we previously created into the tab DataFrame reference. Note that we are defining the `tab` DataFrame reference that will be used later on in this notebook to create a temp view.

In [6]:
with EventContext.get_event_context(dbName) as ctx:
   print("Event context successfully retrieved.")

table_names = ctx.get_names_of_tables()
for idx, name in enumerate(table_names):
   print(name)

Event context successfully retrieved.
IOT_TEMP


In [7]:
tabName = "IOT_TEMP"

In [8]:
tab = eventSession.load_event_table(tabName)

Let's recall the table schema we previously created.

In [9]:
try:
    resolved_table_schema = ctx.get_table(tabName)
    print(resolved_table_schema)
except Exception as err:
    print("Table not found")

ResolvedTableSchema(tableName=IOT_TEMP, schema=StructType(List(StructField(deviceID,IntegerType,false),StructField(sensorID,IntegerType,false),StructField(ts,LongType,false),StructField(ambient_temp,DoubleType,false),StructField(power,DoubleType,false),StructField(temperature,DoubleType,false))), sharding_columns=[u'deviceID', u'sensorID'], pk_columns=[u'deviceID', u'sensorID', u'ts'], partition_columns=None)


Let's recall that in the previous notebook, the table we created has the following tableSchema and indexSchema:

```python
tabSchema = TableSchema(tabName, StructType([
    StructField("deviceID", IntegerType(), nullable = False),
    StructField("sensorID", IntegerType(), nullable = False),
    StructField("ts", LongType(), nullable = False),
    StructField("ambient_temp", DoubleType(), nullable = False),
    StructField("power", DoubleType(), nullable = False),
    StructField("temperature", DoubleType(), nullable = False)
    ]),
    sharding_columns = ["deviceID", "sensorID"],
    pk_columns = ["deviceID", "sensorID", "ts"]
                       )

indexSchema = IndexSpecification(
          index_name=tabName + "Index",
          table_schema=tabSchema,
          equal_columns = ["deviceID", "sensorID"],
          sort_columns = [
            SortSpecification("ts", ColumnOrder.DESCENDING_NULLS_LAST)],
          include_columns = ["temperature"]
        )
```

## Best Practices for efficient queries

In the next cell we create a lazily evaluated "view" that we can then use like a hive table in Spark SQL, but this is only evaluated when we actually run or cache query results. We are calling this view "readings" and that is how we will refer to it in the queries below:

In [10]:
tab.createOrReplaceTempView("readings")

In [11]:
query = "SELECT count(*) FROM readings"
print("{}\nRunning query in Event Store...".format(query))
df_data = eventSession.sql(query)
df_data.toPandas()

SELECT count(*) FROM readings
Running query in Event Store...


Unnamed: 0,count(1)
0,1000000


Let's have a look at the record structure 

In [12]:
query = "SELECT * FROM readings LIMIT 1"
print("{}\nRunning query in Event Store...".format(query))
df_data = eventSession.sql(query)
df_data.toPandas()

SELECT * FROM readings LIMIT 1
Running query in Event Store...


Unnamed: 0,deviceID,sensorID,ts,ambient_temp,power,temperature
0,1,48,1541019342674,25.983183,14.658741,48.908846


In [13]:
query = "SELECT MIN(ts), MAX(ts) FROM readings"
print("{}\nRunning query in Event Store...".format(query))
df_data = eventSession.sql(query)
df_data.toPandas()


SELECT MIN(ts), MAX(ts) FROM readings
Running query in Event Store...


Unnamed: 0,min(ts),max(ts)
0,1541019342674,1541774000106


## Optimal query through the index

- Index queries will significantly reduce amount of data that needs to be scanned for results. 
    - Indexes in IBM Db2 Event Store are formed asynchronously to avoid insert latency. 
    - They are stored as a Log Structured Merge (LSM) Tree.
    - The index is formed by "runs", which include sequences of sorted keys. These runs are written to disk during “Share” processing.
    - These index runs are merged together over time to improve scan and I/O efficiency.

- For an optimal query performance you must specify equality on all the equal_columns in the index and a range on the sort column in the index.

For example, in the following query we are retrieving all the values in the range of dates for a specific device and sensor, where both the `deviceID` and `sensorID` are in the equal_columns definition for the index schema and the `ts` column is the sort column for the index.

In [14]:
index_query = "SELECT ts, temperature  FROM readings where deviceID=1 and sensorID=12 and ts >1541021271619 and ts < 1541043671128 order by ts"
print("{}\nCreating a dataframe for the query ...".format(index_query))
df_index_query = eventSession.sql(index_query)

SELECT ts, temperature  FROM readings where deviceID=1 and sensorID=12 and ts >1541021271619 and ts < 1541043671128 order by ts
Creating a dataframe for the query ...


Then the following cell runs the query and caches the results. Note that this caching is for demostration purposes, to show the time it takes to run the query and cache the results in memory within Spark. This caching is recommended when you are going to do additional processing on this cached data, as the query against IBM Db2 Event Store is only run once. 

In [15]:
%%time
df_index_query.cache()

CPU times: user 5.42 ms, sys: 6.19 ms, total: 11.6 ms
Wall time: 1.37 s


DataFrame[ts: bigint, temperature: double]

Finally the results from the cached data can be visualized:

In [16]:
df_index_query.toPandas()

Unnamed: 0,ts,temperature
0,1541021271900,44.242656
1,1541021321674,40.204513
2,1541021364932,44.899199
3,1541021373404,38.694953
4,1541021400104,41.095604
5,1541021508830,43.350289
6,1541021523956,33.144291
7,1541021585210,40.710335
8,1541021751283,42.303187
9,1541021814905,43.006241


## Sub-optimal query

This next query shows a sub-optimal query that only specifies equality in one of the equal_columns in the index schema, and for this reason ends up doing a full scan of the table.

In [17]:
fullscan_query = "SELECT count(*)  FROM readings where sensorID = 7"
print("{}\nCreating a dataframe for the query...".format(fullscan_query))
df_fullscan_query = eventSession.sql(fullscan_query)

SELECT count(*)  FROM readings where sensorID = 7
Creating a dataframe for the query...


In [18]:
%%time
df_fullscan_query.cache()

CPU times: user 2.27 ms, sys: 737 µs, total: 3 ms
Wall time: 863 ms


DataFrame[count(1): bigint]

In [19]:
df_fullscan_query.toPandas()

Unnamed: 0,count(1)
0,20396


## Accessing multiple sensorIDs optimally

The easiest way to write a query that needs to retrieve multiple values in the equal_columns in the index schema is by using an *In-List*. With this, you can get optimal index access across multiple sensorID's. 

In this example we specify equality for a specific deviceID, and an In-List for the four sensors we are trying to retrieve. To limit the number of records we are returning we also include a range of timestamps.

In [20]:
inlist_query = "SELECT deviceID, sensorID, ts  FROM readings where deviceID=1 and sensorID in (1, 5, 7, 12) and ts >1541021271619 and ts < 1541043671128 order by ts"
print("{}\nCreating a dataframe for the query...".format(inlist_query))
df_inlist_query = eventSession.sql(inlist_query)

SELECT deviceID, sensorID, ts  FROM readings where deviceID=1 and sensorID in (1, 5, 7, 12) and ts >1541021271619 and ts < 1541043671128 order by ts
Creating a dataframe for the query...


In [21]:
%%time
df_inlist_query.cache()

CPU times: user 3.44 ms, sys: 1.67 ms, total: 5.11 ms
Wall time: 719 ms


DataFrame[deviceID: int, sensorID: int, ts: bigint]

In [22]:
df_inlist_query.toPandas()

Unnamed: 0,deviceID,sensorID,ts
0,1,12,1541021271900
1,1,12,1541021321674
2,1,5,1541021350571
3,1,7,1541021353975
4,1,12,1541021364932
5,1,7,1541021366049
6,1,12,1541021373404
7,1,12,1541021400104
8,1,1,1541021442119
9,1,1,1541021448076


## Exploiting the synopsis table:  Advanced medium weight queries

- Event Store tables include a synopsis table which summarizes the minimum/maximum values of the data for each range of rows in the synopsis table. 

     - It contains one range for every 1000 rows.
     - It is stored in a separate internal table in the shared storage layer.
     - It is parquet compressed to minimize footprint.
     - For highly selective queries, it can improve performance by up to 1000x.

- Using an equality or a range predicate on a clustered field (e.g. the 'ts' column in our case because the data is inserted into the table in order) is faster than doing a full scan as it should be able to exploit the synopsis table, but this will be slower than an optimal index scan.

In [23]:
synopsis_query = "SELECT deviceID, sensorID, ts FROM readings where ts >1541021271619 and ts < 1541043671128 order by ts"
print("{}\nCreating a dataframe for the query...".format(synopsis_query))
df_synopsis_query = eventSession.sql(synopsis_query)

SELECT deviceID, sensorID, ts FROM readings where ts >1541021271619 and ts < 1541043671128 order by ts
Creating a dataframe for the query...


In [24]:
%%time
df_synopsis_query.cache()

CPU times: user 971 µs, sys: 2.15 ms, total: 3.12 ms
Wall time: 1.31 s


DataFrame[deviceID: int, sensorID: int, ts: bigint]

In [25]:
df_synopsis_query.toPandas()

Unnamed: 0,deviceID,sensorID,ts
0,1,12,1541021271900
1,1,9,1541021272363
2,1,3,1541021273271
3,1,29,1541021273480
4,1,23,1541021274769
5,2,17,1541021275497
6,1,6,1541021276438
7,2,47,1541021277268
8,2,21,1541021277320
9,2,37,1541021278153


## Summary
This demo introduced you to the best practices querying the table stored in IBM Db2 Event Store database.

## Next Step
`"Event_Store_Data_Analytics.ipynb"` will show you how to perform data analytics with IBM Db2 Event Store with multiple scientific tools.