Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error handling when processing messages #178

Open
sajith opened this issue Jun 2, 2023 · 1 comment · May be fixed by #286
Open

Better error handling when processing messages #178

sajith opened this issue Jun 2, 2023 · 1 comment · May be fixed by #286
Assignees
Labels
bug Something isn't working epic_sdx_controller

Comments

@sajith
Copy link
Member

sajith commented Jun 2, 2023

Using amqp-publish, we can publish a message to sdx-controller this way:

$ amqp-publish --routing-key="$SUB_QUEUE" --exchange="" --body "$(cat tests/data/sax.json)"

However that causes a crash, and then further messages are not handled:

Traceback (most recent call last):
  File "/usr/lib64/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/sdx/sdx-controller/swagger_server/__main__.py", line 179, in <module>
    main()
  File "/home/sdx/sdx-controller/swagger_server/__main__.py", line 175, in main
    start_consumer(thread_queue, db_instance)
  File "/home/sdx/sdx-controller/swagger_server/__main__.py", line 135, in start_consumer
    process_lc_json_msg(
  File "/home/sdx/sdx-controller/swagger_server/__main__.py", line 48, in process_lc_json_msg
    lc_queue_name = msg_json["lc_queue_name"]
KeyError: 'lc_queue_name'
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/home/sdx/sdx-controller/swagger_server/messaging/rpc_queue_consumer.py", line 49, in start_consumer
    self.channel.start_consuming()
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
    self._process_data_events(time_limit=None)
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/adapters/blocking_connection.py", line 851, in process_data_events
    self._dispatch_channel_events()
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
    impl_channel._get_cookie()._dispatch_events()
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1510, in _dispatch_events
    consumer_info.on_message_callback(self, evt.method,
  File "/home/sdx/sdx-controller/swagger_server/messaging/rpc_queue_consumer.py", line 36, in on_request
    ch.basic_publish(
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2259, in basic_publish
    self._impl.basic_publish(
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/channel.py", line 427, in basic_publish
    self._send_method(
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/channel.py", line 1415, in _send_method
    self.connection._send_method(self.channel_number, method, content)
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/connection.py", line 2255, in _send_method
    self._send_message(channel_number, method, content)
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/connection.py", line 2275, in _send_message
    marshaled_body_frames.append(frame_method.marshal())
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/frame.py", line 73, in marshal
    pieces = self.method.encode()
  File "/home/sdx/sdx-controller/venv/lib64/python3.9/site-packages/pika/spec.py", line 1582, in encode
    assert isinstance(self.routing_key, str_or_bytes),\
AssertionError: A non-string value was supplied for self.routing_key

This is less robust than it should be.

@sajith
Copy link
Member Author

sajith commented Jun 12, 2024

Things have changed a little since this issue was originally filed, but it still remains. Here's the updated backtrace:

sdx-controller-1  | Exception in thread Thread-4:
sdx-controller-1  | Traceback (most recent call last):
sdx-controller-1  |   File "/usr/local/lib/python3.9/threading.py", line 980, in _bootstrap_inner
sdx-controller-1  |     self.run()
sdx-controller-1  |   File "/usr/local/lib/python3.9/threading.py", line 917, in run
sdx-controller-1  |     self._target(*self._args, **self._kwargs)
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/sdx_controller/messaging/rpc_queue_consumer.py", line 72, in start_consumer
sdx-controller-1  |     self.channel.start_consuming()
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
sdx-controller-1  |     self._process_data_events(time_limit=None)
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events
sdx-controller-1  |     self.connection.process_data_events(time_limit=time_limit)
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 851, in process_data_events
sdx-controller-1  |     self._dispatch_channel_events()
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
sdx-controller-1  |     impl_channel._get_cookie()._dispatch_events()
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1510, in _dispatch_events
sdx-controller-1  |     consumer_info.on_message_callback(self, evt.method,
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/sdx_controller/messaging/rpc_queue_consumer.py", line 59, in on_request
sdx-controller-1  |     ch.basic_publish(
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2259, in basic_publish
sdx-controller-1  |     self._impl.basic_publish(
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/channel.py", line 427, in basic_publish
sdx-controller-1  |     self._send_method(
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/channel.py", line 1415, in _send_method
sdx-controller-1  |     self.connection._send_method(self.channel_number, method, content)
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/connection.py", line 2253, in _send_method
sdx-controller-1  |     self._send_message(channel_number, method, content)
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/connection.py", line 2273, in _send_message
sdx-controller-1  |     marshaled_body_frames.append(frame_method.marshal())
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/frame.py", line 73, in marshal
sdx-controller-1  |     pieces = self.method.encode()
sdx-controller-1  |   File "/opt/venv/lib/python3.9/site-packages/pika/spec.py", line 1581, in encode
sdx-controller-1  |     assert isinstance(self.routing_key, str_or_bytes),\
sdx-controller-1  | AssertionError: A non-string value was supplied for self.routing_key

Here's the command to reproduce the issue:

$ amqp-publish --routing-key="topo" --body="$(cat ../datamodel/src/sdx_datamodel/data/topologies/amlight.json)"

Basically, this method crashes, on line 59:

def on_request(self, ch, method, props, message_body):
response = message_body
self._thread_queue.put(message_body)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=MQ_HOST,
port=MQ_PORT,
credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
)
)
self.channel = self.connection.channel()
ch.basic_publish(
exchange=self.exchange_name,
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response),
)
ch.basic_ack(delivery_tag=method.delivery_tag)

I think pika is unhappy about exchange name being an empty string. I'm not sure what this code does though. We should document the purpose of the offending basic_publish(), or perhaps remove it if it is not really doing anything.

Since the message queue consumer thread crashes, SDX Controller will have to be restarted. But since the message persists in the queue, we will keep crashing and stay unusable until the message queue process is restarted.

@sajith sajith linked a pull request Jun 12, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working epic_sdx_controller
Projects
1 participant