# Event Hub Avro desirialization

hyssh@microsoft.com


## Reference code sample 

Ref 1: https://stackoverflow.com/questions/72417454/azure-schema-registry-spark-structured-streaming-kafka-eventhub-compatibilit

Ref 2: https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro

In [1]:
import os

from azure.identity import DefaultAzureCredential

# from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer


# from pyspark.sql.avro.functions import from_avro, to_avro


## Get Environment variables

Get the variables from [Spark configuration](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-azure-create-spark-configuration)

In [3]:
os.environ["AZURE_CLIENT_ID"] = spark.sparkContext.environment.get("AZURE_CLIENT_ID")
os.environ["AZURE_TENANT_ID"] = spark.sparkContext.environment.get("AZURE_TENANT_ID")
os.environ["AZURE_CLIENT_SECRET"] = spark.sparkContext.environment.get("AZURE_CLIENT_SECRET")
os.environ["EVENT_HUB_CONN_STR"] = spark.sparkContext.environment.get("EVENT_HUB_CONN_STR")
os.environ["EVENT_HUB_CONN_STR_ENT"] = spark.sparkContext.environment.get("EVENT_HUB_CONN_STR_ENT")
os.environ["EVENT_HUB_CONN_STR_LISTEN"] = spark.sparkContext.environment.get("EVENT_HUB_CONN_STR_LISTEN")

SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = "shin-eventhub-ns.servicebus.windows.net"
EVENTHUB_NAME="transactions"


## Define connection string

In [7]:
ehConf = {}

# For versions before 2.3.15, set the connection string without encryption
# ehConf['eventhubs.connectionString'] = os.environ["EVENT_HUB_CONN_STR"]

# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(os.environ["EVENT_HUB_CONN_STR_ENT"])

# Confrim the consumer group from Event Hub
# https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups
ehConf['eventhubs.consumerGroup'] = "spark"

## Create UDF

Create a UDF to desrialize Avro message using Avro Serializer that supported SchemaRegistry

In [8]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def deserializeBody(encodedBody):    

    token_credential = DefaultAzureCredential()

    schema_registry_client = SchemaRegistryClient("shin-eventhub-ns.servicebus.windows.net", token_credential)
    avro_serializer = AvroSerializer(client=schema_registry_client, group_name="tranxs")

    return avro_serializer.deserialize(encodedBody)

deserializedBody_udf = udf(deserializeBody, StringType())

## (Optional test) Get schema from schema SchemaRegistry


In [10]:
token_credential = DefaultAzureCredential()

schema_registry_client = SchemaRegistryClient(SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, token_credential)
jsonFormatSchema = schema_registry_client.get_schema("71e03f67bd694b30b6dad6dae1fb8d86") 
print(jsonFormatSchema)

# bytes_payload = b"".join(b for b in event.body)
# deserialized_data = avro_serializer.deserialize(bytes_payload)

## Read streaming data from Event Hubs

In [11]:
# Simple batch query
df = spark.readStream.format("eventhubs").options(**ehConf).load()

# df = df.outputMode("append").format("console").start().awaitTermination()
df.printSchema()

## Write streaming data to ADLS as Parquet

In [12]:
# Save stream as parquet
ds1 = df.writeStream.format("parquet")\
    .option("path", "abfss://dev-synapse@hyundevsynapsestorage.dfs.core.windows.net/streaming")\
    .option("checkpointLocation", "abfss://dev-synapse@hyundevsynapsestorage.dfs.core.windows.net/streaming_checkpoint")\
    .trigger(processingTime='10 seconds')\
    .start()

## The result wont be seen in the output of this cell
## 
ds2 = df.select("body").writeStream.outputMode("append").format("console").start()#.awaitTermination()

# ds.start().awaitTermination()
#.writeStream.outputMode("append").format("console").start().awaitTermination()


In [16]:
print(ds1.status)
print(ds2.status)

In [17]:
print(ds1.stop())
print(ds2.stop())

In [18]:
filedf = spark.read.load('abfss://dev-synapse@hyundevsynapsestorage.dfs.core.windows.net/streaming/*.parquet', format='parquet')
print(filedf.count())

## The messages are saved

The column body contains the unreadable raw data

In [20]:
filedf.show()

## Test UDF 1

Use UDF to deserialize body in the file

In [21]:
filedf = filedf.withColumn("decodedBody", deserializedBody_udf(filedf["body"]))
filedf.show()

## Test 2. Use udf before wright data in ADLS

In stread of dederiailze body reading data that saved in ADLS, we can try to deserialize body right before written in ADLS

In [47]:
# see result in the console
# output = df.select("value")
# output.printSchema()

# df.writeStream.outputMode("append").format("concole").option("truncate", False).start().awaitTermination()

In [None]:
# ds3 = df.select("body")#writeStream.outputMode("append").format("console").start().awaitTermination(10)
# ds3 = ds3.withColumn("decodedBody", deserializedBody_udf(ds3["body"]))
# ds3.writeStream.outputMode("append").format("console").start().awaitTermination(300)

Use `deserializeBody_udf` to deserialize the body and save the result as Parquet

In [22]:
ds4 = df.withColumn("decodedBody", deserializedBody_udf(df["body"])).writeStream.format("parquet")\
    .option("path", "abfss://dev-synapse@hyundevsynapsestorage.dfs.core.windows.net/streamingAfterdecode")\
    .option("checkpointLocation", "abfss://dev-synapse@hyundevsynapsestorage.dfs.core.windows.net/streamingAfterdecode_checkpoint")\
    .start()
    # .trigger(processingTime='10 seconds')\
    # .start()


In [23]:
print(ds4.status)

### See the result

In [27]:
afterDecodedf = spark.read.load('abfss://dev-synapse@hyundevsynapsestorage.dfs.core.windows.net/streamingAfterdecode/*.parquet', format='parquet')
print(afterDecodedf.count())

In [28]:
afterDecodedf.count()

In [29]:
afterDecodedf.select("body", "decodedBody").show()

In [30]:
ds4.status

In [31]:
ds4.stop()

## End of Notebook