# Receive Data from Eventhub, Filter Alerts, and Send Back

In [None]:
spark_release='spark-3.4.2'
hadoop_version='hadoop3'

import os, time
start=time.time()
os.environ['SPARK_RELEASE']=spark_release
os.environ['HADOOP_VERSION']=hadoop_version
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_release}-bin-{hadoop_version}"

In [None]:
# Run below commands in google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install Java8
!wget -q http://apache.osuosl.org/spark/${SPARK_RELEASE}/${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # download spark-3.3.X
!tar xf ${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # unzip it
!pip install -q findspark # install findspark

In [None]:
!pip install faker pysqlite3
!pip install mysql.connector
!pip install pyspark



## Spark

In [None]:
# Run below commands in google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install Java8
!wget -q http://apache.osuosl.org/spark/${SPARK_RELEASE}/${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # download spark-3.3.X
!tar xf ${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # unzip it
!pip install -q findspark # install findspark

tar: spark-3.3.3-bin-hadoop3.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [None]:
import multiprocessing
import multiprocessing
import pyspark
import socket
import uuid
import findspark
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from pyspark.sql.streaming import DataStreamReader

import sqlite3
from faker import Faker
import random
import datetime
import json
fake=Faker()

In [None]:
# findspark find your Spark Distribution and sets necessary environment variables
findspark.init()

In [None]:
# Check the pyspark version
print(pyspark.__version__)

3.5.0


In [None]:
# Create a DStream that will connect to hostname:port, like localhost:9999
# if doing this over a network, firewalls may block the connection!
hostname=socket.gethostname()

hostname

'15d1e066aaaa'

In [None]:
app_id=str(uuid.uuid1())

app_id

'89ab8e7c-a95d-11ee-91eb-0242ac1c000c'

In [None]:
conf = SparkConf()

conf.setAll([
     ('spark.app.name', app_id),
     ('spark.shuffle.useOldFetchProtocol', 'true'),
     ('spark.testing', 'true'), # Avoid minimum 450M executor/driver memory https://www.waitingforcode.com/apache-spark/troubleshooting-system-memory-must-be-at-least-error/read / https://programmerclick.com/article/72821685476/
     ('spark.driver.allowMultipleContexts','true'), # https://stackoverflow.com/a/41591258 This option is used only for Spark internal tests and is not to be used in production.
     ('spark. y', '100M'),
     # ('spark.driver.memory ', '200M'),
     # ('spark.executor.instances',1), # This property is no longer used in Spark 2+
     # number of executors is determined as: floor(spark.cores.max / spark.executor.cores)
     ("spark.executor.cores",1), # cores per executor. https://stackoverflow.com/questions/39399205/spark-standalone-number-executors-cores-control/39400195#39400195
     ("spark.cores.max", 2), # the maximum amount of CPU cores to request for the application from across the cluster (not from each machine)
     ('spark.submit.deployMode', 'client'), # client, cluster
     ('spark.ui.showConsoleProgress', 'true'),
     ("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") ])

<pyspark.conf.SparkConf at 0x7a3103a38be0>

In [None]:
end=time.time()


f'Spark setup time: {int(end-start)} seconds'

'Spark setup time: 110 seconds'

## Get Data from Eventhub
1. Set-up PySpark environment for Kafka/Eventhub integration
2. Set-up configuration for Kafka-Eventhub Connection
3. Reading from Event Hub
4. Querying the Streaming Data



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType, StringType
from azure.eventhub import EventHubProducerClient, EventData

# Initialize Spark Session
spark = SparkSession \
    .builder \
    .appName("StreamingFromEventHub") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

# Kafka Configuration for reading from Event Hub
kafkaConf = {
    "kafka.bootstrap.servers": "eventhubname.servicebus.windows.net:9093",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://your_address.servicebus.windows.net/;SharedAccessKeyName=YOUR_ACCESS_KEY;SharedAccessKey=YOUR_ACCESS_KEY;EntityPath=TOPIC";',
    "subscribe": "TOPIC",
    "group.id": "CONSUMER_GROUP",
    "startingOffsets": "earliest"
}

# Schema definition for glucose readings
glucose_schema = StructType([
    StructField("patient_id", IntegerType(), True),
    StructField("device_id", IntegerType(), True),
    StructField("glucose_level", FloatType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("location", StringType(), True)
])


 # Read from Event Hub using Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafkaConf) \
    .load() \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(from_json("json", glucose_schema).alias("data")) \
    .select("data.*")

# Output to console
query = df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("table") \
    .start()

spark.stop()

## Filter Data for Alerts
1. Filter data (check for conditions):
    Condition:
    * Glucose Reading > 115 OR
    * Rolling Window Average of 10 readings > 105

2. Send filtered data to special Eventhub

In [None]:
# Define the window for rolling average calculation, partitioned by patient_id
windowSpec = Window.partitionBy("patient_id").orderBy("timestamp").rowsBetween(-9, 0)

# Calculate the rolling average and count of readings per patient
df_with_rolling_avg = df.withColumn("rolling_avg", avg("glucose_level").over(windowSpec)) \
                        .withColumn("reading_count", count("glucose_level").over(windowSpec))

# Filter based on the conditions
filtered_df = df_with_rolling_avg.filter(((col("glucose_level") > 115) | (col("rolling_avg") > 105)) & (col("reading_count") >= 10))


In [None]:
#send data as json

from azure.eventhub import EventHubProducerClient, EventData

connection_string = CONNECTION_STRING

def send_to_eventhub_batch(batch_df, batch_id):
    if not batch_df.rdd.isEmpty():
        producer = EventHubProducerClient.from_connection_string(
            conn_str=connection_string,
            eventhub_name="alerts"
        )
        with producer:
            event_data_batch = producer.create_batch()
            for row in batch_df.collect():
                event_data = EventData(str(row.asDict()))
                try:
                    # Add the event to the batch
                    event_data_batch.add(event_data)
                except ValueError:
                    # The batch is full, send it and start a new batch
                    producer.send_batch(event_data_batch)
                    event_data_batch = producer.create_batch()
                    event_data_batch.add(event_data)  # Add the event to the new batch
            # Send any remaining events in the batch
            if len(event_data_batch) > 0:
                producer.send_batch(event_data_batch)


# Streaming query (treating the static DataFrame as a stream)
query = filtered_df.writeStream \
    .foreachBatch(send_to_eventhub_batch) \
    .start()\
    .awaitTermination()