# Install Confluent Kafka Library

In [1]:
!pip install confluent-kafka

Collecting confluent-kafka
  Downloading confluent_kafka-1.5.0-cp37-cp37m-manylinux1_x86_64.whl (8.1 MB)
[K     |████████████████████████████████| 8.1 MB 18.2 MB/s eta 0:00:01��        | 6.1 MB 18.2 MB/s eta 0:00:01
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-1.5.0


# Set Environment Variables for Kafka Application

In [1]:
# @hidden_cell
%env KAFKA_BROKERS=eda-dev-kafka-bootstrap-eventstreams.gse-eda-2021-1-0143c5dd31acd8e030a1d6e0ab1380e3-0000.us-east.containers.appdomain.cloud:443
%env KAFKA_USER=
%env KAFKA_PASSWORD=
%env KAFKA_CERT=/project_data/data_asset/es-cert-new.pem

env: KAFKA_BROKERS=eda-dev-kafka-bootstrap-eventstreams.gse-eda-2021-1-0143c5dd31acd8e030a1d6e0ab1380e3-0000.us-east.containers.appdomain.cloud:443
env: KAFKA_USER=
env: KAFKA_PASSWORD=
env: KAFKA_CERT=/project_data/data_asset/es-cert-new.pem


# Define Kafka Producer Class

In [3]:
import json, os
from confluent_kafka import KafkaError, Producer

class KafkaProducer:

    def __init__(self, groupID = "KafkaProducer"):
        # Get the producer configuration
        self.producer_conf = self.getProducerConfiguration(groupID)
        # Create the producer
        self.producer = Producer(self.producer_conf)

    def getProducerConfiguration(self,groupID):
        try:
            options ={
                    'bootstrap.servers': os.environ['KAFKA_BROKERS'],
                    'group.id': groupID
            }
            if (os.getenv('KAFKA_PASSWORD','') != ''):
                # Set security protocol common to ES on prem and on IBM Cloud
                options['security.protocol'] = 'SASL_SSL'
                # Depending on the Kafka User, we will know whether we are talking to ES on prem or on IBM Cloud
                # If we are connecting to ES on IBM Cloud, the SASL mechanism is plain
                if (os.getenv('KAFKA_USER','') == 'token'):
                    options['sasl.mechanisms'] = 'PLAIN'
                # If we are connecting to ES on OCP, the SASL mechanism is scram-sha-512
                else:
                    options['sasl.mechanisms'] = 'SCRAM-SHA-512'
                # Set the SASL username and password
                options['sasl.username'] = os.getenv('KAFKA_USER','')
                options['sasl.password'] = os.getenv('KAFKA_PASSWORD','')
            # If we are talking to ES on prem, it uses an SSL self-signed certificate.
            # Therefore, we need the CA public certificate for the SSL connection to happen.
            if (os.path.isfile(os.getenv('KAFKA_CERT','/certs/es-cert.pem'))):
                options['ssl.ca.location'] = os.getenv('KAFKA_CERT','/certs/es-cert.pem')
            
            # Print out the producer configuration
            self.printProducerConfiguration(options)

            return options

        except KeyError as error:
            print('[KafkaProducer] - [ERROR] - A required environment variable does not exist: ' + error)
            exit(1)

    def printProducerConfiguration(self,options):
        # Printing out producer config for debugging purposes        
        print("[KafkaProducer] - This is the configuration for the producer:")
        print("[KafkaProducer] - -------------------------------------------")
        print('[KafkaProducer] - Bootstrap Server:      {}'.format(options['bootstrap.servers']))
        if (os.getenv('KAFKA_PASSWORD','') != ''):
            # Obfuscate password
            if (len(options['sasl.password']) > 3):
                obfuscated_password = options['sasl.password'][0] + "*****" + options['sasl.password'][len(options['sasl.password'])-1]
            else:
                obfuscated_password = "*******"
            print('[KafkaProducer] - Security Protocol:     {}'.format(options['security.protocol']))
            print('[KafkaProducer] - SASL Mechanism:        {}'.format(options['sasl.mechanisms']))
            print('[KafkaProducer] - SASL Username:         {}'.format(options['sasl.username']))
            print('[KafkaProducer] - SASL Password:         {}'.format(obfuscated_password))
            if (os.path.isfile(os.getenv('KAFKA_CERT','/certs/es-cert.pem'))): 
                print('[KafkaProducer] - SSL CA Location:       {}'.format(options['ssl.ca.location']))
        print("[KafkaProducer] - -------------------------------------------")

    def delivery_report(self,err, msg):
        """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """
        if err is not None:
            print('[KafkaProducer] - [ERROR] - Message delivery failed: {}'.format(err))
        else:
            print('[KafkaProducer] - Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

    def publishEvent(self, topicName, eventToSend, keyName):
        # Print the event to send
        dataStr = json.dumps(eventToSend)
        # Produce the message
        self.producer.produce(topicName,key=eventToSend[keyName],value=dataStr.encode('utf-8'), callback=self.delivery_report)
        # Flush
        self.producer.flush()


# Define Kafka Consumer Class

In [4]:
import json,os,csv
from confluent_kafka import Consumer, KafkaError


class KafkaConsumer:

    def __init__(self, topic_name = "kafka-producer", groupID = 'KafkaConsumer-F21', autocommit = True):
        # Get the consumer configuration
        self.consumer_conf = self.getConsumerConfiguration(groupID, autocommit)
        # Create the Avro consumer
        self.consumer = Consumer(self.consumer_conf)
        # Subscribe to the topic
        self.consumer.subscribe([topic_name])

    def getConsumerConfiguration(self, groupID, autocommit):
        try:
            options ={
                    'bootstrap.servers': os.environ['KAFKA_BROKERS'],
                    'group.id': groupID,
                    'auto.offset.reset': "earliest",
                    'enable.auto.commit': autocommit,
            }
            if (os.getenv('KAFKA_PASSWORD','') != ''):
                # Set security protocol common to ES on prem and on IBM Cloud
                options['security.protocol'] = 'SASL_SSL'
                # Depending on the Kafka User, we will know whether we are talking to ES on prem or on IBM Cloud
                # If we are connecting to ES on IBM Cloud, the SASL mechanism is plain
                if (os.getenv('KAFKA_USER','') == 'token'):
                    options['sasl.mechanisms'] = 'PLAIN'
                # If we are connecting to ES on OCP, the SASL mechanism is scram-sha-512
                else:
                    options['sasl.mechanisms'] = 'SCRAM-SHA-512'
                # Set the SASL username and password
                options['sasl.username'] = os.getenv('KAFKA_USER','')
                options['sasl.password'] = os.getenv('KAFKA_PASSWORD','')
            # If we are talking to ES on prem, it uses an SSL self-signed certificate.
            # Therefore, we need the CA public certificate for the SSL connection to happen.
            if (os.path.isfile(os.getenv('KAFKA_CERT','/certs/es-cert.pem'))):
                options['ssl.ca.location'] = os.getenv('KAFKA_CERT','/certs/es-cert.pem')

            # Print out the producer configuration
            self.printConsumerConfiguration(options)

            return options

        except KeyError as error:
            print('[KafkaConsumer] - [ERROR] - A required environment variable does not exist: ' + error)
            exit(1)
    
    def printConsumerConfiguration(self,options):
        # Printing out consumer config for debugging purposes        
        print("[KafkaConsumer] - This is the configuration for the consumer:")
        print("[KafkaConsumer] - -------------------------------------------")
        print('[KafkaConsumer] - Bootstrap Server:      {}'.format(options['bootstrap.servers']))
        if (os.getenv('KAFKA_PASSWORD','') != ''):
            # Obfuscate password
            if (len(options['sasl.password']) > 3):
                obfuscated_password = options['sasl.password'][0] + "*****" + options['sasl.password'][len(options['sasl.password'])-1]
            else:
                obfuscated_password = "*******"
            print('[KafkaConsumer] - Security Protocol:     {}'.format(options['security.protocol']))
            print('[KafkaConsumer] - SASL Mechanism:        {}'.format(options['sasl.mechanisms']))
            print('[KafkaConsumer] - SASL Username:         {}'.format(options['sasl.username']))
            print('[KafkaConsumer] - SASL Password:         {}'.format(obfuscated_password))
            if (os.path.isfile(os.getenv('KAFKA_CERT','/certs/es-cert.pem'))): 
                print('[KafkaConsumer] - SSL CA Location:       {}'.format(options['ssl.ca.location']))
        print('[KafkaConsumer] - Offset Reset:          {}'.format(options['auto.offset.reset']))
        print('[KafkaConsumer] - Autocommit:            {}'.format(options['enable.auto.commit']))
        print("[KafkaConsumer] - -------------------------------------------")
    
    # Prints out and returns the decoded events received by the consumer
    def traceResponse(self, msg):
        print('[KafkaConsumer] - Next Message consumed from {} partition: [{}] at offset: {}\n\tkey: {}\n\tvalue: {}'
                    .format(msg.topic(), msg.partition(), msg.offset(), msg.key().decode('utf-8'), msg.value().decode('utf-8')))

    # Polls for next event and score deployed model
    def pollNextEvent(self):

      # Poll for messages
      msg = self.consumer.poll(timeout=10.0)

            
      if msg is None:
        print("[KafkaConsumer] - [INFO] - No new messages on the topic")
        
      while not (msg is None):

        if msg.error():
            if ("PARTITION_EOF" in msg.error()):
                print("[KafkaConsumer] - [INFO] - End of partition")
            else:
                print("[KafkaConsumer] - [ERROR] - Consumer error: {}".format(msg.error()))
        else:
            
            # Read Telemetry data from Kafka topic
            loaded_json = json.loads(msg.value().decode('utf-8'))
            payload = loaded_json["payload"].strip('()')
            values = list(payload.split(",")) 
            
            fields = ['container_id', 'timestamp', 'product_id', 'temperature','target_temperature','ambiant_temperature','kilowatts','content_type','oxygen_level','nitrogen_level', \
                'carbon_dioxide_level', 'humidity_level', 'fan_1', 'vent_2', 'vent_3', 'time_door_open', 'latitude', 'longitude', 'defrost_cycle', 'maintenance_required'] 
            
            # Define fields needed for model
            fields_model= ["temperature","ambiant_temperature","kilowatts","oxygen_level","nitrogen_level","humidity_level","fan_1","vent_2"]
    
            col_idx_list = []
            for i in fields_model:
                col_idx=fields.index(i)
                col_idx_list.append(col_idx)
    
            values_model = [float(values[i]) for i in col_idx_list]
    
            payload_scoring = {"input_data": [{"fields": fields_model,"values": [values_model]}]}
            
            # Call anomaly detection model
            scoring_response = wml_client.deployments.score(deployment_id, payload_scoring)
        
            prediction_label = scoring_response['predictions'][0]['values'][0][0]
            
            # Send anomalies only to Kafka Topic
            if prediction_label == 1:
                event = {"eventKey" : "Anomaly Detection Topic", "message" : scoring_response}         
                kafka_producer = KafkaProducer() 
                kafka_producer.publishEvent("Anomaly Detection Topic",event,"eventKey")
        
            
            
        msg = self.consumer.poll(timeout=10.0)    
      
      f.close()
      
      
    
    def close(self):
        self.consumer.close()

# Authenticate to WML Client

In [16]:
from ibm_watson_machine_learning import APIClient
token = os.environ['USER_ACCESS_TOKEN']
wml_credentials = {
                   "token": token,
                   "instance_id" : "wml_local",
                   "url": os.environ['RUNTIME_ENV_APSX_URL'],
                   "version": "3.5"
}
wml_client = APIClient(wml_credentials)

# Obtain the UId of your space
def guid_from_space_name(client, space_name):
    instance_details = client.service_instance.get_details()
    space = client.spaces.get_details()
    return(next(item for item in space['resources'] if item['entity']["name"] == space_name)['metadata']['id'])

space_uid = guid_from_space_name(wml_client, 'Vaccine Model Production Deployment Space')
print("Space UID = " + space_uid)

Space UID = 285c322f-c23f-4784-83fa-bf5dd73d06aa


In [17]:
wml_client.set.default_space("b409e49d-b3d7-4436-81d1-3172c821f920")

'SUCCESS'

In [18]:
deployment_id = '85bb8780-736c-4c07-890c-672ca0495307' 

# Read Events from Kafka Topic, Call Model Endpoint, Return Anomalous Events Back to Topic

In [26]:

if __name__ == '__main__':
    
    # Create a Kafka Consumer
    kafka_consumer = KafkaConsumer("telemetries")
    # Poll for next message
    message = kafka_consumer.pollNextEvent()
    # Close the consumer
    kafka_consumer.close()

[KafkaConsumer] - This is the configuration for the consumer:
[KafkaConsumer] - -------------------------------------------
[KafkaConsumer] - Bootstrap Server:      eda-dev-kafka-bootstrap-eventstreams.gse-eda-2021-1-0143c5dd31acd8e030a1d6e0ab1380e3-0000.us-east.containers.appdomain.cloud:443
[KafkaConsumer] - Security Protocol:     SASL_SSL
[KafkaConsumer] - SASL Mechanism:        SCRAM-SHA-512
[KafkaConsumer] - SASL Username:         qijun-test
[KafkaConsumer] - SASL Password:         b*****l
[KafkaConsumer] - SSL CA Location:       /project_data/data_asset/es-cert-new.pem
[KafkaConsumer] - Offset Reset:          earliest
[KafkaConsumer] - Autocommit:            True
[KafkaConsumer] - -------------------------------------------
[KafkaProducer] - This is the configuration for the producer:
[KafkaProducer] - -------------------------------------------
[KafkaProducer] - Bootstrap Server:      eda-dev-kafka-bootstrap-eventstreams.gse-eda-2021-1-0143c5dd31acd8e030a1d6e0ab1380e3-0000.us-ea