Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Final README update #8430

Merged
merged 7 commits into from Nov 6, 2019
@@ -86,7 +86,7 @@ async def do_operation(events):
async def process_events(partition_context, events):
await do_operation(events)
partition_context.update_checkpoint(events[-1])
await partition_context.update_checkpoint(events[-1])
async def main():
storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, storage_container_name)
@@ -116,7 +116,7 @@ Refer to [Logging](#logging) to enable loggers for related libraries.

### Documentation

Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html
Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager)

### Logging

@@ -114,7 +114,7 @@ Refer to [Logging](#logging) to enable loggers for related libraries.

### Documentation

Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html
Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html#azure.eventhub.PartitionManager)

### Logging

@@ -57,7 +57,7 @@ from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
```

@@ -128,7 +128,7 @@ from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
partition_ids = client.get_partition_ids()
```

@@ -143,18 +143,13 @@ from azure.eventhub import EventHubProducerClient, EventData
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
try:
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))
with client:
client.send(event_list)
except:
raise
finally:
pass
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))
with client:
client.send(event_list)
```

#### Send a batch of events
@@ -164,25 +159,20 @@ Events may be added to the `EventDataBatch` using the `try_add` method until the
```python
from azure.eventhub import EventHubProducerClient, EventData
try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
event_data_batch = client.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
event_data_batch = client.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
with client:
client.send(event_data_batch)
except:
raise
finally:
pass
with client:
client.send(event_data_batch)
```

### Consume events from an Event Hub
@@ -195,22 +185,17 @@ from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
logger = logging.getLogger("azure.eventhub")
def on_events(partition_context, events):
logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id))
try:
with client:
client.receive(on_events=on_events, consumer_group="$Default")
# receive events from specified partition:
# client.receive(on_events=on_events, consumer_group="$Default", partition_id='0')
except:
raise
finally:
pass
with client:
client.receive(on_events=on_events, consumer_group="$Default")
# receive events from specified partition:
# client.receive(on_events=on_events, consumer_group="$Default", partition_id='0')
```

### Async publish events to an Event Hub
@@ -219,54 +204,59 @@ Publish events to an Event Hub asynchronously.

#### Send a single event or an array of events
```python
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
try:
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))
async with client:
await client.send(event_list) # Send a list of events
await client.send(EventData(b"A single event")) # Send a single event
except:
raise
finally:
pass
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))
async def send():
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
async with client:
await client.send(event_list) # Send a list of events
await client.send(EventData(b"A single event")) # Send a single event
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method.
Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
async def create_batch(client):
event_data_batch = await client.create_batch(max_size=10000)
can_add = True
This conversation was marked as resolved by annatisch

This comment has been minimized.

Copy link
@annatisch

annatisch Nov 5, 2019

Contributor

All of these snippets have empty error handling:

except:
	raise
finally:
    pass

We should either show actual error handling, or just remove these altogether.

This comment has been minimized.

Copy link
@yunhaoling

yunhaoling Nov 5, 2019

Author Member

remove all the try, except and finally.
and make all these samples can run directly.

while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
return event_data_batch
async def send():
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
batch_data = await create_batch(client)
async with client:
await client.send(event_data_batch)
except:
raise
finally:
pass
await client.send(batch_data)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
```

### Async consume events from an Event Hub
@@ -275,26 +265,27 @@ Consume events asynchronously from an EventHub.

```python
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
logger = logging.getLogger("azure.eventhub")
async def on_events(partition_context, events):
logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id))
try:
async def receive():
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
async with client:
received = await client.receive(on_events=on_events, consumer_group='$Default')
# receive events from specified partition:
# received = await client.receive(on_events=on_events, consumer_group='$Default', partition_id='0')
except:
raise
finally:
pass
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())
```

### Consume events using a partition manager
@@ -330,28 +321,39 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManage
RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
blob_name_str = '<<STRING FOR THE BLOB NAME>>'
async def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)
async def process_events(partition_context, events):
await asyncio.gather(*[do_operation(event) for event in events])
await partition_context.update_checkpoint(events[-1])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str)
partition_manager = BlobPartitionManager(container_client=container_client)
client = EventHubConsumerClient.from_connection_string(connection_str, partition_manager=partition_manager, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)
async def receive(client):
try:
loop.run_until_complete(client.receive(process_events, "$default"))
await client.receive(on_events=process_events, consumer_group="$Default")
except KeyboardInterrupt:
loop.run_until_complete(client.close())
finally:
loop.stop()
await client.close()
async def main():
container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str)
partition_manager = BlobPartitionManager(container_client)
client = EventHubConsumerClient.from_connection_string(
connection_str,
event_hub_path=event_hub_path,
partition_manager=partition_manager, # For load balancing and checkpoint. Leave None for no load balancing
)
async with client:
await receive(client)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```

### Use EventHubConsumerClient to work with IoT Hub
@@ -385,22 +387,22 @@ The Event Hubs APIs generate the following exceptions.
For instance, this error is raised if you try to send an EventData that is already sent.
- **EventDataSendError:** The Eventhubs service responds with an error when an EventData is sent.
- **OperationTimeoutError:** EventHubConsumer.send() times out.
- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the above mentioned errors.
- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the errors described above.

## Next steps

### Examples

These are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples) in our repo demonstrating the usage of the library.
There are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples) in our repo demonstrating the usage of the library.

- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/send.py) - use EventHubProducerClient to publish events
- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/recv.py) - use EventHubConsumerClient to consume events
- [./samples/async_examples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py) - async/await support of a EventHubProducerClient
- [./samples/async_examples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py) - async/await support of a EventHubConsumerClient
- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/send.py) - use EventHubProducerClient to publish events
- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv.py) - use EventHubConsumerClient to consume events
- [./samples/async_samples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/send_async.py) - async/await support of a EventHubProducerClient
- [./samples/async_samples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_async.py) - async/await support of a EventHubConsumerClient

### Documentation
This conversation was marked as resolved by yunhaoling

This comment has been minimized.

Copy link
@annatisch

annatisch Nov 5, 2019

Contributor

We should hide this URL - something like:

Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html)

This comment has been minimized.

Copy link
@yunhaoling

yunhaoling Nov 5, 2019

Author Member

hided


Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html.
Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html).

### Logging

@@ -423,4 +425,4 @@ PR appropriately (e.g., label, comment). Simply follow the instructions provided
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/README.png)
![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/README.png)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.