This sample code (Python) will demo how to consume Kafka topics using Azure Databricks (Spark), Confluent Cloud (Kafka) running on Azure, Schema Registry and AVRO format.

## Set up the environment.

* You must have a Confluent cluster, an API Key and secret, a Schema Registry, an API Key and secret for the registry, and a topic
* Do a pip install of this: confluent-kafka[avro,json,protobuf]>=1.4.2
* Notebooks must be detached and re-attached before they can see new libraries
* For production use you'll need the pip install in an init script

In [3]:
confluentBootstrapServers = "CHANGE_HERE"
#confluentApiKey = dbutils.secrets.get(scope = "confluentTest", key = "api-key")
#confluentSecret = dbutils.secrets.get(scope = "confluentTest", key = "secret")
#confluentRegistryApiKey = dbutils.secrets.get(scope = "confluentTest", key = "registry-api-key")
#confluentRegistrySecret = dbutils.secrets.get(scope = "confluentTest", key = "registry-secret")
confluentApiKey = "CHANGE_HERE"
confluentSecret = "CHANGE_HERE"
confluentRegistryApiKey = "CHANGE_HERE"
confluentRegistrySecret = "CHANGE_HERE"
confluentTopicName = "CHANGE_HERE"
schemaRegistryUrl = "CHANGE_HERE"


### Set up the client for the Schema Registry

In [5]:
from confluent_kafka.schema_registry import SchemaRegistryClient

schema_registry_conf = {
    'url': schemaRegistryUrl,
    'basic.auth.user.info': '{}:{}'.format(confluentRegistryApiKey, confluentRegistrySecret)}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

In [6]:
import pyspark.sql.functions as fn
from pyspark.sql.avro.functions import from_avro

keyRestResponseSchema = schema_registry_client.get_latest_version(confluentTopicName + "-key").schema
confluentKeySchema = keyRestResponseSchema.schema_str
valueRestResponseSchema = schema_registry_client.get_latest_version(confluentTopicName + "-value").schema
confluentValueSchema = valueRestResponseSchema.schema_str

# Set the option for how to fail - either stop on the first failure it finds (FAILFAST) or just set corrupt data to null (PERMISSIVE)
#fromAvroOptions = {"mode":"FAILFAST"}
fromAvroOptions= {"mode":"PERMISSIVE"}

AvroDF = ( 
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", confluentBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "earliest")  
  .load()
  .withColumn('fixedKey', fn.expr("substring(key, 6, length(key)-5)"))
  .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
  .select(from_avro('fixedKey',confluentKeySchema,fromAvroOptions).alias('parsedKey'), from_avro('fixedValue', confluentValueSchema,fromAvroOptions).alias('parsedValue'))
)

In [7]:
display(AvroDF)

parsedKey,parsedValue
List(P53P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:14.6127573Z, List(List(P53P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(39.735485445584864, kN))))"
List(P47P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:27.1241787Z, List(List(P47P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(25.921665050984206, kN))))"
List(P45P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:31.6205762Z, List(List(P45P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(80.43858566341856, kN))))"
List(P43P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:36.1158037Z, List(List(P43P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(80.12021988635894, kN))))"
List(P38P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:48.1111512Z, List(List(P38P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(66.41252141744481, kN))))"
List(A58P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:03.1009060Z, List(List(A58P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(53.00851392234141, kN))))"
List(A58P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:55.6006928Z, List(List(A58P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(28.385592358366395, kN))))"
List(P53P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:55.6006928Z, List(List(P53P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(68.3970357609899, kN))))"
List(P47P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:55.6006928Z, List(List(P47P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(19.7304256352272, kN))))"
List(P45P-LC-001),"List(List(tbc, tbc, 0.0), 2020-10-08T16:22:55.6006928Z, List(List(P45P-LC-001, http://data.posccaesar.org/rdl/RDS372689, List(75.4647814554464, kN))))"


