Making a Consumer

In [0]:
#report error, throw an exception
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)


def acked(err, msg):
    """ 
        Error callback is used for generic issues for producer errors. 
        
        Parameters:
            err (err): Error flag.
            msg (str): Error message that was part of the callback.
    """
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

In [0]:
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json


#KAFKA variables, Move to the OS variables or configuration
# This will work in local Jupiter Notebook, but in a databrick, hiding config.py is tougher. 
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
# can change topic name
confluentTopicName = "rcensus"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"


#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

c.subscribe(['rcensus']) # subscribe to the topic


In [0]:
# Consume all messages

aString = {}
# collecting all messages to put into a data frame, panda, etc.
kafkaListDictionaries = []

# demo shows while(True):
while(True):
    try:
        msg = c.poll(timeout=10)
        print(msg)
        if msg is None:
            break
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            break
        else:
            aString=json.loads('{}'.format(msg.value().decode('utf-8')))
            kafkaListDictionaries.append(aString)
    except Exception as e:
        print(e)
    for message in kafkaListDictionaries:
        print(message)
# if msg is None and len(kafkaListDictionaries) == 0:
#     dbutils.notebook.exit("no messages")

In [0]:
# print out results from consumer
kafkaListDictionaries

In [0]:
# quick ETL
# create pyspark dataframe with census topic dictionaries
# drop any NAs
# drop any duplicated messages

censusListdf = spark.createDataFrame(kafkaListDictionaries)
censusListdf = censusListdf.na.drop()
censusListdf = censusListdf.dropDuplicates()

In [0]:
# view dataframe to confirm changes
censusListdf.show()

In [0]:
# change order of data frame
censusListdf = censusListdf.select('State', 'Total_Population', 'Public_Insurance_Alone', 'Medicare_Alone', 'Medicaid_Alone', 'VA_Alone', 'State_ID')

In [0]:
# view changes
censusListdf.show()

In [0]:
# check data types
censusListdf.printSchema()

In [0]:
# change data types for appropriate columns
# int: state id
# float: all population columns
# did not change data type (string) for state
# display dataframe and print schema to confirm changes accepted
censusListdf = censusListdf.withColumn('Total_Population', censusListdf.Total_Population.cast('float')).withColumn('Public_Insurance_Alone', censusListdf.Public_Insurance_Alone.cast('float')).withColumn('Medicare_Alone', censusListdf.Medicare_Alone.cast('float')).withColumn('Medicaid_Alone', censusListdf.Medicaid_Alone.cast('float')).withColumn('VA_Alone', censusListdf.VA_Alone.cast('float')).withColumn('State_ID', censusListdf.State_ID.cast('int'))
censusListdf.show()
# view finalized data frame
display(censusListdf)
censusListdf.printSchema()

State,Total_Population,Public_Insurance_Alone,Medicare_Alone,Medicaid_Alone,VA_Alone,State_ID
District of Columbia,692059.0,154098.0,17803.0,135312.0,983.0,11
California,38838728.0,9774780.0,1974723.0,7724092.0,75965.0,6
Pennsylvania,12590644.0,2407050.0,657439.0,1727514.0,22097.0,42
New York,19276808.0,4542469.0,932578.0,3585778.0,24113.0,36
West Virginia,1778080.0,471938.0,114398.0,349027.0,8513.0,54
Utah,3124563.0,339098.0,116851.0,217665.0,4582.0,49
Alaska,711104.0,139179.0,23270.0,111808.0,4101.0,2
South Carolina,4990992.0,1009459.0,306122.0,684845.0,18492.0,45
Maine,1325025.0,227239.0,78486.0,143635.0,5118.0,23
Georgia,10321846.0,1851109.0,542688.0,1270484.0,37937.0,13


In [0]:
# rename for SQL
censusListdf = censusListdf.withColumnRenamed("State","Census_State").withColumnRenamed("Public_Insurance_Alone","Population_Publicly_Insured").withColumnRenamed("Medicare_Alone","Population_Medicare").withColumnRenamed("Medicaid_Alone","Population_Medicaid").withColumnRenamed("VA_Alone","Population_VA")

In [0]:
# add Census_ID for SQL DB
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
df = censusListdf.withColumn("Census_ID", F.monotonically_increasing_id())

display(df)

Census_State,Total_Population,Population_Publicly_Insured,Population_Medicare,Population_Medicaid,Population_VA,State_ID,Census_ID
District of Columbia,692059.0,154098.0,17803.0,135312.0,983.0,11,0
California,38838728.0,9774780.0,1974723.0,7724092.0,75965.0,6,1
Pennsylvania,12590644.0,2407050.0,657439.0,1727514.0,22097.0,42,2
New York,19276808.0,4542469.0,932578.0,3585778.0,24113.0,36,3
West Virginia,1778080.0,471938.0,114398.0,349027.0,8513.0,54,4
Utah,3124563.0,339098.0,116851.0,217665.0,4582.0,49,5
Alaska,711104.0,139179.0,23270.0,111808.0,4101.0,2,6
South Carolina,4990992.0,1009459.0,306122.0,684845.0,18492.0,45,7
Maine,1325025.0,227239.0,78486.0,143635.0,5118.0,23,8
Georgia,10321846.0,1851109.0,542688.0,1270484.0,37937.0,13,9


In [0]:
# correct order of Census_ID
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F

windowSpec = W.orderBy("Census_State")
censusListdf = df.withColumn("Census_ID", F.row_number().over(windowSpec))
censusListdf.show()

In [0]:
# change order of data frame
censusListdf = censusListdf.select('Census_ID', 'Census_State', 'State_ID', 'Total_Population', 'Population_Publicly_Insured', 'Population_Medicare', 'Population_Medicaid', 'Population_VA')

In [0]:
display(censusListdf)

Census_ID,Census_State,State_ID,Total_Population,Population_Publicly_Insured,Population_Medicare,Population_Medicaid,Population_VA
1,Alabama,1,4813429.0,962293.0,291473.0,656093.0,14727.0
2,Alaska,2,711104.0,139179.0,23270.0,111808.0,4101.0
3,Arizona,4,7065326.0,1650936.0,473869.0,1153664.0,23403.0
4,Arkansas,5,2957271.0,800142.0,199507.0,588981.0,11654.0
5,California,6,38838728.0,9774780.0,1974723.0,7724092.0,75965.0
6,Colorado,8,5588760.0,1097732.0,296328.0,782875.0,18529.0
7,Connecticut,9,3520172.0,739685.0,191001.0,543381.0,5303.0
8,Delaware,10,951930.0,193141.0,50174.0,140917.0,2050.0
9,District of Columbia,11,692059.0,154098.0,17803.0,135312.0,983.0
10,Florida,12,20897188.0,4367261.0,1739231.0,2551095.0,76935.0


In [0]:
# establish connection to COPY database
database = "Copy_teamRocket"
table = "dbo.CensusVals"
user = "gen10dbadmin"
password  = "vbtwrEmgSG12mabBr9ReZkcPrrDbTR-Y"
server = "gen10-data-fundamentals-22-02-sql-server.database.windows.net"


In [0]:
# write census table to COPY database
censusListdf.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .mode("overwrite") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

In [0]:
# # establish connection to database
# database = "teamRocket"
# table = "dbo.CensusVals"
# user = "rocketuser"
# password  = "Bl@sting0ff"
# server = "gen10-data-fundamentals-22-02-sql-server.database.windows.net"


In [0]:
# # write census table to database
# censusListdf.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
#     .mode("overwrite") \
#     .option("dbtable", table) \
#     .option("user", user) \
#     .option("password", password) \
#     .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
#     .save()