#Spark Structured Streaming using Kafka and SASL_SSL and SCRAM-SHA-256

The consumer and producer implemented using referance Confluent Kafka library and CloudKarafka cluster:
<ol>
  <li>Create jass file for connecting to Kafka cluster</li>
  <li>Setup Databricks for remote kafka authentication</li>
  <li>Authenticate to Karafka cluster using Spark</li>
  <li>View Structured Stream in Databricks</li> 
</ol>

#Configuration parameters

In [None]:
#UseKafkaBrokersFollowedByCommas
CLOUDKARAFKA_BROKERS = 'Broker1:port,Broker2:port,Broker3:port'

#Kafka topic to consume from
CLOUDKARAFKA_TOPIC = "KafkaTopic" #usually looks like CLOUDKARAFKA_USERNAME-default or CLOUDKARAFKA_USERNAME-anything_you_used_during_setup

#Kafka Partition for consuming the topic
CLOUDKARAFKA_TOPIC_PARTITION = 0

###Create config to be used by the Kafka client class

###Create JAAS file to be used to connect with secure kafka cluster(karafka)

Define Kafka client
<ol>
  <li>We are going to use Scram login module as it is used by Karafka brokers as mentioned by their website "org.apache.kafka.common.security.scram.ScramLoginModule"</li>
  <li>Debug is set to true so we can see the attributes used to create the consumer(use logging level as "INFO")</li>
  <li>Username is as mentioned in your karafka dashboard</li>
  <li>Password is as mentioned in your karafka dashboard</li> 
</ol>

update username and password accordingly

In [None]:
kafka_conf = """
            KafkaClient {
            org.apache.kafka.common.security.scram.ScramLoginModule required
            debug=true
            username="karafka_username"
            password="karafka_password";
            };
             """

Write the file to disk, it will be used to set configuration later

In [None]:
with open('/databricks/driver/kafka_jaas.conf', 'w+') as fp:
    fp.write(kafka_conf)
    fp.close()

In [None]:
import os

os.listdir('/databricks/driver/')

Out[96]: ['conf',
 'preload_class.lst',
 'eventlogs',
 'kafka_conf.conf',
 'metastore_db',
 'kafka_jaas.conf',
 'logs',
 'ganglia']

####Configuration of consumer so as to connect to Kafka cluster

All parameters you can work with in kafka client
<br>Link : https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
<br>
<ol>
  <li><b>'bootstrap.servers'</b>: Kafka brokers to connect to[As per your karafka dashboard] </li>
  <li><b>'default.topic.config</b>:Offset to start reading from in the partition of a topic </li>
  <li><b>'security.protocol':</b> Security protocol to be used</li>
   <li><b>'sasl.mechanisms':</b> Authentication to be used </li>
</ol>

In [None]:
consumer_conf = {
    'bootstrap.servers': CLOUDKARAFKA_BROKERS,
    'subscribe': CLOUDKARAFKA_TOPIC,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
}

#SparkSession

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession\
        .builder\
        .appName("Kafka_Pyspark")\
        .getOrCreate()

###Set sparkContext property for authentication

###<p><b>For databricks environment :Add Kafka clients libraries to your environment</b>
  <br>Follow instructions from environment

In [None]:
spark.sparkContext.setSystemProperty('java.security.auth.login.config', '/databricks/driver/kafka_conf.conf')

#Handling stopwords

In [None]:
!pip install nltk

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [None]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Out[102]: True

In [None]:
from nltk.corpus import stopwords

swords  = sc.broadcast(list(set(stopwords.words('english'))))

In [None]:
def remove_stop(word):
    if len(word)<2:
        return False
    return  not (word in swords.value)


In [None]:
stopword_check = udf(lambda word : remove_stop(word))

#Create Spark Structure stream

In [None]:
kafka_data = spark \
        .readStream \
        .format("kafka") \
        .options(**consumer_conf)\
        .option("forceDeleteTempCheckpointLocation", 'true')\
        .load()

Decode producer messaged

In [None]:
from pyspark.sql.functions import col, decode, udf
from pyspark.sql.types import StringType

decode_text = udf(lambda x: x.decode('unicode-escape'), StringType())

#Confirm Schema
kafka_data.printSchema()


#Schema creation
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([
    StructField("data",StringType(), True)
])

#convert byte obj to string
from pyspark.sql.functions import from_json, col

df = kafka_data.withColumn("value", col("value").cast("String"))
df = kafka_data.withColumn('value', decode_text('value'))
df = df.filter(df.partition==0)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Convert message text into words, each word in a separate row

In [None]:
from pyspark.sql.functions import explode, lower, split

df = df.select(explode(split(col('value'), pattern="\\W+")).alias("Data"), "timestamp").select(lower(col('Data')).alias('Data'), "timestamp")

Remove stopwords from data

In [None]:
df = df.select('Data', stopword_check(col('Data')).alias('filtered'), "timestamp")

In [None]:
df = df.filter(df.filtered==True).select('Data', "timestamp").withWatermark('timestamp', '0 seconds').groupBy(col('Data'), col('timestamp')).count()
df.printSchema()

root
 |-- Data: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [None]:
df = df.sort(col('count').desc())

In [None]:
print(f"is spark streaming : {df.isStreaming}")

is spark streaming : True


Stream Data

In [None]:
query = df\
    .writeStream\
    .format("memory")\
    .queryName("counts")\
    .outputMode("complete")\
    .trigger(processingTime="10 seconds")\
    .start()

In [None]:
spark.sql("SELECT * FROM counts").show(5)

+--------+-----+
|    Data|count|
+--------+-----+
|    upon|    2|
|  hurled|    2|
|clinging|    1|
|mountain|    1|
| drowned|    1|
+--------+-----+
only showing top 5 rows