In [8]:
# Create a DataFrame that blows out the parsedValue into the three Skechers columns
AvroDFCurated = AvroDF.select("parsedValue.SOSAresultTime", "parsedValue.SOSASensors.UAID", "parsedValue.SOSASensors.SOSAhasResult.numericValue")

In [9]:
display(AvroDFCurated)

SOSAresultTime,UAID,numericValue
2020-10-08T16:22:07.1225069Z,List(P56P-LC-001),List(4.537356879812366)
2020-10-08T16:22:16.6011043Z,List(P52P-LC-001),List(16.993945751802038)
2020-10-08T16:22:21.1100104Z,List(P50P-LC-001),List(94.14628413233267)
2020-10-08T16:22:37.6250274Z,List(P42P-LC-001),List(34.74326242447982)
2020-10-08T16:22:42.1190175Z,List(P40P-LC-001),List(11.89560080501046)
2020-10-08T16:22:55.6006928Z,List(P56P-LC-001),List(78.85501746966271)
2020-10-08T16:22:55.6006928Z,List(P52P-LC-001),List(46.2642136710995)
2020-10-08T16:22:55.6006928Z,List(P50P-LC-001),List(86.12889190489841)
2020-10-08T16:22:55.6006928Z,List(P42P-LC-001),List(55.19413792304422)
2020-10-08T16:22:55.6006928Z,List(P40P-LC-001),List(95.05881615684314)


In [10]:
AvroDFCurated.createOrReplaceTempView("loadtable")

In [11]:
%sql 
select * from loadtable

SOSAresultTime,UAID,numericValue
2020-10-08T16:22:14.6127573Z,List(P53P-LC-001),List(39.735485445584864)
2020-10-08T16:22:27.1241787Z,List(P47P-LC-001),List(25.921665050984206)
2020-10-08T16:22:31.6205762Z,List(P45P-LC-001),List(80.43858566341856)
2020-10-08T16:22:36.1158037Z,List(P43P-LC-001),List(80.12021988635894)
2020-10-08T16:22:48.1111512Z,List(P38P-LC-001),List(66.41252141744481)
2020-10-08T16:22:03.1009060Z,List(A58P-LC-001),List(53.00851392234141)
2020-10-08T16:22:55.6006928Z,List(A58P-LC-001),List(28.385592358366395)
2020-10-08T16:22:55.6006928Z,List(P53P-LC-001),List(68.3970357609899)
2020-10-08T16:22:55.6006928Z,List(P47P-LC-001),List(19.7304256352272)
2020-10-08T16:22:55.6006928Z,List(P45P-LC-001),List(75.4647814554464)


In [12]:
%sql
--clean table load
CREATE OR REPLACE TEMPORARY VIEW cleanloadtable as SELECT REPLACE( REPLACE(CAST(numericValue AS VARCHAR(50)), '[', '' ), ']', '' ) AS loadValue, REPLACE( REPLACE( REPLACE(CAST(UAID AS VARCHAR(50)), '[', '' ), ']', '' ), '"', '' ) AS deviceID, SOSAresultTime AS loadTimestamp FROM loadtable

In [13]:
%sql
select * from cleanloadtable

loadValue,deviceID,loadTimestamp
4.537356879812366,P56P-LC-001,2020-10-08T16:22:07.1225069Z
16.993945751802038,P52P-LC-001,2020-10-08T16:22:16.6011043Z
94.14628413233268,P50P-LC-001,2020-10-08T16:22:21.1100104Z
34.74326242447982,P42P-LC-001,2020-10-08T16:22:37.6250274Z
11.89560080501046,P40P-LC-001,2020-10-08T16:22:42.1190175Z
78.85501746966271,P56P-LC-001,2020-10-08T16:22:55.6006928Z
46.2642136710995,P52P-LC-001,2020-10-08T16:22:55.6006928Z
86.12889190489841,P50P-LC-001,2020-10-08T16:22:55.6006928Z
55.19413792304422,P42P-LC-001,2020-10-08T16:22:55.6006928Z
95.05881615684314,P40P-LC-001,2020-10-08T16:22:55.6006928Z
