-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathEventhub_Sender&Receiver_for_ApacheParquetFile
110 lines (77 loc) · 3.24 KB
/
Eventhub_Sender&Receiver_for_ApacheParquetFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
Apache Parquet is an open source file format available to any project in the Hadoop ecosystem./
Apache Parquet is designed for efficient as well as performant flat columnar storage format of data compared to row based files like CSV or TSV files
Sample Parquet file: https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet
Install required packages:
azure.eventhub: pip install azure-eventhub
pyarrow: pip install pyarrow
pandas: pip install pandas2
"""
import pyarrow.parquet as pq
import pandas as pd
from io import StringIO
import pyarrow as pa
from azure.eventhub import EventData, EventHubProducerClient, EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
fully_qualified_namespace = "<eventhub-namespace>.servicebus.windows.net"
CONNECTION_STR = "<Eventhub-ConnectionString>"
EVENTHUB_NAME = "<entity-name>"
STORAGE_CONNECTION_STR = "<Storage-ConnectionSting>"
BLOB_CONTAINER_NAME = "<Container-Name>"
count = 1
def sender():
#sender client read a parquet file and send it to eventhub as an eventbatch
producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)
data = "parquet-filepath"
parquet_data= pd.read_parquet(data, engine='pyarrow')
#parquet file can be read as panda dataframe type.
with producer:
event_data_batch = producer.create_batch()
try:
event_data_batch.add(EventData(parquet_data.to_csv()))
# here to_csv() converts panda dataframe to csv in string type
except ValueError:
print("error")
producer.send_batch(event_data_batch)
print('Sending Done')
def receiver():
#creating an Eventhub receiver client to receive Parquet file as Event and store it as an Parquet file again.
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store)
try:
with consumer_client:
consumer_client.receive(
on_event=on_event,
starting_position="-1"
# "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print('Stop receiving.')
def on_event(partition_context, event):
global count
try:
df = pd.read_csv(StringIO(event.body_as_str(encoding='UTF-8')))
df = df.drop(df.columns[[0]],axis=1)
#dropping garbage column.
print(df)
table = pa.Table.from_pandas(df)
#Converting csv string to panda dataFrame & later to pyarrow table.
#Write parquet table into a parquet file.
pq.write_table(table, 'file{0}.parquet'.format(count))
except Exception as exp:
print(exp)
count += 1
#Writing table as a parquet file
print("event received.")
partition_context.update_checkpoint(event)
if __name__ == '__main__':
print("Starting..")
sender()
receiver()