From d407c53eff1dec329c2f608254c3e788f83d5580 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Tue, 6 Feb 2024 17:06:04 +0800 Subject: [PATCH] [Model Monitoring] Enable explicit ack for V3IO streams (#5071) [ML-5666](https://jira.iguazeng.com/browse/ML-5666) --- server/api/crud/model_monitoring/deployment.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/server/api/crud/model_monitoring/deployment.py b/server/api/crud/model_monitoring/deployment.py index 558327ecf7a..2d113014ee6 100644 --- a/server/api/crud/model_monitoring/deployment.py +++ b/server/api/crud/model_monitoring/deployment.py @@ -669,15 +669,18 @@ def _apply_stream_trigger( function_name=function_name, ) if stream_path.startswith("v3io://"): + kwargs = {} + if function_name != mm_constants.MonitoringFunctionNames.STREAM: + kwargs["access_key"] = model_monitoring_access_key + if mlrun.mlconf.is_explicit_ack(): + kwargs["explicit_ack_mode"] = "explicitOnly" + kwargs["workerAllocationMode"] = "static" + # Generate V3IO stream trigger function.add_v3io_stream_trigger( stream_path=stream_path, - name="monitoring_stream_trigger" - if function_name is None - else f"monitoring_{function_name}_trigger", - access_key=model_monitoring_access_key - if function_name != mm_constants.MonitoringFunctionNames.STREAM - else None, + name=f"monitoring_{function_name or 'stream'}_trigger", + **kwargs, ) # Add the default HTTP source http_source = mlrun.datastore.sources.HttpSource()