-
Notifications
You must be signed in to change notification settings - Fork 20
/
direct_publisher.py
100 lines (82 loc) · 4.23 KB
/
direct_publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
## Goal: Simple Publisher, event handling and message properties setting
import os
import platform
import time
# Import Solace Python API modules from the solace package
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, RetryStrategy, ServiceEvent
from solace.messaging.resources.topic import Topic
from solace.messaging.publisher.direct_message_publisher import PublishFailureListener, FailedPublishEvent
if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer
MSG_COUNT = 5
TOPIC_PREFIX = "solace/samples/python"
# Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
def on_reconnected(self, e: ServiceEvent):
print("\non_reconnected")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_reconnecting(self, e: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_service_interrupted(self, e: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
class PublisherErrorHandling(PublishFailureListener):
def on_failed_publish(self, e: "FailedPublishEvent"):
print("on_failed_publish")
# Broker Config. Note: Could pass other properties Look into
broker_props = {
"solace.messaging.transport.host": os.environ.get('SOLACE_HOST') or "tcp://localhost:55555,tcp://localhost:55554",
"solace.messaging.service.vpn-name": os.environ.get('SOLACE_VPN') or "default",
"solace.messaging.authentication.scheme.basic.username": os.environ.get('SOLACE_USERNAME') or "default",
"solace.messaging.authentication.scheme.basic.password": os.environ.get('SOLACE_PASSWORD') or "default"
}
# Build A messaging service with a reconnection strategy of 20 retries over an interval of 3 seconds
# Note: The reconnections strategy could also be configured using the broker properties object
messaging_service = MessagingService.builder().from_properties(broker_props)\
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
.build()
# Blocking connect thread
messaging_service.connect()
print(f'Messaging Service connected? {messaging_service.is_connected}')
# Event Handling for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)
# Create a direct message publisher and start it
direct_publisher = messaging_service.create_direct_message_publisher_builder().build()
direct_publisher.set_publish_failure_listener(PublisherErrorHandling())
# Blocking Start thread
direct_publisher.start()
print(f'Direct Publisher ready? {direct_publisher.is_ready()}')
# Prepare outbound message payload and body
message_body = "this is the body of the msg"
outbound_msg_builder = messaging_service.message_builder() \
.with_application_message_id("sample_id") \
.with_property("application", "samples") \
.with_property("language", "Python") \
count = 1
print("\nSend a KeyboardInterrupt to stop publishing\n")
try:
while True:
while count <= MSG_COUNT:
topic = Topic.of(TOPIC_PREFIX + f'/direct/pub/{count}')
# Direct publish the message with dynamic headers and payload
outbound_msg = outbound_msg_builder \
.with_application_message_id(f'NEW {count}')\
.build(f'{message_body} + {count}')
direct_publisher.publish(destination=topic, message=outbound_msg)
print(f'Published message on {topic}')
count += 1
time.sleep(0.1)
print("\n")
count = 1
time.sleep(1)
except KeyboardInterrupt:
print('\nTerminating Publisher')
direct_publisher.terminate()
print('\nDisconnecting Messaging Service')
messaging_service.disconnect()