diff --git a/.flake8 b/.flake8 index 4bb29cf..b8f9c95 100644 --- a/.flake8 +++ b/.flake8 @@ -6,6 +6,7 @@ ignore = W503,E402,E731 exclude = .git, __pycache__, build, dist, .eggs, .github, .local, docs/, - Samples, .env*, .vscode, venv*, *.venv* + Samples, .env*, .vscode, venv*, *.venv*, + azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos, max-line-length = 88 diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py new file mode 100644 index 0000000..edd79eb --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement.proto b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement.proto new file mode 100644 index 0000000..34f9a03 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement.proto @@ -0,0 +1,99 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/wrappers.proto"; +import "google/protobuf/timestamp.proto"; + +// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic +option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc"; + +// The settlement service definition. +service Settlement { + // Completes a message + rpc Complete (CompleteRequest) returns (google.protobuf.Empty) {} + + // Abandons a message + rpc Abandon (AbandonRequest) returns (google.protobuf.Empty) {} + + // Deadletters a message + rpc Deadletter (DeadletterRequest) returns (google.protobuf.Empty) {} + + // Defers a message + rpc Defer (DeferRequest) returns (google.protobuf.Empty) {} + + // Renew message lock + rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {} + + // Get session state + rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {} + + // Set session state + rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {} + + // Release session + rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {} + + // Renew session lock + rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {} +} + +// The complete message request containing the locktoken. +message CompleteRequest { + string locktoken = 1; +} + +// The abandon message request containing the locktoken and properties to modify. +message AbandonRequest { + string locktoken = 1; + bytes propertiesToModify = 2; +} + +// The deadletter message request containing the locktoken and properties to modify along with the reason/description. +message DeadletterRequest { + string locktoken = 1; + bytes propertiesToModify = 2; + google.protobuf.StringValue deadletterReason = 3; + google.protobuf.StringValue deadletterErrorDescription = 4; +} + +// The defer message request containing the locktoken and properties to modify. +message DeferRequest { + string locktoken = 1; + bytes propertiesToModify = 2; +} + +// The renew message lock request containing the locktoken. +message RenewMessageLockRequest { + string locktoken = 1; +} + +// The get message request. +message GetSessionStateRequest { + string sessionId = 1; +} + +// The set message request. +message SetSessionStateRequest { + string sessionId = 1; + bytes sessionState = 2; +} + +// Get response containing the session state. +message GetSessionStateResponse { + bytes sessionState = 1; +} + +// Release session. +message ReleaseSessionRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockResponse { + google.protobuf.Timestamp lockedUntil = 1; +} \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2.py new file mode 100644 index 0000000..94fd154 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: azurefunctions/extensions/bindings/protos/settlement.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'azurefunctions/extensions/bindings/protos/settlement.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n:azurefunctions/extensions/bindings/protos/settlement.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"$\n\x0f\x43ompleteRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\"?\n\x0e\x41\x62\x61ndonRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\x12\x1a\n\x12propertiesToModify\x18\x02 \x01(\x0c\"\xbc\x01\n\x11\x44\x65\x61\x64letterRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\x12\x1a\n\x12propertiesToModify\x18\x02 \x01(\x0c\x12\x36\n\x10\x64\x65\x61\x64letterReason\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12@\n\x1a\x64\x65\x61\x64letterErrorDescription\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"=\n\x0c\x44\x65\x66\x65rRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\x12\x1a\n\x12propertiesToModify\x18\x02 \x01(\x0c\",\n\x17RenewMessageLockRequest\x12\x11\n\tlocktoken\x18\x01 \x01(\t\"+\n\x16GetSessionStateRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\"A\n\x16SetSessionStateRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\x12\x14\n\x0csessionState\x18\x02 \x01(\x0c\"/\n\x17GetSessionStateResponse\x12\x14\n\x0csessionState\x18\x01 \x01(\x0c\"*\n\x15ReleaseSessionRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\",\n\x17RenewSessionLockRequest\x12\x11\n\tsessionId\x18\x01 \x01(\t\"K\n\x18RenewSessionLockResponse\x12/\n\x0blockedUntil\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp2\xcd\x04\n\nSettlement\x12\x36\n\x08\x43omplete\x12\x10.CompleteRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x34\n\x07\x41\x62\x61ndon\x12\x0f.AbandonRequest\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nDeadletter\x12\x12.DeadletterRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x30\n\x05\x44\x65\x66\x65r\x12\r.DeferRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x46\n\x10RenewMessageLock\x12\x18.RenewMessageLockRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x46\n\x0fGetSessionState\x12\x17.GetSessionStateRequest\x1a\x18.GetSessionStateResponse\"\x00\x12\x44\n\x0fSetSessionState\x12\x17.SetSessionStateRequest\x1a\x16.google.protobuf.Empty\"\x00\x12\x42\n\x0eReleaseSession\x12\x16.ReleaseSessionRequest\x1a\x16.google.protobuf.Empty\"\x00\x12I\n\x10RenewSessionLock\x12\x18.RenewSessionLockRequest\x1a\x19.RenewSessionLockResponse\"\x00\x42\"\xaa\x02\x1fMicrosoft.Azure.ServiceBus.Grpcb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'azurefunctions.extensions.bindings.protos.settlement_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'\252\002\037Microsoft.Azure.ServiceBus.Grpc' + _globals['_COMPLETEREQUEST']._serialized_start=156 + _globals['_COMPLETEREQUEST']._serialized_end=192 + _globals['_ABANDONREQUEST']._serialized_start=194 + _globals['_ABANDONREQUEST']._serialized_end=257 + _globals['_DEADLETTERREQUEST']._serialized_start=260 + _globals['_DEADLETTERREQUEST']._serialized_end=448 + _globals['_DEFERREQUEST']._serialized_start=450 + _globals['_DEFERREQUEST']._serialized_end=511 + _globals['_RENEWMESSAGELOCKREQUEST']._serialized_start=513 + _globals['_RENEWMESSAGELOCKREQUEST']._serialized_end=557 + _globals['_GETSESSIONSTATEREQUEST']._serialized_start=559 + _globals['_GETSESSIONSTATEREQUEST']._serialized_end=602 + _globals['_SETSESSIONSTATEREQUEST']._serialized_start=604 + _globals['_SETSESSIONSTATEREQUEST']._serialized_end=669 + _globals['_GETSESSIONSTATERESPONSE']._serialized_start=671 + _globals['_GETSESSIONSTATERESPONSE']._serialized_end=718 + _globals['_RELEASESESSIONREQUEST']._serialized_start=720 + _globals['_RELEASESESSIONREQUEST']._serialized_end=762 + _globals['_RENEWSESSIONLOCKREQUEST']._serialized_start=764 + _globals['_RENEWSESSIONLOCKREQUEST']._serialized_end=808 + _globals['_RENEWSESSIONLOCKRESPONSE']._serialized_start=810 + _globals['_RENEWSESSIONLOCKRESPONSE']._serialized_end=885 + _globals['_SETTLEMENT']._serialized_start=888 + _globals['_SETTLEMENT']._serialized_end=1477 +# @@protoc_insertion_point(module_scope) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py new file mode 100644 index 0000000..0dc374b --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py @@ -0,0 +1,454 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from azurefunctions.extensions.bindings.protos import settlement_pb2 as azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2 +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + +GRPC_GENERATED_VERSION = '1.74.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in azurefunctions/extensions/bindings/protos/settlement_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class SettlementStub(object): + """The settlement service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Complete = channel.unary_unary( + '/Settlement/Complete', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.CompleteRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.Abandon = channel.unary_unary( + '/Settlement/Abandon', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.AbandonRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.Deadletter = channel.unary_unary( + '/Settlement/Deadletter', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeadletterRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.Defer = channel.unary_unary( + '/Settlement/Defer', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeferRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.RenewMessageLock = channel.unary_unary( + '/Settlement/RenewMessageLock', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewMessageLockRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.GetSessionState = channel.unary_unary( + '/Settlement/GetSessionState', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateRequest.SerializeToString, + response_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateResponse.FromString, + _registered_method=True) + self.SetSessionState = channel.unary_unary( + '/Settlement/SetSessionState', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.SetSessionStateRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.ReleaseSession = channel.unary_unary( + '/Settlement/ReleaseSession', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.ReleaseSessionRequest.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + _registered_method=True) + self.RenewSessionLock = channel.unary_unary( + '/Settlement/RenewSessionLock', + request_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockRequest.SerializeToString, + response_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockResponse.FromString, + _registered_method=True) + + +class SettlementServicer(object): + """The settlement service definition. + """ + + def Complete(self, request, context): + """Completes a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Abandon(self, request, context): + """Abandons a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Deadletter(self, request, context): + """Deadletters a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Defer(self, request, context): + """Defers a message + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def RenewMessageLock(self, request, context): + """Renew message lock + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetSessionState(self, request, context): + """Get session state + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SetSessionState(self, request, context): + """Set session state + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReleaseSession(self, request, context): + """Release session + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def RenewSessionLock(self, request, context): + """Renew session lock + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_SettlementServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Complete': grpc.unary_unary_rpc_method_handler( + servicer.Complete, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.CompleteRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'Abandon': grpc.unary_unary_rpc_method_handler( + servicer.Abandon, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.AbandonRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'Deadletter': grpc.unary_unary_rpc_method_handler( + servicer.Deadletter, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeadletterRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'Defer': grpc.unary_unary_rpc_method_handler( + servicer.Defer, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeferRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'RenewMessageLock': grpc.unary_unary_rpc_method_handler( + servicer.RenewMessageLock, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewMessageLockRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'GetSessionState': grpc.unary_unary_rpc_method_handler( + servicer.GetSessionState, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateRequest.FromString, + response_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateResponse.SerializeToString, + ), + 'SetSessionState': grpc.unary_unary_rpc_method_handler( + servicer.SetSessionState, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.SetSessionStateRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'ReleaseSession': grpc.unary_unary_rpc_method_handler( + servicer.ReleaseSession, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.ReleaseSessionRequest.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'RenewSessionLock': grpc.unary_unary_rpc_method_handler( + servicer.RenewSessionLock, + request_deserializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockRequest.FromString, + response_serializer=azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'Settlement', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('Settlement', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class Settlement(object): + """The settlement service definition. + """ + + @staticmethod + def Complete(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Complete', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.CompleteRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Abandon(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Abandon', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.AbandonRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Deadletter(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Deadletter', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeadletterRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Defer(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/Defer', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.DeferRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def RenewMessageLock(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/RenewMessageLock', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewMessageLockRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetSessionState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/GetSessionState', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateRequest.SerializeToString, + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.GetSessionStateResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def SetSessionState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/SetSessionState', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.SetSessionStateRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def ReleaseSession(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/ReleaseSession', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.ReleaseSessionRequest.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def RenewSessionLock(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Settlement/RenewSessionLock', + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockRequest.SerializeToString, + azurefunctions_dot_extensions_dot_bindings_dot_protos_dot_settlement__pb2.RenewSessionLockResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py index 99a4710..a6e25bb 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/__init__.py @@ -1,12 +1,16 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from .serviceBusMessageActions import ServiceBusMessageActions from .serviceBusReceivedMessage import ServiceBusReceivedMessage from .serviceBusConverter import ServiceBusConverter +from .serviceBusClientConverter import ServiceBusClientConverter __all__ = [ "ServiceBusReceivedMessage", "ServiceBusConverter", + "ServiceBusMessageActions", + "ServiceBusClientConverter", ] __version__ = '1.0.0b1' diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py new file mode 100644 index 0000000..1899fa3 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpcClient.py @@ -0,0 +1,61 @@ +# Copyright (c) .NET Foundation. All rights reserved. +# Licensed under the MIT License. + +import grpc +from typing import Any, Optional, Type + + +class GrpcChannelError(Exception): + """Exception raised when gRPC channel creation fails.""" + + +class GrpcClientFactory: + """ + Factory class for creating gRPC clients from generated Python stubs. + + Python requires `.proto` files to be compiled into + `_pb2.py` and `_pb2_grpc.py` modules before use. This factory assumes + those files are already generated and importable. + + Example: + from my_service_pb2_grpc import MyServiceStub + + client = GrpcClientFactory.create_client( + service_stub=MyServiceStub, + address="localhost:50051", + grpc_max_message_length=4 * 1024 * 1024, # 4 MB + ) + """ + + @staticmethod + def create_client( + service_stub: Type[Any], + address: str, + grpc_max_message_length: int = 4 * 1024 * 1024, + root_certificates: Optional[bytes] = None, + ) -> Any: + """ + Creates and returns a gRPC client for the given service stub. + + Args: + service_stub: The generated service stub class (e.g. `MyServiceStub`). + address: The server address (e.g., "localhost:50051"). + grpc_max_message_length: Max message size for send/receive. + root_certificates: Optional root certificates for TLS. + + Returns: + An instance of the gRPC client stub. + """ + + options = [ + ("grpc.max_send_message_length", grpc_max_message_length), + ("grpc.max_receive_message_length", grpc_max_message_length), + ] + + try: + channel = grpc.insecure_channel(address, options=options) + except Exception as e: + raise GrpcChannelError(f"Failed to create gRPC channel. URL: {address}," + f" Options: {options}, Error: {e}") + + return service_stub(channel) diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py new file mode 100644 index 0000000..ddc7c9c --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/grpc_utils.py @@ -0,0 +1,60 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import argparse +from typing import List, Optional + + +class ArgumentError(Exception): + """Custom exception for missing or invalid arguments.""" + pass + + +def parse_grpc_args(argv: Optional[List[str]] = None): + """ + Parses CLI arguments for gRPC connection. + + Args: + argv: Optional list of CLI arguments (defaults to sys.argv[1:]). + + Returns: + args: Namespace with host, port, and functions_grpc_max_message_length + + Raises: + ArgumentError if required arguments are missing or invalid. + """ + parser = argparse.ArgumentParser(add_help=False) + parser.add_argument("--host", help="gRPC server host") + parser.add_argument("--port", help="gRPC server port") + parser.add_argument( + "--functions-grpc-max-message-length", + type=int, + help="Maximum gRPC message size in bytes", + ) + args, _ = parser.parse_known_args(argv) + + missing_args = [] + if not args.host: + missing_args.append("'host'") + if not args.port: + missing_args.append("'port'") + if not args.functions_grpc_max_message_length: + missing_args.append("'functions-grpc-max-message-length'") + + if missing_args: + raise ArgumentError(f"Missing required arguments: {', '.join(missing_args)}") + return args + + +def get_grpc_uri(args) -> str: + """ + Returns the gRPC URI from CLI args. + """ + return f"{args.host}:{args.port}" + + +def get_grpc_max_message_length(args) -> int: + """ + Returns the gRPC max message length from CLI args. + """ + return args.functions_grpc_max_message_length diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py new file mode 100644 index 0000000..db9e90a --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusClientConverter.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any + +from azurefunctions.extensions.base import InConverter +from .serviceBusMessageActions import ServiceBusMessageActions + + +class ServiceBusClientConverter( + InConverter, + binding='serviceBusClient' +): + + @classmethod + def get_client(cls) -> Any: + """ + Returns an instance of ServiceBusMessageActions. + """ + return ServiceBusMessageActions.get_instance() diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py new file mode 100644 index 0000000..ecc1a14 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/serviceBusMessageActions.py @@ -0,0 +1,169 @@ +# Copyright (c) .NET Foundation. All rights reserved. +# Licensed under the MIT License. +import threading + +from typing import Optional + +from azurefunctions.extensions.base import GrpcClientType +from google.protobuf.wrappers_pb2 import StringValue + +from ..protos.settlement_pb2 import ( + AbandonRequest, + CompleteRequest, + DeadletterRequest, + DeferRequest, + ReleaseSessionRequest, + RenewMessageLockRequest, + RenewSessionLockRequest, + SetSessionStateRequest, +) +from ..protos.settlement_pb2_grpc import SettlementStub + +from .grpcClient import GrpcClientFactory +from .grpc_utils import get_grpc_uri, get_grpc_max_message_length, parse_grpc_args + + +class SettlementError(Exception): + """Custom exception for ServiceBusMessageActions errors.""" + def __init__(self, method: str, details: str, original: Exception): + super().__init__(f"[{method}] {details}. Underlying error: {original}") + self.method = method + self.details = details + self.original = original + + +class ServiceBusMessageActions(GrpcClientType): + """ + ServiceBusMessageActions class. + Provides async methods for message settlement over gRPC. + Implements a singleton pattern. + """ + + _instance: Optional["ServiceBusMessageActions"] = None + _lock = threading.Lock() # class-level lock + + def __init__(self) -> None: + args = parse_grpc_args() + self._uri = get_grpc_uri(args) + self._grpc_max_message_length = get_grpc_max_message_length(args) + + self._client: SettlementStub = GrpcClientFactory.create_client( + service_stub=SettlementStub, + address=self._uri, + grpc_max_message_length=self._grpc_max_message_length, + ) + + @classmethod + def get_instance(cls) -> "ServiceBusMessageActions": + with cls._lock: + if cls._instance is None: + cls._instance = ServiceBusMessageActions() + return cls._instance + + def _validate_lock_token(self, message) -> str: + locktoken = message.lock_token + if not locktoken: + raise ValueError("lockToken is required in ServiceBusReceivedMessage.") + return locktoken + + # ------------------------------- + # Settlement methods + # ------------------------------- + + def complete(self, message) -> None: + try: + locktoken = self._validate_lock_token(message) + request = CompleteRequest() + request.locktoken = str(locktoken) + self._client.Complete(request) + except Exception as e: + raise SettlementError("complete", + f"Failed to complete message {locktoken}", e) + + def abandon(self, message) -> None: + try: + locktoken = self._validate_lock_token(message) + request = AbandonRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = b"" + self._client.Abandon(request) + except Exception as e: + raise SettlementError("abandon", + f"Failed to abandon message {locktoken}", e) + + def deadletter( + self, + message, + deadletter_reason: Optional[str] = None, + deadletter_error_description: Optional[str] = None) -> None: + try: + locktoken = self._validate_lock_token(message) + request = DeadletterRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = b"" + + if deadletter_reason: + request.deadletterReason.CopyFrom(StringValue(value=deadletter_reason)) + if deadletter_error_description: + request.deadletterErrorDescription.CopyFrom( + StringValue(value=deadletter_error_description)) + + self._client.Deadletter(request) + except Exception as e: + raise SettlementError("deadletter", + f"Failed to deadletter message {locktoken}", e) + + def defer(self, message) -> None: + try: + locktoken = self._validate_lock_token(message) + request = DeferRequest() + request.locktoken = str(locktoken) + request.propertiesToModify = b"" + + self._client.Defer(request) + except Exception as e: + raise SettlementError("defer", f"Failed to defer message {locktoken}", e) + + def renew_message_lock(self, message) -> None: + try: + locktoken = self._validate_lock_token(message) + request = RenewMessageLockRequest() + request.locktoken = str(locktoken) + self._client.RenewMessageLock(request) + except Exception as e: + raise SettlementError("renew_message_lock", + f"Failed to renew lock for {locktoken}", e) + + def set_session_state(self, session_id: str, session_state: bytes) -> None: + try: + request = SetSessionStateRequest() + request.sessionId = session_id + request.sessionState = session_state + self._client.SetSessionState(request) + except Exception as e: + raise SettlementError("set_session_state", + f"Failed to set state for session {session_id}", e) + + def release_session(self, session_id: str) -> None: + try: + request = ReleaseSessionRequest() + request.sessionId = session_id + self._client.ReleaseSession(request) + except Exception as e: + raise SettlementError("release_session", + f"Failed to release session {session_id}", e) + + def renew_session_lock(self, session_id: str): + try: + request = RenewSessionLockRequest() + request.sessionId = session_id + response = self._client.RenewSessionLock(request) + except Exception as e: + raise SettlementError("renew_session_lock", f"Failed to renew " + f"lock for session {session_id}", e) + + if not response or not response.lockedUntil: + raise RuntimeError("No response or lockedUntil returned " + "from renewSessionLock") + + return response.lockedUntil diff --git a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py index 0d7fea9..d33b872 100644 --- a/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py +++ b/azurefunctions-extensions-bindings-servicebus/azurefunctions/extensions/bindings/servicebus/utils.py @@ -13,7 +13,8 @@ def get_lock_token(message: bytes, index: int) -> str: lock_token_encoded = message[:index] # Convert the lock token to a UUID using the first 16 bytes - lock_token_uuid = uuid.UUID(bytes=lock_token_encoded[:16]) + # Use little-endian to match SDK + lock_token_uuid = uuid.UUID(bytes_le=lock_token_encoded[:16]) return lock_token_uuid diff --git a/azurefunctions-extensions-bindings-servicebus/pyproject.toml b/azurefunctions-extensions-bindings-servicebus/pyproject.toml index 54af6f6..f528a8e 100644 --- a/azurefunctions-extensions-bindings-servicebus/pyproject.toml +++ b/azurefunctions-extensions-bindings-servicebus/pyproject.toml @@ -26,9 +26,12 @@ classifiers= [ 'Development Status :: 5 - Production/Stable', ] dependencies = [ - 'azurefunctions-extensions-base', + 'azurefunctions-extensions-base>=1.1.0', 'azure-servicebus>=7.14.2,<8.0', - 'uamqp>=1.6.11,<2.0' + 'uamqp>=1.6.11,<2.0', + 'protobuf>=6.32.0,<7.0', + 'grpcio>=1.74.0,<2.0', + 'grpcio-tools>=1.74.0,<2.0' ] [project.optional-dependencies] diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py new file mode 100644 index 0000000..9d97870 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/function_app.py @@ -0,0 +1,37 @@ +# coding: utf-8 + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +import logging + +import azure.functions as func +import azurefunctions.extensions.bindings.servicebus as servicebus + +app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) + +""" +FOLDER: servicebus_samples +DESCRIPTION: + These samples demonstrate how to complete a message using the + optional ServiceBusMessageActions argument. +USAGE: + Set the environment variables with your own values before running the + sample: + For running the ServiceBus queue trigger function: + 1) QUEUE_NAME - the name of the ServiceBus queue + 2) SERVICEBUS_CONNECTION - the connection string for the ServiceBus entity +""" + + +@app.service_bus_queue_trigger(arg_name="received_message", + queue_name="QUEUE_NAME", + connection="SERVICEBUS_CONNECTION", + auto_complete_messages=False) +def servicebus_queue_trigger(received_message: servicebus.ServiceBusReceivedMessage, message_actions: servicebus.ServiceBusMessageActions): + logging.info(f"Python ServiceBus queue trigger processed message. Message: {received_message}") + message_actions.complete(received_message) + logging.info("Completed message.") diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/host.json b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/host.json new file mode 100644 index 0000000..9df9136 --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/host.json @@ -0,0 +1,15 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json new file mode 100644 index 0000000..8efd1ba --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/local.settings.json @@ -0,0 +1,9 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "QUEUE_NAME": "", + "SERVICEBUS_CONNECTION": "" + } +} \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/requirements.txt b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/requirements.txt new file mode 100644 index 0000000..7b4c53b --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/samples/servicebus_samples_settlement/requirements.txt @@ -0,0 +1,6 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +azurefunctions-extensions-bindings-servicebus \ No newline at end of file diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py new file mode 100644 index 0000000..c8bb7ef --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_grpc_client.py @@ -0,0 +1,105 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import unittest +from unittest.mock import patch, MagicMock + +from azurefunctions.extensions.bindings.servicebus.grpcClient import (GrpcClientFactory, + GrpcChannelError) +from azurefunctions.extensions.bindings.servicebus.grpc_utils import ( + get_grpc_uri, + get_grpc_max_message_length, + parse_grpc_args, + ArgumentError) + + +class DummyStub: + def __init__(self, channel): + self._channel = channel + + +class TestGrpcClient(unittest.TestCase): + def test_create_client_insecure_channel(self): + with patch("azurefunctions.extensions.bindings.servicebus.grpcClient.grpc.insecure_channel") as mock_insecure: # noqa + fake_channel = MagicMock() + mock_insecure.return_value = fake_channel + + client = GrpcClientFactory.create_client( + service_stub=DummyStub, + address="localhost:1234", + grpc_max_message_length=1024, + ) + + mock_insecure.assert_called_once() + args, kwargs = mock_insecure.call_args + assert args[0] == "localhost:1234" + assert ("grpc.max_send_message_length", 1024) in kwargs["options"] + assert ("grpc.max_receive_message_length", 1024) in kwargs["options"] + + assert isinstance(client, DummyStub) + assert client._channel == fake_channel + + @patch("azurefunctions.extensions.bindings.servicebus.grpcClient." + "grpc.insecure_channel") + def test_create_client_raises_on_insecure_channel_failure(self, + mock_insecure_channel): + # Arrange: force grpc.insecure_channel to throw + mock_insecure_channel.side_effect = RuntimeError("connection failed") + + # Act + Assert + with self.assertRaises(GrpcChannelError) as ctx: + GrpcClientFactory.create_client(DummyStub, "localhost:1234") + + # Ensure exception contains useful context + self.assertIn("Failed to create gRPC channel", str(ctx.exception)) + self.assertIn("localhost:1234", str(ctx.exception)) + + +class TestGrpcUtils(unittest.TestCase): + def test_get_grpc_uri_and_max_message_length_valid_args(self): + argv = [ + "--host", "localhost", + "--port", "50051", + "--functions-grpc-max-message-length", "4096" + ] + args = parse_grpc_args(argv) + uri = get_grpc_uri(args) + max_len = get_grpc_max_message_length(args) + assert uri == "localhost:50051" + assert max_len == 4096 + + def test_get_grpc_uri_missing_host(self): + argv = [ + "--port", "50051", + "--functions-grpc-max-message-length", "4096" + ] + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + self.assertIn("host", str(excinfo.exception)) + + def test_get_grpc_uri_missing_port(self): + argv = [ + "--host", "localhost", + "--functions-grpc-max-message-length", "4096" + ] + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + self.assertIn("port", str(excinfo.exception)) + + def test_get_grpc_max_message_length_missing(self): + argv = [ + "--host", "localhost", + "--port", "50051", + ] + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + self.assertIn("functions-grpc-max-message-length", str(excinfo.exception)) + + def test_get_grpc_uri_and_max_message_length_multiple_missing(self): + argv = [] + with self.assertRaises(ArgumentError) as excinfo: + parse_grpc_args(argv) + msg = str(excinfo.exception) + self.assertIn("host", msg) + self.assertIn("port", msg) + self.assertIn("functions-grpc-max-message-length", msg) diff --git a/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py new file mode 100644 index 0000000..5bfb2ee --- /dev/null +++ b/azurefunctions-extensions-bindings-servicebus/tests/test_servicebus_message_actions.py @@ -0,0 +1,240 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import grpc +import unittest + +from unittest.mock import patch, MagicMock + +from google.protobuf.timestamp_pb2 import Timestamp + +from azurefunctions.extensions.bindings.servicebus import ServiceBusMessageActions +from azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions import SettlementError # noqa +from azurefunctions.extensions.bindings.protos import settlement_pb2 as pb2 + + +class DummyMessage: + def __init__(self, locktoken): + self.lock_token = locktoken + + +class TestServiceBusMessageActions(unittest.TestCase): + def setUp(self): + ServiceBusMessageActions._instance = None + # Patch create_client so we control what gets returned + patcher = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.GrpcClientFactory.create_client") # noqa + self.addCleanup(patcher.stop) + self.mock_create_client = patcher.start() + + patcher_args = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.parse_grpc_args") # noqa + self.addCleanup(patcher_args.stop) + patcher_args.start() + + # Patch get_grpc_uri so we don't need CLI args + patcher_uri = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.get_grpc_uri", # noqa + return_value=("localhost:50051", 4 * 1024 * 1024)) + self.addCleanup(patcher_uri.stop) + patcher_uri.start() + + patcher_message_length = patch( + "azurefunctions.extensions.bindings.servicebus.serviceBusMessageActions.get_grpc_max_message_length") # noqa + self.addCleanup(patcher_message_length.stop) + patcher_message_length.start() + + # The fake gRPC client returned by create_client + self.mock_client = MagicMock() + self.mock_client.Complete = MagicMock() + self.mock_client.Abandon = MagicMock() + self.mock_client.Deadletter = MagicMock() + self.mock_client.Defer = MagicMock() + self.mock_client.RenewMessageLock = MagicMock() + self.mock_client.SetSessionState = MagicMock() + self.mock_client.ReleaseSession = MagicMock() + self.mock_client.RenewSessionLock = MagicMock() + self.mock_create_client.return_value = self.mock_client + + # Now actions will use our patched client + self.actions = ServiceBusMessageActions().get_instance() + + def tearDown(self): + # Ensure singleton is cleared after each test too + ServiceBusMessageActions._instance = None + + def test_complete_calls_grpc(self): + msg = DummyMessage("lock123") + self.actions.complete(msg) + + self.mock_client.Complete.assert_called_once() + called_req = self.mock_client.Complete.call_args[0][0] + self.assertIsInstance(called_req, pb2.CompleteRequest) + self.assertEqual(called_req.locktoken, "lock123") + + def test_abandon_calls_grpc(self): + msg = DummyMessage("lock123") + self.actions.abandon(msg) + + self.mock_client.Abandon.assert_called_once() + called_req = self.mock_client.Abandon.call_args[0][0] + self.assertIsInstance(called_req, pb2.AbandonRequest) + self.assertEqual(called_req.locktoken, "lock123") + self.assertEqual(called_req.propertiesToModify, b"") + + def test_deadletter_with_reasons(self): + msg = DummyMessage("lock123") + self.actions.deadletter( + msg, + deadletter_reason="reason", + deadletter_error_description="desc" + ) + + self.mock_client.Deadletter.assert_called_once() + called_req = self.mock_client.Deadletter.call_args[0][0] + self.assertIsInstance(called_req, pb2.DeadletterRequest) + self.assertEqual(called_req.locktoken, "lock123") + self.assertEqual(called_req.propertiesToModify, b"") + self.assertEqual(called_req.deadletterReason.value, "reason") + self.assertEqual(called_req.deadletterErrorDescription.value, "desc") + + def test_defer_calls_grpc(self): + msg = DummyMessage("lock123") + self.actions.defer(msg) + + self.mock_client.Defer.assert_called_once() + called_req = self.mock_client.Defer.call_args[0][0] + self.assertIsInstance(called_req, pb2.DeferRequest) + self.assertEqual(called_req.locktoken, "lock123") + self.assertEqual(called_req.propertiesToModify, b"") + + def test_renew_message_lock_calls_grpc(self): + msg = DummyMessage("lock123") + self.actions.renew_message_lock(msg) + + self.mock_client.RenewMessageLock.assert_called_once() + called_req = self.mock_client.RenewMessageLock.call_args[0][0] + self.assertIsInstance(called_req, pb2.RenewMessageLockRequest) + self.assertEqual(called_req.locktoken, "lock123") + + def test_set_session_state(self): + self.actions.set_session_state("sid", b"state") + + self.mock_client.SetSessionState.assert_called_once() + called_req = self.mock_client.SetSessionState.call_args[0][0] + self.assertIsInstance(called_req, pb2.SetSessionStateRequest) + self.assertEqual(called_req.sessionId, "sid") + self.assertEqual(called_req.sessionState, b"state") + + def test_release_session(self): + self.actions.release_session("sid") + + self.mock_client.ReleaseSession.assert_called_once() + called_req = self.mock_client.ReleaseSession.call_args[0][0] + self.assertIsInstance(called_req, pb2.ReleaseSessionRequest) + self.assertEqual(called_req.sessionId, "sid") + + def test_renew_session_lock_success(self): + ts = Timestamp() + ts.GetCurrentTime() + # Mock gRPC response + resp = pb2.RenewSessionLockResponse() + resp.lockedUntil.CopyFrom(ts) + self.mock_client.RenewSessionLock.return_value = resp + + result = self.actions.renew_session_lock("sid") + + self.mock_client.RenewSessionLock.assert_called_once() + called_req = self.mock_client.RenewSessionLock.call_args[0][0] + self.assertIsInstance(called_req, pb2.RenewSessionLockRequest) + self.assertEqual(called_req.sessionId, "sid") + self.assertIsInstance(result, Timestamp) + self.assertEqual(result, ts) + + def test_renew_session_lock_failure(self): + # No response + self.mock_client.RenewSessionLock.return_value = None + with self.assertRaises(RuntimeError): + self.actions.renew_session_lock("sid") + + def test_validate_lock_token_raises(self): + msg = DummyMessage(None) + with self.assertRaises(ValueError): + self.actions._validate_lock_token(msg) + + def test_complete_raises_SettlementError(self): + msg = DummyMessage("lt1") + self.mock_client.Complete.side_effect = grpc.RpcError("boom") + + with self.assertRaises(SettlementError) as cm: + self.actions.complete(msg) + + self.assertIn("complete", str(cm.exception)) + self.assertIn("lt1", str(cm.exception)) + + def test_abandon_raises_SettlementError(self): + msg = DummyMessage("lt2") + self.mock_client.Abandon.side_effect = grpc.RpcError("fail") + + with self.assertRaises(SettlementError) as cm: + self.actions.abandon(msg) + + self.assertIn("abandon", str(cm.exception)) + self.assertIn("lt2", str(cm.exception)) + + def test_deadletter_raises_SettlementError(self): + msg = DummyMessage("lt3") + self.mock_client.Deadletter.side_effect = grpc.RpcError("oops") + + with self.assertRaises(SettlementError) as cm: + self.actions.deadletter(msg, deadletter_reason="reason") + + self.assertIn("deadletter", str(cm.exception)) + self.assertIn("lt3", str(cm.exception)) + + def test_defer_raises_SettlementError(self): + msg = DummyMessage("lt4") + self.mock_client.Defer.side_effect = grpc.RpcError("bad") + + with self.assertRaises(SettlementError) as cm: + self.actions.defer(msg) + + self.assertIn("defer", str(cm.exception)) + self.assertIn("lt4", str(cm.exception)) + + def test_renew_message_lock_raises_SettlementError(self): + msg = DummyMessage("lt5") + self.mock_client.RenewMessageLock.side_effect = grpc.RpcError("err") + + with self.assertRaises(SettlementError) as cm: + self.actions.renew_message_lock(msg) + + self.assertIn("renew_message_lock", str(cm.exception)) + self.assertIn("lt5", str(cm.exception)) + + def test_set_session_state_raises_SettlementError(self): + self.mock_client.SetSessionState.side_effect = grpc.RpcError("nope") + + with self.assertRaises(SettlementError) as cm: + self.actions.set_session_state("sid1", b"state") + + self.assertIn("set_session_state", str(cm.exception)) + self.assertIn("sid1", str(cm.exception)) + + def test_release_session_raises_SettlementError(self): + self.mock_client.ReleaseSession.side_effect = grpc.RpcError("denied") + + with self.assertRaises(SettlementError) as cm: + self.actions.release_session("sid2") + + self.assertIn("release_session", str(cm.exception)) + self.assertIn("sid2", str(cm.exception)) + + def test_renew_session_lock_raises_SettlementError(self): + self.mock_client.RenewSessionLock.side_effect = grpc.RpcError("boom") + + with self.assertRaises(SettlementError) as cm: + self.actions.renew_session_lock("sid3") + + self.assertIn("renew_session_lock", str(cm.exception)) + self.assertIn("sid3", str(cm.exception))