## Stream processing

This notebook shows how to connect to an Event Hub, read the stream and process the messages using .NET for Apache Spark on Azure Synapse Analytics.

In order to reproduce a common use case, we have the code for the following steps: 

- Read stream from Event Hubs
- Decompress gzip body
- Apply schema
- Apply stream processing
- (*optional*) Save to Delta table

Please refer to the official documentation for additional details:
- [.NET for Apache Spark](https://dotnet.microsoft.com/apps/data/spark)
- [.NET for Spark on Azure Synapse Analytics](https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/spark-dotnet)
- [.NET APIs for Spark](https://docs.microsoft.com/en-us/dotnet/api/microsoft.spark?view=spark-dotnet)
- [UDFs on .NET for Spark](https://docs.microsoft.com/en-us/dotnet/spark/how-to-guides/udf-guide)


## Parameters
These are just placeholders, the real values can be either inserted here before running the notebook, or even better overwritten using pipeline variables: https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-development-using-notebooks?tabs=preview#assign-parameters-values-from-a-pipeline

In [None]:
var eh_connstr = "<connection string of your Event Hub>";
var eh_consumergroup = "<consumer group of your Event Hub you want to use to receive the messages>";
var eh_data_type = "<can be arbitrary, its main goal is to differentiate between message checkpoint locations>"; //The idea here is to use the type of data you are reading to create the checkpoint location for your delta table.

In [None]:

var eh_checkpoint_location = $"/delta/_checkpoints/{eh_data_type}/";
if(eh_connstr == "placeholder") 
{
    Console.Error.WriteLine("Please insert your EH connection string in the parameters");
}

Console.WriteLine(eh_checkpoint_location);

## Connect to Event Hub
We will now connect to Event Hub and create a streaming dataframe (**streamingDf**). 

We need to manually encrypt the connection string. 


In [None]:
// The connection string must be encrypted

using Microsoft.Spark.Interop;
var eventHubConnectionStringEncrypted = (string)SparkEnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.eventhubs.EventHubsUtils", "encrypt", eh_connstr);

In [None]:
// Connect to EH
var ehConf = new Dictionary<string, string>();
ehConf.Add("eventhubs.connectionString", eventHubConnectionStringEncrypted);
ehConf.Add("eventhubs.consumerGroup", eh_consumergroup);

DataFrame streamingDf = spark
    .ReadStream()
    .Format("eventhubs")
    .Options(ehConf)
    .Load();

## Decompress gzip body

The message body is compressed (gzip). We need to create a UDF to decompress each row in our **streamingDF**.
We need to add the *SharpZipLib* nuget package. [Here](https://documentation.help/ICSharpCode.SharpZipLib/documentation.pdf) you can find more information about the library.


In [None]:
// We use SharpZipLib to decompress the body
#r "nuget:SharpZipLib"

In [None]:
using System.IO;
using ICSharpCode.SharpZipLib.GZip;

// Define an UDF to decompress EH body
public static string DecompressFunction(byte[] data)
{
    using var source = new MemoryStream(data);
    using GZipInputStream zipStream = new GZipInputStream(source);
    using StreamReader sr = new StreamReader(zipStream);
    return sr.ReadToEnd();
}

Func<Column, Column> DecompressAvro = Udf<byte[], string>(DecompressFunction);

In [None]:
// Decompress EH body
var decompDf = streamingDf.WithColumn("DecompressedBody", DecompressAvro(streamingDf["body"]));

## Apply schema to EH messages

The schema of the decompressed body is defined and applied below.


In [None]:
// Define a schema for EH body
using Microsoft.Spark.Sql.Types;
var schema = new StructType(new[]
{
    new StructField("DataType", new StringType()),
    new StructField("MessageContent", new StringType()),
    new StructField("MessageId", new StringType()),
    new StructField("Timestamp", new TimestampType()),
});

In [None]:
// Parse json
var jsonDf = decompDf
    .WithColumn("jsonBody", FromJson(decompDf["DecompressedBody"], schema.SimpleString))
    .Select("enqueuedTime", "jsonBody", "body");

## Process the stream

Extract meaningful data from the decompressed body and add them as separate columns.


In [None]:
// Process stream: Extract meaningful fields
var finalDf = jsonDf
    .WithColumn("MessageId", jsonDf["jsonBody"].GetField("MessageId"))
    .WithColumn("DocType", Lit(eh_data_type))
    .WithColumn("SourceType", jsonDf["jsonBody"].GetField("DataType"))
    .WithColumn("TimeStamp", jsonDf["jsonBody"].GetField("Timestamp"));

## Save as a Delta Table
Save the dataframe as a delta table, which will be created if does not exist.


In [None]:
// ToTable API is available from Spark 3.1
var deltaStream = finalDf
    .WriteStream()
    .Format("delta")
    .OutputMode("append")
    .Option("checkpointLocation", eh_checkpoint_location)
    .ToTable("deltatablesample");

In [None]:
spark.Sql("SELECT * FROM deltatablesample LIMIT 10").Show();

## Debug

The following cells can be used to verify if the data is correclty streamed and manipulated in any of the steps above.

Just add those 2 cells after the manipulation you need to test, replace the name of the DataStreamWriter (here *finalDf*) with the one you want to test, and run both cells. Please notice that the second one might look like it's empty in the beginning, but it just needs few seconds before being able to show the actual results. 


In [None]:
// DEBUG
var sQuery = finalDf.WriteStream().Format("memory").QueryName("finalDf").Start();

In [None]:
// DEBUG
spark.Sql("select * from finalDf").Show();