Skip to content

Commit

Permalink
copy logging changes to eh
Browse files Browse the repository at this point in the history
  • Loading branch information
swathipil committed Feb 21, 2024
1 parent cd2cf68 commit 8294a6f
Show file tree
Hide file tree
Showing 23 changed files with 769 additions and 255 deletions.
2 changes: 2 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### Other Changes

- Updated network trace logging to include AMQP connection/session/link names for advanced usage.

## 5.11.6 (2024-02-12)

This version and all future versions will require Python 3.8+. Python 3.7 is no longer supported.
Expand Down
112 changes: 81 additions & 31 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def _set_state(self, new_state: ConnectionState) -> None:
previous_state = self.state
self.state = new_state
_LOGGER.info(
"Connection state changed: %r -> %r",
"[Connection:%s] Connection state changed: %r -> %r",
self._network_trace_params["amqpConnection"],
previous_state,
new_state,
extra=self._network_trace_params
new_state
)
for session in self._outgoing_endpoints.values():
session._on_connection_state_change() # pylint:disable=protected-access
Expand Down Expand Up @@ -321,7 +321,11 @@ def _send_frame(self, channel: int, frame: NamedTuple, **kwargs: Any) -> None:
except Exception: # pylint:disable=try-except-raise
raise
else:
_LOGGER.info("Cannot write frame in current state: %r", self.state, extra=self._network_trace_params)
_LOGGER.info(
"[Connection:%s] Cannot write frame in current state: %r",
self._network_trace_params["amqpConnection"],
self.state
)

def _get_next_outgoing_channel(self) -> int:
"""Get the next available outgoing channel number within the max channel limit.
Expand All @@ -338,7 +342,10 @@ def _get_next_outgoing_channel(self) -> int:
def _outgoing_empty(self) -> None:
"""Send an empty frame to prevent the connection from reaching an idle timeout."""
if self._network_trace:
_LOGGER.debug("-> EmptyFrame()", extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> EmptyFrame()",
self._network_trace_params["amqpConnection"]
)
if self._error:
raise self._error

Expand All @@ -359,7 +366,11 @@ def _outgoing_header(self)-> None:
"""Send the AMQP protocol header to initiate the connection."""
self._last_frame_sent_time = time.time()
if self._network_trace:
_LOGGER.debug("-> Header(%r)", HEADER_FRAME, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> Header(%r)",
self._network_trace_params["amqpConnection"],
HEADER_FRAME
)
self._transport.write(HEADER_FRAME)

def _incoming_header(self, _, frame: bytes) -> None:
Expand All @@ -369,7 +380,11 @@ def _incoming_header(self, _, frame: bytes) -> None:
:param bytes frame: The incoming frame.
"""
if self._network_trace:
_LOGGER.debug("<- Header(%r)", frame, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] <- Header(%r)",
self._network_trace_params["amqpConnection"],
frame,
)
if self.state == ConnectionState.START:
self._set_state(ConnectionState.HDR_RCVD)
elif self.state == ConnectionState.HDR_SENT:
Expand All @@ -392,7 +407,11 @@ def _outgoing_open(self) -> None:
properties=self._properties,
)
if self._network_trace:
_LOGGER.debug("-> %r", open_frame, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> %r",
self._network_trace_params["amqpConnection"],
open_frame
)
self._send_frame(0, open_frame)

