From 5f5de701d037ed84ac109a80c1947a3cff35ba68 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Mon, 25 Aug 2025 10:17:45 -0500 Subject: [PATCH 01/14] initial commit for message settlement support --- .../extensions/bindings/protos/__init__.py | 0 .../bindings/protos/settlement.proto | 99 ++++ .../bindings/protos/settlement_pb2.py | 62 +++ .../bindings/protos/settlement_pb2_grpc.py | 454 ++++++++++++++++++ .../bindings/servicebus/grpcClient.py | 60 +++ .../bindings/servicebus/grpc_utils.py | 58 +++ .../servicebus/serviceBusMessageActions.py | 105 ++++ 7 files changed, 838 insertions(+) create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement.proto create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2.py create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement.proto b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement.proto new file mode 100644 index 0000000..34f9a03 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement.proto @@ -0,0 +1,99 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/wrappers.proto"; +import "google/protobuf/timestamp.proto"; + +// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic +option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc"; + +// The settlement service definition. +service Settlement { + // Completes a message + rpc Complete (CompleteRequest) returns (google.protobuf.Empty) {} + + // Abandons a message + rpc Abandon (AbandonRequest) returns (google.protobuf.Empty) {} + + // Deadletters a message + rpc Deadletter (DeadletterRequest) returns (google.protobuf.Empty) {} + + // Defers a message + rpc Defer (DeferRequest) returns (google.protobuf.Empty) {} + + // Renew message lock + rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {} + + // Get session state + rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {} + + // Set session state + rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {} + + // Release session + rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {} + + // Renew session lock + rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {} +} + +// The complete message request containing the locktoken. +message CompleteRequest { + string locktoken = 1; +} + +// The abandon message request containing the locktoken and properties to modify. +message AbandonRequest { + string locktoken = 1; + bytes propertiesToModify = 2; +} + +// The deadletter message request containing the locktoken and properties to modify along with the reason/description. +message DeadletterRequest { + string locktoken = 1; + bytes propertiesToModify = 2; + google.protobuf.StringValue deadletterReason = 3; + google.protobuf.StringValue deadletterErrorDescription = 4; +} + +// The defer message request containing the locktoken and properties to modify. +message DeferRequest { + string locktoken = 1; + bytes propertiesToModify = 2; +} + +// The renew message lock request containing the locktoken. +message RenewMessageLockRequest { + string locktoken = 1; +} + +// The get message request. +message GetSessionStateRequest { + string sessionId = 1; +} + +// The set message request. +message SetSessionStateRequest { + string sessionId = 1; + bytes sessionState = 2; +} + +// Get response containing the session state. +message GetSessionStateResponse { + bytes sessionState = 1; +} + +// Release session. +message ReleaseSessionRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockResponse { + google.protobuf.Timestamp lockedUntil = 1; +} \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2.py new file mode 100644 index 0000000..94fd154 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: azurefunctions/extensions/bindings/protos/settlement.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'azurefunctions/extensions/bindings/protos/settlement.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n:azurefunctions/extensions/bindings/protos/settlement.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"$\n\x0f\x43ompleteRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\"?\n\x0e\x41\x62\x61ndonRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\x12\x1a\n\x12propertiesToModify\x18\x02 \x01(\x0c\"\xbc\x01\n\x11\x44\x65\x61\x64letterRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\x12\x1a\n\x12propertiesToModify\x18\x02 \x01(\x0c\x12\x36\n\x10\x64\x65\x61\x64letterReason\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12@\n\x1a\x64\x65\x61\x64letterErrorDescription\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"=\n\x0c\x44\x65\x66\x65rRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\x12\x1a\n\x12propertiesToModify\x18\x02 \x01(\x0c\",\n\x17RenewMessageLockRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\"+\n\x16GetSessionStateRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\"A\n\x16SetSessionStateRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\x12\x14\n\x0csessionState\x18\x02 \x01(\x0c\"/\n\x17GetSessionStateResponse\x12\x14\n\x0csessionState\x18\x01 \x01(\x0c\"*\n\x15ReleaseSessionRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\",\n\x17RenewSessionLockRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\"K\n\x18RenewSessionLockResponse\x12/\n\x0blockedUntil\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp2\xcd\x04\n\nSettlement\x12\x36\n\x08\x43omplete\x12\x10.CompleteRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x34\n\x07\x41\x62\x61ndon\x12\x0f.AbandonRequest\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nDeadletter\x12\x12.DeadletterRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x30\n\x05\x44\x65\x66\x65r\x12\r.DeferRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x46\n\x10RenewMessageLock\x12\x18.RenewMessageLockRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x46\n\x0fGetSessionState\x12\x17.GetSessionStateRequest\x1a\x18.GetSessionStateResponse\"\x00\x12\x44\n\x0fSetSessionState\x12\x17.SetSessionStateRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x42\n\x0eReleaseSession\x12\x16.ReleaseSessionRequest\x1a\x16.google.protobuf.Empty\"\x00\x12I\n\x10RenewSessionLock\x12\x18.RenewSessionLockRequest\x1a\x19.RenewSessionLockResponse\"\x00\x42\"\xaa\x02\x1fMicrosoft.Azure.ServiceBus.Grpcb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'azurefunctions.extensions.bindings.protos.settlement_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'\252\002\037Microsoft.Azure.ServiceBus.Grpc' + _globals['_COMPLETEREQUEST']._serialized_start=156 + _globals['_COMPLETEREQUEST']._serialized_end=192 + _globals['_ABANDONREQUEST']._serialized_start=194 + _globals['_ABANDONREQUEST']._serialized_end=257 + _globals['_DEADLETTERREQUEST']._serialized_start=260 + _globals['_DEADLETTERREQUEST']._serialized_end=448 + _globals['_DEFERREQUEST']._serialized_start=450 + _globals['_DEFERREQUEST']._serialized_end=511 + _globals['_RENEWMESSAGELOCKREQUEST']._serialized_start=513 + _globals['_RENEWMESSAGELOCKREQUEST']._serialized_end=557 + _globals['_GETSESSIONSTATEREQUEST']._serialized_start=559 + _globals['_GETSESSIONSTATEREQUEST']._serialized_end=602 + _globals['_SETSESSIONSTATEREQUEST']._serialized_start=604 + _globals['_SETSESSIONSTATEREQUEST']._serialized_end=669 + _globals['_GETSESSIONSTATERESPONSE']._serialized_start=671 + _globals['_GETSESSIONSTATERESPONSE']._serialized_end=718 + _globals['_RELEASESESSIONREQUEST']._serialized_start=720 + _globals['_RELEASESESSIONREQUEST']._serialized_end=762 + _globals['_RENEWSESSIONLOCKREQUEST']._serialized_start=764 + _globals['_RENEWSESSIONLOCKREQUEST']._serialized_end=808 + _globals['_RENEWSESSIONLOCKRESPONSE']._serialized_start=810 + _globals['_RENEWSESSIONLOCKRESPONSE']._serialized_end=885 + _globals['_SETTLEMENT']._serialized_start=888 + _globals['_SETTLEMENT']._serialized_end=1477 +# @@protoc_insertion_point(module_scope) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py new file mode 100644 index 0000000..0dc374b --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py @@ -0,0 +1,454 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from azurefunctions.extensions.bindings.protos import settlement_pb2 as azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2 +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + +GRPC_GENERATED_VERSION = '1.74.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class SettlementStub(object): + """The settlement service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Complete = channel.unary_unary( + '/Settlement/Complete', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.CompleteRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.Abandon = channel.unary_unary( + '/Settlement/Abandon', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.AbandonRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.Deadletter = channel.unary_unary( + '/Settlement/Deadletter', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeadletterRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.Defer = channel.unary_unary( + '/Settlement/Defer', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeferRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.RenewMessageLock = channel.unary_unary( + '/Settlement/RenewMessageLock', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewMessageLockRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.GetSessionState = channel.unary_unary( + '/Settlement/GetSessionState', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateRequest.SerializeToString, + response_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateResponse.FromString, + _registered_method=True) + self.SetSessionState = channel.unary_unary( + '/Settlement/SetSessionState', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.SetSessionStateRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.ReleaseSession = channel.unary_unary( + '/Settlement/ReleaseSession', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.ReleaseSessionRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.RenewSessionLock = channel.unary_unary( + '/Settlement/RenewSessionLock', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockRequest.SerializeToString, + response_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockResponse.FromString, + _registered_method=True) + + +class SettlementServicer(object): + """The settlement service definition. + """ + + def Complete(self, request, context): + """Completes a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Abandon(self, request, context): + """Abandons a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Deadletter(self, request, context): + """Deadletters a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Defer(self, request, context): + """Defers a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def RenewMessageLock(self, request, context): + """Renew message lock + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetSessionState(self, request, context): + """Get session state + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SetSessionState(self, request, context): + """Set session state + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReleaseSession(self, request, context): + """Release session + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def RenewSessionLock(self, request, context): + """Renew session lock + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_SettlementServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Complete': grpc.unary_unary_rpc_method_handler( + servicer.Complete, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.CompleteRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'Abandon': grpc.unary_unary_rpc_method_handler( + servicer.Abandon, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.AbandonRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'Deadletter': grpc.unary_unary_rpc_method_handler( + servicer.Deadletter, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeadletterRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'Defer': grpc.unary_unary_rpc_method_handler( + servicer.Defer, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeferRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'RenewMessageLock': grpc.unary_unary_rpc_method_handler( + servicer.RenewMessageLock, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewMessageLockRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'GetSessionState': grpc.unary_unary_rpc_method_handler( + servicer.GetSessionState, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateRequest.FromString, + response_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateResponse.SerializeToString, + ), + 'SetSessionState': grpc.unary_unary_rpc_method_handler( + servicer.SetSessionState, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.SetSessionStateRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'ReleaseSession': grpc.unary_unary_rpc_method_handler( + servicer.ReleaseSession, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.ReleaseSessionRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'RenewSessionLock': grpc.unary_unary_rpc_method_handler( + servicer.RenewSessionLock, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockRequest.FromString, + response_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'Settlement', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('Settlement', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class Settlement(object): + """The settlement service definition. + """ + + @staticmethod + def Complete(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Complete', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.CompleteRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Abandon(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Abandon', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.AbandonRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Deadletter(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Deadletter', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeadletterRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Defer(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Defer', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeferRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def RenewMessageLock(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/RenewMessageLock', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewMessageLockRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetSessionState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/GetSessionState', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateRequest.SerializeToString, + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def SetSessionState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/SetSessionState', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.SetSessionStateRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def ReleaseSession(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/ReleaseSession', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.ReleaseSessionRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def RenewSessionLock(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/RenewSessionLock', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockRequest.SerializeToString, + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py new file mode 100644 index 0000000..d5893ca --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py @@ -0,0 +1,60 @@ +# Copyright (c) .NET Foundation. All rights reserved. +# Licensed under the MIT License. + +import grpc +from typing import Any, Type + + +class GrpcClientFactory: + """ + Factory class for creating gRPC clients from generated Python stubs. + + Python requires `.proto` files to be compiled into + `_pb2.py` and `_pb2_grpc.py` modules before use. This factory assumes + those files are already generated and importable. + + Example: + from my_service_pb2_grpc import MyServiceStub + + client = GrpcClientFactory.create_client( + service_stub=MyServiceStub, + address="localhost:50051", + grpc_max_message_length=4 * 1024 * 1024, # 4 MB + secure=False, + ) + """ + + @staticmethod + def create_client( + service_stub: Type[Any], + address: str, + grpc_max_message_length: int = 4 * 1024 * 1024, + secure: bool = False, + root_certificates: bytes | None = None, + ) -> Any: + """ + Creates and returns a gRPC client for the given service stub. + + Args: + service_stub: The generated service stub class (e.g. `MyServiceStub`). + address: The server address (e.g., "localhost:50051"). + grpc_max_message_length: Max message size for send/receive. + secure: If True, use a secure channel; otherwise, insecure. + root_certificates: Optional root certificates for TLS. + + Returns: + An instance of the gRPC client stub. + """ + + options = [ + ("grpc.max_send_message_length", grpc_max_message_length), + ("grpc.max_receive_message_length", grpc_max_message_length), + ] + + if secure: + credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates) + channel = grpc.secure_channel(address, credentials, options=options) + else: + channel = grpc.insecure_channel(address, options=options) + + return service_stub(channel) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py new file mode 100644 index 0000000..9528fb2 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py @@ -0,0 +1,58 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +# Copyright (c) .NET Foundation. All rights reserved. +# Licensed under the MIT License. + +import argparse +from typing import Tuple, List + + +class ArgumentError(Exception): + """Custom exception for missing or invalid arguments.""" + pass + + +def build_grpc_uri(argv: List[str] | None = None) -> Tuple[str, int]: + """ + Builds a gRPC URI and retrieves the max message length from CLI args. + + Expected CLI arguments: + --host HOST + --port PORT + --functions-grpc-max-message-length LENGTH + + Args: + argv: Optional list of CLI arguments (defaults to sys.argv[1:]). + + Returns: + (uri, grpc_max_message_length) + + Raises: + ArgumentError if required arguments are missing or invalid. + """ + parser = argparse.ArgumentParser(add_help=False) + + parser.add_argument("--host", help="gRPC server host") + parser.add_argument("--port", help="gRPC server port") + parser.add_argument( + "--functions-grpc-max-message-length", + type=int, + help="Maximum gRPC message size in bytes", + ) + + args, _ = parser.parse_known_args(argv) + + missing = [] + if not args.host: + missing.append("'host'") + if not args.port: + missing.append("'port'") + if not args.functions_grpc_max_message_length: + missing.append("'functions-grpc-max-message-length'") + + if missing: + raise ArgumentError(f"Missing required arguments: {', '.join(missing)}") + + uri = f"{args.host}:{args.port}" + return uri, args.functions_grpc_max_message_length diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py new file mode 100644 index 0000000..0ec2ea9 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -0,0 +1,105 @@ +# Copyright (c) .NET Foundation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional +from protos.settlement_pb2 import ( + AbandonRequest, + CompleteRequest, + DeadletterRequest, + DeferRequest, + ReleaseSessionRequest, + RenewMessageLockRequest, + RenewSessionLockRequest, + SetSessionStateRequest, +) +from protos.settlement_pb2_grpc import SettlementStub + +from .grpcClient import GrpcClientFactory +from .grpc_utils import build_grpc_uri + + +class ServiceBusMessageActions: + """ + ServiceBusMessageActions class. + Provides async methods for message settlement over gRPC. + Implements a singleton pattern. + """ + + _instance: Optional["ServiceBusMessageActions"] = None + + def __init__(self) -> None: + uri, grpc_max_message_length = build_grpc_uri() + + self._client: SettlementStub = GrpcClientFactory.create_client( + service_stub=SettlementStub, + address=uri, + grpc_max_message_length=grpc_max_message_length, + secure=False, + ) + + @classmethod + def get_instance(cls) -> "ServiceBusMessageActions": + if cls._instance is None: + cls._instance = ServiceBusMessageActions() + return cls._instance + + def _validate_lock_token(self, message) -> str: + locktoken = message.lock_token + if not locktoken: + raise ValueError("lockToken is required in ServiceBusReceivedMessage.") + return locktoken + + # ------------------------------- + # Settlement methods + # ------------------------------- + + async def complete(self, message) -> None: + locktoken = self._validate_lock_token(message) + request = CompleteRequest(locktoken=locktoken) + await self._client.Complete(request) + + async def abandon(self, message, properties_to_modify: bytes = b"") -> None: + locktoken = self._validate_lock_token(message) + request = AbandonRequest(locktoken=locktoken, propertiesToModify=properties_to_modify) + await self._client.Abandon(request) + + async def deadletter( + self, + message, + properties_to_modify: bytes = b"", + deadletter_reason: Optional[str] = None, + deadletter_error_description: Optional[str] = None, + ) -> None: + locktoken = self._validate_lock_token(message) + request = DeadletterRequest( + locktoken=locktoken, + propertiesToModify=properties_to_modify, + deadletterReason=deadletter_reason or "", + deadletterErrorDescription=deadletter_error_description or "", + ) + await self._client.Deadletter(request) + + async def defer(self, message, properties_to_modify: bytes = b"") -> None: + locktoken = self._validate_lock_token(message) + request = DeferRequest(locktoken=locktoken, propertiesToModify=properties_to_modify) + await self._client.Defer(request) + + async def renew_message_lock(self, message) -> None: + locktoken = self._validate_lock_token(message) + request = RenewMessageLockRequest(locktoken=locktoken) + await self._client.RenewMessageLock(request) + + async def set_session_state(self, session_id: str, session_state: bytes) -> None: + request = SetSessionStateRequest(sessionId=session_id, sessionState=session_state) + await self._client.SetSessionState(request) + + async def release_session(self, session_id: str) -> None: + request = ReleaseSessionRequest(sessionId=session_id) + await self._client.ReleaseSession(request) + + async def renew_session_lock(self, session_id: str) -> str: + request = RenewSessionLockRequest(sessionId=session_id) + response = await self._client.RenewSessionLock(request) + if not response or not response.lockedUntil: + raise RuntimeError("No response or lockedUntil returned from renewSessionLock") + return response.lockedUntil From 9248b33e31c41558badaf60fc62d2af94e7a3d01 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Mon, 8 Sep 2025 11:41:28 -0500 Subject: [PATCH 02/14] base changes + client converter class --- .../extensions/base/__init__.py | 2 ++ .../extensions/base/grpcClientType.py | 9 +++++++++ .../azurefunctions/extensions/base/meta.py | 12 ++++++++++- .../extensions/bindings/protos/__init__.py | 2 ++ .../bindings/servicebus/__init__.py | 4 ++++ .../servicebus/serviceBusClientConverter.py | 20 +++++++++++++++++++ .../servicebus/serviceBusMessageActions.py | 9 ++++++--- .../pyproject.toml | 5 ++++- 8 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py create mode 100644 azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py index cf1eb24..c9b28e4 100644 --- a/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from .grpcClientType import GrpcClientType from .meta import ( Datum, InConverter, @@ -27,6 +28,7 @@ "_BaseConverter", "InConverter", "OutConverter", + "GrpcClientType", "SdkType", "get_binding_registry", "ModuleTrackerMeta", diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py new file mode 100644 index 0000000..e476e1f --- /dev/null +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py @@ -0,0 +1,9 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import abstractmethod + + +class GrpcClientType: + def __init__(self, *, data: dict = None): + self._data = data or {} diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py index a771a9f..516254f 100644 --- a/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py @@ -6,7 +6,7 @@ import json from typing import Any, Dict, Mapping, Optional, Tuple, Union, get_args, get_origin -from . import sdkType, utils +from . import grpcClientType, sdkType, utils class Datum: @@ -99,6 +99,16 @@ def check_supported_type(cls, annotation: type) -> bool: # An iterable who only has one inner type and is a subclass of SdkType return cls._is_iterable_supported_type(annotation) + @classmethod + def check_grpc_client_type(cls, annotation: type) -> bool: + if annotation is None: + return False + + # The annotation is a class/type (not an object) - not iterable + if (isinstance(annotation, type) + and issubclass(annotation, grpcClientType.GrpcClientType)): + return True + @classmethod def _is_iterable_supported_type(cls, annotation: type) -> bool: # Check base type from type hint. Ex: List from List[SdkType] diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py index e69de29..edd79eb 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py index 99a4710..a6e25bb 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py @@ -1,12 +1,16 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from .serviceBusMessageActions import ServiceBusMessageActions from .serviceBusReceivedMessage import ServiceBusReceivedMessage from .serviceBusConverter import ServiceBusConverter +from .serviceBusClientConverter import ServiceBusClientConverter __all__ = [ "ServiceBusReceivedMessage", "ServiceBusConverter", + "ServiceBusMessageActions", + "ServiceBusClientConverter", ] __version__ = '1.0.0b1' diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py new file mode 100644 index 0000000..f5a8037 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any + +from azurefunctions.extensions.base import Datum, InConverter +from .serviceBusMessageActions import ServiceBusMessageActions + + +class ServiceBusClientConverter( + InConverter, + binding='serviceBusClient' +): + + @classmethod + def get_client(cls) -> Any: + """ + TODO: comments + """ + return ServiceBusMessageActions.get_instance() diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py index 0ec2ea9..e9c4095 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -2,7 +2,10 @@ # Licensed under the MIT License. from typing import Optional -from protos.settlement_pb2 import ( + +from azurefunctions.extensions.base import GrpcClientType + +from ..protos.settlement_pb2 import ( AbandonRequest, CompleteRequest, DeadletterRequest, @@ -12,13 +15,13 @@ RenewSessionLockRequest, SetSessionStateRequest, ) -from protos.settlement_pb2_grpc import SettlementStub +from ..protos.settlement_pb2_grpc import SettlementStub from .grpcClient import GrpcClientFactory from .grpc_utils import build_grpc_uri -class ServiceBusMessageActions: +class ServiceBusMessageActions(GrpcClientType): """ ServiceBusMessageActions class. Provides async methods for message settlement over gRPC. diff --git a/azurefunctions-extensions-bindings-servicebus/pyproject.toml b/azurefunctions-extensions-bindings-servicebus/pyproject.toml index 54af6f6..f351cba 100644 --- a/azurefunctions-extensions-bindings-servicebus/pyproject.toml +++ b/azurefunctions-extensions-bindings-servicebus/pyproject.toml @@ -28,7 +28,10 @@ classifiers= [ dependencies = [ 'azurefunctions-extensions-base', 'azure-servicebus>=7.14.2,<8.0', - 'uamqp>=1.6.11,<2.0' + 'uamqp>=1.6.11,<2.0', + 'protobuf>=6.32.0,<7.0', + 'grpcio>=1.74.0,<2.0', + 'grpcio-tools>=1.74.0,<2.0' ] [project.optional-dependencies] From 42f983b0c79e0fc9dad55b7a1e1c280ee8d604c1 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Tue, 9 Sep 2025 16:02:22 -0500 Subject: [PATCH 03/14] update method name --- .../azurefunctions/extensions/base/meta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py index 516254f..98e60a3 100644 --- a/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py @@ -100,7 +100,7 @@ def check_supported_type(cls, annotation: type) -> bool: return cls._is_iterable_supported_type(annotation) @classmethod - def check_grpc_client_type(cls, annotation: type) -> bool: + def check_supported_grpc_client_type(cls, annotation: type) -> bool: if annotation is None: return False From a138fccf7f23b27f6fef41d47a066f6de5ddcfb1 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Thu, 11 Sep 2025 13:37:09 -0500 Subject: [PATCH 04/14] flake + unit tests --- .flake8 | 3 +- .../extensions/base/grpcClientType.py | 2 - .../azurefunctions/extensions/base/meta.py | 2 +- .../bindings/servicebus/grpcClient.py | 3 +- .../bindings/servicebus/grpc_utils.py | 3 - .../servicebus/serviceBusClientConverter.py | 4 +- .../servicebus/serviceBusMessageActions.py | 123 ++++++++++------- .../extensions/bindings/servicebus/utils.py | 3 +- .../function_app.py | 41 ++++++ .../servicebus_samples_settlement/host.json | 15 +++ .../local.settings.json | 11 ++ .../requirements.txt | 6 + .../tests/test_grpc_client.py | 116 ++++++++++++++++ .../tests/test_servicebus_message_actions.py | 125 ++++++++++++++++++ 14 files changed, 401 insertions(+), 56 deletions(-) create mode 100644 azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py create mode 100644 azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/host.json create mode 100644 azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json create mode 100644 azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/requirements.txt create mode 100644 azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py create mode 100644 azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py diff --git a/.flake8 b/.flake8 index 4bb29cf..b8f9c95 100644 --- a/.flake8 +++ b/.flake8 @@ -6,6 +6,7 @@ ignore = W503,E402,E731 exclude = .git, __pycache__, build, dist, .eggs, .github, .local, docs/, - Samples, .env*, .vscode, venv*, *.venv* + Samples, .env*, .vscode, venv*, *.venv*, + azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos, max-line-length = 88 diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py index e476e1f..03f851f 100644 --- a/azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/grpcClientType.py @@ -1,8 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from abc import abstractmethod - class GrpcClientType: def __init__(self, *, data: dict = None): diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py index 98e60a3..8bee1a4 100644 --- a/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/meta.py @@ -108,7 +108,7 @@ def check_supported_grpc_client_type(cls, annotation: type) -> bool: if (isinstance(annotation, type) and issubclass(annotation, grpcClientType.GrpcClientType)): return True - + @classmethod def _is_iterable_supported_type(cls, annotation: type) -> bool: # Check base type from type hint. Ex: List from List[SdkType] diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py index d5893ca..f44c99f 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py @@ -52,7 +52,8 @@ def create_client( ] if secure: - credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates) + credentials = grpc.ssl_channel_credentials( + root_certificates=root_certificates) channel = grpc.secure_channel(address, credentials, options=options) else: channel = grpc.insecure_channel(address, options=options) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py index 9528fb2..474a807 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py @@ -1,9 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -# Copyright (c) .NET Foundation. All rights reserved. -# Licensed under the MIT License. - import argparse from typing import Tuple, List diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py index f5a8037..db9e90a 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py @@ -3,7 +3,7 @@ from typing import Any -from azurefunctions.extensions.base import Datum, InConverter +from azurefunctions.extensions.base import InConverter from .serviceBusMessageActions import ServiceBusMessageActions @@ -15,6 +15,6 @@ class ServiceBusClientConverter( @classmethod def get_client(cls) -> Any: """ - TODO: comments + Returns an instance of ServiceBusMessageActions. """ return ServiceBusMessageActions.get_instance() diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py index e9c4095..6d8fa4d 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -4,6 +4,7 @@ from typing import Optional from azurefunctions.extensions.base import GrpcClientType +from google.protobuf.wrappers_pb2 import StringValue from ..protos.settlement_pb2 import ( AbandonRequest, @@ -31,12 +32,12 @@ class ServiceBusMessageActions(GrpcClientType): _instance: Optional["ServiceBusMessageActions"] = None def __init__(self) -> None: - uri, grpc_max_message_length = build_grpc_uri() + self._uri, self._grpc_max_message_length = build_grpc_uri() self._client: SettlementStub = GrpcClientFactory.create_client( service_stub=SettlementStub, - address=uri, - grpc_max_message_length=grpc_max_message_length, + address=self._uri, + grpc_max_message_length=self._grpc_max_message_length, secure=False, ) @@ -56,53 +57,85 @@ def _validate_lock_token(self, message) -> str: # Settlement methods # ------------------------------- - async def complete(self, message) -> None: + def complete(self, + message + ) -> None: locktoken = self._validate_lock_token(message) - request = CompleteRequest(locktoken=locktoken) - await self._client.Complete(request) - - async def abandon(self, message, properties_to_modify: bytes = b"") -> None: + request = CompleteRequest() + request.locktoken = str(locktoken) + self._client.Complete(request) + + def abandon(self, + message, + properties_to_modify: bytes = b"" + ) -> None: locktoken = self._validate_lock_token(message) - request = AbandonRequest(locktoken=locktoken, propertiesToModify=properties_to_modify) - await self._client.Abandon(request) - - async def deadletter( - self, - message, - properties_to_modify: bytes = b"", - deadletter_reason: Optional[str] = None, - deadletter_error_description: Optional[str] = None, - ) -> None: + request = AbandonRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = properties_to_modify + self._client.Abandon(request) + + def deadletter(self, + message, + properties_to_modify: bytes = b"", + deadletter_reason: Optional[str] = None, + deadletter_error_description: Optional[str] = None, + ) -> None: locktoken = self._validate_lock_token(message) - request = DeadletterRequest( - locktoken=locktoken, - propertiesToModify=properties_to_modify, - deadletterReason=deadletter_reason or "", - deadletterErrorDescription=deadletter_error_description or "", - ) - await self._client.Deadletter(request) - - async def defer(self, message, properties_to_modify: bytes = b"") -> None: + request = DeadletterRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = properties_to_modify + + if deadletter_reason: + request.deadletterReason.CopyFrom(StringValue(value=deadletter_reason)) + + if deadletter_error_description: + request.deadletterErrorDescription.CopyFrom( + StringValue(value=deadletter_error_description)) + self._client.Deadletter(request) + + def defer(self, + message, + properties_to_modify: bytes = b"" + ) -> None: locktoken = self._validate_lock_token(message) - request = DeferRequest(locktoken=locktoken, propertiesToModify=properties_to_modify) - await self._client.Defer(request) - - async def renew_message_lock(self, message) -> None: + request = DeferRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = properties_to_modify + self._client.Defer(request) + + def renew_message_lock(self, + message + ) -> None: locktoken = self._validate_lock_token(message) - request = RenewMessageLockRequest(locktoken=locktoken) - await self._client.RenewMessageLock(request) + request = RenewMessageLockRequest() + request.locktoken = str(locktoken) + self._client.RenewMessageLock(request) + + def set_session_state(self, + session_id: str, + session_state: bytes + ) -> None: + request = SetSessionStateRequest() + request.sessionId = session_id + request.sessionState = session_state + self._client.SetSessionState(request) + + def release_session(self, + session_id: str + ) -> None: + request = ReleaseSessionRequest() + request.sessionId = session_id + self._client.ReleaseSession(request) + + def renew_session_lock(self, + session_id: str): + request = RenewSessionLockRequest() + request.sessionId = session_id + response = self._client.RenewSessionLock(request) - async def set_session_state(self, session_id: str, session_state: bytes) -> None: - request = SetSessionStateRequest(sessionId=session_id, sessionState=session_state) - await self._client.SetSessionState(request) - - async def release_session(self, session_id: str) -> None: - request = ReleaseSessionRequest(sessionId=session_id) - await self._client.ReleaseSession(request) - - async def renew_session_lock(self, session_id: str) -> str: - request = RenewSessionLockRequest(sessionId=session_id) - response = await self._client.RenewSessionLock(request) if not response or not response.lockedUntil: - raise RuntimeError("No response or lockedUntil returned from renewSessionLock") + raise RuntimeError("No response or lockedUntil " + "returned from renewSessionLock") + return response.lockedUntil diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py index 0d7fea9..d33b872 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py @@ -13,7 +13,8 @@ def get_lock_token(message: bytes, index: int) -> str: lock_token_encoded = message[:index] # Convert the lock token to a UUID using the first 16 bytes - lock_token_uuid = uuid.UUID(bytes=lock_token_encoded[:16]) + # Use little-endian to match SDK + lock_token_uuid = uuid.UUID(bytes_le=lock_token_encoded[:16]) return lock_token_uuid diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py new file mode 100644 index 0000000..b1baf85 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py @@ -0,0 +1,41 @@ +# coding: utf-8 + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +import logging + +import azure.functions as func +import azurefunctions.extensions.bindings.servicebus as servicebus + +app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) + +""" +FOLDER: servicebus_samples +DESCRIPTION: + These samples demonstrate how to obtain a ServiceBusReceivedMessage + from a ServiceBus Trigger. +USAGE: + Set the environment variables with your own values before running the + sample: + For running the ServiceBus queue trigger function: + 1) QUEUE_NAME - the name of the ServiceBus queue + 2) SERVICEBUS_CONNECTION - the connection string for the ServiceBus entity + For running the ServiceBus topic trigger function: + 1) TOPIC_NAME - the name of the ServiceBus topic + 2) SERVICEBUS_CONNECTION - the connection string for the ServiceBus entity + 3) SUBSCRIPTION_NAME - the name of the Subscription +""" + + +@app.service_bus_queue_trigger(arg_name="receivedmessage", + queue_name="QUEUE_NAME", + connection="SERVICEBUS_CONNECTION", + auto_complete_messages=False) +def servicebus_queue_trigger(receivedmessage: servicebus.ServiceBusReceivedMessage, message: servicebus.ServiceBusMessageActions): + logging.info(f"Python ServiceBus queue trigger processed message. Message: {receivedmessage}") + message.complete(receivedmessage) + logging.info("Completed message.") diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/host.json b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/host.json new file mode 100644 index 0000000..9df9136 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/host.json @@ -0,0 +1,15 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json new file mode 100644 index 0000000..967f18b --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json @@ -0,0 +1,11 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "QUEUE_NAME": "", + "SERVICEBUS_CONNECTION": "", + "TOPIC_NAME": "", + "SUBSCRIPTION_NAME": "" + } +} \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/requirements.txt b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/requirements.txt new file mode 100644 index 0000000..7b4c53b --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/requirements.txt @@ -0,0 +1,6 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +azurefunctions-extensions-bindings-servicebus \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py new file mode 100644 index 0000000..6553f1b --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -0,0 +1,116 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import unittest +from unittest.mock import patch, MagicMock + +from azurefunctions.extensions.bindings.servicebus.grpcClient import GrpcClientFactory +from azurefunctions.extensions.bindings.servicebus.grpc_utils import (build_grpc_uri, + ArgumentError) +import pytest + + +class TestGrpcClient(unittest.TestCase): + def test_create_client_insecure_channel(): + # Dummy stub class to verify it receives a channel + class DummyStub: + def __init__(self, channel): + self._channel = channel + + with patch("mypackage.grpcClient.grpc.insecure_channel") as mock_insecure: + fake_channel = MagicMock() + mock_insecure.return_value = fake_channel + + client = GrpcClientFactory.create_client( + service_stub=DummyStub, + address="localhost:1234", + grpc_max_message_length=1024, + secure=False, + ) + + mock_insecure.assert_called_once() + args, kwargs = mock_insecure.call_args + assert args[0] == "localhost:1234" + assert ("grpc.max_send_message_length", 1024) in kwargs["options"] + assert ("grpc.max_receive_message_length", 1024) in kwargs["options"] + + assert isinstance(client, DummyStub) + assert client._channel == fake_channel + + def test_create_client_secure_channel_with_root_certs(): + class DummyStub: + def __init__(self, channel): + self._channel = channel + + with (patch("mypackage.grpcClient.grpc.secure_channel") as mock_secure, + patch("mypackage.grpcClient.grpc.ssl_channel_credentials") as mock_creds): + fake_channel = MagicMock() + fake_creds = MagicMock() + mock_secure.return_value = fake_channel + mock_creds.return_value = fake_creds + + client = GrpcClientFactory.create_client( + service_stub=DummyStub, + address="securehost:9999", + grpc_max_message_length=2048, + secure=True, + root_certificates=b"fakecerts", + ) + + mock_creds.assert_called_once_with(root_certificates=b"fakecerts") + mock_secure.assert_called_once() + args, kwargs = mock_secure.call_args + assert args[0] == "securehost:9999" + assert args[1] == fake_creds + assert ("grpc.max_send_message_length", 2048) in kwargs["options"] + assert ("grpc.max_receive_message_length", 2048) in kwargs["options"] + + assert isinstance(client, DummyStub) + assert client._channel == fake_channel + + +class TestGrpcUtils(unittest.TestCase): + def test_build_grpc_uri_valid_args(): + argv = [ + "--host", "localhost", + "--port", "50051", + "--functions-grpc-max-message-length", "4096" + ] + uri, max_len = build_grpc_uri(argv) + assert uri == "localhost:50051" + assert max_len == 4096 + + def test_build_grpc_uri_missing_host(): + argv = [ + "--port", "50051", + "--functions-grpc-max-message-length", "4096" + ] + with pytest.raises(ArgumentError) as excinfo: + build_grpc_uri(argv) + assert "host" in str(excinfo.value) + + def test_build_grpc_uri_missing_port(): + argv = [ + "--host", "localhost", + "--functions-grpc-max-message-length", "4096" + ] + with pytest.raises(ArgumentError) as excinfo: + build_grpc_uri(argv) + assert "port" in str(excinfo.value) + + def test_build_grpc_uri_missing_message_length(): + argv = [ + "--host", "localhost", + "--port", "50051", + ] + with pytest.raises(ArgumentError) as excinfo: + build_grpc_uri(argv) + assert "functions-grpc-max-message-length" in str(excinfo.value) + + def test_build_grpc_uri_multiple_missing(): + argv = [] + with pytest.raises(ArgumentError) as excinfo: + build_grpc_uri(argv) + msg = str(excinfo.value) + assert ("host" in msg and "port" in msg + and "functions-grpc-max-message-length" in msg) diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py new file mode 100644 index 0000000..0ae7926 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py @@ -0,0 +1,125 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import pytest +import unittest + +from unittest.mock import patch, MagicMock + +from google.protobuf.timestamp_pb2 import Timestamp + +from azurefunctions.extensions.bindings.servicebus import ServiceBusMessageActions +from azurefunctions.extensions.bindings.protos import settlement_pb2 as pb2 + + +@pytest.fixture +def mock_client(): + """Patch the GrpcClientFactory to return a mock SettlementStub.""" + with patch("azurefunctions.extensions.bindings.servicebus.GrpcClientFactory.create_client") as mock_factory: # noqa + client = MagicMock() + mock_factory.return_value = client + yield client + + +@pytest.fixture +def actions(mock_client): + """Return a fresh ServiceBusMessageActions instance with mocked gRPC client.""" + # Clear singleton + ServiceBusMessageActions._instance = None + return ServiceBusMessageActions.get_instance() + + +class DummyMessage: + def __init__(self, lock_token=None): + self.lock_token = lock_token + + +class TestServiceBusMessageActions(unittest.TestCase): + def test_complete_calls_grpc(actions, mock_client): + msg = DummyMessage("lock123") + actions.complete(msg) + # Check that gRPC method was called + called_req = mock_client.Complete.call_args[0][0] + assert isinstance(called_req, pb2.CompleteRequest) + assert called_req.locktoken == "lock123" + + def test_abandon_calls_grpc(actions, mock_client): + msg = DummyMessage("lock123") + actions.abandon(msg, properties_to_modify=b"foo") + called_req = mock_client.Abandon.call_args[0][0] + assert isinstance(called_req, pb2.AbandonRequest) + assert called_req.locktoken == "lock123" + assert called_req.propertiesToModify == b"foo" + + def test_deadletter_with_reasons(actions, mock_client): + msg = DummyMessage("lock123") + actions.deadletter( + msg, + properties_to_modify=b"p", + deadletter_reason="reason", + deadletter_error_description="desc" + ) + called_req = mock_client.Deadletter.call_args[0][0] + assert isinstance(called_req, pb2.DeadletterRequest) + assert called_req.locktoken == "lock123" + assert called_req.propertiesToModify == b"p" + assert called_req.deadletterReason.value == "reason" + assert called_req.deadletterErrorDescription.value == "desc" + + def test_defer_calls_grpc(actions, mock_client): + msg = DummyMessage("lock123") + actions.defer(msg, properties_to_modify=b"defer") + called_req = mock_client.Defer.call_args[0][0] + assert isinstance(called_req, pb2.DeferRequest) + assert called_req.locktoken == "lock123" + assert called_req.propertiesToModify == b"defer" + + def test_renew_message_lock_calls_grpc(actions, mock_client): + msg = DummyMessage("lock123") + actions.renew_message_lock(msg) + called_req = mock_client.RenewMessageLock.call_args[0][0] + assert isinstance(called_req, pb2.RenewMessageLockRequest) + assert called_req.locktoken == "lock123" + + def test_set_session_state(actions, mock_client): + actions.set_session_state("sid", b"state") + called_req = mock_client.SetSessionState.call_args[0][0] + assert isinstance(called_req, pb2.SetSessionStateRequest) + assert called_req.sessionId == "sid" + assert called_req.sessionState == b"state" + + def test_release_session(actions, mock_client): + actions.release_session("sid") + called_req = mock_client.ReleaseSession.call_args[0][0] + assert isinstance(called_req, pb2.ReleaseSessionRequest) + assert called_req.sessionId == "sid" + + def test_renew_session_lock_success(actions, mock_client): + ts = Timestamp() + ts.GetCurrentTime() + mock_client.RenewSessionLock.return_value.lockedUntil.CopyFrom(ts) + + result = actions.renew_session_lock("sid") + + called_req = mock_client.RenewSessionLock.call_args[0][0] + assert isinstance(called_req, pb2.RenewSessionLockRequest) + assert called_req.sessionId == "sid" + assert isinstance(result, Timestamp) # raw proto object returned + assert result == ts + + def test_renew_session_lock_failure(actions, mock_client): + # No response + mock_client.RenewSessionLock.return_value = None + with pytest.raises(RuntimeError): + actions.renew_session_lock("sid") + + # Empty response + empty_resp = pb2.RenewSessionLockResponse() + mock_client.RenewSessionLock.return_value = empty_resp + with pytest.raises(RuntimeError): + actions.renew_session_lock("sid") + + def test_validate_lock_token_raises(actions): + msg = DummyMessage(None) + with pytest.raises(ValueError): + actions._validate_lock_token(msg) From 937d573427d67846f0061aea5ae4c06e4bae05e6 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Fri, 12 Sep 2025 13:42:48 -0500 Subject: [PATCH 05/14] merge fix --- .../azurefunctions/extensions/base/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py index f83e240..8b28ef1 100644 --- a/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/__init__.py @@ -28,7 +28,6 @@ "_BaseConverter", "InConverter", "OutConverter", - "GrpcClientType", "SdkType", "get_binding_registry", "ModuleTrackerMeta", From 75b1e3d12eba0c7e4b6c4e2e2090ff7abe694dcf Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Fri, 12 Sep 2025 14:40:48 -0500 Subject: [PATCH 06/14] test fixes --- .../tests/test_grpc_client.py | 20 +- .../tests/test_servicebus_message_actions.py | 201 ++++++++++-------- 2 files changed, 124 insertions(+), 97 deletions(-) diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py index 6553f1b..b7e79a3 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -11,13 +11,13 @@ class TestGrpcClient(unittest.TestCase): - def test_create_client_insecure_channel(): + def test_create_client_insecure_channel(self): # Dummy stub class to verify it receives a channel class DummyStub: def __init__(self, channel): self._channel = channel - with patch("mypackage.grpcClient.grpc.insecure_channel") as mock_insecure: + with patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.insecure_channel") as mock_insecure: # noqa fake_channel = MagicMock() mock_insecure.return_value = fake_channel @@ -37,13 +37,13 @@ def __init__(self, channel): assert isinstance(client, DummyStub) assert client._channel == fake_channel - def test_create_client_secure_channel_with_root_certs(): + def test_create_client_secure_channel_with_root_certs(self): class DummyStub: def __init__(self, channel): self._channel = channel - with (patch("mypackage.grpcClient.grpc.secure_channel") as mock_secure, - patch("mypackage.grpcClient.grpc.ssl_channel_credentials") as mock_creds): + with (patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.secure_channel") as mock_secure, # noqa + patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.ssl_channel_credentials") as mock_creds): # noqa fake_channel = MagicMock() fake_creds = MagicMock() mock_secure.return_value = fake_channel @@ -70,7 +70,7 @@ def __init__(self, channel): class TestGrpcUtils(unittest.TestCase): - def test_build_grpc_uri_valid_args(): + def test_build_grpc_uri_valid_args(self): argv = [ "--host", "localhost", "--port", "50051", @@ -80,7 +80,7 @@ def test_build_grpc_uri_valid_args(): assert uri == "localhost:50051" assert max_len == 4096 - def test_build_grpc_uri_missing_host(): + def test_build_grpc_uri_missing_host(self): argv = [ "--port", "50051", "--functions-grpc-max-message-length", "4096" @@ -89,7 +89,7 @@ def test_build_grpc_uri_missing_host(): build_grpc_uri(argv) assert "host" in str(excinfo.value) - def test_build_grpc_uri_missing_port(): + def test_build_grpc_uri_missing_port(self): argv = [ "--host", "localhost", "--functions-grpc-max-message-length", "4096" @@ -98,7 +98,7 @@ def test_build_grpc_uri_missing_port(): build_grpc_uri(argv) assert "port" in str(excinfo.value) - def test_build_grpc_uri_missing_message_length(): + def test_build_grpc_uri_missing_message_length(self): argv = [ "--host", "localhost", "--port", "50051", @@ -107,7 +107,7 @@ def test_build_grpc_uri_missing_message_length(): build_grpc_uri(argv) assert "functions-grpc-max-message-length" in str(excinfo.value) - def test_build_grpc_uri_multiple_missing(): + def test_build_grpc_uri_multiple_missing(self): argv = [] with pytest.raises(ArgumentError) as excinfo: build_grpc_uri(argv) diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py index 0ae7926..1caf5d2 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -import pytest import unittest from unittest.mock import patch, MagicMock @@ -12,114 +11,142 @@ from azurefunctions.extensions.bindings.protos import settlement_pb2 as pb2 -@pytest.fixture -def mock_client(): - """Patch the GrpcClientFactory to return a mock SettlementStub.""" - with patch("azurefunctions.extensions.bindings.servicebus.GrpcClientFactory.create_client") as mock_factory: # noqa - client = MagicMock() - mock_factory.return_value = client - yield client - - -@pytest.fixture -def actions(mock_client): - """Return a fresh ServiceBusMessageActions instance with mocked gRPC client.""" - # Clear singleton - ServiceBusMessageActions._instance = None - return ServiceBusMessageActions.get_instance() - - class DummyMessage: - def __init__(self, lock_token=None): - self.lock_token = lock_token + def __init__(self, locktoken): + self.lock_token = locktoken class TestServiceBusMessageActions(unittest.TestCase): - def test_complete_calls_grpc(actions, mock_client): + def setUp(self): + ServiceBusMessageActions._instance = None + # Patch create_client so we control what gets returned + patcher = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.GrpcClientFactory.create_client") # noqa + self.addCleanup(patcher.stop) + self.mock_create_client = patcher.start() + + # Patch build_grpc_uri so we don't need CLI args + patcher_uri = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.build_grpc_uri", # noqa + return_value=("localhost:50051", 4 * 1024 * 1024)) + self.addCleanup(patcher_uri.stop) + patcher_uri.start() + + # The fake gRPC client returned by create_client + self.mock_client = MagicMock() + self.mock_client.Complete = MagicMock() + self.mock_client.Abandon = MagicMock() + self.mock_client.Deadletter = MagicMock() + self.mock_client.Defer = MagicMock() + self.mock_client.RenewMessageLock = MagicMock() + self.mock_client.SetSessionState = MagicMock() + self.mock_client.ReleaseSession = MagicMock() + self.mock_client.RenewSessionLock = MagicMock() + self.mock_create_client.return_value = self.mock_client + + # Now actions will use our patched client + self.actions = ServiceBusMessageActions().get_instance() + + def tearDown(self): + # Ensure singleton is cleared after each test too + ServiceBusMessageActions._instance = None + + def test_complete_calls_grpc(self): msg = DummyMessage("lock123") - actions.complete(msg) - # Check that gRPC method was called - called_req = mock_client.Complete.call_args[0][0] - assert isinstance(called_req, pb2.CompleteRequest) - assert called_req.locktoken == "lock123" + self.actions.complete(msg) - def test_abandon_calls_grpc(actions, mock_client): + self.mock_client.Complete.assert_called_once() + called_req = self.mock_client.Complete.call_args[0][0] + self.assertIsInstance(called_req, pb2.CompleteRequest) + self.assertEqual(called_req.locktoken, "lock123") + + def test_abandon_calls_grpc(self): msg = DummyMessage("lock123") - actions.abandon(msg, properties_to_modify=b"foo") - called_req = mock_client.Abandon.call_args[0][0] - assert isinstance(called_req, pb2.AbandonRequest) - assert called_req.locktoken == "lock123" - assert called_req.propertiesToModify == b"foo" + self.actions.abandon(msg, properties_to_modify=b"foo") + + self.mock_client.Abandon.assert_called_once() + called_req = self.mock_client.Abandon.call_args[0][0] + self.assertIsInstance(called_req, pb2.AbandonRequest) + self.assertEqual(called_req.locktoken, "lock123") + self.assertEqual(called_req.propertiesToModify, b"foo") - def test_deadletter_with_reasons(actions, mock_client): + def test_deadletter_with_reasons(self): msg = DummyMessage("lock123") - actions.deadletter( + self.actions.deadletter( msg, properties_to_modify=b"p", deadletter_reason="reason", deadletter_error_description="desc" ) - called_req = mock_client.Deadletter.call_args[0][0] - assert isinstance(called_req, pb2.DeadletterRequest) - assert called_req.locktoken == "lock123" - assert called_req.propertiesToModify == b"p" - assert called_req.deadletterReason.value == "reason" - assert called_req.deadletterErrorDescription.value == "desc" - - def test_defer_calls_grpc(actions, mock_client): + + self.mock_client.Deadletter.assert_called_once() + called_req = self.mock_client.Deadletter.call_args[0][0] + self.assertIsInstance(called_req, pb2.DeadletterRequest) + self.assertEqual(called_req.locktoken, "lock123") + self.assertEqual(called_req.propertiesToModify, b"p") + self.assertEqual(called_req.deadletterReason.value, "reason") + self.assertEqual(called_req.deadletterErrorDescription.value, "desc") + + def test_defer_calls_grpc(self): msg = DummyMessage("lock123") - actions.defer(msg, properties_to_modify=b"defer") - called_req = mock_client.Defer.call_args[0][0] - assert isinstance(called_req, pb2.DeferRequest) - assert called_req.locktoken == "lock123" - assert called_req.propertiesToModify == b"defer" + self.actions.defer(msg, properties_to_modify=b"defer") - def test_renew_message_lock_calls_grpc(actions, mock_client): + self.mock_client.Defer.assert_called_once() + called_req = self.mock_client.Defer.call_args[0][0] + self.assertIsInstance(called_req, pb2.DeferRequest) + self.assertEqual(called_req.locktoken, "lock123") + self.assertEqual(called_req.propertiesToModify, b"defer") + + def test_renew_message_lock_calls_grpc(self): msg = DummyMessage("lock123") - actions.renew_message_lock(msg) - called_req = mock_client.RenewMessageLock.call_args[0][0] - assert isinstance(called_req, pb2.RenewMessageLockRequest) - assert called_req.locktoken == "lock123" - - def test_set_session_state(actions, mock_client): - actions.set_session_state("sid", b"state") - called_req = mock_client.SetSessionState.call_args[0][0] - assert isinstance(called_req, pb2.SetSessionStateRequest) - assert called_req.sessionId == "sid" - assert called_req.sessionState == b"state" - - def test_release_session(actions, mock_client): - actions.release_session("sid") - called_req = mock_client.ReleaseSession.call_args[0][0] - assert isinstance(called_req, pb2.ReleaseSessionRequest) - assert called_req.sessionId == "sid" - - def test_renew_session_lock_success(actions, mock_client): + self.actions.renew_message_lock(msg) + + self.mock_client.RenewMessageLock.assert_called_once() + called_req = self.mock_client.RenewMessageLock.call_args[0][0] + self.assertIsInstance(called_req, pb2.RenewMessageLockRequest) + self.assertEqual(called_req.locktoken, "lock123") + + def test_set_session_state(self): + self.actions.set_session_state("sid", b"state") + + self.mock_client.SetSessionState.assert_called_once() + called_req = self.mock_client.SetSessionState.call_args[0][0] + self.assertIsInstance(called_req, pb2.SetSessionStateRequest) + self.assertEqual(called_req.sessionId, "sid") + self.assertEqual(called_req.sessionState, b"state") + + def test_release_session(self): + self.actions.release_session("sid") + + self.mock_client.ReleaseSession.assert_called_once() + called_req = self.mock_client.ReleaseSession.call_args[0][0] + self.assertIsInstance(called_req, pb2.ReleaseSessionRequest) + self.assertEqual(called_req.sessionId, "sid") + + def test_renew_session_lock_success(self): ts = Timestamp() ts.GetCurrentTime() - mock_client.RenewSessionLock.return_value.lockedUntil.CopyFrom(ts) + # Mock gRPC response + resp = pb2.RenewSessionLockResponse() + resp.lockedUntil.CopyFrom(ts) + self.mock_client.RenewSessionLock.return_value = resp - result = actions.renew_session_lock("sid") + result = self.actions.renew_session_lock("sid") - called_req = mock_client.RenewSessionLock.call_args[0][0] - assert isinstance(called_req, pb2.RenewSessionLockRequest) - assert called_req.sessionId == "sid" - assert isinstance(result, Timestamp) # raw proto object returned - assert result == ts + self.mock_client.RenewSessionLock.assert_called_once() + called_req = self.mock_client.RenewSessionLock.call_args[0][0] + self.assertIsInstance(called_req, pb2.RenewSessionLockRequest) + self.assertEqual(called_req.sessionId, "sid") + self.assertIsInstance(result, Timestamp) + self.assertEqual(result, ts) - def test_renew_session_lock_failure(actions, mock_client): + def test_renew_session_lock_failure(self): # No response - mock_client.RenewSessionLock.return_value = None - with pytest.raises(RuntimeError): - actions.renew_session_lock("sid") - - # Empty response - empty_resp = pb2.RenewSessionLockResponse() - mock_client.RenewSessionLock.return_value = empty_resp - with pytest.raises(RuntimeError): - actions.renew_session_lock("sid") + self.mock_client.RenewSessionLock.return_value = None + with self.assertRaises(RuntimeError): + self.actions.renew_session_lock("sid") - def test_validate_lock_token_raises(actions): + def test_validate_lock_token_raises(self): msg = DummyMessage(None) - with pytest.raises(ValueError): - actions._validate_lock_token(msg) + with self.assertRaises(ValueError): + self.actions._validate_lock_token(msg) From a74d3029753e3d188ad8a710ffca9b4b90780bb8 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Mon, 15 Sep 2025 14:36:46 -0500 Subject: [PATCH 07/14] feedback --- .../bindings/servicebus/grpcClient.py | 24 ++- .../bindings/servicebus/grpc_utils.py | 43 ++-- .../servicebus/serviceBusMessageActions.py | 196 ++++++++++-------- .../function_app.py | 14 +- .../local.settings.json | 4 +- .../tests/test_grpc_client.py | 103 ++++++--- .../tests/test_servicebus_message_actions.py | 93 ++++++++- 7 files changed, 320 insertions(+), 157 deletions(-) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py index f44c99f..d9fa64f 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py @@ -2,7 +2,11 @@ # Licensed under the MIT License. import grpc -from typing import Any, Type +from typing import Any, Optional, Type + + +class GrpcChannelError(Exception): + """Exception raised when gRPC channel creation fails.""" class GrpcClientFactory: @@ -30,7 +34,7 @@ def create_client( address: str, grpc_max_message_length: int = 4 * 1024 * 1024, secure: bool = False, - root_certificates: bytes | None = None, + root_certificates: Optional[bytes] = None, ) -> Any: """ Creates and returns a gRPC client for the given service stub. @@ -51,11 +55,15 @@ def create_client( ("grpc.max_receive_message_length", grpc_max_message_length), ] - if secure: - credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificates) - channel = grpc.secure_channel(address, credentials, options=options) - else: - channel = grpc.insecure_channel(address, options=options) + try: + if secure: + credentials = grpc.ssl_channel_credentials( + root_certificates=root_certificates) + channel = grpc.secure_channel(address, credentials, options=options) + else: + channel = grpc.insecure_channel(address, options=options) + except Exception as e: + raise GrpcChannelError(f"Failed to create gRPC channel. URL: {address}," + f" Options: {options}, Error: {e}") return service_stub(channel) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py index 474a807..ddc7c9c 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. import argparse -from typing import Tuple, List +from typing import List, Optional class ArgumentError(Exception): @@ -10,26 +10,20 @@ class ArgumentError(Exception): pass -def build_grpc_uri(argv: List[str] | None = None) -> Tuple[str, int]: +def parse_grpc_args(argv: Optional[List[str]] = None): """ - Builds a gRPC URI and retrieves the max message length from CLI args. - - Expected CLI arguments: - --host HOST - --port PORT - --functions-grpc-max-message-length LENGTH + Parses CLI arguments for gRPC connection. Args: argv: Optional list of CLI arguments (defaults to sys.argv[1:]). Returns: - (uri, grpc_max_message_length) + args: Namespace with host, port, and functions_grpc_max_message_length Raises: ArgumentError if required arguments are missing or invalid. """ parser = argparse.ArgumentParser(add_help=False) - parser.add_argument("--host", help="gRPC server host") parser.add_argument("--port", help="gRPC server port") parser.add_argument( @@ -37,19 +31,30 @@ def build_grpc_uri(argv: List[str] | None = None) -> Tuple[str, int]: type=int, help="Maximum gRPC message size in bytes", ) - args, _ = parser.parse_known_args(argv) - missing = [] + missing_args = [] if not args.host: - missing.append("'host'") + missing_args.append("'host'") if not args.port: - missing.append("'port'") + missing_args.append("'port'") if not args.functions_grpc_max_message_length: - missing.append("'functions-grpc-max-message-length'") + missing_args.append("'functions-grpc-max-message-length'") + + if missing_args: + raise ArgumentError(f"Missing required arguments: {', '.join(missing_args)}") + return args + + +def get_grpc_uri(args) -> str: + """ + Returns the gRPC URI from CLI args. + """ + return f"{args.host}:{args.port}" - if missing: - raise ArgumentError(f"Missing required arguments: {', '.join(missing)}") - uri = f"{args.host}:{args.port}" - return uri, args.functions_grpc_max_message_length +def get_grpc_max_message_length(args) -> int: + """ + Returns the gRPC max message length from CLI args. + """ + return args.functions_grpc_max_message_length diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py index 6d8fa4d..b75928b 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -1,5 +1,6 @@ # Copyright (c) .NET Foundation. All rights reserved. # Licensed under the MIT License. +import threading from typing import Optional @@ -19,7 +20,16 @@ from ..protos.settlement_pb2_grpc import SettlementStub from .grpcClient import GrpcClientFactory -from .grpc_utils import build_grpc_uri +from .grpc_utils import get_grpc_uri, get_grpc_max_message_length, parse_grpc_args + + +class SettlementError(Exception): + """Custom exception for ServiceBusMessageActions errors.""" + def __init__(self, method: str, details: str, original: Exception): + super().__init__(f"[{method}] {details}. Underlying error: {original}") + self.method = method + self.details = details + self.original = original class ServiceBusMessageActions(GrpcClientType): @@ -30,21 +40,25 @@ class ServiceBusMessageActions(GrpcClientType): """ _instance: Optional["ServiceBusMessageActions"] = None + _lock = threading.Lock() # class-level lock def __init__(self) -> None: - self._uri, self._grpc_max_message_length = build_grpc_uri() + args = parse_grpc_args() + self._uri = get_grpc_uri(args) + self._grpc_max_message_length = get_grpc_max_message_length(args) self._client: SettlementStub = GrpcClientFactory.create_client( service_stub=SettlementStub, address=self._uri, grpc_max_message_length=self._grpc_max_message_length, - secure=False, + secure=True, ) @classmethod def get_instance(cls) -> "ServiceBusMessageActions": - if cls._instance is None: - cls._instance = ServiceBusMessageActions() + with cls._lock: + if cls._instance is None: + cls._instance = ServiceBusMessageActions() return cls._instance def _validate_lock_token(self, message) -> str: @@ -57,85 +71,101 @@ def _validate_lock_token(self, message) -> str: # Settlement methods # ------------------------------- - def complete(self, - message - ) -> None: - locktoken = self._validate_lock_token(message) - request = CompleteRequest() - request.locktoken = str(locktoken) - self._client.Complete(request) - - def abandon(self, - message, - properties_to_modify: bytes = b"" - ) -> None: - locktoken = self._validate_lock_token(message) - request = AbandonRequest() - request.locktoken = str(locktoken) - request.propertiesToModify = properties_to_modify - self._client.Abandon(request) - - def deadletter(self, - message, - properties_to_modify: bytes = b"", - deadletter_reason: Optional[str] = None, - deadletter_error_description: Optional[str] = None, - ) -> None: - locktoken = self._validate_lock_token(message) - request = DeadletterRequest() - request.locktoken = str(locktoken) - request.propertiesToModify = properties_to_modify - - if deadletter_reason: - request.deadletterReason.CopyFrom(StringValue(value=deadletter_reason)) - - if deadletter_error_description: - request.deadletterErrorDescription.CopyFrom( - StringValue(value=deadletter_error_description)) - self._client.Deadletter(request) - - def defer(self, - message, - properties_to_modify: bytes = b"" - ) -> None: - locktoken = self._validate_lock_token(message) - request = DeferRequest() - request.locktoken = str(locktoken) - request.propertiesToModify = properties_to_modify - self._client.Defer(request) - - def renew_message_lock(self, - message - ) -> None: - locktoken = self._validate_lock_token(message) - request = RenewMessageLockRequest() - request.locktoken = str(locktoken) - self._client.RenewMessageLock(request) - - def set_session_state(self, - session_id: str, - session_state: bytes - ) -> None: - request = SetSessionStateRequest() - request.sessionId = session_id - request.sessionState = session_state - self._client.SetSessionState(request) - - def release_session(self, - session_id: str - ) -> None: - request = ReleaseSessionRequest() - request.sessionId = session_id - self._client.ReleaseSession(request) - - def renew_session_lock(self, - session_id: str): - request = RenewSessionLockRequest() - request.sessionId = session_id - response = self._client.RenewSessionLock(request) + def complete(self, message) -> None: + try: + locktoken = self._validate_lock_token(message) + request = CompleteRequest() + request.locktoken = str(locktoken) + self._client.Complete(request) + except Exception as e: + raise SettlementError("complete", + f"Failed to complete message {locktoken}", e) + + def abandon(self, message, properties_to_modify: bytes = b"") -> None: + try: + locktoken = self._validate_lock_token(message) + request = AbandonRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = properties_to_modify + self._client.Abandon(request) + except Exception as e: + raise SettlementError("abandon", + f"Failed to abandon message {locktoken}", e) + + def deadletter( + self, + message, + properties_to_modify: bytes = b"", + deadletter_reason: Optional[str] = None, + deadletter_error_description: Optional[str] = None) -> None: + try: + locktoken = self._validate_lock_token(message) + request = DeadletterRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = properties_to_modify + + if deadletter_reason: + request.deadletterReason.CopyFrom(StringValue(value=deadletter_reason)) + if deadletter_error_description: + request.deadletterErrorDescription.CopyFrom( + StringValue(value=deadletter_error_description)) + + self._client.Deadletter(request) + except Exception as e: + raise SettlementError("deadletter", + f"Failed to deadletter message {locktoken}", e) + + def defer(self, message, properties_to_modify: bytes = b"") -> None: + try: + locktoken = self._validate_lock_token(message) + request = DeferRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = properties_to_modify + + self._client.Defer(request) + except Exception as e: + raise SettlementError("defer", f"Failed to defer message {locktoken}", e) + + def renew_message_lock(self, message) -> None: + try: + locktoken = self._validate_lock_token(message) + request = RenewMessageLockRequest() + request.locktoken = str(locktoken) + self._client.RenewMessageLock(request) + except Exception as e: + raise SettlementError("renew_message_lock", + f"Failed to renew lock for {locktoken}", e) + + def set_session_state(self, session_id: str, session_state: bytes) -> None: + try: + request = SetSessionStateRequest() + request.sessionId = session_id + request.sessionState = session_state + self._client.SetSessionState(request) + except Exception as e: + raise SettlementError("set_session_state", + f"Failed to set state for session {session_id}", e) + + def release_session(self, session_id: str) -> None: + try: + request = ReleaseSessionRequest() + request.sessionId = session_id + self._client.ReleaseSession(request) + except Exception as e: + raise SettlementError("release_session", + f"Failed to release session {session_id}", e) + + def renew_session_lock(self, session_id: str): + try: + request = RenewSessionLockRequest() + request.sessionId = session_id + response = self._client.RenewSessionLock(request) + except Exception as e: + raise SettlementError("renew_session_lock", f"Failed to renew " + f"lock for session {session_id}", e) if not response or not response.lockedUntil: - raise RuntimeError("No response or lockedUntil " - "returned from renewSessionLock") + raise RuntimeError("No response or lockedUntil returned " + "from renewSessionLock") return response.lockedUntil diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py index b1baf85..752f988 100644 --- a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py @@ -16,18 +16,14 @@ """ FOLDER: servicebus_samples DESCRIPTION: - These samples demonstrate how to obtain a ServiceBusReceivedMessage - from a ServiceBus Trigger. + These samples demonstrate how to complete a message using the + optional ServiceBusMessageActions argument. USAGE: Set the environment variables with your own values before running the sample: For running the ServiceBus queue trigger function: 1) QUEUE_NAME - the name of the ServiceBus queue 2) SERVICEBUS_CONNECTION - the connection string for the ServiceBus entity - For running the ServiceBus topic trigger function: - 1) TOPIC_NAME - the name of the ServiceBus topic - 2) SERVICEBUS_CONNECTION - the connection string for the ServiceBus entity - 3) SUBSCRIPTION_NAME - the name of the Subscription """ @@ -35,7 +31,7 @@ queue_name="QUEUE_NAME", connection="SERVICEBUS_CONNECTION", auto_complete_messages=False) -def servicebus_queue_trigger(receivedmessage: servicebus.ServiceBusReceivedMessage, message: servicebus.ServiceBusMessageActions): - logging.info(f"Python ServiceBus queue trigger processed message. Message: {receivedmessage}") - message.complete(receivedmessage) +def servicebus_queue_trigger(received_message: servicebus.ServiceBusReceivedMessage, message_actions: servicebus.ServiceBusMessageActions): + logging.info(f"Python ServiceBus queue trigger processed message. Message: {received_message}") + message_actions.complete(received_message) logging.info("Completed message.") diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json index 967f18b..8efd1ba 100644 --- a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json @@ -4,8 +4,6 @@ "FUNCTIONS_WORKER_RUNTIME": "python", "AzureWebJobsStorage": "UseDevelopmentStorage=true", "QUEUE_NAME": "", - "SERVICEBUS_CONNECTION": "", - "TOPIC_NAME": "", - "SUBSCRIPTION_NAME": "" + "SERVICEBUS_CONNECTION": "" } } \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py index b7e79a3..0937655 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -4,19 +4,22 @@ import unittest from unittest.mock import patch, MagicMock -from azurefunctions.extensions.bindings.servicebus.grpcClient import GrpcClientFactory -from azurefunctions.extensions.bindings.servicebus.grpc_utils import (build_grpc_uri, - ArgumentError) -import pytest +from azurefunctions.extensions.bindings.servicebus.grpcClient import (GrpcClientFactory, + GrpcChannelError) +from azurefunctions.extensions.bindings.servicebus.grpc_utils import ( + get_grpc_uri, + get_grpc_max_message_length, + parse_grpc_args, + ArgumentError) + + +class DummyStub: + def __init__(self, channel): + self._channel = channel class TestGrpcClient(unittest.TestCase): def test_create_client_insecure_channel(self): - # Dummy stub class to verify it receives a channel - class DummyStub: - def __init__(self, channel): - self._channel = channel - with patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.insecure_channel") as mock_insecure: # noqa fake_channel = MagicMock() mock_insecure.return_value = fake_channel @@ -38,10 +41,6 @@ def __init__(self, channel): assert client._channel == fake_channel def test_create_client_secure_channel_with_root_certs(self): - class DummyStub: - def __init__(self, channel): - self._channel = channel - with (patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.secure_channel") as mock_secure, # noqa patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.ssl_channel_credentials") as mock_creds): # noqa fake_channel = MagicMock() @@ -68,49 +67,87 @@ def __init__(self, channel): assert isinstance(client, DummyStub) assert client._channel == fake_channel + @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." + "grpc.insecure_channel") + def test_create_client_raises_on_insecure_channel_failure(self, + mock_insecure_channel): + # Arrange: force grpc.insecure_channel to throw + mock_insecure_channel.side_effect = RuntimeError("connection failed") + + # Act + Assert + with self.assertRaises(GrpcChannelError) as ctx: + GrpcClientFactory.create_client(DummyStub, "localhost:1234", secure=False) + + # Ensure exception contains useful context + self.assertIn("Failed to create gRPC channel", str(ctx.exception)) + self.assertIn("localhost:1234", str(ctx.exception)) + + @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." + "grpc.secure_channel") + @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." + "grpc.ssl_channel_credentials") + def test_create_client_raises_on_secure_channel_failure(self, + mock_ssl_creds, + mock_secure_channel): + # Arrange: make secure_channel raise + mock_secure_channel.side_effect = ValueError("SSL handshake failed") + + # Act + Assert + with self.assertRaises(GrpcChannelError) as ctx: + GrpcClientFactory.create_client(DummyStub, + "localhost:5678", + secure=True, + root_certificates=b"dummy") + + self.assertIn("Failed to create gRPC channel", str(ctx.exception)) + self.assertIn("localhost:5678", str(ctx.exception)) + class TestGrpcUtils(unittest.TestCase): - def test_build_grpc_uri_valid_args(self): + def test_get_grpc_uri_and_max_message_length_valid_args(self): argv = [ "--host", "localhost", "--port", "50051", "--functions-grpc-max-message-length", "4096" ] - uri, max_len = build_grpc_uri(argv) + args = parse_grpc_args(argv) + uri = get_grpc_uri(args) + max_len = get_grpc_max_message_length(args) assert uri == "localhost:50051" assert max_len == 4096 - def test_build_grpc_uri_missing_host(self): + def test_get_grpc_uri_missing_host(self): argv = [ "--port", "50051", "--functions-grpc-max-message-length", "4096" ] - with pytest.raises(ArgumentError) as excinfo: - build_grpc_uri(argv) - assert "host" in str(excinfo.value) + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + self.assertIn("host", str(excinfo.exception)) - def test_build_grpc_uri_missing_port(self): + def test_get_grpc_uri_missing_port(self): argv = [ "--host", "localhost", "--functions-grpc-max-message-length", "4096" ] - with pytest.raises(ArgumentError) as excinfo: - build_grpc_uri(argv) - assert "port" in str(excinfo.value) + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + self.assertIn("port", str(excinfo.exception)) - def test_build_grpc_uri_missing_message_length(self): + def test_get_grpc_max_message_length_missing(self): argv = [ "--host", "localhost", "--port", "50051", ] - with pytest.raises(ArgumentError) as excinfo: - build_grpc_uri(argv) - assert "functions-grpc-max-message-length" in str(excinfo.value) + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + self.assertIn("functions-grpc-max-message-length", str(excinfo.exception)) - def test_build_grpc_uri_multiple_missing(self): + def test_get_grpc_uri_and_max_message_length_multiple_missing(self): argv = [] - with pytest.raises(ArgumentError) as excinfo: - build_grpc_uri(argv) - msg = str(excinfo.value) - assert ("host" in msg and "port" in msg - and "functions-grpc-max-message-length" in msg) + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + msg = str(excinfo.exception) + self.assertIn("host", msg) + self.assertIn("port", msg) + self.assertIn("functions-grpc-max-message-length", msg) diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py index 1caf5d2..c9c44ec 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import grpc import unittest from unittest.mock import patch, MagicMock @@ -8,6 +9,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from azurefunctions.extensions.bindings.servicebus import ServiceBusMessageActions +from azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions import SettlementError # noqa from azurefunctions.extensions.bindings.protos import settlement_pb2 as pb2 @@ -25,13 +27,23 @@ def setUp(self): self.addCleanup(patcher.stop) self.mock_create_client = patcher.start() - # Patch build_grpc_uri so we don't need CLI args + patcher_args = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.parse_grpc_args") # noqa + self.addCleanup(patcher_args.stop) + patcher_args.start() + + # Patch get_grpc_uri so we don't need CLI args patcher_uri = patch( - "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.build_grpc_uri", # noqa + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.get_grpc_uri", # noqa return_value=("localhost:50051", 4 * 1024 * 1024)) self.addCleanup(patcher_uri.stop) patcher_uri.start() + patcher_message_length = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.get_grpc_max_message_length") # noqa + self.addCleanup(patcher_message_length.stop) + patcher_message_length.start() + # The fake gRPC client returned by create_client self.mock_client = MagicMock() self.mock_client.Complete = MagicMock() @@ -150,3 +162,80 @@ def test_validate_lock_token_raises(self): msg = DummyMessage(None) with self.assertRaises(ValueError): self.actions._validate_lock_token(msg) + + def test_complete_raises_SettlementError(self): + msg = DummyMessage("lt1") + self.mock_client.Complete.side_effect = grpc.RpcError("boom") + + with self.assertRaises(SettlementError) as cm: + self.actions.complete(msg) + + self.assertIn("complete", str(cm.exception)) + self.assertIn("lt1", str(cm.exception)) + + def test_abandon_raises_SettlementError(self): + msg = DummyMessage("lt2") + self.mock_client.Abandon.side_effect = grpc.RpcError("fail") + + with self.assertRaises(SettlementError) as cm: + self.actions.abandon(msg) + + self.assertIn("abandon", str(cm.exception)) + self.assertIn("lt2", str(cm.exception)) + + def test_deadletter_raises_SettlementError(self): + msg = DummyMessage("lt3") + self.mock_client.Deadletter.side_effect = grpc.RpcError("oops") + + with self.assertRaises(SettlementError) as cm: + self.actions.deadletter(msg, deadletter_reason="reason") + + self.assertIn("deadletter", str(cm.exception)) + self.assertIn("lt3", str(cm.exception)) + + def test_defer_raises_SettlementError(self): + msg = DummyMessage("lt4") + self.mock_client.Defer.side_effect = grpc.RpcError("bad") + + with self.assertRaises(SettlementError) as cm: + self.actions.defer(msg) + + self.assertIn("defer", str(cm.exception)) + self.assertIn("lt4", str(cm.exception)) + + def test_renew_message_lock_raises_SettlementError(self): + msg = DummyMessage("lt5") + self.mock_client.RenewMessageLock.side_effect = grpc.RpcError("err") + + with self.assertRaises(SettlementError) as cm: + self.actions.renew_message_lock(msg) + + self.assertIn("renew_message_lock", str(cm.exception)) + self.assertIn("lt5", str(cm.exception)) + + def test_set_session_state_raises_SettlementError(self): + self.mock_client.SetSessionState.side_effect = grpc.RpcError("nope") + + with self.assertRaises(SettlementError) as cm: + self.actions.set_session_state("sid1", b"state") + + self.assertIn("set_session_state", str(cm.exception)) + self.assertIn("sid1", str(cm.exception)) + + def test_release_session_raises_SettlementError(self): + self.mock_client.ReleaseSession.side_effect = grpc.RpcError("denied") + + with self.assertRaises(SettlementError) as cm: + self.actions.release_session("sid2") + + self.assertIn("release_session", str(cm.exception)) + self.assertIn("sid2", str(cm.exception)) + + def test_renew_session_lock_raises_SettlementError(self): + self.mock_client.RenewSessionLock.side_effect = grpc.RpcError("boom") + + with self.assertRaises(SettlementError) as cm: + self.actions.renew_session_lock("sid3") + + self.assertIn("renew_session_lock", str(cm.exception)) + self.assertIn("sid3", str(cm.exception)) From c6692cea98dce6ec3dba71f9e28bc46e3ed40dfd Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Mon, 15 Sep 2025 14:37:54 -0500 Subject: [PATCH 08/14] missed arg name --- .../samples/servicebus_samples_settlement/function_app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py index 752f988..9d97870 100644 --- a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py @@ -27,7 +27,7 @@ """ -@app.service_bus_queue_trigger(arg_name="receivedmessage", +@app.service_bus_queue_trigger(arg_name="received_message", queue_name="QUEUE_NAME", connection="SERVICEBUS_CONNECTION", auto_complete_messages=False) From 04f42936c40a0b5d0bde02d09566d8decee876b7 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Tue, 16 Sep 2025 10:17:41 -0500 Subject: [PATCH 09/14] always secure --- .../bindings/servicebus/grpcClient.py | 11 ++---- .../servicebus/serviceBusMessageActions.py | 2 +- .../tests/test_grpc_client.py | 38 ------------------- 3 files changed, 4 insertions(+), 47 deletions(-) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py index d9fa64f..73bcc65 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py @@ -33,7 +33,6 @@ def create_client( service_stub: Type[Any], address: str, grpc_max_message_length: int = 4 * 1024 * 1024, - secure: bool = False, root_certificates: Optional[bytes] = None, ) -> Any: """ @@ -43,7 +42,6 @@ def create_client( service_stub: The generated service stub class (e.g. `MyServiceStub`). address: The server address (e.g., "localhost:50051"). grpc_max_message_length: Max message size for send/receive. - secure: If True, use a secure channel; otherwise, insecure. root_certificates: Optional root certificates for TLS. Returns: @@ -56,12 +54,9 @@ def create_client( ] try: - if secure: - credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificates) - channel = grpc.secure_channel(address, credentials, options=options) - else: - channel = grpc.insecure_channel(address, options=options) + credentials = grpc.ssl_channel_credentials( + root_certificates=root_certificates) + channel = grpc.secure_channel(address, credentials, options=options) except Exception as e: raise GrpcChannelError(f"Failed to create gRPC channel. URL: {address}," f" Options: {options}, Error: {e}") diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py index b75928b..db5a3cc 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -1,5 +1,6 @@ # Copyright (c) .NET Foundation. All rights reserved. # Licensed under the MIT License. + import threading from typing import Optional @@ -51,7 +52,6 @@ def __init__(self) -> None: service_stub=SettlementStub, address=self._uri, grpc_max_message_length=self._grpc_max_message_length, - secure=True, ) @classmethod diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py index 0937655..cf0fac3 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -19,27 +19,6 @@ def __init__(self, channel): class TestGrpcClient(unittest.TestCase): - def test_create_client_insecure_channel(self): - with patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.insecure_channel") as mock_insecure: # noqa - fake_channel = MagicMock() - mock_insecure.return_value = fake_channel - - client = GrpcClientFactory.create_client( - service_stub=DummyStub, - address="localhost:1234", - grpc_max_message_length=1024, - secure=False, - ) - - mock_insecure.assert_called_once() - args, kwargs = mock_insecure.call_args - assert args[0] == "localhost:1234" - assert ("grpc.max_send_message_length", 1024) in kwargs["options"] - assert ("grpc.max_receive_message_length", 1024) in kwargs["options"] - - assert isinstance(client, DummyStub) - assert client._channel == fake_channel - def test_create_client_secure_channel_with_root_certs(self): with (patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.secure_channel") as mock_secure, # noqa patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.ssl_channel_credentials") as mock_creds): # noqa @@ -52,7 +31,6 @@ def test_create_client_secure_channel_with_root_certs(self): service_stub=DummyStub, address="securehost:9999", grpc_max_message_length=2048, - secure=True, root_certificates=b"fakecerts", ) @@ -67,21 +45,6 @@ def test_create_client_secure_channel_with_root_certs(self): assert isinstance(client, DummyStub) assert client._channel == fake_channel - @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." - "grpc.insecure_channel") - def test_create_client_raises_on_insecure_channel_failure(self, - mock_insecure_channel): - # Arrange: force grpc.insecure_channel to throw - mock_insecure_channel.side_effect = RuntimeError("connection failed") - - # Act + Assert - with self.assertRaises(GrpcChannelError) as ctx: - GrpcClientFactory.create_client(DummyStub, "localhost:1234", secure=False) - - # Ensure exception contains useful context - self.assertIn("Failed to create gRPC channel", str(ctx.exception)) - self.assertIn("localhost:1234", str(ctx.exception)) - @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." "grpc.secure_channel") @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." @@ -96,7 +59,6 @@ def test_create_client_raises_on_secure_channel_failure(self, with self.assertRaises(GrpcChannelError) as ctx: GrpcClientFactory.create_client(DummyStub, "localhost:5678", - secure=True, root_certificates=b"dummy") self.assertIn("Failed to create gRPC channel", str(ctx.exception)) From b149335cb77536f3fa7a58dc6e1d7719dd218bb9 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Tue, 16 Sep 2025 10:47:46 -0500 Subject: [PATCH 10/14] Revert "always secure" This reverts commit 04f42936c40a0b5d0bde02d09566d8decee876b7. --- .../bindings/servicebus/grpcClient.py | 11 ++++-- .../servicebus/serviceBusMessageActions.py | 2 +- .../tests/test_grpc_client.py | 38 +++++++++++++++++++ 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py index 73bcc65..d9fa64f 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py @@ -33,6 +33,7 @@ def create_client( service_stub: Type[Any], address: str, grpc_max_message_length: int = 4 * 1024 * 1024, + secure: bool = False, root_certificates: Optional[bytes] = None, ) -> Any: """ @@ -42,6 +43,7 @@ def create_client( service_stub: The generated service stub class (e.g. `MyServiceStub`). address: The server address (e.g., "localhost:50051"). grpc_max_message_length: Max message size for send/receive. + secure: If True, use a secure channel; otherwise, insecure. root_certificates: Optional root certificates for TLS. Returns: @@ -54,9 +56,12 @@ def create_client( ] try: - credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificates) - channel = grpc.secure_channel(address, credentials, options=options) + if secure: + credentials = grpc.ssl_channel_credentials( + root_certificates=root_certificates) + channel = grpc.secure_channel(address, credentials, options=options) + else: + channel = grpc.insecure_channel(address, options=options) except Exception as e: raise GrpcChannelError(f"Failed to create gRPC channel. URL: {address}," f" Options: {options}, Error: {e}") diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py index db5a3cc..b75928b 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -1,6 +1,5 @@ # Copyright (c) .NET Foundation. All rights reserved. # Licensed under the MIT License. - import threading from typing import Optional @@ -52,6 +51,7 @@ def __init__(self) -> None: service_stub=SettlementStub, address=self._uri, grpc_max_message_length=self._grpc_max_message_length, + secure=True, ) @classmethod diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py index cf0fac3..0937655 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -19,6 +19,27 @@ def __init__(self, channel): class TestGrpcClient(unittest.TestCase): + def test_create_client_insecure_channel(self): + with patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.insecure_channel") as mock_insecure: # noqa + fake_channel = MagicMock() + mock_insecure.return_value = fake_channel + + client = GrpcClientFactory.create_client( + service_stub=DummyStub, + address="localhost:1234", + grpc_max_message_length=1024, + secure=False, + ) + + mock_insecure.assert_called_once() + args, kwargs = mock_insecure.call_args + assert args[0] == "localhost:1234" + assert ("grpc.max_send_message_length", 1024) in kwargs["options"] + assert ("grpc.max_receive_message_length", 1024) in kwargs["options"] + + assert isinstance(client, DummyStub) + assert client._channel == fake_channel + def test_create_client_secure_channel_with_root_certs(self): with (patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.secure_channel") as mock_secure, # noqa patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.ssl_channel_credentials") as mock_creds): # noqa @@ -31,6 +52,7 @@ def test_create_client_secure_channel_with_root_certs(self): service_stub=DummyStub, address="securehost:9999", grpc_max_message_length=2048, + secure=True, root_certificates=b"fakecerts", ) @@ -45,6 +67,21 @@ def test_create_client_secure_channel_with_root_certs(self): assert isinstance(client, DummyStub) assert client._channel == fake_channel + @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." + "grpc.insecure_channel") + def test_create_client_raises_on_insecure_channel_failure(self, + mock_insecure_channel): + # Arrange: force grpc.insecure_channel to throw + mock_insecure_channel.side_effect = RuntimeError("connection failed") + + # Act + Assert + with self.assertRaises(GrpcChannelError) as ctx: + GrpcClientFactory.create_client(DummyStub, "localhost:1234", secure=False) + + # Ensure exception contains useful context + self.assertIn("Failed to create gRPC channel", str(ctx.exception)) + self.assertIn("localhost:1234", str(ctx.exception)) + @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." "grpc.secure_channel") @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." @@ -59,6 +96,7 @@ def test_create_client_raises_on_secure_channel_failure(self, with self.assertRaises(GrpcChannelError) as ctx: GrpcClientFactory.create_client(DummyStub, "localhost:5678", + secure=True, root_certificates=b"dummy") self.assertIn("Failed to create gRPC channel", str(ctx.exception)) From 1bf9e33fae80268eee3213525aba8f50069c6c0a Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Tue, 16 Sep 2025 10:49:53 -0500 Subject: [PATCH 11/14] secure fix --- .../bindings/servicebus/grpcClient.py | 10 +--- .../servicebus/serviceBusMessageActions.py | 1 - .../tests/test_grpc_client.py | 48 ------------------- 3 files changed, 1 insertion(+), 58 deletions(-) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py index d9fa64f..1899fa3 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py @@ -24,7 +24,6 @@ class GrpcClientFactory: service_stub=MyServiceStub, address="localhost:50051", grpc_max_message_length=4 * 1024 * 1024, # 4 MB - secure=False, ) """ @@ -33,7 +32,6 @@ def create_client( service_stub: Type[Any], address: str, grpc_max_message_length: int = 4 * 1024 * 1024, - secure: bool = False, root_certificates: Optional[bytes] = None, ) -> Any: """ @@ -43,7 +41,6 @@ def create_client( service_stub: The generated service stub class (e.g. `MyServiceStub`). address: The server address (e.g., "localhost:50051"). grpc_max_message_length: Max message size for send/receive. - secure: If True, use a secure channel; otherwise, insecure. root_certificates: Optional root certificates for TLS. Returns: @@ -56,12 +53,7 @@ def create_client( ] try: - if secure: - credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificates) - channel = grpc.secure_channel(address, credentials, options=options) - else: - channel = grpc.insecure_channel(address, options=options) + channel = grpc.insecure_channel(address, options=options) except Exception as e: raise GrpcChannelError(f"Failed to create gRPC channel. URL: {address}," f" Options: {options}, Error: {e}") diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py index b75928b..0cd2156 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -51,7 +51,6 @@ def __init__(self) -> None: service_stub=SettlementStub, address=self._uri, grpc_max_message_length=self._grpc_max_message_length, - secure=True, ) @classmethod diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py index 0937655..4b94b1e 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -28,7 +28,6 @@ def test_create_client_insecure_channel(self): service_stub=DummyStub, address="localhost:1234", grpc_max_message_length=1024, - secure=False, ) mock_insecure.assert_called_once() @@ -40,33 +39,6 @@ def test_create_client_insecure_channel(self): assert isinstance(client, DummyStub) assert client._channel == fake_channel - def test_create_client_secure_channel_with_root_certs(self): - with (patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.secure_channel") as mock_secure, # noqa - patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.ssl_channel_credentials") as mock_creds): # noqa - fake_channel = MagicMock() - fake_creds = MagicMock() - mock_secure.return_value = fake_channel - mock_creds.return_value = fake_creds - - client = GrpcClientFactory.create_client( - service_stub=DummyStub, - address="securehost:9999", - grpc_max_message_length=2048, - secure=True, - root_certificates=b"fakecerts", - ) - - mock_creds.assert_called_once_with(root_certificates=b"fakecerts") - mock_secure.assert_called_once() - args, kwargs = mock_secure.call_args - assert args[0] == "securehost:9999" - assert args[1] == fake_creds - assert ("grpc.max_send_message_length", 2048) in kwargs["options"] - assert ("grpc.max_receive_message_length", 2048) in kwargs["options"] - - assert isinstance(client, DummyStub) - assert client._channel == fake_channel - @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." "grpc.insecure_channel") def test_create_client_raises_on_insecure_channel_failure(self, @@ -82,26 +54,6 @@ def test_create_client_raises_on_insecure_channel_failure(self, self.assertIn("Failed to create gRPC channel", str(ctx.exception)) self.assertIn("localhost:1234", str(ctx.exception)) - @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." - "grpc.secure_channel") - @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." - "grpc.ssl_channel_credentials") - def test_create_client_raises_on_secure_channel_failure(self, - mock_ssl_creds, - mock_secure_channel): - # Arrange: make secure_channel raise - mock_secure_channel.side_effect = ValueError("SSL handshake failed") - - # Act + Assert - with self.assertRaises(GrpcChannelError) as ctx: - GrpcClientFactory.create_client(DummyStub, - "localhost:5678", - secure=True, - root_certificates=b"dummy") - - self.assertIn("Failed to create gRPC channel", str(ctx.exception)) - self.assertIn("localhost:5678", str(ctx.exception)) - class TestGrpcUtils(unittest.TestCase): def test_get_grpc_uri_and_max_message_length_valid_args(self): From 63f563bc76ac6f3781425556444f7ab67c2941c6 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Tue, 16 Sep 2025 11:14:59 -0500 Subject: [PATCH 12/14] test fix --- .../tests/test_grpc_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py index 4b94b1e..c8bb7ef 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -48,7 +48,7 @@ def test_create_client_raises_on_insecure_channel_failure(self, # Act + Assert with self.assertRaises(GrpcChannelError) as ctx: - GrpcClientFactory.create_client(DummyStub, "localhost:1234", secure=False) + GrpcClientFactory.create_client(DummyStub, "localhost:1234") # Ensure exception contains useful context self.assertIn("Failed to create gRPC channel", str(ctx.exception)) From e0107fbdb6572448b3f36ce75e3808f2dc17f6ff Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Wed, 17 Sep 2025 10:01:31 -0500 Subject: [PATCH 13/14] remove properties_to_modify --- .../bindings/servicebus/serviceBusMessageActions.py | 11 +++++------ .../tests/test_servicebus_message_actions.py | 5 ++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py index 0cd2156..ecc1a14 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -80,12 +80,12 @@ def complete(self, message) -> None: raise SettlementError("complete", f"Failed to complete message {locktoken}", e) - def abandon(self, message, properties_to_modify: bytes = b"") -> None: + def abandon(self, message) -> None: try: locktoken = self._validate_lock_token(message) request = AbandonRequest() request.locktoken = str(locktoken) - request.propertiesToModify = properties_to_modify + request.propertiesToModify = b"" self._client.Abandon(request) except Exception as e: raise SettlementError("abandon", @@ -94,14 +94,13 @@ def abandon(self, message, properties_to_modify: bytes = b"") -> None: def deadletter( self, message, - properties_to_modify: bytes = b"", deadletter_reason: Optional[str] = None, deadletter_error_description: Optional[str] = None) -> None: try: locktoken = self._validate_lock_token(message) request = DeadletterRequest() request.locktoken = str(locktoken) - request.propertiesToModify = properties_to_modify + request.propertiesToModify = b"" if deadletter_reason: request.deadletterReason.CopyFrom(StringValue(value=deadletter_reason)) @@ -114,12 +113,12 @@ def deadletter( raise SettlementError("deadletter", f"Failed to deadletter message {locktoken}", e) - def defer(self, message, properties_to_modify: bytes = b"") -> None: + def defer(self, message) -> None: try: locktoken = self._validate_lock_token(message) request = DeferRequest() request.locktoken = str(locktoken) - request.propertiesToModify = properties_to_modify + request.propertiesToModify = b"" self._client.Defer(request) except Exception as e: diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py index c9c44ec..a6cd281 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py @@ -74,7 +74,7 @@ def test_complete_calls_grpc(self): def test_abandon_calls_grpc(self): msg = DummyMessage("lock123") - self.actions.abandon(msg, properties_to_modify=b"foo") + self.actions.abandon(msg) self.mock_client.Abandon.assert_called_once() called_req = self.mock_client.Abandon.call_args[0][0] @@ -86,7 +86,6 @@ def test_deadletter_with_reasons(self): msg = DummyMessage("lock123") self.actions.deadletter( msg, - properties_to_modify=b"p", deadletter_reason="reason", deadletter_error_description="desc" ) @@ -101,7 +100,7 @@ def test_deadletter_with_reasons(self): def test_defer_calls_grpc(self): msg = DummyMessage("lock123") - self.actions.defer(msg, properties_to_modify=b"defer") + self.actions.defer(msg) self.mock_client.Defer.assert_called_once() called_req = self.mock_client.Defer.call_args[0][0] From 2c115ac66e3944648acb1359bf53da6513869b5e Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Wed, 17 Sep 2025 10:23:17 -0500 Subject: [PATCH 14/14] fix tests --- .../tests/test_servicebus_message_actions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py index a6cd281..5bfb2ee 100644 --- a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py @@ -80,7 +80,7 @@ def test_abandon_calls_grpc(self): called_req = self.mock_client.Abandon.call_args[0][0] self.assertIsInstance(called_req, pb2.AbandonRequest) self.assertEqual(called_req.locktoken, "lock123") - self.assertEqual(called_req.propertiesToModify, b"foo") + self.assertEqual(called_req.propertiesToModify, b"") def test_deadletter_with_reasons(self): msg = DummyMessage("lock123") @@ -94,7 +94,7 @@ def test_deadletter_with_reasons(self): called_req = self.mock_client.Deadletter.call_args[0][0] self.assertIsInstance(called_req, pb2.DeadletterRequest) self.assertEqual(called_req.locktoken, "lock123") - self.assertEqual(called_req.propertiesToModify, b"p") + self.assertEqual(called_req.propertiesToModify, b"") self.assertEqual(called_req.deadletterReason.value, "reason") self.assertEqual(called_req.deadletterErrorDescription.value, "desc") @@ -106,7 +106,7 @@ def test_defer_calls_grpc(self): called_req = self.mock_client.Defer.call_args[0][0] self.assertIsInstance(called_req, pb2.DeferRequest) self.assertEqual(called_req.locktoken, "lock123") - self.assertEqual(called_req.propertiesToModify, b"defer") + self.assertEqual(called_req.propertiesToModify, b"") def test_renew_message_lock_calls_grpc(self): msg = DummyMessage("lock123")