In [80]:
# Review - Capture Event Hubs data in Azure Storage and read it by using Python (azure-eventhub)
# https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-python

# Finish eventhub message loop sink
# https://www.sqler.com/board_Azure/1102215

## install packages

- `pip install avro-python3`
- `pip install azure-storage-blob`
- `pip install python-dotenv`

In [82]:
import os
import json
from avro.datafile import DataFileReader
from avro.io import DatumReader

In [83]:
# Read avro sample file from data directory
sample_file = 'data/event_hub_sample.avro'
reader = DataFileReader(open(sample_file, 'rb'), DatumReader())

In [84]:
# Print meta data and schema
reader.meta

{'avro.codec': b'null',
 'avro.schema': b'{"type":"record","name":"EventData","namespace":"Microsoft.ServiceBus.Messaging","fields":[{"name":"SequenceNumber","type":"long"},{"name":"Offset","type":"string"},{"name":"EnqueuedTimeUtc","type":"string"},{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes","null"]}},{"name":"Body","type":["null","bytes"]}]}'}

In [85]:
lst = []
for reading in reader:
    lst.append(json.loads(reading["Body"]))

json.dumps(lst[0:5])

'[{"product_num": 5, "product_price": 5273, "product_description": "1pq5EDoT5u", "product_production_dt": "2021-10-28 03:15:38.314706"}, {"product_num": 8, "product_price": 6028, "product_description": "zl8E2UIgHz", "product_production_dt": "2021-10-28 03:15:39.316801"}, {"product_num": 6, "product_price": 448, "product_description": "qlfqPcJkbI", "product_production_dt": "2021-10-28 03:15:40.319594"}, {"product_num": 9, "product_price": 6313, "product_description": "Ys1xvquiuV", "product_production_dt": "2021-10-28 03:15:41.322595"}, {"product_num": 8, "product_price": 7685, "product_description": "uhxCO8LFoW", "product_production_dt": "2021-10-28 03:15:42.324536"}]'

# Fetch all avro format event hub blob files from Azure Blob Storage

In [75]:
# Rename ".env_example" file to ".env" then fill out your conenction string values
from dotenv import load_dotenv

%load_ext dotenv
%dotenv

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [76]:
# Get event hub avro files from azure blob storage 
from azure.storage.blob import ContainerClient

AZURE_STORAGE_CONN_STR = os.getenv("AZURE_STORAGE_CONN_STR")
AZURE_CONTAINTER_NAME = os.getenv("AZURE_CONTAINTER_NAME")

container = ContainerClient.from_connection_string(conn_str=AZURE_STORAGE_CONN_STR, container_name=AZURE_CONTAINTER_NAME)
blob_list = container.list_blobs()

In [77]:
lst = []

for blob in blob_list:
    # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
    if blob.size > 508:
        print('Downloaded a non empty blob: ' + blob.name)
        # Create a blob client for the blob.
        blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
        # Construct a file name based on the blob name.
        cleanName = str.replace(blob.name, '/', '_')
        cleanName = os.path.join(os.getcwd(), cleanName)

        with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
            my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
        
        reader = DataFileReader(open(cleanName, 'rb'), DatumReader())

        for reading in reader:
            lst.append(json.loads(reading["Body"]))
        
        reader.close()

        # Remove the downloaded file.
        os.remove(cleanName)
        
        # Delete the blob from the Azure Storage Account container after it's read.
        container.delete_blob(blob.name)

Downloaded a non empty blob: dwevthubns03/dwehub03/0/2021/10/28/04/45/38.avro


In [78]:
# check lst size
len(lst)

300

In [79]:
# show sample result
json.dumps(lst[0:5])

'[{"product_num": 6, "product_price": 8858, "product_description": "LEFub5WKgA", "product_production_dt": "2021-10-28 04:45:38.467946"}, {"product_num": 9, "product_price": 3599, "product_description": "0vwFk4Otb7", "product_production_dt": "2021-10-28 04:45:39.469353"}, {"product_num": 5, "product_price": 182, "product_description": "M9CRcxiIOM", "product_production_dt": "2021-10-28 04:45:40.470992"}, {"product_num": 9, "product_price": 4927, "product_description": "9K7WYBkhBB", "product_production_dt": "2021-10-28 04:45:41.473058"}, {"product_num": 7, "product_price": 8177, "product_description": "s092wnmCFu", "product_production_dt": "2021-10-28 04:45:42.475711"}]'