def _incoming_open(self, channel: int, frame) -> None:
Expand All @@ -418,17 +437,27 @@ def _incoming_open(self, channel: int, frame) -> None:
"""
# TODO: Add type hints for full frame tuple contents.
if self._network_trace:
_LOGGER.debug("<- %r", OpenFrame(*frame), extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] <- %r",
self._network_trace_params["amqpConnection"],
OpenFrame(*frame)
)
if channel != 0:
_LOGGER.error("OPEN frame received on a channel that is not 0.", extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] OPEN frame received on a channel that is not 0.",
self._network_trace_params["amqpConnection"]
)
self.close(
error=AMQPError(
condition=ErrorCondition.NotAllowed, description="OPEN frame received on a channel that is not 0."
)
)
self._set_state(ConnectionState.END)
if self.state == ConnectionState.OPENED:
_LOGGER.error("OPEN frame received in the OPENED state.", extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] OPEN frame received in the OPENED state.",
self._network_trace_params["amqpConnection"]
)
self.close()
if frame[4]:
self._remote_idle_timeout = cast(float, frame[4] / 1000) # Convert to seconds
Expand All @@ -447,8 +476,8 @@ def _incoming_open(self, channel: int, frame) -> None:
)
)
_LOGGER.error(
"Failed parsing OPEN frame: Max frame size is less than supported minimum.",
extra=self._network_trace_params
"[Connection:%s] Failed parsing OPEN frame: Max frame size is less than supported minimum.",
self._network_trace_params["amqpConnection"]
)
return
self._remote_max_frame_size = frame[2]
Expand All @@ -466,15 +495,23 @@ def _incoming_open(self, channel: int, frame) -> None:
description=f"connection is an illegal state: {self.state}",
)
)
_LOGGER.error("Connection is an illegal state: %r", self.state, extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] Connection is an illegal state: %r",
self._network_trace_params["amqpConnection"],
self.state
)

def _outgoing_close(self, error: Optional[AMQPError] = None) -> None:
"""Send a Close frame to shutdown connection with optional error information.
:param ~pyamqp.error.AMQPError or None error: Optional error information.
"""
close_frame = CloseFrame(error=error)
if self._network_trace:
_LOGGER.debug("-> %r", close_frame, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> %r",
self._network_trace_params["amqpConnection"],
close_frame
)
self._send_frame(0, close_frame)

def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
Expand All @@ -488,7 +525,11 @@ def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
:param tuple frame: The incoming Close frame.
"""
if self._network_trace:
_LOGGER.debug("<- %r", CloseFrame(*frame), extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] <- %r",
self._network_trace_params["amqpConnection"],
CloseFrame(*frame),
)
disconnect_states = [
ConnectionState.HDR_RCVD,
ConnectionState.HDR_EXCH,
Expand All @@ -503,8 +544,8 @@ def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
close_error = None
if channel > self._channel_max:
_LOGGER.error(
"CLOSE frame received on a channel greated than support max.",
extra=self._network_trace_params
"[Connection:%s] CLOSE frame received on a channel greated than support max.",
self._network_trace_params["amqpConnection"]
)
close_error = AMQPError(condition=ErrorCondition.InvalidField, description="Invalid channel", info=None)

Expand All @@ -517,8 +558,9 @@ def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
condition=frame[0][0], description=frame[0][1], info=frame[0][2]
)
_LOGGER.error(
"Connection closed with error: %r", frame[0],
extra=self._network_trace_params
"[Connection:%s] Connection closed with error: %r",
self._network_trace_params["amqpConnection"],
frame[0],
)


Expand Down Expand Up @@ -577,8 +619,8 @@ def _incoming_end(self, channel: int, frame: Tuple[Any, ...]) -> None:
description="Invalid channel number received"
))
_LOGGER.error(
"END frame received on invalid channel. Closing connection.",
extra=self._network_trace_params
"[Connection:%s] END frame received on invalid channel. Closing connection.",
self._network_trace_params["amqpConnection"]
)
return

