From 02b9a794b9c84b78c202ca2750cd9f33eff41225 Mon Sep 17 00:00:00 2001 From: EvanWave Date: Tue, 12 Aug 2025 20:43:47 +0800 Subject: [PATCH 1/3] Add python doc Signed-off-by: EvanWave --- functions/example-external.yaml | 4 +- functions/example-functions.yaml | 4 +- operator/Makefile | 17 +++ operator/deploy/chart/values.yaml | 2 +- sdks/fs-python/Makefile | 3 + sdks/fs-python/function_stream/__init__.py | 2 +- sdks/fs-python/function_stream/context.py | 54 +++++++- sdks/fs-python/function_stream/function.py | 147 +++++++++++++++++---- sdks/fs-python/tests/test_context.py | 45 +++++++ sdks/fs-python/tests/test_function.py | 113 ++++++++++++---- 10 files changed, 334 insertions(+), 57 deletions(-) diff --git a/functions/example-external.yaml b/functions/example-external.yaml index 2e20f4a6..732bc62b 100644 --- a/functions/example-external.yaml +++ b/functions/example-external.yaml @@ -13,14 +13,14 @@ # limitations under the License. name: external-function -namespace: fs +namespace: function-stream runtime: type: "external" sources: - config: inputs: - "external-input" - subscription-name: "fs" + subscription-name: "function-stream" type: "memory" sink: config: diff --git a/functions/example-functions.yaml b/functions/example-functions.yaml index 084375d2..45767572 100644 --- a/functions/example-functions.yaml +++ b/functions/example-functions.yaml @@ -13,7 +13,7 @@ # limitations under the License. name: function-sample -namespace: fs +namespace: function-stream runtime: type: "wasm" config: @@ -22,7 +22,7 @@ sources: - config: inputs: - "input" - subscription-name: "fs" + subscription-name: "function-stream" type: "memory" sink: config: diff --git a/operator/Makefile b/operator/Makefile index 986e60f5..3c557d76 100644 --- a/operator/Makefile +++ b/operator/Makefile @@ -178,10 +178,12 @@ KUSTOMIZE ?= $(LOCALBIN)/kustomize CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen ENVTEST ?= $(LOCALBIN)/setup-envtest GOLANGCI_LINT = $(LOCALBIN)/golangci-lint +CODEGEN ?= $(LOCALBIN)/code-generator ## Tool Versions KUSTOMIZE_VERSION ?= v5.6.0 CONTROLLER_TOOLS_VERSION ?= v0.17.2 +CODEGEN_VERSION ?= v0.32.1 #ENVTEST_VERSION is the version of controller-runtime release branch to fetch the envtest setup script (i.e. release-0.20) ENVTEST_VERSION ?= $(shell go list -m -f "{{ .Version }}" sigs.k8s.io/controller-runtime | awk -F'[v.]' '{printf "release-%d.%d", $$2, $$3}') #ENVTEST_K8S_VERSION is the version of Kubernetes to use for setting up ENVTEST binaries (i.e. 1.31) @@ -216,6 +218,21 @@ golangci-lint: $(GOLANGCI_LINT) ## Download golangci-lint locally if necessary. $(GOLANGCI_LINT): $(LOCALBIN) $(call go-install-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/cmd/golangci-lint,$(GOLANGCI_LINT_VERSION)) +.PHONY: code-generator +code-generator: $(CODEGEN) ## Download code-generator locally if necessary. +$(CODEGEN): $(LOCALBIN) + $(call go-install-tool,$(CODEGEN),k8s.io/code-generator/cmd/...,$(CODEGEN_VERSION)) + +.PHONY: generate-client +generate-client: ## Generate client SDK using code-generator + @echo "Generating client SDK..." + @mkdir -p pkg/client + @go install k8s.io/code-generator/cmd/client-gen@v0.32.1 + @go install k8s.io/code-generator/cmd/lister-gen@v0.32.1 + @go install k8s.io/code-generator/cmd/informer-gen@v0.32.1 + @go install k8s.io/code-generator/cmd/deepcopy-gen@v0.32.1 + @hack/update-codegen.sh + # go-install-tool will 'go install' any package with custom target and name of binary, if it doesn't exist # $1 - target path with name of binary # $2 - package url which can be installed diff --git a/operator/deploy/chart/values.yaml b/operator/deploy/chart/values.yaml index 45dee591..a7756d8e 100644 --- a/operator/deploy/chart/values.yaml +++ b/operator/deploy/chart/values.yaml @@ -5,7 +5,7 @@ controllerManager: image: repository: functionstream/operator tag: latest - imagePullPolicy: IfNotPresent + imagePullPolicy: Always args: - "--leader-elect" - "--metrics-bind-address=:8443" diff --git a/sdks/fs-python/Makefile b/sdks/fs-python/Makefile index 70ab0ba9..1092e8d1 100644 --- a/sdks/fs-python/Makefile +++ b/sdks/fs-python/Makefile @@ -3,5 +3,8 @@ build-image: docker build -t functionstream/fs-python-base . +docker-buildx: + docker buildx build --platform linux/amd64,linux/arm64 -t functionstream/fs-python-base . + test: PYTHONPATH=. python -m pytest \ No newline at end of file diff --git a/sdks/fs-python/function_stream/__init__.py b/sdks/fs-python/function_stream/__init__.py index c479b0de..163bc509 100644 --- a/sdks/fs-python/function_stream/__init__.py +++ b/sdks/fs-python/function_stream/__init__.py @@ -4,7 +4,7 @@ from .metrics import Metrics, MetricsServer from .module import FSModule -__version__ = "0.6.0rc1" +__version__ = "0.6.0rc2" __all__ = [ # Core classes "FSFunction", diff --git a/sdks/fs-python/function_stream/context.py b/sdks/fs-python/function_stream/context.py index 5c81383f..f28c826f 100644 --- a/sdks/fs-python/function_stream/context.py +++ b/sdks/fs-python/function_stream/context.py @@ -1,9 +1,14 @@ """ FSContext module provides a context object that manages configuration access for FunctionStream SDK. + +This module defines the FSContext class which serves as a wrapper around the Config object, +providing a clean interface for accessing configuration values and handling any potential +errors during access. It also provides methods for metadata access and data production. """ import logging from typing import Any, Dict +from datetime import datetime from .config import Config @@ -17,6 +22,7 @@ class FSContext: This class serves as a wrapper around the Config object, providing a clean interface for accessing configuration values and handling any potential errors during access. + It also provides methods for metadata access and data production capabilities. Attributes: config (Config): The configuration object containing all settings. @@ -25,11 +31,10 @@ class FSContext: def __init__(self, config: Config): """ - Initialize the FSContext with a configuration object and optional FSFunction reference. + Initialize the FSContext with a configuration object. Args: config (Config): The configuration object to be used by this context. - function (FSFunction, optional): The parent FSFunction instance. """ self.config = config @@ -53,8 +58,53 @@ def get_config(self, config_name: str) -> Any: logger.error(f"Error getting config {config_name}: {str(e)}") return "" + def get_metadata(self, key: str) -> Any: + """ + Get metadata value by key. + + This method retrieves metadata associated with the current message. + + Args: + key (str): The metadata key to retrieve. + + Returns: + Any: The metadata value, currently always None. + """ + return None + + def produce(self, data: Dict[str, Any], event_time: datetime = None) -> None: + """ + Produce data to the output stream. + + This method is intended to send processed data to the output stream. + + Args: + data (Dict[str, Any]): The data to produce. + event_time (datetime, optional): The timestamp for the event. Defaults to None. + + Returns: + None: Currently always returns None. + """ + return None + def get_configs(self) -> Dict[str, Any]: + """ + Get all configuration values. + + Returns a dictionary containing all configuration key-value pairs. + + Returns: + Dict[str, Any]: A dictionary containing all configuration values. + """ return self.config.config def get_module(self) -> str: + """ + Get the current module name. + + Returns the name of the module currently being executed. + + Returns: + str: The name of the current module. + """ return self.config.module diff --git a/sdks/fs-python/function_stream/function.py b/sdks/fs-python/function_stream/function.py index d735f450..dc4c4a9d 100644 --- a/sdks/fs-python/function_stream/function.py +++ b/sdks/fs-python/function_stream/function.py @@ -3,6 +3,7 @@ This module provides the core functionality for creating and managing FunctionStream functions. It handles message processing, request/response flow, and resource management. +The module includes classes for function execution, message handling, and Pulsar integration. """ import asyncio import dataclasses @@ -33,12 +34,20 @@ def _validate_process_func(func: Callable, module_name: str): """ Validate the structure of a process function. + This function performs comprehensive validation of a process function to ensure + it meets the requirements for FunctionStream processing. It checks parameter + count, types, and return types including support for async functions. + Args: func (Callable): The function to validate module_name (str): Name of the module for error messages Raises: - ValueError: If the function structure is invalid + ValueError: If the function structure is invalid, including: + - Incorrect number of parameters + - Missing type hints + - Invalid parameter types + - Invalid return types """ # Get function signature sig = inspect.signature(func) @@ -59,12 +68,14 @@ def _validate_process_func(func: Callable, module_name: str): ) def unwrap_annotated(annotation): + """Helper function to unwrap Annotated types.""" origin = typing.get_origin(annotation) if origin is typing.Annotated: return unwrap_annotated(typing.get_args(annotation)[0]) return annotation def is_dict_str_any(annotation): + """Check if annotation represents Dict[str, Any] or dict[str, Any].""" ann = unwrap_annotated(annotation) origin = typing.get_origin(ann) args = typing.get_args(ann) @@ -82,28 +93,33 @@ def is_dict_str_any(annotation): return_type = type_hints.get('return') def is_dict_return(annotation): + """Check if annotation represents Dict[str, Any] or dict[str, Any].""" ann = unwrap_annotated(annotation) origin = typing.get_origin(ann) args = typing.get_args(ann) return (origin in (dict, typing.Dict)) and args == (str, Any) def is_none_type(annotation): + """Check if annotation represents None type.""" ann = unwrap_annotated(annotation) return ann is type(None) def is_awaitable_dict(annotation): + """Check if annotation represents Awaitable[Dict[str, Any]].""" ann = unwrap_annotated(annotation) origin = typing.get_origin(ann) args = typing.get_args(ann) return origin in (typing.Awaitable,) and len(args) == 1 and is_dict_return(args[0]) def is_awaitable_none(annotation): + """Check if annotation represents Awaitable[None].""" ann = unwrap_annotated(annotation) origin = typing.get_origin(ann) args = typing.get_args(ann) return origin in (typing.Awaitable,) and len(args) == 1 and is_none_type(args[0]) def is_union_of_dict_and_none(annotation): + """Check if annotation represents Union[Dict[str, Any], None].""" ann = unwrap_annotated(annotation) origin = typing.get_origin(ann) args = typing.get_args(ann) @@ -112,6 +128,7 @@ def is_union_of_dict_and_none(annotation): return False def is_awaitable_union_dict_none(annotation): + """Check if annotation represents Awaitable[Union[Dict[str, Any], None]].""" ann = unwrap_annotated(annotation) origin = typing.get_origin(ann) args = typing.get_args(ann) @@ -135,6 +152,16 @@ def is_awaitable_union_dict_none(annotation): @dataclasses.dataclass class MsgWrapper: + """ + Wrapper class for message data and event timing. + + This class encapsulates message data along with its associated event timestamp, + providing a structured way to handle messages throughout the processing pipeline. + + Attributes: + data (Dict[str, Any]): The message data payload. + event_time (Optional[datetime]): The timestamp when the event occurred. + """ data: Dict[str, Any] event_time: Optional[datetime] = None @@ -147,14 +174,26 @@ class FSFunction: messages from multiple Pulsar topics. It handles message consumption, processing, and response generation, while managing resources and providing metrics. + The FSFunction class is the main entry point for creating FunctionStream functions. + It manages the entire lifecycle of message processing, including: + - Pulsar client and consumer setup + - Message processing with configurable concurrency limits + - Response handling and error management + - Metrics collection and monitoring + - Graceful shutdown and resource cleanup + Attributes: - config (Config): Configuration object containing function settings - process_funcs (Dict[str, Union[Callable, FSModule]]): Dictionary of process functions or modules by module name - client (Client): Pulsar client instance - semaphore (asyncio.Semaphore): Semaphore for controlling concurrent requests - metrics (Metrics): Metrics collection object - metrics_server (MetricsServer): Server for exposing metrics - context (FSContext): Context object for accessing configuration + config (Config): Configuration object containing function settings. + process_funcs (Dict[str, Union[Callable, FSModule]]): Dictionary of process functions or modules by module name. + client (Client): Pulsar client instance for message consumption and production. + semaphore (asyncio.Semaphore): Semaphore for controlling concurrent requests. + metrics (Metrics): Metrics collection object for monitoring function performance. + metrics_server (MetricsServer): Server for exposing metrics via HTTP endpoint. + context (FSContext): Context object for accessing configuration and runtime information. + _shutdown_event (asyncio.Event): Event flag for graceful shutdown coordination. + _current_tasks (Set[asyncio.Task]): Set of currently running processing tasks. + _tasks_lock (asyncio.Lock): Lock for thread-safe task management. + _consumer: Pulsar consumer for message consumption. """ def __init__( @@ -167,17 +206,22 @@ def __init__( """ Initialize the FS Function. + This method sets up the FunctionStream function with the provided process functions + and configuration. It performs validation of the module configuration and sets up + the Pulsar client, consumer, and other resources needed for message processing. + Args: process_funcs (Dict[str, Union[Callable, FSModule]]): Dictionary mapping module names to their process functions or modules. Each function must accept two parameters: (context: FSContext, data: Dict[str, Any]) and return either a Dict[str, Any] or an Awaitable[Dict[str, Any]]. Each module must be an instance of FSModule. - config_path (str): Path to the configuration file. + config_path (str): Path to the configuration file. If None, uses FS_CONFIG_PATH environment variable or defaults to "config.yaml". Raises: ValueError: If no module is specified in config or if the specified module doesn't have a corresponding process function, or if the function structure is invalid. + Exception: If there are errors during Pulsar client setup or consumer creation. """ if config_path is None: config_path = os.getenv("FS_CONFIG_PATH", "config.yaml") @@ -230,7 +274,11 @@ def _setup_consumer(self): This method creates a Pulsar consumer that subscribes to multiple topics specified in the configuration. It collects topics from both regular sources - and the request source. + and the request source, creating a single consumer that can handle messages + from all configured topics. + + The consumer is configured with shared subscription type and appropriate + timeout settings for non-ordering guarantee workloads. Raises: ValueError: If no subscription name is set or if no valid sources are found. @@ -268,6 +316,9 @@ async def _add_task(self, task: asyncio.Task): """ Thread-safe method to add a task to the tracking set. + This method safely adds a task to the internal tracking set using a lock + to ensure thread safety when multiple tasks are being processed concurrently. + Args: task (asyncio.Task): The task to add to tracking. """ @@ -278,6 +329,10 @@ async def _remove_task(self, task: asyncio.Task): """ Thread-safe method to remove a task from the tracking set. + This method safely removes a task from the internal tracking set using a lock + to ensure thread safety. It handles any exceptions that might occur during + the removal process. + Args: task (asyncio.Task): The task to remove from tracking. """ @@ -291,6 +346,9 @@ async def _get_tasks(self) -> Set[asyncio.Task]: """ Thread-safe method to get a copy of current tasks. + This method returns a copy of the current tasks set to avoid race conditions + when iterating over the tasks. The copy is made while holding the lock. + Returns: Set[asyncio.Task]: A copy of the current tasks set. """ @@ -302,6 +360,10 @@ def _get_producer(self, topic: str) -> Producer: """ Get a producer for the specified topic. + This method uses an LRU cache to efficiently manage Pulsar producers. + Producers are cached by topic to avoid creating new ones for each message, + improving performance and resource utilization. + Args: topic (str): The topic to create a producer for. @@ -314,11 +376,15 @@ async def process_request(self, message): """ Process an incoming request and send a response. - This method: + This method is the core message processing function that: 1. Records metrics for the request 2. Processes the request using the configured module 3. Sends the response back to the appropriate topic 4. Handles any errors that occur during processing + 5. Manages message acknowledgment + + The method supports both synchronous and asynchronous process functions, + and handles various types of responses including error responses. Args: message: The incoming Pulsar message to process. @@ -351,10 +417,21 @@ async def process_request(self, message): resp_msgs: List[MsgWrapper] = [] def produce(data: Dict[str, Any], event_time: datetime = None): + """Local produce function to collect response messages.""" resp_msgs.append(MsgWrapper(data=data, event_time=event_time)) context.produce = produce + def get_metadata(key: str) -> Any: + """Local metadata function to provide message metadata.""" + if key == "topic": + return message.topic_name() + if key == "message_id": + return message.message_id() + raise KeyError(key) + + context.get_metadata = get_metadata + # Call the function with context as first argument and handle both sync and async results response_data = None try: @@ -417,19 +494,27 @@ async def _send_response(self, response_topic: str, request_id: str, msg: List[M """ Send a response message using cached producer asynchronously. + This method sends response messages to the specified topic using the cached + Pulsar producer. It handles multiple messages in parallel and provides + proper error handling and logging for failed sends. + + The method converts datetime objects to ISO format strings for JSON serialization + and sets appropriate event timestamps for Pulsar messages. + Args: - response_topic (str): The topic to send the response to - request_id (str): The ID of the request being responded to - msg (List[MsgWrapper]): The list of messages to send + response_topic (str): The topic to send the response to. + request_id (str): The ID of the request being responded to. + msg (List[MsgWrapper]): The list of messages to send. Raises: - Exception: If there's an error sending the response + Exception: If there's an error sending the response. """ loop = asyncio.get_event_loop() try: producer = self._get_producer(response_topic) def default_serializer(o): + """Custom JSON serializer for datetime objects.""" if isinstance(o, datetime): return o.isoformat() return str(o) @@ -440,6 +525,7 @@ def default_serializer(o): message_data = json.dumps(m.data, default=default_serializer).encode('utf-8') def create_callback(f): + """Create a callback function for async message sending.""" def callback(res, msg_id): if res != pulsar.Result.Ok: err = Exception(f"Error producing: {res}") @@ -475,10 +561,15 @@ async def start(self): """ Start processing requests from all consumers. - This method: - 1. Starts the metrics server + This method is the main entry point for starting the FunctionStream function. + It: + 1. Starts the metrics server for monitoring 2. Enters a loop to process incoming messages 3. Handles graceful shutdown when requested + 4. Manages the consumer receive loop with proper error handling + + The method runs indefinitely until a shutdown signal is received, either + through cancellation or keyboard interrupt. """ module = self.config.module logger.info(f"Starting FS Function with module: {module}") @@ -514,11 +605,14 @@ async def close(self): """ Close the service and clean up resources. - This method: - 1. Stops the metrics server - 2. Closes the consumer - 3. Clears the producer cache - 4. Closes the Pulsar client + This method performs a graceful shutdown of the FunctionStream function by: + 1. Stopping the metrics server + 2. Closing the Pulsar consumer + 3. Clearing the producer cache + 4. Closing the Pulsar client + + The method ensures that all resources are properly cleaned up and handles + any errors that might occur during the shutdown process. """ logger.info("Closing FS Function resources...") @@ -550,7 +644,8 @@ def __del__(self): Ensure resources are cleaned up when the object is destroyed. This finalizer ensures that all resources are properly closed when the - object is garbage collected. + object is garbage collected. It provides a safety net for resource cleanup + in case the explicit close() method is not called. """ if self._consumer is not None: try: @@ -571,6 +666,9 @@ def get_metrics(self) -> Dict[str, Any]: """ Get current metrics for monitoring. + This method returns the current metrics collected by the FunctionStream function, + providing insights into performance, throughput, and error rates. + Returns: Dict[str, Any]: A dictionary containing the current metrics. """ @@ -580,6 +678,9 @@ def get_context(self) -> FSContext: """ Get the FSContext instance associated with this function. + This method provides access to the context object that contains configuration + and runtime information for the function. + Returns: FSContext: The context object containing configuration and runtime information. """ diff --git a/sdks/fs-python/tests/test_context.py b/sdks/fs-python/tests/test_context.py index 20f9e786..5005cbc8 100644 --- a/sdks/fs-python/tests/test_context.py +++ b/sdks/fs-python/tests/test_context.py @@ -3,6 +3,7 @@ """ from unittest.mock import Mock +from datetime import datetime import pytest @@ -63,3 +64,47 @@ def test_get_config_non_string_value(self, context, mock_config): # Verify mock_config.get_config_value.assert_called_once_with("test_key") assert result == 123 + + def test_get_metadata_default_implementation(self, context): + """Test that get_metadata returns None by default.""" + result = context.get_metadata("any_key") + assert result is None + + def test_produce_default_implementation(self, context): + """Test that produce does nothing by default.""" + test_data = {"key": "value"} + test_time = datetime.utcnow() + + # Should not raise any exception + result = context.produce(test_data, test_time) + assert result is None + + def test_produce_without_event_time(self, context): + """Test produce method without event_time parameter.""" + test_data = {"key": "value"} + + # Should not raise any exception + result = context.produce(test_data) + assert result is None + + def test_get_configs(self, context, mock_config): + """Test get_configs method.""" + # Setup + mock_config.config = {"key1": "value1", "key2": "value2"} + + # Execute + result = context.get_configs() + + # Verify + assert result == {"key1": "value1", "key2": "value2"} + + def test_get_module(self, context, mock_config): + """Test get_module method.""" + # Setup + mock_config.module = "test_module" + + # Execute + result = context.get_module() + + # Verify + assert result == "test_module" diff --git a/sdks/fs-python/tests/test_function.py b/sdks/fs-python/tests/test_function.py index b124edf9..d5b525d6 100644 --- a/sdks/fs-python/tests/test_function.py +++ b/sdks/fs-python/tests/test_function.py @@ -3,7 +3,9 @@ """ import asyncio +import inspect import json +from typing import Dict, Any from unittest.mock import Mock, patch, AsyncMock import pulsar @@ -100,52 +102,47 @@ def function(self, mock_config, mock_client, mock_consumer, mock_client.subscribe.return_value = mock_consumer mock_client.create_producer.return_value = mock_producer - from typing import Dict, Any - async def process_func(context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]: - return {"result": "test_result"} + return {"processed": data} process_funcs = {"test_module": process_func} - function = FSFunction( + return FSFunction( process_funcs=process_funcs, config_path="test_config.yaml" ) - return function @pytest.mark.asyncio async def test_init(self): """Test FSFunction initialization.""" - import inspect - from typing import Dict, Any - - async def process_func(context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]: - return {"result": "test_result"} - - process_funcs = {"test_module": process_func} with patch('function_stream.function.Config.from_yaml') as mock_from_yaml, \ - patch('function_stream.function.Client'), \ - patch('function_stream.function.Metrics'), \ - patch('function_stream.function.MetricsServer'): - mock_config = Mock() + patch('function_stream.function.Client') as mock_client, \ + patch('function_stream.function.Metrics') as mock_metrics, \ + patch('function_stream.function.MetricsServer') as mock_metrics_server: + mock_config = Mock(spec=Config) mock_config.module = "test_module" mock_config.subscriptionName = "test_subscription" - - class PulsarConfig: - authPlugin = "" - authParams = "" - max_concurrent_requests = 10 - serviceUrl = "pulsar://localhost:6650" - - mock_config.pulsar = PulsarConfig() + mock_config.pulsar = PulsarConfig( + serviceUrl="pulsar://localhost:6650", + authPlugin="", + authParams="", + max_concurrent_requests=10 + ) mock_config.sources = [SourceSpec(pulsar=PulsarSourceConfig(topic="test_topic"))] - mock_config.requestSource = None - mock_config.sink = None + mock_config.requestSource = SourceSpec(pulsar=PulsarSourceConfig(topic="request_topic")) + mock_config.sink = SinkSpec(pulsar=PulsarSourceConfig(topic="response_topic")) metric_mock = Mock() metric_mock.port = 8080 mock_config.metric = metric_mock mock_from_yaml.return_value = mock_config + mock_client.return_value.subscribe.return_value = Mock() + mock_client.return_value.create_producer.return_value = Mock() + + async def process_func(context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]: + return {"processed": data} + + process_funcs = {"test_module": process_func} function = FSFunction( process_funcs=process_funcs, config_path="test_config.yaml" @@ -173,6 +170,70 @@ async def test_process_request_success(self, function): # that the consumer acknowledge was called function._consumer.acknowledge.assert_called_once_with(message) + @pytest.mark.asyncio + async def test_process_request_with_metadata_access(self, function): + """Test request processing with metadata access through context.""" + message = Mock() + message.data.return_value = json.dumps({"test": "data"}).encode('utf-8') + message.properties.return_value = { + "request_id": "test_id", + "response_topic": "response_topic" + } + message.message_id.return_value = "test_message_id" + message.topic_name.return_value = "test_topic" + + # Mock the consumer acknowledge method + function._consumer.acknowledge = Mock() + + # Create a process function that accesses metadata + async def process_func_with_metadata(context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]: + topic = context.get_metadata("topic") + message_id = context.get_metadata("message_id") + return { + "processed": data, + "metadata": { + "topic": topic, + "message_id": message_id + } + } + + function.process_funcs["test_module"] = process_func_with_metadata + + await function.process_request(message) + + # Verify that the message was processed successfully + function._consumer.acknowledge.assert_called_once_with(message) + + @pytest.mark.asyncio + async def test_process_request_metadata_invalid_key(self, function): + """Test request processing with invalid metadata key access.""" + message = Mock() + message.data.return_value = json.dumps({"test": "data"}).encode('utf-8') + message.properties.return_value = { + "request_id": "test_id", + "response_topic": "response_topic" + } + message.message_id.return_value = "test_message_id" + message.topic_name.return_value = "test_topic" + + # Mock the consumer acknowledge method + function._consumer.acknowledge = Mock() + + # Create a process function that accesses invalid metadata + async def process_func_with_invalid_metadata(context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]: + try: + context.get_metadata("invalid_key") + return {"error": "Should have raised KeyError"} + except KeyError: + return {"error": "KeyError raised as expected"} + + function.process_funcs["test_module"] = process_func_with_invalid_metadata + + await function.process_request(message) + + # Verify that the message was processed successfully + function._consumer.acknowledge.assert_called_once_with(message) + @pytest.mark.asyncio async def test_process_request_json_error(self, function, mock_metrics): """Test request processing with JSON decode error.""" From c1ee39b3dca9b65d20397b265468a879059e9cc7 Mon Sep 17 00:00:00 2001 From: EvanWave Date: Tue, 12 Aug 2025 21:11:49 +0800 Subject: [PATCH 2/3] fix --- sdks/fs-python/function_stream/function.py | 8 ++-- sdks/fs-python/function_stream/module.py | 45 +++++++++++++++++----- sdks/fs-python/tests/test_context.py | 8 ++-- sdks/fs-python/tests/test_function.py | 4 +- 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/sdks/fs-python/function_stream/function.py b/sdks/fs-python/function_stream/function.py index dc4c4a9d..887db9a4 100644 --- a/sdks/fs-python/function_stream/function.py +++ b/sdks/fs-python/function_stream/function.py @@ -429,7 +429,7 @@ def get_metadata(key: str) -> Any: if key == "message_id": return message.message_id() raise KeyError(key) - + context.get_metadata = get_metadata # Call the function with context as first argument and handle both sync and async results @@ -449,7 +449,7 @@ def get_metadata(key: str) -> Any: logger.error(f"Error invoking process function: {str(e)}") raise Exception(f"Error invoking process function: {str(e)}") from e if response_data: - resp_msgs.append(MsgWrapper(data=response_data, event_time=datetime.utcnow())) + resp_msgs.append(MsgWrapper(data=response_data, event_time=datetime.now(timezone.utc))) if not response_topic: logger.warning("No response_topic provided and no sink topic available. Skip messages") @@ -481,7 +481,7 @@ def get_metadata(key: str) -> Any: await self._send_response( response_topic, request_id, - [MsgWrapper(data={'error': str(e)}, event_time=datetime.utcnow())] + [MsgWrapper(data={'error': str(e)}, event_time=datetime.now(timezone.utc))] ) self.metrics.record_request_end(False, time.time() - start_time) self.metrics.record_event(False) @@ -642,7 +642,7 @@ async def close(self): def __del__(self): """ Ensure resources are cleaned up when the object is destroyed. - + This finalizer ensures that all resources are properly closed when the object is garbage collected. It provides a safety net for resource cleanup in case the explicit close() method is not called. diff --git a/sdks/fs-python/function_stream/module.py b/sdks/fs-python/function_stream/module.py index 778997a1..e0ad7377 100644 --- a/sdks/fs-python/function_stream/module.py +++ b/sdks/fs-python/function_stream/module.py @@ -1,3 +1,11 @@ +""" +FSModule module provides the base class for all FunctionStream modules. + +This module defines the abstract base class FSModule that all FunctionStream modules +must inherit from. It provides a common interface for module initialization and +data processing, ensuring consistency across different module implementations. +""" + from abc import ABC, abstractmethod from typing import Dict, Any @@ -8,32 +16,51 @@ class FSModule(ABC): """ Base class for all FunctionStream modules. - This class provides a common interface for all modules in the FunctionStream SDK. - Each module must implement the process method to handle incoming data. + This abstract base class provides a common interface for all modules in the + FunctionStream SDK. Each module must implement the init and process methods + to handle module initialization and incoming data processing. Attributes: - name (str): The name of the module + name (str): The name of the module (to be set during initialization). """ @abstractmethod def init(self, context: FSContext): """ - Initialize the module with a name. + Initialize the module with the provided context. + + This method is called during module initialization to set up the module + with the necessary context and configuration. Subclasses must implement + this method to handle any required setup. Args: - name (str): The name of the module + context (FSContext): The context object containing configuration and + runtime information for the module. """ + pass @abstractmethod async def process(self, context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]: """ - Process incoming data. + Process incoming data asynchronously. + + This method is the core processing function that handles incoming data. + Subclasses must implement this method to define the specific data processing + logic for their module. The method should be asynchronous to support + non-blocking operations. Args: - context (FSContext): The context object containing configuration and runtime information - data (Dict[str, Any]): The input data to process + context (FSContext): The context object containing configuration and + runtime information. + data (Dict[str, Any]): The input data to process. This is typically + a dictionary containing the message payload + and any associated metadata. Returns: - Union[Dict[str, Any], Awaitable[Dict[str, Any]]]: The processed data or an awaitable that will resolve to the processed data + Dict[str, Any]: The processed data that should be returned as the + result of the processing operation. + + Raises: + NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedError("Subclasses must implement process method") diff --git a/sdks/fs-python/tests/test_context.py b/sdks/fs-python/tests/test_context.py index 5005cbc8..fc8293cd 100644 --- a/sdks/fs-python/tests/test_context.py +++ b/sdks/fs-python/tests/test_context.py @@ -3,7 +3,7 @@ """ from unittest.mock import Mock -from datetime import datetime +from datetime import datetime, timezone import pytest @@ -73,8 +73,8 @@ def test_get_metadata_default_implementation(self, context): def test_produce_default_implementation(self, context): """Test that produce does nothing by default.""" test_data = {"key": "value"} - test_time = datetime.utcnow() - + test_time = datetime.now(timezone.utc) + # Should not raise any exception result = context.produce(test_data, test_time) assert result is None @@ -82,7 +82,7 @@ def test_produce_default_implementation(self, context): def test_produce_without_event_time(self, context): """Test produce method without event_time parameter.""" test_data = {"key": "value"} - + # Should not raise any exception result = context.produce(test_data) assert result is None diff --git a/sdks/fs-python/tests/test_function.py b/sdks/fs-python/tests/test_function.py index d5b525d6..3cb19468 100644 --- a/sdks/fs-python/tests/test_function.py +++ b/sdks/fs-python/tests/test_function.py @@ -115,9 +115,7 @@ async def process_func(context: FSContext, data: Dict[str, Any]) -> Dict[str, An async def test_init(self): """Test FSFunction initialization.""" with patch('function_stream.function.Config.from_yaml') as mock_from_yaml, \ - patch('function_stream.function.Client') as mock_client, \ - patch('function_stream.function.Metrics') as mock_metrics, \ - patch('function_stream.function.MetricsServer') as mock_metrics_server: + patch('function_stream.function.Client') as mock_client: mock_config = Mock(spec=Config) mock_config.module = "test_module" mock_config.subscriptionName = "test_subscription" From 773d41bddeb52270964be6d7db6e7e5aa570d110 Mon Sep 17 00:00:00 2001 From: EvanWave Date: Tue, 12 Aug 2025 21:19:30 +0800 Subject: [PATCH 3/3] Fix --- sdks/fs-python/function_stream/module.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/fs-python/function_stream/module.py b/sdks/fs-python/function_stream/module.py index e0ad7377..7c60d0cf 100644 --- a/sdks/fs-python/function_stream/module.py +++ b/sdks/fs-python/function_stream/module.py @@ -37,7 +37,6 @@ def init(self, context: FSContext): context (FSContext): The context object containing configuration and runtime information for the module. """ - pass @abstractmethod async def process(self, context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]: