In [0]:
# Define Error Callbacks
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)


def acked(err, msg):
    """ 
        Error callback is used for generic issues for producer errors. 
        
        Parameters:
            err (err): Error flag.
            msg (str): Error message that was part of the callback.
    """
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

In [0]:
# Connection String
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
from confluent_kafka.admin import AdminClient, NewTopic

##### CHANGE YOUR CONFLUENTTOPICNAME TO YOUR TOPIC ######### 

#KAFKA variables, Move to the OS variables or configuration
# This will work in local Jupiter Notebook, but in a databrick, hiding config.py is tougher. 
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "pushing-p"  ## MAKE SURE YOU CHANGE THIS
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"


#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

c.subscribe(['pushing-p'])

In [0]:
# List of Counties in New Jersey (ordered Alphabetically)
county_list = ['Atlantic County',
 'Bergen County',
 'Burlington County',
 'Camden County',
 'Cape May County',
 'Cumberland County',
 'Essex County',
 'Gloucester County',
 'Hudson County',
 'Hunterdon County','Mercer County', 'Middlesex County', 'Monmouth County', 'Morris County', 'Ocean County', 'Passaic County', 'Salem County', 'Somerset County', 'Sussex County', 'Union County', 'Warren County']

In [0]:
admin_database = "master"
admin_user = "gen10dbadmin"
admin_password  = "vbtwrEmgSG12mabBr9ReZkcPrrDbTR-Y"
admin_server = "gen10-data-fundamentals-22-02-sql-server.database.windows.net"

database = "Pushing-P-DB"
table = "dbo.Live_Feed"
user = "pushing_p"
password  = "t3stP@ssword"
server = "gen10-data-fundamentals-22-02-sql-server.database.windows.net"

In [0]:
# Consume messages from Topic in chunks of 10000

aString = {}

kafkaListDictionaries = []

j = 0

for i in range(1000):
    
    try:
        
        msg = c.poll(timeout=15)
        
        if msg is None:  # When we run out of messages add whatever messages remain to SQL Database
          
            # Create DataFrame from Messages
            county_df = spark.createDataFrame(kafkaListDictionaries)

            # Write PySpark DataFrame to SQL Database 
            county_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
                .mode("overwrite") \
                .option("dbtable", table) \
                .option("user", user) \
                .option("password", password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .save()
            
            break
            
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            break
            
        else:
            
            aString=json.loads('{}'.format(msg.value().decode('utf-8')))
            aString['Timestamp'] = msg.timestamp()[1]
            
            # Get Current County
            current_county = county_list[j]
            
            # If the county changed, upload the current messages to SQL
            if aString['County'] != current_county:
                
                # Create DataFrame from Messages
                county_df = spark.createDataFrame(kafkaListDictionaries)
                
                # Write PySpark DataFrame to SQL Database 
                county_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
                    .mode("overwrite") \
                    .option("dbtable", table) \
                    .option("user", user) \
                    .option("password", password) \
                    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                    .save()
                
                # Reset List of messages and move to next county
                kafkaListDictionaries = []
                j = j + 1
                
                sleep(30)
                
            kafkaListDictionaries.append(aString)
            c.commit(asynchronous=False)
            
    except Exception as e:
        print(e)