Expand Down Expand Up @@ -643,7 +685,11 @@ def _process_incoming_frame(self, channel: int, frame: Optional[Union[bytes, Tup
return True
if performative == 1:
return False
_LOGGER.error("Unrecognized incoming frame: %r", frame, extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] Unrecognized incoming frame: %r",
self._network_trace_params["amqpConnection"],
frame
)
return True
except KeyError:
return True # TODO: channel error
Expand Down Expand Up @@ -675,8 +721,8 @@ def _process_outgoing_frame(self, channel: int, frame) -> None:
cast(float, self._last_frame_received_time),
) or self._get_remote_timeout(now):
_LOGGER.info(
"No frame received for the idle timeout. Closing connection.",
extra=self._network_trace_params
"[Connection:%s] No frame received for the idle timeout. Closing connection.",
self._network_trace_params["amqpConnection"]
)
self.close(
error=AMQPError(
Expand Down Expand Up @@ -755,8 +801,8 @@ def listen(self, wait: Union[float, bool] = False, batch: int = 1, **kwargs: Any
now
):
_LOGGER.info(
"No frame received for the idle timeout. Closing connection.",
extra=self._network_trace_params
"[Connection:%s] No frame received for the idle timeout. Closing connection.",
self._network_trace_params["amqpConnection"]
)
self.close(
error=AMQPError(
Expand All @@ -777,9 +823,9 @@ def listen(self, wait: Union[float, bool] = False, batch: int = 1, **kwargs: Any
break
else:
_LOGGER.info(
"Connection cannot read frames in this state: %r",
self.state,
extra=self._network_trace_params
"[Connection:%s] Connection cannot read frames in this state: %r",
self._network_trace_params["amqpConnection"],
self.state
)
break
except (OSError, IOError, SSLError, socket.error) as exc:
Expand Down Expand Up @@ -906,7 +952,11 @@ def close(self, error: Optional[AMQPError] = None, wait: bool = False) -> None:
self._wait_for_response(wait, ConnectionState.END)
except Exception as exc: # pylint:disable=broad-except
# If error happened during closing, ignore the error and set state to END
_LOGGER.info("An error occurred when closing the connection: %r", exc, extra=self._network_trace_params)
_LOGGER.info(
"[Connection:%s] An error occurred when closing the connection: %r",
self._network_trace_params["amqpConnection"],
exc
)
self._set_state(ConnectionState.END)
finally:
self._disconnect()
48 changes: 38 additions & 10 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ def connect(self):
# has _not_ been sent
self.connected = True
except (OSError, IOError, SSLError) as e:
_LOGGER.info("Transport connection failed: %r", e, extra=self.network_trace_params)
_LOGGER.info(
"[Connection:%s, Session:%s, Link:%s] Transport connection failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
e
)
# if not fully connected, close socket, and reraise error
if self.sock and not self.connected:
self.sock.close()
Expand Down Expand Up @@ -396,9 +402,11 @@ def close(self):
# TODO: shutdown could raise OSError, Transport endpoint is not connected if the endpoint is already
# disconnected. can we safely ignore the errors since the close operation is initiated by us.
_LOGGER.debug(
"Transport endpoint is already disconnected: %r",
exc,
extra=self.network_trace_params
"[Connection:%s, Session:%s, Link:%s] Transport endpoint is already disconnected: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
exc
)
self.sock.close()
self.sock = None
Expand All @@ -421,10 +429,12 @@ def read(self, verify_frame_type=0):
frame_type = frame_header[5]
if verify_frame_type is not None and frame_type != verify_frame_type:
_LOGGER.debug(
"Received invalid frame type: %r, expected: %r",
"[Connection:%s, Session:%s, Link:%s] Received invalid frame type: %r, expected: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
frame_type,
verify_frame_type,
extra=self.network_trace_params
verify_frame_type
)
raise ValueError(
f"Received invalid frame type: {frame_type}, expected: {verify_frame_type}"
Expand Down Expand Up @@ -453,7 +463,13 @@ def read(self, verify_frame_type=0):
raise socket.timeout()
if get_errno(exc) not in _UNAVAIL:
self.connected = False
_LOGGER.debug("Transport read failed: %r", exc, extra=self.network_trace_params)
_LOGGER.debug(
"[Connection:%s, Session:%s, Link:%s] Transport read failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
exc
)
raise
offset -= 2
return frame_header, channel, payload[offset:]
Expand All @@ -465,7 +481,13 @@ def write(self, s):
except socket.timeout:
raise
except (OSError, IOError, socket.error) as exc:
_LOGGER.debug("Transport write failed: %r", exc, extra=self.network_trace_params)
_LOGGER.debug(
"[Connection:%s, Session:%s, Link:%s] Transport write failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
exc
)
if get_errno(exc) not in _UNAVAIL:
self.connected = False
raise
Expand Down Expand Up @@ -753,7 +775,13 @@ def connect(self):
self.close()
raise ConnectionError("Websocket failed to establish connection: %r" % exc) from exc
except (OSError, IOError, SSLError) as e:
_LOGGER.info("Websocket connection failed: %r", e, extra=self.network_trace_params)
_LOGGER.info(
"[Connection:%s, Session:%s, Link:%s] Websocket connection failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
e
)
self.close()
raise

Expand Down
Loading

0 comments on commit 8294a6f

Please sign in to comment.