<a href="https://colab.research.google.com/github/anuthereaper/PythonLibrary/blob/main/Eventhubsend%2Breceive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install azure-eventhub

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
import time
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError

CONNECTION_STR = "Endpoint=sb://"
EVENTHUB_NAME = "xxxxxxxxxx"


def send_event_data_batch(producer,message):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(message))
    producer.send_batch(event_data_batch)


def send_event_data_batch_with_limited_size(producer,max_size):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch_with_limited_size = producer.create_batch(max_size_in_bytes=max_size)

    while True:
        try:
            event_data_batch_with_limited_size.add(EventData('Message inside EventBatchData'))
        except ValueError:
            # EventDataBatch object reaches max_size.
            # New EventDataBatch object can be created here to send more data.
            break

    producer.send_batch(event_data_batch_with_limited_size)


def send_event_data_batch_with_partition_key(producer,message,key):
    # Specifying partition_key.
    event_data_batch_with_partition_key = producer.create_batch(partition_key=key)
    event_data_batch_with_partition_key.add(EventData(message))

    producer.send_batch(event_data_batch_with_partition_key)

def send_event_data_batch_with_partition_id(producer,message,id):
    # Specifying partition_id.
    event_data_batch_with_partition_id = producer.create_batch(partition_id=id)
    event_data_batch_with_partition_id.add(EventData(message))

    producer.send_batch(event_data_batch_with_partition_id)

def send_event_data_batch_with_properties(producer,message):
    event_data_batch = producer.create_batch()
    event_data = EventData(message)
    event_data.properties = {'prop_key': 'prop_value'}
    event_data_batch.add(event_data)
    producer.send_batch(event_data_batch)

def send_event_data_list(producer):
    # If you know beforehand that the list of events you have will not exceed the
    # size limits, you can use the `send_batch()` api directly without creating an EventDataBatch

    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.

    event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
    try:
        producer.send_batch(event_data_list)
    except ValueError:  # Size exceeds limit. This shouldn't happen if you make sure before hand.
        print("Size of the event data list exceeds the size limit of a single send")
    except EventHubError as eh_err:
        print("Sending error: ", eh_err)


producer = EventHubProducerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    eventhub_name=EVENTHUB_NAME
)

start_time = time.time()
with producer:
    send_event_data_batch(producer,'Single message')
    send_event_data_batch_with_limited_size(producer,1000)
    send_event_data_batch_with_partition_key(producer,'Message will be sent to a partition determined by the partition key','pkey')
    send_event_data_batch_with_partition_id(producer,'Message will be sent to target-id partition',0)
    send_event_data_batch_with_properties(producer,'Message with properties')
    send_event_data_list(producer)

print("Send messages in {} seconds.".format(time.time() - start_time))

Send messages in 13.098610162734985 seconds.


In [None]:
# Event hub receiver

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = "Endpoint=sb://xxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=master;SharedAccessKey=xxxxxxxx"
EVENTHUB_NAME = "anueventhub"


def on_event(partition_context, event):
    # Put your code here.
    # If the operation is i/o intensive, multi-thread will have better performance.
#    print("Received event from partition: {}.".format(partition_context.partition_id))
    print("Received the event: {} from the partition with ID: {}".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))


def on_partition_initialize(partition_context):
    # Put your code here.
    print("Partition: {} has been initialized.".format(partition_context.partition_id))


def on_partition_close(partition_context, reason):
    # Put your code here.
    print("Partition: {} has been closed, reason for closing: {}.".format(
        partition_context.partition_id,
        reason
    ))


def on_error(partition_context, error):
    # Put your code here. partition_context can be None in the on_error callback.
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))


if __name__ == '__main__':
    consumer_client = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        consumer_group='$Default',
        eventhub_name=EVENTHUB_NAME,
    )

    try:
        with consumer_client:
            consumer_client.receive(
                on_event=on_event,
                on_partition_initialize=on_partition_initialize,
                on_partition_close=on_partition_close,
                on_error=on_error,
                starting_position="-1",  # "-1" is from the beginning of the partition.
            )
    except KeyboardInterrupt:
        print('Stopped receiving.')