# Using OmniSci Pymapd API with Kinesis Consumer Application

Pymapd (https://github.com/omnisci/pymapd)is the python DB API compliant interface for OmniSci. 

Packages are available on conda-forge and PyPI:
conda install -c conda-forge pymapd
pip install pymapd

To install cudf for GPU Dataframe support (conda-only):
conda install -c nvidia/label/cuda10.0 -c rapidsai/label/cuda10.0 -c numba -c conda-forge -c defaults cudf=0.6 pymapd python=3.6

In [11]:
import argparse
import sys
import csv
import string
import os
import time
import re
import pandas as pd
import numpy as np
from pymapd import connect
import boto3

Function to connect to the OmniSci database.

In [12]:
# Connect to the OmniSci database
def connect_to_omnisci(str_user, str_password, str_host, str_dbname, isCloud):
  try:
    if (isCloud):
      connection = connect(user=str_user, password=str_password, host=str_host, dbname=str_dbname, port=443, protocol='https')
    else:
      connection = connect(user=str_user, password=str_password, host=str_host, dbname=str_dbname, port=6274)
  except Exception as ex:
    template = "An exception of type {0} occurred. Arguments:\n{1!r}"
    message = template.format(type(ex).__name__, ex.args)
    print(message)
    if 'OmniSci Core not ready, try again' in message:
      print("Set connection to RETRY!")
      connection = "RETRY"
    else:
      connection = "ERROR"
  return connection

Call connect function passing the following arguments:
user = mapd | Cloud API Key Name
password = HyperInteractive | Cloud API Key Secret
host = localhost | use2-api.mapd.cloud
database = mapd
Also note the last argument which is flag to indicate whether you are connecting to OmniSci Cloud instance or not.

In [13]:
# Connect to OmniSci with 5 trys, this applies to OmniSci cloud instance which is paused during inactivity
for i in range(5):
  # connecting to a non-OmniSci Cloud instance
  # connection = connect_to_omnisci("mapd", "HyperInteractive", "localhost", "mapd", False)
  # connecting to an OmniSci Cloud instance
  connection = connect_to_omnisci("F0A7674FB728C4DE89A0", "fiNNiSG6YFZYac7Y9qLROCbbFRSif7L12BWfErSn", "use2-api.mapd.cloud", "mapd", True)  
  if connection == "RETRY":
    # recommended time to sleep is 20 seconds before instance wakes up
    time.sleep(20)
    continue
  if connection == "ERROR":
    sys.exit(1)
  print(connection)
  break

Connection(mapd://F0A7674FB728C4DE89A0:***@https://use2-api.mapd.cloud:443/mapd?protocol=https)


Connect to the Kinesis data stream called gas-production using the boto library.

In [14]:
my_stream_name = 'gas-production'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
                                                      ShardId=my_shard_id,
                                                      ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator['ShardIterator']

The main loop which does the following:
  - create a Pandas dataframe with the column names corresponding to the gas production dataset
  - read 1 record from the stream
  - decode the data blob and create a list
  - add the list as a new row to the dataframe
  - set the columns like date and float values to the correct datatype
  - if the dataframe size is 100 then call Pymapd load_table API to add the contents of the dataframe to the OmniSci table (natural_gas_production)
  - get the next iterator for the shard and repeat the loop

In [15]:
df = pd.DataFrame(columns=['flow_date', 'flow_value', 'state', 'county', 'region', 'display_name', 'latitude', 'longitude'])
print(df.head())
row = 0
table_name = 'natural_gas_production'
while 1==1:
    record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=1)
    for item in record_response["Records"]:
      record_data = item["Data"].decode("utf-8")
      record_data = record_data.rstrip('\n')
      list = record_data.split(',')
      df.loc[row] = list
      row = row + 1
    if (row == 6):
      df['flow_date'] = pd.to_datetime(df['flow_date'], format='%Y-%m-%d %H:%M:%S')
      df['latitude'] = pd.to_numeric(df['latitude'], downcast='float')
      df['longitude'] = pd.to_numeric(df['longitude'], downcast='float')
      print(df.head())
      connection.load_table(table_name, df, preserve_index=False)
      df.head(0)
      row = 0
    my_shard_iterator = record_response['NextShardIterator']
    if my_shard_iterator is None:
      print('producer closed, exit!')
      break
    # wait for 1 second
    time.sleep(1)


Empty DataFrame
Columns: [flow_date, flow_value, state, county, region, display_name, latitude, longitude]
Index: []
            flow_date flow_value         state   county          region  \
0 2018-02-08 16:00:00     10.008  Pennsylvania   Warren       Northeast   
1 2018-03-14 16:00:00      9.898  Pennsylvania   Potter       Northeast   
2 2018-02-16 16:00:00       7.85       Wyoming  Lincoln  Rocky Mountain   
3 2018-03-05 16:00:00       7.52      New York  Steuben       Northeast   
4 2018-02-09 16:00:00       3.86     Tennessee   Sumner   South Central   

                    display_name   latitude   longitude  
0       National Fuel Gas Supply  41.837833  -79.148064  
1       National Fuel Gas Supply  41.574596  -77.817741  
2    Kern River Gas Transmission  41.769386 -110.353294  
3    Independence- National Fuel  42.148750  -77.560921  
4  Mitchellville - Tennessee Gas  36.640610  -86.508736  


KeyboardInterrupt: 