<header>
   <p  style='font-size:36px;font-family:Arial; color:#F0F0F0; background-color: #00233c; padding-left: 20pt; padding-top: 20pt;padding-bottom: 10pt; padding-right: 20pt;'>
       Real-time data loading with Kafka and Teradata VantageCloud Lake
  <br>
       <img id="teradata-logo" src="images/TeradataLogo.png" alt="Teradata" style="width: 125px; height: auto; margin-top: 20pt;">
    </p>
</header>
<hr>

<br>

<b style = 'font-size:24px;font-family:Arial;color:#00233C'>Leverage the parallelism and scale of message streaming architectures with native data loading capabilities in Teradata Vantage</b>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>Streaming</b> messaging patterns are an integral component of the enterprise data landcape.  Publish/subscribe messaging systems like <b>Apache Kafka, AWS Kinesis, or Azure EventHubs</b> provide a robust, high-scale infrastructure for the receipt and delivery of near-real-time data from event-based operations.</p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>Teradata Vantage</b> is the industry-leading Massively-Parallel-Processing analytic engine with provides the fast, scalable data processing capabilities to handle the loading and analysis of this near-real-time data.</p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>Organizations can effectively leverage these two architectures to both simplify and scale real-time-data processing and analytics for rapid decision support, AI and ML tasks, or any other advanced analytic need that would benefit from low-latency processing. <b>Teradata Parallel Transporter</b> is a scalable and robust parallel data loading tool that can integrate with these streaming systems to provide the critical bridge between message generation and analytics.</p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>This demonstration will illustrate the following process</p>

<table style = 'width:100%;table-layout:fixed;'>
    <tr>
        <td style = 'vertical-align:top' width = '30%'>
            <ol style = 'font-size:16px;font-family:Arial'>
                <li><b>Publish</b> messages that consist of simulated retail events to a Kafka <i>topic</i> </li>
                <br>
                <br>
                <br>
                <li>Invoke a <b>Teradata Parallel Transporter</b> job to consume messages off the stream and write them to a table in Vantage</li>
                <br>
                <br>
                <br>
                <li><b>Analyze</b> the data as it is loaded into the database</li>
            </ol>
        </td>
        <td><img src = 'images/TPT_KafkaAXSMOD.png' width = '600'></td>
    </tr>
</table>

<hr>

<hr>
<p style = 'font-size:20px;font-family:Arial;color:#00233C'><b>Step 1 - Publish messages to the streaming system</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>For this demonstration an <b>Apache Kafka</b> cluster has been installed and configured with a message topic <i>demotopic</i> is required.</p>

```bash
$./kafka-topics.sh --create --topic demotopic --bootstrap-server <broker>:<port>
```

<ol style = 'font-size:16px;font-family:Arial;color:#00233C'>
    <li>Read simulated Digital Retail Events from a local file</li>
    <li>Connect to the Kafka Broker</li>
    <li>Publish messages</li>
    </ol>
    
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><i>This notebook uses the kafka-python library to publish messages to the Kafka infrastructure</i></p>

In [None]:
%%capture
# Install the python package if necessary
!pip install kafka-python

In [None]:
from kafka import KafkaProducer
import pandas as pd
from time import sleep
import datetime, json
from teradataml import *
import teradatasql

from IPython.display import display
from IPython.display import clear_output

import concurrent.futures
from threading import current_thread, get_ident, get_native_id, Event

import matplotlib.pyplot as plt

# create a local dictionary of environment-specific variables

# load vars json
with open('../../vars.json', 'r') as f:
    session_vars = json.load(f)

# Use the "data_engineer" and Business compute group from the base setup
host = session_vars['environment']['host']
username = session_vars['hierarchy']['users']['business_users'][0]['username']
password = session_vars['hierarchy']['users']['business_users'][0]['password']

# Initialize the target table.
# Truncate all records to make the row counts easier to see

with teradatasql.connect(host = host, user = username, password = password) as con:
    cur = con.cursor()
    cur.execute('DELETE demo.kafka_retail_events;')

<hr>
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>1.1 - Read simulated digital events data</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>These records will be used as input to the Kafka stream.</p>

In [None]:
df_newdata = pd.read_csv('Digital_Retail_Events.csv', nrows = 1000)
df_newdata.head()

<hr>
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>1.2 - Connect to Kafka</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>Use the python library to instantiate a connection as a message <i>Producer</i>.</p>

In [None]:
# Create a producer object that connects to the broker(s)


# list of strings that represent tha Kafka brokers
bootstrap_hosts = ['<host>:<port>']

producer = KafkaProducer(bootstrap_servers=bootstrap_hosts)

if producer.bootstrap_connected() == True:
    print('Connected to Bootstrap server(s)')
else:
    print('Connection failed, check error messages')

<hr>
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>1.3 - Publish messages to the stream topic</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>Spawn a separate thread to publish one message per second.  This will run in the background for the rest of the demonstration.</p>

