diff --git a/agent/resources/test/flow_generator/openwire/openwire_correlation_id.result b/agent/resources/test/flow_generator/openwire/openwire_correlation_id.result index bf2eb2ccad0..77b10d69204 100644 --- a/agent/resources/test/flow_generator/openwire/openwire_correlation_id.result +++ b/agent/resources/test/flow_generator/openwire/openwire_correlation_id.result @@ -71,6 +71,7 @@ OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_t OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: false, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:localhost-1-1706669422443-0:1"), broker_url: None, session_id: Some(1), topic: Some("TEST.FOO"), command_type: MessageDispatch, command_id: 20, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(411), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: Some("My own MessageID:15"), rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: false, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:localhost-1-1706669422443-0:1"), broker_url: None, session_id: Some(1), topic: Some("TEST.FOO"), command_type: MessageAck, command_id: 20, response_required: false, resp_command_id: 0, req_msg_size: Some(223), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: false, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:localhost-1-1706669422443-0:1"), broker_url: None, session_id: Some(1), topic: Some("TEST.FOO"), command_type: MessageDispatch, command_id: 21, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(411), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: Some("My own MessageID:16"), rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: false, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:localhost-1-1706669422443-0:1"), broker_url: None, session_id: Some(1), topic: Some("TEST.FOO"), command_type: MessageDispatch, command_id: 22, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(411), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: Some("My own MessageID:17"), rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: false, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:localhost-1-1706669422443-0:1"), broker_url: None, session_id: Some(1), topic: Some("TEST.FOO"), command_type: MessageAck, command_id: 21, response_required: false, resp_command_id: 0, req_msg_size: Some(223), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: false, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:localhost-1-1706669422443-0:1"), broker_url: None, session_id: Some(1), topic: Some("TEST.FOO"), command_type: MessageAck, command_id: 22, response_required: false, resp_command_id: 0, req_msg_size: Some(223), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: false, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:localhost-1-1706669422443-0:1"), broker_url: None, session_id: Some(1), topic: Some("TEST.FOO"), command_type: MessageDispatch, command_id: 23, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(411), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: Some("My own MessageID:18"), rtt: 0 } is_openwire: false diff --git a/agent/resources/test/flow_generator/openwire/openwire_injected.result b/agent/resources/test/flow_generator/openwire/openwire_injected.result index fab01a2054d..53c4b79a1a6 100644 --- a/agent/resources/test/flow_generator/openwire/openwire_injected.result +++ b/agent/resources/test/flow_generator/openwire/openwire_injected.result @@ -51,6 +51,9 @@ OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_t OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageDispatch, command_id: 9, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(635), status: Ok, err_msg: None, trace_id: Some("c55c062343a74e58906f7e9948f32155.55.17056483403920009"), span_id: Some("c55c062343a74e58906f7e9948f32155.55.17056483403920008-0"), correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageDispatch, command_id: 10, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(635), status: Ok, err_msg: None, trace_id: Some("c55c062343a74e58906f7e9948f32155.55.17056483403990011"), span_id: Some("c55c062343a74e58906f7e9948f32155.55.17056483403990010-0"), correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageAck, command_id: 5, response_required: false, resp_command_id: 0, req_msg_size: Some(161), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageAck, command_id: 6, response_required: false, resp_command_id: 0, req_msg_size: Some(165), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageAck, command_id: 7, response_required: false, resp_command_id: 0, req_msg_size: Some(165), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageAck, command_id: 8, response_required: false, resp_command_id: 0, req_msg_size: Some(165), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageAck, command_id: 9, response_required: false, resp_command_id: 0, req_msg_size: Some(165), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageAck, command_id: 10, response_required: false, resp_command_id: 0, req_msg_size: Some(165), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: false, version: 11, connection_id: Some("ID:test.local-64517-1705648339915-1:1"), broker_url: None, session_id: Some(1), topic: Some("TEST"), command_type: MessageDispatch, command_id: 11, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(635), status: Ok, err_msg: None, trace_id: Some("c55c062343a74e58906f7e9948f32155.55.17056483404090013"), span_id: Some("c55c062343a74e58906f7e9948f32155.55.17056483404090012-0"), correlation_id: None, rtt: 0 } is_openwire: false diff --git a/agent/resources/test/flow_generator/openwire/openwire_segmented.pcap b/agent/resources/test/flow_generator/openwire/openwire_segmented.pcap new file mode 100644 index 00000000000..929ce6e3830 Binary files /dev/null and b/agent/resources/test/flow_generator/openwire/openwire_segmented.pcap differ diff --git a/agent/resources/test/flow_generator/openwire/openwire_segmented.result b/agent/resources/test/flow_generator/openwire/openwire_segmented.result new file mode 100644 index 00000000000..8ea54e60104 --- /dev/null +++ b/agent/resources/test/flow_generator/openwire/openwire_segmented.result @@ -0,0 +1,76 @@ +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: WireFormatInfo, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: Some(334), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: true +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: WireFormatInfo, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(315), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: true +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:1"), broker_url: None, session_id: None, topic: None, command_type: ConnectionInfo, command_id: 1, response_required: true, resp_command_id: 0, req_msg_size: Some(83), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: Some("tcp://master24:61616"), session_id: None, topic: None, command_type: BrokerInfo, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(82), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: ConnectionControl, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(12), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 1, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:1"), broker_url: None, session_id: Some(18446744073709551615), topic: Some("ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic"), command_type: ConsumerInfo, command_id: 2, response_required: true, resp_command_id: 0, req_msg_size: Some(138), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 2, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:1"), broker_url: None, session_id: Some(1), topic: None, command_type: SessionInfo, command_id: 3, response_required: false, resp_command_id: 0, req_msg_size: Some(58), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:1"), broker_url: None, session_id: Some(1), topic: Some("queue1"), command_type: ConsumerInfo, command_id: 4, response_required: true, resp_command_id: 0, req_msg_size: Some(94), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 4, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-49:1"), broker_url: None, session_id: Some(1), topic: Some("queue1"), command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(753), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-53:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-51:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-55:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-57:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-59:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-93:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(714), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-71:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-89:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(714), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-63:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-87:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(714), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-69:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-81:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-83:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-79:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-77:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-73:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-65:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(692), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-61:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(714), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-95:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(714), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-97:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(714), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-99:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(714), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510067"), span_id: Some("c8164cae90e94fe68493c106ea95f1ee.66.16667985488510066-10"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-101:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-103:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-105:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-107:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-109:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-117:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-119:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-121:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-123:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-125:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-129:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(316), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-131:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(294), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-133:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(294), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-135:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(294), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +This packet payload cannot be parsed +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: WireFormatInfo, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: Some(334), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: true +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: WireFormatInfo, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(315), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: true +OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: Some("tcp://master24:61616"), session_id: None, topic: None, command_type: BrokerInfo, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(82), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:2"), broker_url: None, session_id: None, topic: None, command_type: ConnectionInfo, command_id: 1, response_required: true, resp_command_id: 0, req_msg_size: Some(118), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: ConnectionControl, command_id: 0, response_required: false, resp_command_id: 0, req_msg_size: None, res_msg_size: Some(12), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 1, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:2"), broker_url: None, session_id: Some(18446744073709551615), topic: Some("ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic"), command_type: ConsumerInfo, command_id: 2, response_required: true, resp_command_id: 0, req_msg_size: Some(138), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 2, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:2"), broker_url: None, session_id: Some(1), topic: None, command_type: SessionInfo, command_id: 3, response_required: false, resp_command_id: 0, req_msg_size: Some(58), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:2"), broker_url: None, session_id: Some(1), topic: Some("queue1"), command_type: ProducerInfo, command_id: 4, response_required: true, resp_command_id: 0, req_msg_size: Some(76), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 4, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: ActiveMQTextMessage, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(210), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-137:2"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageDispatch, command_id: 5, response_required: true, resp_command_id: 0, req_msg_size: Some(294), res_msg_size: None, status: Ok, err_msg: None, trace_id: Some("bb79fa34-d03f-4bc2-8fa9-979af7877426"), span_id: Some("0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0"), correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 5, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: Some("ID:DESKTOP-DSE5EEN-49491-1709024553638-49:1"), broker_url: None, session_id: Some(1), topic: None, command_type: MessageAck, command_id: 5, response_required: false, resp_command_id: 0, req_msg_size: Some(85), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 6, response_required: false, resp_command_id: 0, req_msg_size: Some(11), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 7, response_required: false, resp_command_id: 0, req_msg_size: Some(17), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 8, response_required: false, resp_command_id: 0, req_msg_size: Some(17), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 9, response_required: true, resp_command_id: 0, req_msg_size: Some(17), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 9, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: ShutdownInfo, command_id: 10, response_required: false, resp_command_id: 0, req_msg_size: Some(7), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 6, response_required: false, resp_command_id: 0, req_msg_size: Some(17), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 7, response_required: false, resp_command_id: 0, req_msg_size: Some(17), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 8, response_required: false, resp_command_id: 0, req_msg_size: Some(17), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Request, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: RemoveInfo, command_id: 9, response_required: true, resp_command_id: 0, req_msg_size: Some(17), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Response, is_tls: false, direction: ServerToClient, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: Response, command_id: 0, response_required: false, resp_command_id: 9, req_msg_size: None, res_msg_size: Some(11), status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false +OpenWireInfo { msg_type: Session, is_tls: false, direction: ClientToServer, is_tight_encoding_enabled: true, is_size_prefix_disabled: false, is_cache_enabled: true, version: 12, connection_id: None, broker_url: None, session_id: None, topic: None, command_type: ShutdownInfo, command_id: 10, response_required: false, resp_command_id: 0, req_msg_size: Some(7), res_msg_size: None, status: Ok, err_msg: None, trace_id: None, span_id: None, correlation_id: None, rtt: 0 } is_openwire: false diff --git a/agent/src/flow_generator/error.rs b/agent/src/flow_generator/error.rs index 4f0bd89ac31..d826618aa1b 100644 --- a/agent/src/flow_generator/error.rs +++ b/agent/src/flow_generator/error.rs @@ -48,12 +48,12 @@ pub enum Error { #[error("mqtt perf parse failed")] MqttPerfParseFailed, #[error("openwire log parse failed")] - OpenwireLogParseFailed, + OpenWireLogParseFailed, // openwire log parse unimplemented is acceptable #[error("openwire log parse unimplemented")] - OpenwireLogParseUnimplemented, - #[error("openwire perf parse failed")] - OpenwirePerfParseFailed, + OpenWireLogParseUnimplemented, + #[error("openwire log parse EOF")] + OpenWireLogParseEOF, #[error("redis log parse failed")] RedisLogParseFailed, #[error("redis perf parse failed")] diff --git a/agent/src/flow_generator/protocol_logs/mq/openwire.rs b/agent/src/flow_generator/protocol_logs/mq/openwire.rs index 7d29b3989ee..073a4febac3 100644 --- a/agent/src/flow_generator/protocol_logs/mq/openwire.rs +++ b/agent/src/flow_generator/protocol_logs/mq/openwire.rs @@ -1,5 +1,5 @@ use bitflags::bitflags; -use nom::{bytes, error, number, Err, IResult}; +use nom::{bytes, error, number, Err}; use serde::Serialize; use std::fmt; @@ -136,7 +136,7 @@ macro_rules! all_openwire_commands { fn try_from(value: u8) -> Result { match value { $($command_id => Ok(OpenWireCommand::$variant)),*, - _ => Err(Error::OpenwireLogParseFailed) + _ => Err(Error::OpenWireLogParseFailed) } } } @@ -233,7 +233,7 @@ macro_rules! impl_auto_marshaller { $(OpenWireCommandMarshaller::$command_type(_) => { $command_type::tight_unmarshal(parser, info, bs, payload) })* - _ => Err(Error::OpenwireLogParseUnimplemented), + _ => Err(Error::OpenWireLogParseUnimplemented), } } fn loose_unmarshal<'a>( @@ -246,7 +246,7 @@ macro_rules! impl_auto_marshaller { $(OpenWireCommandMarshaller::$command_type(_) => { $command_type::loose_unmarshal(parser, info, payload) })* - _ => Err(Error::OpenwireLogParseUnimplemented), + _ => Err(Error::OpenWireLogParseUnimplemented), } } fn tight_unmarshal_command<'a>( @@ -260,7 +260,10 @@ macro_rules! impl_auto_marshaller { $(OpenWireCommandMarshaller::$command_type(_) => { $command_type::tight_unmarshal(parser, info, bs, payload) })* - _ => BaseCommand::tight_unmarshal(parser, info, bs, payload), + _ => { + BaseCommand::tight_unmarshal(parser, info, bs, payload)?; + Err(Error::OpenWireLogParseUnimplemented) + } } } fn loose_unmarshal_command<'a>( @@ -273,7 +276,10 @@ macro_rules! impl_auto_marshaller { $(OpenWireCommandMarshaller::$command_type(_) => { $command_type::loose_unmarshal(parser, info, payload) })* - _ => BaseCommand::loose_unmarshal(parser, info, payload), + _ => { + BaseCommand::loose_unmarshal(parser, info, payload)?; + Err(Error::OpenWireLogParseUnimplemented) + } } } } @@ -322,14 +328,17 @@ struct BooleanStream<'a> { payload: &'a [u8], offset: usize, bitpos: usize, + // bytes consumed in payload (include the length) + bytes_consumed: usize, } impl<'a> BooleanStream<'a> { - fn new(payload: &'a [u8]) -> Self { + fn new(payload: &'a [u8], bytes_consumed: usize) -> Self { BooleanStream { payload, offset: 0, bitpos: 0, + bytes_consumed: bytes_consumed, } } fn read_bool(&mut self) -> Option { @@ -345,47 +354,53 @@ impl<'a> BooleanStream<'a> { } Some(bit == 1) } - fn read_boolean_stream(payload: &'a [u8]) -> IResult<&'a [u8], Self> { + fn read_boolean_stream(payload: &'a [u8]) -> Result<(&'a [u8], Self)> { // read bollean stream length - let (mut payload, byte) = number::complete::be_u8(payload)?; + let (mut payload, byte) = parse_byte(payload)?; let mut bs_length = byte as usize; + let mut length_len = 1; match bs_length { 0xC0 => { - let (new_payload, byte) = number::complete::be_u8(payload)?; + let (new_payload, byte) = parse_byte(payload)?; bs_length = byte as usize; + length_len += 1; payload = new_payload; } 0x80 => { - let (new_payload, short) = number::complete::be_u16(payload)?; + let (new_payload, short) = parse_short(payload)?; bs_length = short as usize; + length_len += 2; payload = new_payload; } _ => {} } + if bs_length == 0 { + return Err(Error::OpenWireLogParseFailed); + } // read boolean stream - let (payload, bs) = bytes::complete::take(bs_length)(payload)?; - Ok((payload, Self::new(bs))) + let (payload, bs) = parse_bytes(payload, bs_length)?; + Ok((payload, Self::new(bs, length_len + bs_length))) } } fn parse_byte<'a>(payload: &'a [u8]) -> Result<(&'a [u8], u8)> { let (payload, byte) = number::complete::be_u8(payload) - .map_err(|_: Err>| Error::OpenwireLogParseFailed)?; + .map_err(|_: Err>| Error::OpenWireLogParseEOF)?; Ok((payload, byte)) } fn parse_short<'a>(payload: &'a [u8]) -> Result<(&'a [u8], u16)> { let (payload, short) = number::complete::be_u16(payload) - .map_err(|_: Err>| Error::OpenwireLogParseFailed)?; + .map_err(|_: Err>| Error::OpenWireLogParseEOF)?; Ok((payload, short)) } fn parse_integer<'a>(payload: &'a [u8]) -> Result<(&'a [u8], u32)> { let (payload, integer) = number::complete::be_u32(payload) - .map_err(|_: Err>| Error::OpenwireLogParseFailed)?; + .map_err(|_: Err>| Error::OpenWireLogParseEOF)?; Ok((payload, integer)) } fn parse_long<'a>(payload: &'a [u8]) -> Result<(&'a [u8], u64)> { let (payload, long) = number::complete::be_u64(payload) - .map_err(|_: Err>| Error::OpenwireLogParseFailed)?; + .map_err(|_: Err>| Error::OpenWireLogParseEOF)?; Ok((payload, long)) } fn parse_boolean<'a>(payload: &'a [u8]) -> Result<(&'a [u8], bool)> { @@ -398,13 +413,12 @@ fn parse_boolean<'a>(payload: &'a [u8]) -> Result<(&'a [u8], bool)> { } fn parse_bytes<'a>(payload: &'a [u8], length: usize) -> Result<(&'a [u8], &'a [u8])> { let (payload, bytes) = bytes::complete::take(length)(payload) - .map_err(|_: Err>| Error::OpenwireLogParseFailed)?; + .map_err(|_: Err>| Error::OpenWireLogParseEOF)?; Ok((payload, bytes)) } fn parse_command_type(payload: &[u8]) -> Result<(&[u8], OpenWireCommand)> { - let (payload, command_type) = bytes::complete::take(1usize)(payload) - .map_err(|_: Err>| Error::OpenwireLogParseFailed)?; - let command_type = OpenWireCommand::try_from(command_type[0])?; + let (payload, command_type) = parse_byte(payload)?; + let command_type = OpenWireCommand::try_from(command_type)?; Ok((payload, command_type)) } fn parse_trace_and_span( @@ -438,7 +452,7 @@ fn parse_trace_and_span( let values = match parse_short(payload) { Ok((payload, length)) => payload .get(..length as usize) - .ok_or(Error::OpenwireLogParseFailed)?, + .ok_or(Error::OpenWireLogParseEOF)?, Err(_) => continue, }; let values = match std::str::from_utf8(values) { @@ -457,7 +471,7 @@ fn parse_trace_and_span( }; return Ok((trace_id, span_id)); } - Err(Error::OpenwireLogParseFailed) + Err(Error::OpenWireLogParseFailed) } trait Unmarshal { @@ -468,7 +482,7 @@ trait Unmarshal { bs: &mut BooleanStream, payload: &'a [u8], ) -> Result<&'a [u8]> { - Err(Error::OpenwireLogParseFailed) + Err(Error::OpenWireLogParseFailed) } #[allow(unused_variables)] fn loose_unmarshal<'a>( @@ -476,7 +490,7 @@ trait Unmarshal { info: &mut OpenWireInfo, payload: &'a [u8], ) -> Result<&'a [u8]> { - Err(Error::OpenwireLogParseFailed) + Err(Error::OpenWireLogParseFailed) } } @@ -489,6 +503,10 @@ impl Unmarshal for BaseCommand { ) -> Result<&'a [u8]> { // parse commandId let (payload, command_id) = parse_integer(payload)?; + // command id is unlikely to be larger than 2^24 + if command_id > 0xffffff { + return Err(Error::OpenWireLogParseFailed); + } info.command_id = command_id; // parse responseRequired match bs.read_bool() { @@ -496,7 +514,7 @@ impl Unmarshal for BaseCommand { info.response_required = boolean; Ok(payload) } - None => Err(Error::OpenwireLogParseFailed), + None => Err(Error::OpenWireLogParseFailed), } } fn loose_unmarshal<'a>( @@ -507,6 +525,10 @@ impl Unmarshal for BaseCommand { // parse commandId let (payload, command_id) = parse_integer(payload)?; info.command_id = command_id; + // command id is unlikely to be larger than 2^24 + if command_id > 0xffffff { + return Err(Error::OpenWireLogParseFailed); + } // parse responseRequired let (payload, boolean) = parse_boolean(payload)?; info.response_required = boolean; @@ -526,35 +548,35 @@ impl BaseDataStream { let (payload, string) = parse_bytes(payload, length as usize)?; Ok(( payload, - std::str::from_utf8(string).map_err(|_| Error::OpenwireLogParseFailed)?, + std::str::from_utf8(string).map_err(|_| Error::OpenWireLogParseFailed)?, )) } - None => Err(Error::OpenwireLogParseFailed), + None => Err(Error::OpenWireLogParseFailed), }, Some(false) => Ok((payload, "")), - None => Err(Error::OpenwireLogParseFailed), + None => Err(Error::OpenWireLogParseFailed), } } fn loose_unmarshal_string<'a>(payload: &'a [u8]) -> Result<(&'a [u8], &'a str)> { - match parse_boolean(payload) { - Ok((payload, true)) => { + let (payload, boolean) = parse_boolean(payload)?; + match boolean { + true => { let (payload, length) = parse_short(payload)?; let (payload, string) = parse_bytes(payload, length as usize)?; Ok(( payload, - std::str::from_utf8(string).map_err(|_| Error::OpenwireLogParseFailed)?, + std::str::from_utf8(string).map_err(|_| Error::OpenWireLogParseFailed)?, )) } - Ok((payload, false)) => Ok((payload, "")), - Err(_) => Err(Error::OpenwireLogParseFailed), + false => Ok((payload, "")), } } fn tight_unmarshal_long<'a>( payload: &'a [u8], bs: &mut BooleanStream, ) -> Result<(&'a [u8], u64)> { - if bs.read_bool().ok_or(Error::OpenwireLogParseFailed)? { - if bs.read_bool().ok_or(Error::OpenwireLogParseFailed)? { + if bs.read_bool().ok_or(Error::OpenWireLogParseFailed)? { + if bs.read_bool().ok_or(Error::OpenWireLogParseFailed)? { let (payload, long) = parse_long(payload)?; Ok((payload, long)) } else { @@ -562,7 +584,7 @@ impl BaseDataStream { Ok((payload, integer as u64)) } } else { - if bs.read_bool().ok_or(Error::OpenwireLogParseFailed)? { + if bs.read_bool().ok_or(Error::OpenWireLogParseFailed)? { let (payload, long) = parse_short(payload)?; Ok((payload, long as u64)) } else { @@ -584,7 +606,7 @@ impl BaseDataStream { Ok((payload, Some(bytes))) } Some(false) => Ok((payload, None)), - None => Err(Error::OpenwireLogParseFailed), + None => Err(Error::OpenWireLogParseFailed), } } fn loose_unmarshal_byte_array<'a>(payload: &'a [u8]) -> Result<(&'a [u8], Option<&'a [u8]>)> { @@ -609,13 +631,13 @@ impl BaseDataStream { let data_type = OpenWireCommand::try_from(data_type)?; // consume a boolean if is_marshall_aware is true if Self::is_marshall_aware(data_type) { - let _ = bs.read_bool().ok_or(Error::OpenwireLogParseFailed)?; + let _ = bs.read_bool().ok_or(Error::OpenWireLogParseFailed)?; } let data_marshaller = OpenWireCommandMarshaller::from(data_type); data_marshaller.tight_unmarshal(parser, info, bs, payload) } Some(false) => Ok(payload), - None => Err(Error::OpenwireLogParseFailed), + None => Err(Error::OpenWireLogParseFailed), } } fn loose_unmarshal_nested_object<'a>( @@ -654,7 +676,7 @@ impl BaseDataStream { // cannot parse cached object Ok(payload) } - None => Err(Error::OpenwireLogParseFailed), + None => Err(Error::OpenWireLogParseFailed), } } else { Self::tight_unmarshal_nested_object(parser, info, bs, payload) @@ -698,10 +720,10 @@ impl BaseDataStream { // parse exception message let (_, err_msg) = Self::tight_unmarshal_string(payload, bs)?; info.err_msg = Some(err_msg.to_string()); - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } Some(false) => Ok(payload), - None => Err(Error::OpenwireLogParseFailed), + None => Err(Error::OpenWireLogParseFailed), } } fn loose_unmarshal_broker_error<'a>( @@ -716,7 +738,7 @@ impl BaseDataStream { // parse exception message let (_, err_msg) = Self::loose_unmarshal_string(payload)?; info.err_msg = Some(err_msg.to_string()); - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } (payload, false) => Ok(payload), } @@ -993,7 +1015,7 @@ impl Unmarshal for ConnectionInfo { // parse clientId let _ = BaseDataStream::tight_unmarshal_string(payload, bs)?; // TODO: more fields are not supported - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } fn loose_unmarshal<'a>( parser: &mut OpenWireLog, @@ -1007,7 +1029,7 @@ impl Unmarshal for ConnectionInfo { // parse clientId let _ = BaseDataStream::loose_unmarshal_string(payload)?; // TODO: more fields are not supported - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } } @@ -1049,11 +1071,11 @@ impl Unmarshal for ConsumerInfo { // parse consumerId let payload = BaseDataStream::tight_unmarshal_cached_object(parser, info, bs, payload)?; // parse browser - let _ = bs.read_bool().ok_or(Error::OpenwireLogParseFailed)?; + let _ = bs.read_bool().ok_or(Error::OpenWireLogParseFailed)?; // parse destination let _ = BaseDataStream::tight_unmarshal_cached_object(parser, info, bs, payload)?; // TODO: more fields are not supported - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } fn loose_unmarshal<'a>( parser: &mut OpenWireLog, @@ -1069,7 +1091,7 @@ impl Unmarshal for ConsumerInfo { // parse destination let _ = BaseDataStream::loose_unmarshal_cached_object(parser, info, payload)?; // TODO: more fields are not supported - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } } @@ -1087,7 +1109,7 @@ impl Unmarshal for ProducerInfo { // parse destination let _ = BaseDataStream::tight_unmarshal_cached_object(parser, info, bs, payload)?; // TODO: more fields are not supported - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } fn loose_unmarshal<'a>( parser: &mut OpenWireLog, @@ -1101,7 +1123,7 @@ impl Unmarshal for ProducerInfo { // parse destination let _ = BaseDataStream::loose_unmarshal_cached_object(parser, info, payload)?; // TODO: more fields are not supported - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } } @@ -1117,9 +1139,9 @@ impl Unmarshal for BrokerInfo { // parse brokerId let payload = BaseDataStream::tight_unmarshal_cached_object(parser, info, bs, payload)?; // parse brokerURL - let (payload, broker_url) = BaseDataStream::tight_unmarshal_string(payload, bs)?; + let (_, broker_url) = BaseDataStream::tight_unmarshal_string(payload, bs)?; info.broker_url = Some(broker_url.to_string()); - Ok(payload) + Err(Error::OpenWireLogParseUnimplemented) } fn loose_unmarshal<'a>( parser: &mut OpenWireLog, @@ -1131,9 +1153,9 @@ impl Unmarshal for BrokerInfo { // parse brokerId let payload = BaseDataStream::loose_unmarshal_cached_object(parser, info, payload)?; // parse brokerURL - let (payload, broker_url) = BaseDataStream::loose_unmarshal_string(payload)?; + let (_, broker_url) = BaseDataStream::loose_unmarshal_string(payload)?; info.broker_url = Some(broker_url.to_string()); - Ok(payload) + Err(Error::OpenWireLogParseUnimplemented) } } @@ -1151,11 +1173,11 @@ impl Unmarshal for WireFormatInfo { match std::str::from_utf8(magic) { Ok(magic) => { if magic != "ActiveMQ" { - return Err(Error::OpenwireLogParseFailed); + return Err(Error::OpenWireLogParseFailed); } } _ => { - return Err(Error::OpenwireLogParseFailed); + return Err(Error::OpenWireLogParseFailed); } } @@ -1194,7 +1216,7 @@ impl Unmarshal for WireFormatInfo { let boolean = value != &0; info.is_tight_encoding_enabled = boolean; } - _ => return Err(Error::OpenwireLogParseFailed), + _ => return Err(Error::OpenWireLogParseEOF), } } @@ -1206,7 +1228,7 @@ impl Unmarshal for WireFormatInfo { let boolean = value != &0; info.is_size_prefix_disabled = boolean; } - _ => return Err(Error::OpenwireLogParseFailed), + _ => return Err(Error::OpenWireLogParseEOF), } } @@ -1218,7 +1240,7 @@ impl Unmarshal for WireFormatInfo { let boolean = value != &0; info.is_cache_enabled = boolean; } - _ => return Err(Error::OpenwireLogParseFailed), + _ => return Err(Error::OpenWireLogParseEOF), } } @@ -1408,6 +1430,73 @@ impl Unmarshal for ExceptionResponse { } } +impl Unmarshal for DataResponse { + fn tight_unmarshal<'a>( + parser: &mut OpenWireLog, + info: &mut OpenWireInfo, + bs: &mut BooleanStream, + payload: &'a [u8], + ) -> Result<&'a [u8]> { + // parse response + let payload = Response::tight_unmarshal(parser, info, bs, payload)?; + // parse data + let payload = BaseDataStream::tight_unmarshal_nested_object(parser, info, bs, payload)?; + Ok(payload) + } + fn loose_unmarshal<'a>( + parser: &mut OpenWireLog, + info: &mut OpenWireInfo, + payload: &'a [u8], + ) -> Result<&'a [u8]> { + // parse response + let payload = Response::loose_unmarshal(parser, info, payload)?; + // parse data + let payload = BaseDataStream::loose_unmarshal_nested_object(parser, info, payload)?; + Ok(payload) + } +} + +impl Unmarshal for DataArrayResponse { + fn tight_unmarshal<'a>( + parser: &mut OpenWireLog, + info: &mut OpenWireInfo, + bs: &mut BooleanStream, + payload: &'a [u8], + ) -> Result<&'a [u8]> { + // parse response + let payload = Response::tight_unmarshal(parser, info, bs, payload)?; + // parse data array + if bs.read_bool().ok_or(Error::OpenWireLogParseFailed)? { + let (mut payload, length) = parse_short(payload)?; + for _ in 0..length { + payload = BaseDataStream::tight_unmarshal_nested_object(parser, info, bs, payload)?; + } + Ok(payload) + } else { + Ok(payload) + } + } + fn loose_unmarshal<'a>( + parser: &mut OpenWireLog, + info: &mut OpenWireInfo, + payload: &'a [u8], + ) -> Result<&'a [u8]> { + // parse response + let payload = Response::loose_unmarshal(parser, info, payload)?; + // parse data array + let (payload, boolean) = parse_boolean(payload)?; + if boolean { + let (mut payload, length) = parse_short(payload)?; + for _ in 0..length { + payload = BaseDataStream::loose_unmarshal_nested_object(parser, info, payload)?; + } + Ok(payload) + } else { + Ok(payload) + } + } +} + macro_rules! impl_message_marshaller { ($($t:ty),+) => { $(impl Unmarshal for $t { @@ -1438,7 +1527,7 @@ macro_rules! impl_message_marshaller { // parse correlationId let (_, correlation_id) = BaseDataStream::tight_unmarshal_string(payload, bs)?; info.correlation_id = (!correlation_id.is_empty()).then(|| correlation_id.to_string()); - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } fn loose_unmarshal<'a>( parser: &mut OpenWireLog, @@ -1466,7 +1555,7 @@ macro_rules! impl_message_marshaller { // parse correlationId let (_, correlation_id) = BaseDataStream::loose_unmarshal_string(payload)?; info.correlation_id = (!correlation_id.is_empty()).then(|| correlation_id.to_string()); - Err(Error::OpenwireLogParseUnimplemented) + Err(Error::OpenWireLogParseUnimplemented) } })* } @@ -1593,6 +1682,20 @@ impl Default for OpenWireInfo { } impl OpenWireInfo { + fn get_msg_size(&self) -> Option { + match self.msg_type { + LogMessageType::Request => self.req_msg_size, + LogMessageType::Response => self.res_msg_size, + LogMessageType::Session => { + if self.direction == PacketDirection::ClientToServer { + self.req_msg_size + } else { + self.res_msg_size + } + } + _ => None, + } + } fn merge(&mut self, res: &Self) { if self.res_msg_size.is_none() { self.res_msg_size = res.res_msg_size; @@ -1710,6 +1813,8 @@ pub struct OpenWireLog { server_version: Option, version: u32, + client_next_skip_len: Option, + server_next_skip_len: Option, perf_stats: Option, } @@ -1724,6 +1829,8 @@ impl Default for OpenWireLog { client_version: None, server_version: None, version: DEFAULT_VERSION, + client_next_skip_len: None, + server_next_skip_len: None, perf_stats: None, } } @@ -1738,26 +1845,36 @@ impl L7ProtocolParserInterface for OpenWireLog { self.perf_stats = Some(L7PerfStats::default()) }; - let mut info = self.parse(payload, param)?; + let mut infos = self.parse(payload, param); - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param, None).map(|rtt| { - info.rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); - }); - } - - match param.direction { - PacketDirection::ClientToServer => { - self.perf_stats.as_mut().map(|p| p.inc_req()); + infos.iter_mut().for_each(|info| { + let info = match info { + L7ProtocolInfo::OpenWireInfo(info) => info, + _ => return, + }; + if info.msg_type != LogMessageType::Session { + info.cal_rrt(param, None).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); } - PacketDirection::ServerToClient => { - self.perf_stats.as_mut().map(|p| p.inc_resp()); + + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } } - } + }); - if param.parse_log { - Ok(L7ParseResult::Single(info.into())) + if !param.parse_log { + Ok(L7ParseResult::None) + } else if infos.len() == 1 { + Ok(L7ParseResult::Single(infos.pop().unwrap())) + } else if infos.len() > 1 { + Ok(L7ParseResult::Multi(infos)) } else { Ok(L7ParseResult::None) } @@ -1777,12 +1894,65 @@ impl L7ProtocolParserInterface for OpenWireLog { } impl OpenWireLog { - fn do_unmarshal(&mut self, mut payload: &[u8], param: &ParseParam) -> Result { - let mut info = OpenWireInfo::default(); + fn is_message_type_valid(command_type: OpenWireCommand) -> bool { + match command_type { + OpenWireCommand::WireFormatInfo + | OpenWireCommand::BrokerInfo + | OpenWireCommand::ConnectionInfo + | OpenWireCommand::SessionInfo + | OpenWireCommand::ConsumerInfo + | OpenWireCommand::ProducerInfo + | OpenWireCommand::TransactionInfo + | OpenWireCommand::DestinationInfo + | OpenWireCommand::RemoveSubscriptionInfo + | OpenWireCommand::KeepAliveInfo + | OpenWireCommand::ShutdownInfo + | OpenWireCommand::RemoveInfo + | OpenWireCommand::ControlCommand + | OpenWireCommand::FlushCommand + | OpenWireCommand::ConnectionError + | OpenWireCommand::ConsumerControl + | OpenWireCommand::ConnectionControl + | OpenWireCommand::ProducerAck + | OpenWireCommand::MessagePull + | OpenWireCommand::MessageDispatch + | OpenWireCommand::MessageAck + | OpenWireCommand::ActiveMQMessage + | OpenWireCommand::ActiveMQBytesMessage + | OpenWireCommand::ActiveMQMapMessage + | OpenWireCommand::ActiveMQObjectMessage + | OpenWireCommand::ActiveMQStreamMessage + | OpenWireCommand::ActiveMQTextMessage + | OpenWireCommand::ActiveMQBlobMessage + | OpenWireCommand::DiscoveryEvent + | OpenWireCommand::DurableSubscriptionInfo + | OpenWireCommand::PartialCommand + | OpenWireCommand::PartialLastCommand + | OpenWireCommand::Replay + | OpenWireCommand::MessageDispatchNotification + | OpenWireCommand::Response + | OpenWireCommand::ExceptionResponse + | OpenWireCommand::DataResponse + | OpenWireCommand::DataArrayResponse + | OpenWireCommand::IntegerResponse => true, + _ => false, + } + } + fn do_unmarshal<'a>( + &mut self, + info: &mut OpenWireInfo, + mut payload: &'a [u8], + param: &ParseParam, + ) -> Result<&'a [u8]> { + let original_len = payload.len(); let mut msg_size = payload.len(); if !self.is_size_prefix_disabled { let (updated_payload, length) = parse_integer(payload)?; msg_size = length as usize; + // message size is unlikely to be larger than 2^24 + if msg_size > 0xffffff { + return Err(Error::OpenWireLogParseFailed); + } payload = updated_payload; } // parse commandtype @@ -1840,34 +2010,39 @@ impl OpenWireLog { info.msg_type = LogMessageType::Response; info.res_msg_size = Some(msg_size as u32); } - _ => { - info.msg_type = LogMessageType::Other; - info.res_msg_size = Some(msg_size as u32); - } + _ => return Err(Error::OpenWireLogParseFailed), }; - if self.is_tight_encoding_enabled && command_type != OpenWireCommand::WireFormatInfo { - let (payload, mut bs) = BooleanStream::read_boolean_stream(payload) - .map_err(|_: Err>| Error::OpenwireLogParseFailed)?; - let data_marshaller = OpenWireCommandMarshaller::from(command_type); - let res = data_marshaller.tight_unmarshal_command(self, &mut info, &mut bs, payload); - // omit OpenwireLogParseUnimplemented error - match res { - Ok(_) | Err(Error::OpenwireLogParseUnimplemented) => Ok(info), - Err(_) => Err(Error::OpenwireLogParseFailed), + let p = if self.is_tight_encoding_enabled && command_type != OpenWireCommand::WireFormatInfo + { + let (payload, mut bs) = BooleanStream::read_boolean_stream(payload)?; + if !self.is_size_prefix_disabled && bs.bytes_consumed + 1 > msg_size { + return Err(Error::OpenWireLogParseFailed); } + let data_marshaller = OpenWireCommandMarshaller::from(command_type); + data_marshaller.tight_unmarshal_command(self, info, &mut bs, payload)? } else { - let res = OpenWireCommandMarshaller::from(command_type) - .loose_unmarshal_command(self, &mut info, payload); - // omit OpenwireLogParseUnimplemented error - match res { - Ok(_) | Err(Error::OpenwireLogParseUnimplemented) => Ok(info), - Err(_) => Err(Error::OpenwireLogParseFailed), - } + let data_marshaller = OpenWireCommandMarshaller::from(command_type); + data_marshaller.loose_unmarshal_command(self, info, payload)? + }; + // check if the payload length is valid + if !self.is_size_prefix_disabled && original_len - p.len() != msg_size + 4 { + return Err(Error::OpenWireLogParseFailed); } + Ok(p) } - fn parse(&mut self, payload: &[u8], param: &ParseParam) -> Result { - let mut info = self.do_unmarshal(payload, param)?; + fn parse_one<'a>( + &mut self, + info: &mut OpenWireInfo, + payload: &'a [u8], + param: &ParseParam, + ) -> Result<&'a [u8]> { + let result = match self.do_unmarshal(info, payload, param) { + Ok(p) => Ok(p), + Err(Error::OpenWireLogParseUnimplemented) => Err(Error::OpenWireLogParseUnimplemented), + Err(Error::OpenWireLogParseEOF) => Err(Error::OpenWireLogParseEOF), + Err(e) => return Err(e), + }; info.is_tls = param.is_tls(); info.direction = param.direction; // set status @@ -1941,7 +2116,82 @@ impl OpenWireLog { info.is_size_prefix_disabled = self.is_size_prefix_disabled; info.is_cache_enabled = self.is_cache_enabled; } - Ok(info) + result + } + fn parse(&mut self, mut payload: &[u8], param: &ParseParam) -> Vec { + let mut infos = Vec::new(); + let mut current_skip_len = if param.direction == PacketDirection::ClientToServer { + self.client_next_skip_len.take() + } else { + self.server_next_skip_len.take() + }; + let mut next_skip_len = None; + let mut first_byte_parse_failed = true; + while !payload.is_empty() { + let mut info = OpenWireInfo::default(); + match self.parse_one(&mut info, payload, param) { + Ok(p) => { + payload = p; + infos.push(L7ProtocolInfo::OpenWireInfo(info)); + } + Err(Error::OpenWireLogParseUnimplemented) => { + let msg_size = match info.get_msg_size() { + Some(msg_size) => 4 + msg_size as usize, + None => break, + }; + infos.push(L7ProtocolInfo::OpenWireInfo(info)); + if !self.is_size_prefix_disabled { + payload = match payload.get(msg_size..) { + Some(p) => p, + None => { + next_skip_len = Some(msg_size - payload.len()); + break; + } + }; + } else { + break; + } + } + Err(Error::OpenWireLogParseEOF) => { + let msg_size = match info.get_msg_size() { + Some(msg_size) => 4 + msg_size as usize, + None => break, + }; + infos.push(L7ProtocolInfo::OpenWireInfo(info)); + if !self.is_size_prefix_disabled { + next_skip_len = Some(msg_size - payload.len()); + } + break; + } + Err(Error::OpenWireLogParseFailed) => { + if first_byte_parse_failed { + first_byte_parse_failed = false; + // parse failed at the start of the payload, + // try to skip the cached next_skip_len + if infos.is_empty() { + if let Some(skip_len) = current_skip_len.take() { + match payload.get(skip_len..) { + Some(p) => { + payload = p; + continue; + } + // conservatively skip only one packet + None => break, + }; + } + } + } + payload = payload.get(1..).unwrap_or_default(); + } + _ => break, + } + } + if param.direction == PacketDirection::ClientToServer { + self.client_next_skip_len = next_skip_len; + } else { + self.server_next_skip_len = next_skip_len; + } + infos } fn check_protocol(payload: &[u8], param: &ParseParam) -> bool { if !param.ebpf_type.is_raw_protocol() { @@ -1952,8 +2202,9 @@ impl OpenWireLog { } // only parse the initial WIREFORMAT_INFO command to check the protocol let mut parser = Self::default(); - match parser.do_unmarshal(payload, param) { - Ok(res) => res.command_type == OpenWireCommand::WireFormatInfo, + let mut info = OpenWireInfo::default(); + match parser.do_unmarshal(&mut info, payload, param) { + Ok(_) => info.command_type == OpenWireCommand::WireFormatInfo, Err(_) => false, } } @@ -1978,7 +2229,7 @@ mod tests { const FILE_DIR: &str = "resources/test/flow_generator/openwire"; fn run(name: &str) -> String { - let capture = Capture::load_pcap(Path::new(FILE_DIR).join(name), Some(1024)); + let capture = Capture::load_pcap(Path::new(FILE_DIR).join(name), Some(1500)); let log_cache = Rc::new(RefCell::new(L7PerfCache::new(L7_RRT_CACHE_CAPACITY))); let mut packets = capture.as_meta_packets(); if packets.is_empty() { @@ -2021,11 +2272,18 @@ mod tests { param.set_log_parse_config(parse_config); let is_openwire = OpenWireLog::check_protocol(payload, param); - match openwire.parse(payload, param) { - Ok(info) => { - output.push_str(&format!("{:?} is_openwire: {}\n", info, is_openwire)); + let infos = openwire.parse(payload, param); + if infos.is_empty() { + output.push_str("This packet payload cannot be parsed\n"); + } else { + for info in infos { + match info { + L7ProtocolInfo::OpenWireInfo(info) => { + output.push_str(&format!("{:?} is_openwire: {}\n", info, is_openwire)); + } + _ => unreachable!(), + } } - Err(_) => unreachable!(), } } output @@ -2056,6 +2314,7 @@ mod tests { "openwire_loose_consumer.result", ), ("openwire_injected.pcap", "openwire_injected.result"), + ("openwire_segmented.pcap", "openwire_segmented.result"), ]; for item in files.iter() {