In [3]:
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class PersistentReceiver(MessagingHandler):
    def __init__(self, broker_url, topic):
        super().__init__()
        self.broker_url = broker_url
        self.topic = topic

    def on_start(self, event):
        # Establish connection and create receiver
        print(f"Connecting to {self.broker_url}...")
        self.connection = event.container.connect(self.broker_url)
        event.container.create_receiver(self.connection, self.topic)
        print(f"Listening for messages on topic: {self.topic}")

    def on_message(self, event):
        # Print the received message
        print(f"Received message: {event.message.body}")
        # Close the connection after receiving the first message
        print("Closing connection...")
        event.connection.close()

    def on_transport_error(self, event):
        print(f"Transport error: {event.transport.condition.description}")
        event.connection.close()

# Create and run the receiver container
if __name__ == "__main__":
    broker_url = "amqp://localhost:5672"  # Update with your broker's address if needed
    topic = "topic:///skopa"
    Container(PersistentReceiver(broker_url, topic)).run()


Connecting to amqp://localhost:5672...
Listening for messages on topic: topic:///skopa
Received message: Test message
Closing connection...


In [13]:
import threading
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class PersistentReceiver(MessagingHandler):
    def __init__(self, broker_url, topic):
        super().__init__()
        self.broker_url = broker_url
        self.topic = topic
        self.stop_requested = False

    def on_start(self, event):
        print(f"Connecting to {self.broker_url}...")
        self.connection = event.container.connect(self.broker_url)
        event.container.create_receiver(self.connection, self.topic)
        print(f"Listening for messages on topic: {self.topic}")

    def on_message(self, event):
        print(f"Received message: {event.message.body}")
        if self.stop_requested:
            print("Stop requested. Closing connection...")
            event.connection.close()

    def on_transport_error(self, event):
        print(f"Transport error: {event.transport.condition.description}")
        event.connection.close()

    def stop(self):
        self.connection.close()


# Wrapper to run the receiver in a thread
class ReceiverThread:
    def __init__(self, broker_url, topic):
        self.receiver = PersistentReceiver(broker_url, topic)
        self.container = Container(self.receiver)
        self.thread = threading.Thread(target=self.container.run)
        self.thread.daemon = True  # Ensure thread doesn't prevent program exit

    def start(self):
        print("Starting receiver thread...")
        self.thread.start()

    def stop(self):
        print("Stopping receiver thread...")
        self.receiver.stop()
        #self.container.stop()  # Stops the container and closes connections
        self.thread.join()  # Wait for the thread to finish


# Example usage
if __name__ == "__main__":
    broker_url = "amqp://localhost:5672"
    topic = "topic:///skopa"

    # Create and start the receiver in a thread
    receiver_thread = ReceiverThread(broker_url, topic)
    receiver_thread.start()

    

    


Starting receiver thread...
Connecting to amqp://localhost:5672...
Listening for messages on topic: topic:///skopa


Received message: Test message


In [14]:
# Stop the receiver from outside
receiver_thread.stop()
print("Receiver has been stopped.")

Stopping receiver thread...
Receiver has been stopped.


In [1]:
from measurement_plane.protocols.amqp.receive import ReceiverThread

broker_url = "amqp://localhost:5672"
topic = "topic:///skopa"

# Create and start the receiver in a thread
receiver_thread = ReceiverThread(broker_url, topic)
receiver_thread.start()

Starting receiver thread...
Connecting to amqp://localhost:5672...
Listening for messages on topic: topic:///skopa


Received message: Test message


In [2]:
receiver_thread.stop()

Stopping receiver thread...
