From 3acb17159a3d08773fca1034a239dec72e31a66f Mon Sep 17 00:00:00 2001 From: vdahiya12 <67608553+vdahiya12@users.noreply.github.com> Date: Wed, 7 Sep 2022 11:27:40 -0700 Subject: [PATCH] [ycable] cleanup logic for creating grpc future ready (#289) Signed-off-by: vaibhav-dahiya vdahiya@microsoft.com This PR attempts to remove this call channel_ready = grpc.channel_ready_future for the ycabled gRPC implementation. The reason to do this is incase of channel not being connected ycabled tries to reattempt the channel creation again with each RPC request from linkmgrd. This is not the right way to maintain the channels/stubs, and actually adds unrequired overhead for ycabled. This PR supports creating channel/stub in ycabled in correct manner. Description Motivation and Context Required to reduce CPU usage for ycabled How Has This Been Tested? Deploying the changes on Arista testbed and UT --- sonic-ycabled/tests/test_y_cable_helper.py | 12 +++- .../ycable/ycable_utilities/y_cable_helper.py | 70 +++++++++++++------ 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index 3f0d78ce2b0c..528acbffbe38 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -4962,10 +4962,18 @@ def test_setup_grpc_channel_for_port(self): with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util: patched_util.get_asic_id_for_logical_port.return_value = 0 - rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") + (channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") - assert(rc == (None, None)) + assert(stub == True) + assert(channel != None) + def test_connect_channel(self): + + with patch('grpc.channel_ready_future') as patched_util: + + patched_util.result.return_value = 0 + rc = connect_channel(patched_util, None, None) + assert(rc == None) def test_setup_grpc_channels(self): diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index 9cb099dfd832..ffe82915ee23 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -332,7 +332,6 @@ def wrapper(*args, **kwargs): return wrapper - def retry_setup_grpc_channel_for_port(port, asic_index): global grpc_port_stubs @@ -409,34 +408,61 @@ def get_grpc_credentials(type, kvp): return credential -def create_channel(type,level, kvp, soc_ip): +def connect_channel(channel, stub, port): + channel_ready = grpc.channel_ready_future(channel) retries = 3 + for _ in range(retries): + try: + channel_ready.result(timeout=2) + except grpc.FutureTimeoutError: + helper_logger.log_warning("gRPC port {} state changed to SHUTDOWN".format(port)) + else: + break - if type == "secure": - credential = get_grpc_credentials(level, kvp) - target_name = kvp.get("grpc_ssl_credential", None) - if credential is None or target_name is None: - return (None, None) +def create_channel(type, level, kvp, soc_ip, port): - GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) - channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) - else: - channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + #Helper callback to get an channel connectivity state + def wait_for_state_change(channel_connectivity): + if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE: + helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.CONNECTING: + helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.READY: + helper_logger.log_notice("gRPC port {} state changed to READY".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.IDLE: + helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN: + helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port)) - stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) - channel_ready = grpc.channel_ready_future(channel) + if type == "secure": + credential = get_grpc_credentials(level, kvp) + target_name = kvp.get("grpc_ssl_credential", None) + if credential is None or target_name is None: + return (None, None) - try: - channel_ready.result(timeout=2) - except grpc.FutureTimeoutError: - channel = None - stub = None - else: - break + GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) + + channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) + else: + channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + + + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) + + + if channel is not None: + channel.subscribe(wait_for_state_change) + + #connect_channel(channel, stub, port) + """ + Comment the connect channel call for now, since it is not required for normal gRPC I/O + and all use cases work without it. + TODO: check if this subroutine call can be ommitted for all use cases in future enhancements + """ return channel, stub @@ -490,12 +516,12 @@ def setup_grpc_channel_for_port(port, soc_ip): kvp = dict(fvs) - channel, stub = create_channel(type, level, kvp, soc_ip) + channel, stub = create_channel(type, level, kvp, soc_ip, port) if stub is None: helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) if channel is None: - helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port)) + helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) return channel, stub