# Kafka Consumer for the Insurance Dataset(s)

Create error messages

In [None]:
import pyspark.sql.functions
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.types import StringType, DecimalType, IntegerType, ByteType

In [None]:
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

Set up the consumer

In [None]:
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json


#KAFKA variables, get from your cluster and put into a config file
from config import confluentClusterName
from config import confluentBootstrapServers
from config import confluentTopicName
from config import schemaRegistryUrl
from config import confluentApiKey
from config import confluentSecret
from config import confluentRegistryApiKey
from config import confluentRegistrySecret


#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,# this will create a new consumer group on each invocation.
    'group.id': str(1),
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
    'error_cb': error_cb,
})

c.subscribe(['insurance-capstone2'])

Read the messages

In [None]:
aString = {}

kafkaListDictionaries = []

while(True):
    try:
        msg = c.poll(timeout=15)
        print(msg)
        if msg is None:
            break
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            break
        else:
            aString=json.loads('{}'.format(msg.value().decode('utf-8')))
            aString['timestamp'] = msg.timestamp()[1]
            kafkaListDictionaries.append(aString)
            charge = aString['charges']
            print("New claim with charge of: " + charge)
    except Exception as e:
        print(e)


## Start (ET)L for the InusranceCharges Dataset

In [None]:
# turn kafkaListDictionaries into spark dataframe
sparkdf = spark.createDataFrame(kafkaListDictionaries)
display(sparkdf)

In [None]:
#Change Smoker Y/N to 1/0
from pyspark.sql.functions import regexp_replace
sparkdf = sparkdf.withColumn('smoker', regexp_replace('smoker', 'no', '0'))
sparkdf = sparkdf.withColumn('smoker', regexp_replace('smoker', 'yes', '1'))
display(sparkdf)

In [None]:
sparkdf.dtypes

Convert the data types appropriately for database

In [None]:
df = sparkdf.withColumn("age", col("age").cast(IntegerType()))
df = df.withColumn("bmi", col("bmi").cast(DecimalType()))
df = df.withColumn("charges", col("charges").cast(DecimalType()))
df = df.withColumn("children", col("children").cast(IntegerType()))
df = df.withColumn("smoker", col("smoker").cast(ByteType()))

In [None]:
df.dtypes

Let's clean the data

In [None]:
#Drop null values
df = sparkdf.dropna()

#Drop duplicates
df = df.dropDuplicates()

Let's set the ranges for data values to filter incoming data.

In [None]:
age_lower = 18
age_upper = 120
children_lower = 0
children_upper = 10


In [None]:
df = df.where(df.age >= age_lower)
df = df.where(df.age <= age_upper)
df = df.where(df.children >= children_lower)
df = df.where(df.children <= children_upper)
df.show()

In [None]:
df.count()

Save the data to CSV file; then load to database

In [None]:
# Mount the capstone container (output)
from config import storageAccount
from config import storageContainer
from config import clientSecret
from config import clientid
mount_point = "/mnt/capstone-group2-data/dataout"
    
    
configs = {"fs.azure.account.auth.type": "OAuth",
   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
   "fs.azure.account.oauth2.client.id": clientid,
   "fs.azure.account.oauth2.client.secret": clientSecret,
   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
   "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try:
    dbutils.fs.unmount(mount_point)
except:
    pass


dbutils.fs.mount(
source = f"abfss://{storageContainer}@{storageAccount}.dfs.core.windows.net/", 
mount_point = mount_point, 
extra_configs = configs)

In [None]:
df.write.mode("overwrite").option("header", "true").csv("/mnt/capstone-group2-data/dataout/cleandata/cleanHealthCosts/clean_insurance")