<a href="https://colab.research.google.com/github/ciccio1982/Colab/blob/main/ricezione_messaggio_solace.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install solace-pubsubplus



Before running the code, ensure you have the Solace PubSub+ Event Broker connection details ready. You'll need the host, message VPN name, and user credentials.

You can create a free Solace Cloud account and get these details if you don't have a broker instance.

In [21]:
import os
import time

# Import Solace Python  API modules
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, RetryStrategy, ServiceEvent
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError
from solace.messaging.publisher.direct_message_publisher import PublishFailureListener, FailedPublishEvent
from solace.messaging.resources.topic_subscription import TopicSubscription
from solace.messaging.receiver.message_receiver import MessageHandler
from solace.messaging.config.solace_properties.message_properties import APPLICATION_MESSAGE_ID
from solace.messaging.resources.topic import Topic
from solace.messaging.receiver.inbound_message import InboundMessage
# Solace Connection Details (replace with your actual values)
SOLACE_HOST = 'tcps://mr-connection-1d0n94c2091.messaging.solace.cloud:55443' # Changed to tcp:// for non-TLS
SOLACE_VPN = 'myfirstservice' # e.g., default
SOLACE_USERNAME = 'solace-cloud-client' # e.g., guest
SOLACE_PASSWORD = 'nj8nmnunpa68tg00rfa06hol8r' # e.g., guest

# Queue to send messages to
TARGET_QUEUE = 'test1'

In [22]:

broker_props = {
    "solace.messaging.transport.host": SOLACE_HOST,
    "solace.messaging.service.vpn-name": SOLACE_VPN,
    "solace.messaging.authentication.scheme.basic.username": SOLACE_USERNAME,
    "solace.messaging.authentication.scheme.basic.password": SOLACE_PASSWORD,
    # Disabilita validazione certificati
    "solace.messaging.tls.cert-validated": False,
    "solace.messaging.tls.cert-reject-expired": False
    # TLS-specific properties are removed for tcp:// connection
}
# Create the messaging service using from_properties
messaging_service = MessagingService.builder() \
  .from_properties(broker_props) \
  .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3)) \
  .build()

# Blocking connect thread
messaging_service.connect()
print("CONNESSO!!!")

CONNESSO!!!


In [23]:
class MessageHandlerImpl(MessageHandler):
    def on_message(self, message: 'InboundMessage'):
        try:
            global SHUTDOWN
            if "quit" in message.get_destination_name():
                print("QUIT message received, shutting down.")
                SHUTDOWN = True

            # Check if the payload is a String or Byte, decode if its the later
            payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
            if isinstance(payload, bytearray):
                print(f"Received a message of type: {type(payload)}. Decoding to string")
                payload = payload.decode()

            print("\n" + f"Message payload: {payload} \n")
            print("\n" + f"Message dump: {message} \n")
        except Exception as e:
            print(f"Error processing message: {e.__traceback__}")
# Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
    def on_reconnected(self, e: ServiceEvent):
        print("\non_reconnected")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_reconnecting(self, e: "ServiceEvent"):
        print("\non_reconnecting")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_service_interrupted(self, e: "ServiceEvent"):
        print("\non_service_interrupted")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

class PublisherErrorHandling(PublishFailureListener):
    def on_failed_publish(self, e: "FailedPublishEvent"):
        print("on_failed_publish")


Next, we'll define a `MessageReceiver` class that implements the `MessageHandler` interface. This class will contain the logic to process incoming messages.

In [None]:
# Error Handeling for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)

# Define a Topic subscriptions
topics = [TARGET_QUEUE]
topics_sub = []
for t in topics:
    topics_sub.append(TopicSubscription.of(t))

# Build a Receiver with the given topics and start it
direct_receiver = messaging_service.create_direct_message_receiver_builder()\
                        .with_subscriptions(topics_sub)\
                        .build()

direct_receiver.start()
print(f'Direct Receiver is running? {direct_receiver.is_running()}')
try:
    print(f"Subscribing to: {topics}")
    # Callback for received messages
    direct_receiver.receive_async(MessageHandlerImpl())
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print('\nDisconnecting Messaging Service')
finally:
    print('\nTerminating receiver')
    direct_receiver.terminate()
    print('\nDisconnecting Messaging Service')
    messaging_service.disconnect()

Direct Receiver is running? True
Subscribing to: ['test1']


Finally, we'll initialize the Solace messaging service, create a consumer, bind it to the specified queue, and start listening for messages. The consumer will run in a separate thread, and the main thread will wait for a stop signal.