# Azure Event Hub

## What is Azure Event Hub ?


Microsoft Azure Event Hub is a cloud-based service that is used as a big data platform and event ingestion service by processing millions of events in seconds.  Data generated or stored in the source system can be sent to the event hub and the user can perform the necessary transformation on data and finally, it can be stored using real-time ingestion technique or by batch/storage processes. The azure event hub is used for anomaly detection, application logging, or applications where real-time data is needed like live dashboarding, etc.

## How does Azure Event Hub work?

Azure event hub processes data easily in real-time so that users can get more insight into data. It uses a distributed processing platform used in Hadoop to process data with low latency and has integration with data analytics services. Event hubs are like a “front door” to the event pipeline and act as an event ingester. Event ingester lies in between the event punisher and consumer. It is a unified streaming service to decouple the producer from the event consumer.


- Event producers: It is an endpoint that engages customers with event hubs using the HTTP or Apache Kafka protocols. Any type of data sent to the hub is first published using the event publisher/producer.

- Partitions: Event hub streams a message which is partitioned so that based on partition consumers can only read the particular subset of the partition of the streamed message.

- Consumer groups: Event hub follows the mechanism of publishing and subscribing and this is enabled in the event hub using the consumer group. It provides the state, position, and offset view of the event hub. Based on the subscription of the consumer group they can view the event stream in the event hub. Consumer groups can read or view the stream based on their pace and offset.

- Throughput units: To control the throughput capacity users can pre-purchase the units of capacity in the Azure Event Hub.

- Event Receiver: it is an entity used to read the data from the event hub. AMQP 1.0 sessions are used to connect the All event hub consumers.  This session is used to deliver Event hub services as soon as they are available. For real-time streaming and ingestion of data, Apache Kafka uses Kafka consumers which are connected using Kafka protocol 1.0 or later.


### >> Install the Azure EventHub package.

In [None]:
pip install azure-eventhub

### >> To receive events using Azure Blob Storage.

In [None]:
pip install azure-eventhub-checkpointstoreblob-aio

## 1. Asynchronously

### >> To Send Events to Azure Event Hub

In [1]:
# Import Libraries
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

In [None]:
# Connection Strings and Names
EH_conn_string = "Endpoint=sb://eh-testns.servicebus.windows.net/;SharedAccessKeyName=EHTestPolicyName;SharedAccessKey=Bw2rPoQ64RDxUOqe9W0ekM3l+WGSehG84vbr8TW8AEo=;EntityPath=ehtestname"
EH_Name = "ehtestname"
async def run():
    # Create a producer client to send messages to the event hub
    producer = EventHubProducerClient.from_connection_string(conn_str=EH_conn_string_, eventhub_name=EH_Name)
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('This is First event '))
        event_data_batch.add(EventData('This is Second event'))
        event_data_batch.add(EventData('This is Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)
        
loop = asyncio.get_event_loop()
loop.run_until_complete(run())

## >> To Receive Event Data

In [2]:
# Import Libraries
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

In [None]:
EH_conn_string = "Endpoint=sb://eh-testns.servicebus.windows.net/;SharedAccessKeyName=EHTestPolicyName;SharedAccessKey=Bw2rPoQ64RDxUOqe9W0ekM3l+WGSehG84vbr8TW8AEo=;EntityPath=ehtestname"
EH_Name = "ehtestname"
BlobStorageConnString = "DefaultEndpointsProtocol=https;AccountName=teststorageaccname;AccountKey=Lpf3kfsOriMMXDpaoQo/YHDrgLK/oABnoxW7/yKEltn5f6XrryXCFDZ0HDmMUnifVgg21BjTyfYA+AStURZrUQ==;EndpointSuffix=core.windows.net"
ContainerName = "testcontainername"

async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string(BlobStorageConnString, ContainerName)

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string(EH_conn_string, consumer_group="$Default", eventhub_name=EH_Name, checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main())

## 2. Synchrounously

#### >> Import Libraries

In [2]:
# Import Libraries
from azure.eventhub import EventHubProducerClient
from azure.eventhub import EventHubConsumerClient
from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub import EventData
import os
import time
import logging

#### >> Constants

In [3]:
EH_Name_Space = "EH-TestNS"
EH_Shared_Policy = "RootManageSharedAccessKey"
EH_Shared_Access_Key = "UzDpteVNxCzjFtFL2C/uvcGp6B4/1JHe9Oqno88JHj4="
EH_conn_string = "Endpoint=sb://eh-testns.servicebus.windows.net/;SharedAccessKeyName=EHTestPolicyName;SharedAccessKey=Bw2rPoQ64RDxUOqe9W0ekM3l+WGSehG84vbr8TW8AEo=;EntityPath=ehtestname"
EH_Name = "ehtestname"

#### >> Create Event Producer Client (legacy method)

In [4]:
credential = EventHubSharedKeyCredential(EH_Shared_Policy, EH_Shared_Access_Key)
producer = EventHubProducerClient(
fully_qualified_namespace=EH_Name_Space,
eventhub_name=EH_Name,
credential=credential
)

#### >> Create Event Producer Client (from_connection_string())

In [20]:
# Create Producer Client

# event_hub_connection_string  = os.environ['Endpoint=sb://eh-testns.servicebus.windows.net/;SharedAccessKeyName=EHTestPolicyName;SharedAccessKey=Bw2rPoQ64RDxUOqe9W0ekM3l+WGSehG84vbr8TW8AEo=;EntityPath=ehtestname']
# event_hub_name = os.environ['ehtestname']

producer = EventHubProducerClient.from_connection_string(
conn_str=EH_conn_string,
eventhub_name=EH_Name
)

#### >> Create Event Consumer Client (Legacy Method)

In [6]:
credential = EventHubSharedKeyCredential(EH_Shared_Policy, EH_Shared_Access_Key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=EH_Name_Space,
eventhub_name=EH_Name,
credential=credential,
consumer_group='$Default'
)

#### >> Create Event Consumer Client

In [35]:
# Create Consumer Client
consumer = EventHubConsumerClient.from_connection_string(
conn_str=EH_conn_string,
eventhub_name=EH_Name,
consumer_group='$Default',
)

#### >> Create Event Data

In [19]:
# Send and Receive Events

# Event Data
event_data1 = EventData("String Data")
event_data2 = EventData(b"Bytes Data")

# Create Event Batch
event_data_batch = producer.create_batch()

# Add Events to Batch
event_data_batch.add(event_data1)
event_data_batch.add(event_data2)

# Send Single Event
# producer.send_event(event_data1)

# Send Event Batch
producer.send_batch(event_data_batch)

#### >> Receive Event Data

In [None]:
# Receive Events

def on_event(partition_context, event):
    print(partition_context.partition_id)
    
consumer.receive(on_event=on_event)