# Time Series Persister
Usage:
- Copy `TimeSeriesPersister.env.sample` to `TimeSeriesPersister.env`
- Edit `TimeSeriesPersister.env` to match your environment
- Execute the cells but only one of the _setup_ cells

Tip: If you encounter problems running cells with spark

**Read Configuration From `.env` File**

In [None]:
#r "nuget: dotenv.net, 3.0.0"
#r "nuget:Microsoft.Spark"
#r "nuget: Microsoft.Spark.Extensions.Delta, 2.1.0"
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using Microsoft.Spark.Sql.Types;
using Microsoft.Spark.Extensions.Delta;
using static Microsoft.Spark.Sql.Functions;
using System.Text.RegularExpressions;
using dotenv.net;

var envVars = DotEnv.Fluent().WithEnvFiles("./TimeSeriesPersister.env").Read();

// Event hub (Kafka source)
var eventHubConnectionString = envVars["EVENTHUB_CONNECTIONSTRING"];
var eventHubsNamespace = Regex.Match(eventHubConnectionString, "sb://([^.]+).").Groups[1].Value;
var eventHubsInstance = Regex.Match(eventHubConnectionString, "EntityPath=(.+)$").Groups[1].Value;

// Delta Table (sink)
var storageAccountKey = envVars["STORAGE_ACCOUNT_KEY"];
var storageAccountName = envVars["STORAGE_ACCOUNT_NAME"];
var delta_lake_container_name = envVars["DELTA_LAKE_CONTAINER_NAME"];
var blobName = envVars["BLOB_NAME"];

Loading extensions from `Microsoft.Data.Analysis.Interactive.dll`

**Setup - local file system**

In [None]:
var path = "/workspaces/geh-timeseries/source/notebooks/__storage__/local_file_system/unprocessed-time-series";

var spark = SparkSession
    .Builder()
    .GetOrCreate();

**Setup - Azure blob storage gen 2 (abfs)**

In [None]:
var path = $"abfss://{delta_lake_container_name}@{storageAccountName}.dfs.core.windows.net/{blobName}";

var spark = SparkSession
    .Builder()
    // Support Azure blob storage gen 2
    .Config($"fs.azure.account.key.{storageAccountName}.dfs.core.windows.net", storageAccountKey)
    .GetOrCreate();

**Setup - Azurite (wasb)**

In [None]:
var path = "unprocessed_time_series";

var spark = SparkSession
    .Builder()
    // Support Azurite
    .Config("spark.hadoop.fs.defaultFS", "wasb://container@azurite")
    .Config("spark.hadoop.fs.azure.storage.emulator.account.name", "azurite")
    .GetOrCreate();

**Show Received Time Series**

In [None]:
var receivedTimeSeries = spark.Read().Format("delta").Load(path);

receivedTimeSeries.PrintSchema();
receivedTimeSeries.Show(100);

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- time-series: string (nullable = true)

+----+-----+---+--------------------+
|year|month|day|         time-series|
+----+-----+---+--------------------+
|2022|    3| 10|{"Document":{"Id"...|
|2022|    3| 10|{"Document":{"Id"...|
|2022|    3| 10|{"Document":{"Id"...|
|2022|    3| 10|{"Document":{"Id"...|
+----+-----+---+--------------------+



**Execute Job**

In [None]:
// 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide)
string bootstrapServers = $"{eventHubsNamespace}.servicebus.windows.net:9093";
string eh_sasl = $"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{eventHubConnectionString}\";";

var streamingDf = spark
    .ReadStream()
    .Format("kafka")
    .Option("kafka.bootstrap.servers", bootstrapServers)
    .Option("subscribe", eventHubsInstance)
    .Option("kafka.sasl.mechanism", "PLAIN")
    .Option("kafka.security.protocol", "SASL_SSL")
    .Option("kafka.sasl.jaas.config", eh_sasl)
    .Option("kafka.request.timeout.ms", "60000")
    .Option("kafka.session.timeout.ms", "60000")
    //.Option("failOnDataLoss", "false")
    //.Option("checkpointLocation", "/tmp/kafka_cp.txt")
    .Load()
    .WriteStream()
    .Trigger(Trigger.ProcessingTime(2000))
    .ForeachBatch((df, id) =>
    {
        df = df
            .WithColumn("year", Functions.Year(df["timestamp"]))
            .WithColumn("month", Functions.Month(df["timestamp"]))
            .WithColumn("day", Functions.DayOfMonth(df["timestamp"]))
            .WithColumn("time-series", Functions.Col("value").Cast("string"))
            .Select("year", "month", "day", "time-series");

        df.PrintSchema();
        df.Show();

        df
            .Write()
            .PartitionBy("year", "month", "day")
            .Format("delta")
            .Mode(SaveMode.Append)
            .Save(path);
    })
    .Start();

streamingDf.AwaitTermination();

[2022-03-10T12:51:47.8331646Z] [a901b8652663] [Info] [CallbackServer] Starting CallbackServer.
[2022-03-10T12:51:47.8390189Z] [a901b8652663] [Info] [CallbackServer] Started CallbackServer on 127.0.0.1:46843
[2022-03-10T12:51:54.9541685Z] [a901b8652663] [Info] [CallbackConnection] [1] Connected with RemoteEndPoint: 127.0.0.1:60062
[2022-03-10T12:51:54.9574134Z] [a901b8652663] [Info] [CallbackServer] Pool snapshot: [NumThreads:1], [NumConnections:1]
[2022-03-10T12:51:54.9630403Z] [a901b8652663] [Info] [CallbackConnection] [1] Received request for callback id: 1, callback handler: Microsoft.Spark.Interop.Ipc.ForeachBatchCallbackHandler
[2022-03-10T12:51:54.9644108Z] [a901b8652663] [Debug] [CallbackConnection] [1] Received END_OF_STREAM signal.
root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- time-series: string (nullable = true)

+----+-----+---+-----------+
|year|month|day|time-series|
+----+-----+---+-----------+
+-