# Kafka Consumer

**Error functions**

In [None]:
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)


**Setting Consumer Parameters**

In [None]:
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json
from pyspark.sql import SparkSession, Row
import pandas as pd

#KAFKA variables
confluentClusterName = config.confluentClusterName
confluentBootstrapServers = config.confluentBootstrapServers
confluentTopicName = config.confluentTopicName
schemaRegistryUrl = config.schemaRegistryUrl
confluentApiKey = config.confluentApiKey
confluentSecret = config.confluentSecret
confluentRegistryApiKey = config.confluentRegistryApiKey
confluentRegistrySecret = config.confluentRegistrySecret

#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

c.subscribe(['diabetesfinal-10'])

**Running Consumer**

In [None]:
kafkaListDictionaries = []             
while(True): 
    try:
        msg = c.poll(timeout=1.0)
        if msg is None:
            break
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            pass
        else:
            df_dict=json.loads('{}'.format(msg.value().decode('utf-8')))          
            kafkaListDictionaries.append(df_dict)                             #Store dictionary message in list "kafkaListDictionaries"  
            sleep(0.3)
            del df_dict
        
            if len(kafkaListDictionaries) > 500:                              #When list exceeds 500 items, save list as csv file
                df = pd.DataFrame(kafkaListDictionaries)
                df.drop_duplicates()
                time = str(msg.timestamp()[1])
                filepath = r'/dbfs/mnt/eddydoering/diabeetus/consumer_final/diabetesfinal-4/' + time + ".csv"    #name csv file based on timestamp
                df.to_csv(filepath, index=False)         
                kafkaListDictionaries = []
            
                     
    except Exception as e:
        print(e)
        
df = pd.DataFrame(kafkaListDictionaries)                     #When no messages remain, save leftover messages as csv
df.drop_duplicates()
filepath = r'/dbfs/mnt/eddydoering/diabeetus/consumer_final/diabetesfinal-3/' + "final.csv"
df.to_csv(filepath, index=False)