In [None]:
# Define the function that will publish messages
# use two threading event flags - one to stop processing, and one to suppress status output

def produce_messages(event, print_event):

    for key, val in df_newdata.iterrows():
        
        # check thread event - do we kill the thread
        if event.is_set():
            print('Thread Killed')
            return f'Thread Killed'
            break;
        
        msg = str(val['Entity_Id']) + '|' + str(datetime.datetime.now()) + '|' + val['event'] + '|' + str(val['session_id'])     
        if print_event.is_set():
            clear_output()
            print(f'Message {str(key)}: {msg}', end = '\r')
        producer.send(topic = 'demotopic', value = msg.encode('utf-8'))
        sleep(1)

e = Event()
p = Event()
# create a thread pool object using concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers = 1)

# call the user function for each instance in my profile to execute them in parallel
f = executor.submit(produce_messages, e, p)

# set the print flag to print messages
p.set()

<hr>
<p style = 'font-size:20px;font-family:Arial;color:#00233C'><b>Step 2 - Execute the TPT job</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>Teradata Parallel Transporter</b> is an object-oriented client application that provides scalable, high-speed, parallel data extraction, data loading, and data updating. These capabilities can be extended with customizations or with third-party products.  An example of a TPT script that will stream data from a Kafka topic is below.</p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>In order to execute this script, open a terminal session to a host where TPT has been installed.  Execute this using the <i>tbuild</i> command (-f is the input file, -j is the job name)</p>

```bash
$tbuild -f TPT_Kafka_Stream.txt -j jobname
```

<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>This is an example of the TPT script run in the demo:</b></p>

```bash
 DEFINE JOB IMPORT_TO_TERADATA
    DESCRIPTION 'Import data to Teradata from Kafka Server'

    (
    SET LoadTargetTable = 'demo.kafka_retail_events'
    SET StreamTdpId         = '<db_host>'
    SET StreamUserName      = '<db_user>'
    SET StreamUserPassword  = '<db_password>'

        STEP IMPORT_THE_DATA
        (
            APPLY $INSERT @LoadTargetTable TO OPERATOR ($STREAM)
            SELECT * FROM OPERATOR ($FILE_READER()
                ATTR
                (
                    PrivateLogName = 'KAFKA_TestTopic_log',
                    AccessModuleName = 'libkafkaaxsmod.so',
                    AccessModuleInitStr = '-MODE C
                                           -TOPIC demotopic
                                           -BROKERS <broker>:<port>
                                           -BLOCKSIZE 30000
                                           -PARTITION 0
                                           -SHOWP y
                                           -TRACELEVEL 3
                                           -CONFIG compression.codec=none
                                           -CONFIG topic.auto.offset.reset=latest
                                           -alf Y
                                          ',
                     TextDelimiter = '|',
                     Format = 'Delimited'

                )
            );
        );
    );
```

<hr>
<p style = 'font-size:20px;font-family:Arial;color:#00233C'><b>Step 3 - Monitor data loading from the database</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>A simple script showing the data being loaded real-time</p>

<ol style = 'font-size:16px;font-family:Arial;color:#00233C'>
    <li>Connect to the Vantage system</li>
    <li>Query the count of records</li>
    <li>Display an active visualization of event counts</li>
    </ol>

<hr>
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>3.1 - Connect to VantageCloud Lake</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>Create a client connection to the destination database.</p>

In [None]:
# clear the printing of stream publishing messages
p.clear()


eng = create_context(host = host, username = username, password = password)

# confirm connection
print(eng)

<hr>
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>3.2 - Sample the data</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>Query some records and watch the row count increase.</p>

In [None]:
# show a sample of the data
DataFrame.from_query('SELECT TOP 10 * FROM demo.kafka_retail_events ORDER BY datestamp DESC;')

In [None]:
# show the count of records increasing
for i in range(10):
    print(execute_sql('SELECT COUNT(*) FROM demo.kafka_retail_events;').fetchall(), end = '\r')
    sleep(1)

<hr>
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>3.3 - Visualize</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>Sample histogram showing event counts.</p>

In [None]:
# Paint a nice plot of the count of pages increasing

tdf = DataFrame('"demo"."kafka_retail_events"')
for i in range(100):
    
    fig, ax = plt.subplots()
    clear_output(wait=True)
    sleep(3)
    df = tdf.groupby('event').count().to_pandas(all_rows = True)
    df.set_index('event')[['count_datestamp']].plot(kind = 'barh', ax = ax)
    plt.show()
    

<hr>
<p style = 'font-size:16px;font-family:Arial;color:#00233C'><b>Cleanup</b></p>

<p style = 'font-size:16px;font-family:Arial;color:#00233C'>Stop the thread from publishing messages and disconnect from the database.</p>

In [None]:
# Kill the message producer thread if desired
e.set()

In [None]:
remove_context()