From 82088bf83eff5ff45fdf41e92bc86a471afe31c1 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 28 Oct 2024 14:59:52 +0800 Subject: [PATCH 1/5] builtin-functions --- python/.gitignore => .gitignore | 3 + python/databend_udf/udf.py | 103 ++++++++++++++++++++++++++++++-- python/example/server.py | 3 +- python/pyproject.toml | 6 +- 4 files changed, 107 insertions(+), 8 deletions(-) rename python/.gitignore => .gitignore (66%) diff --git a/python/.gitignore b/.gitignore similarity index 66% rename from python/.gitignore rename to .gitignore index a5ff56a..2a5aa9a 100644 --- a/python/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ dist/ __pycache__/ Pipfile.lock .ruff_cache/ +.vscode +python/example/test.py + diff --git a/python/databend_udf/udf.py b/python/databend_udf/udf.py index a7890fb..c723b75 100644 --- a/python/databend_udf/udf.py +++ b/python/databend_udf/udf.py @@ -17,6 +17,9 @@ import inspect from concurrent.futures import ThreadPoolExecutor from typing import Iterator, Callable, Optional, Union, List, Dict +from prometheus_client import Counter, Gauge, Histogram +from prometheus_client import start_http_server +import threading import pyarrow as pa from pyarrow.flight import FlightServerBase, FlightInfo @@ -229,11 +232,65 @@ class UDFServer(FlightServerBase): _location: str _functions: Dict[str, UserDefinedFunction] - def __init__(self, location="0.0.0.0:8815", **kwargs): + def __init__(self, location="0.0.0.0:8815", metric_location = None, **kwargs): super(UDFServer, self).__init__("grpc://" + location, **kwargs) self._location = location + self._metric_location = metric_location self._functions = {} + # Initialize Prometheus metrics + self.requests_count = Counter( + 'udf_server_requests_count', + 'Total number of UDF requests processed', + ['function_name'] + ) + self.rows_count = Counter( + 'udf_server_rows_count', + 'Total number of rows processed', + ['function_name'] + ) + self.running_requests = Gauge( + 'udf_server_running_requests_count', + 'Number of currently running UDF requests', + ['function_name'] + ) + self.running_rows = Gauge( + 'udf_server_running_rows_count', + 'Number of currently processing rows', + ['function_name'] + ) + self.response_duration = Histogram( + 'udf_server_response_duration_seconds', + 'Time spent processing UDF requests', + ['function_name'], + buckets=(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0) + ) + + self.error_count = Counter( + 'udf_server_errors_count', + 'Total number of UDF processing errors', + ['function_name', 'error_type'] + ) + + self.add_function(builtin_echo) + self.add_function(builtin_healthy) + + def _start_metrics_server(self): + """Start Prometheus metrics HTTP server if metric_location is provided""" + try: + host, port = self._metric_location.split(':') + port = int(port) + + def start_server(): + start_http_server(port, host) + logger.info(f"Prometheus metrics server started on {self._metric_location}") + + metrics_thread = threading.Thread(target=start_server, daemon=True) + metrics_thread.start() + except Exception as e: + logger.error(f"Failed to start metrics server: {e}") + raise + def get_flight_info(self, context, descriptor): """Return the result schema of a function.""" func_name = descriptor.path[0].decode("utf-8") @@ -257,13 +314,37 @@ def do_exchange(self, context, descriptor, reader, writer): raise ValueError(f"Function {func_name} does not exists") udf = self._functions[func_name] writer.begin(udf._result_schema) + + # Increment request counter + self.requests_count.labels(function_name=func_name).inc() + # Increment running requests gauge + self.running_requests.labels(function_name=func_name).inc() + try: - for batch in reader: - for output_batch in udf.eval_batch(batch.data): - writer.write_batch(output_batch) + with self.response_duration.labels(function_name=func_name).time(): + for batch in reader: + # Update row metrics + batch_rows = batch.data.num_rows + self.rows_count.labels(function_name=func_name).inc(batch_rows) + self.running_rows.labels(function_name=func_name).inc(batch_rows) + + try: + for output_batch in udf.eval_batch(batch.data): + writer.write_batch(output_batch) + finally: + # Decrease running rows gauge after processing + self.running_rows.labels(function_name=func_name).dec(batch_rows) + except Exception as e: + self.error_count.labels( + function_name=func_name, + error_type=e.__class__.__name__ + ).inc() logger.exception(e) raise e + finally: + # Decrease running requests gauge + self.running_requests.labels(function_name=func_name).dec() def add_function(self, udf: UserDefinedFunction): """Add a function to the server.""" @@ -284,7 +365,10 @@ def add_function(self, udf: UserDefinedFunction): def serve(self): """Start the server.""" - logger.info(f"listening on {self._location}") + logger.info(f"UDF server listening on {self._location}") + if self._metric_location: + self._start_metrics_server() + logger.info(f"Prometheus metrics available at http://{self._metric_location}/metrics") super(UDFServer, self).serve() @@ -586,3 +670,12 @@ def _field_type_to_string(field: pa.Field) -> str: return f"TUPLE({args_str})" else: raise ValueError(f"Unsupported type: {t}") + + +@udf(input_types=["VARCHAR"], result_type="VARCHAR") +def builtin_echo(a): + return a + +@udf(input_types=[], result_type="INT") +def builtin_healthy(): + return 1 \ No newline at end of file diff --git a/python/example/server.py b/python/example/server.py index d58cc84..d825b32 100644 --- a/python/example/server.py +++ b/python/example/server.py @@ -19,6 +19,7 @@ from typing import List, Dict, Any, Tuple, Optional from databend_udf import udf, UDFServer +# from test import udf, UDFServer logging.basicConfig(level=logging.INFO) @@ -313,7 +314,7 @@ def wait_concurrent(x): if __name__ == "__main__": - udf_server = UDFServer("0.0.0.0:8815") + udf_server = UDFServer("0.0.0.0:8815", metric_location = "0.0.0.0:8816") udf_server.add_function(add_signed) udf_server.add_function(add_unsigned) udf_server.add_function(add_float) diff --git a/python/pyproject.toml b/python/pyproject.toml index 36c0c8e..4ac5226 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -10,8 +10,10 @@ name = "databend-udf" version = "0.2.5" readme = "README.md" requires-python = ">=3.7" -dependencies = ["pyarrow"] - +ependencies = [ + "pyarrow", + "prometheus-client>=0.17.0" +] [project.optional-dependencies] lint = ["ruff"] From 7a4500318535de8373767eaecff506c2faf156db Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 28 Oct 2024 15:08:49 +0800 Subject: [PATCH 2/5] builtin-functions --- python/databend_udf/udf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/databend_udf/udf.py b/python/databend_udf/udf.py index c723b75..692e21d 100644 --- a/python/databend_udf/udf.py +++ b/python/databend_udf/udf.py @@ -369,6 +369,7 @@ def serve(self): if self._metric_location: self._start_metrics_server() logger.info(f"Prometheus metrics available at http://{self._metric_location}/metrics") + super(UDFServer, self).serve() From aba8a8007ef81ec830c117fed2424e4ca216744b Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 28 Oct 2024 15:21:42 +0800 Subject: [PATCH 3/5] builtin-functions --- .github/workflows/python.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index c033d92..61ef598 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -22,17 +22,17 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Install pipenv - run: curl https://raw.githubusercontent.com/pypa/pipenv/master/get-pipenv.py | python - - uses: actions/setup-python@v5 + - name: Setup Python + uses: actions/setup-python@v5 with: - python-version: '3.12' - - name: Lint + python-version: "3.12" + - name: Install dependencies + working-directory: python + run: pip install ruff + - name: Check format working-directory: python run: | - pipenv install --dev - pipenv run ruff check databend_udf - pipenv run ruff format --check databend_udf + ruff format --check . - name: build working-directory: python run: | From 713dd21b13c9c9f55ab1c57c42884e807256d966 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 28 Oct 2024 15:26:34 +0800 Subject: [PATCH 4/5] builtin-functions --- python/databend_udf/udf.py | 77 ++++++++++++++++++++++++-------------- python/example/server.py | 2 +- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/python/databend_udf/udf.py b/python/databend_udf/udf.py index 692e21d..ee13ac8 100644 --- a/python/databend_udf/udf.py +++ b/python/databend_udf/udf.py @@ -232,44 +232,57 @@ class UDFServer(FlightServerBase): _location: str _functions: Dict[str, UserDefinedFunction] - def __init__(self, location="0.0.0.0:8815", metric_location = None, **kwargs): + def __init__(self, location="0.0.0.0:8815", metric_location=None, **kwargs): super(UDFServer, self).__init__("grpc://" + location, **kwargs) self._location = location self._metric_location = metric_location self._functions = {} - # Initialize Prometheus metrics + # Initialize Prometheus metrics self.requests_count = Counter( - 'udf_server_requests_count', - 'Total number of UDF requests processed', - ['function_name'] + "udf_server_requests_count", + "Total number of UDF requests processed", + ["function_name"], ) self.rows_count = Counter( - 'udf_server_rows_count', - 'Total number of rows processed', - ['function_name'] + "udf_server_rows_count", "Total number of rows processed", ["function_name"] ) self.running_requests = Gauge( - 'udf_server_running_requests_count', - 'Number of currently running UDF requests', - ['function_name'] + "udf_server_running_requests_count", + "Number of currently running UDF requests", + ["function_name"], ) self.running_rows = Gauge( - 'udf_server_running_rows_count', - 'Number of currently processing rows', - ['function_name'] + "udf_server_running_rows_count", + "Number of currently processing rows", + ["function_name"], ) self.response_duration = Histogram( - 'udf_server_response_duration_seconds', - 'Time spent processing UDF requests', - ['function_name'], - buckets=(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0) + "udf_server_response_duration_seconds", + "Time spent processing UDF requests", + ["function_name"], + buckets=( + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + ), ) self.error_count = Counter( - 'udf_server_errors_count', - 'Total number of UDF processing errors', - ['function_name', 'error_type'] + "udf_server_errors_count", + "Total number of UDF processing errors", + ["function_name", "error_type"], ) self.add_function(builtin_echo) @@ -278,12 +291,14 @@ def __init__(self, location="0.0.0.0:8815", metric_location = None, **kwargs): def _start_metrics_server(self): """Start Prometheus metrics HTTP server if metric_location is provided""" try: - host, port = self._metric_location.split(':') + host, port = self._metric_location.split(":") port = int(port) def start_server(): start_http_server(port, host) - logger.info(f"Prometheus metrics server started on {self._metric_location}") + logger.info( + f"Prometheus metrics server started on {self._metric_location}" + ) metrics_thread = threading.Thread(target=start_server, daemon=True) metrics_thread.start() @@ -315,7 +330,7 @@ def do_exchange(self, context, descriptor, reader, writer): udf = self._functions[func_name] writer.begin(udf._result_schema) - # Increment request counter + # Increment request counter self.requests_count.labels(function_name=func_name).inc() # Increment running requests gauge self.running_requests.labels(function_name=func_name).inc() @@ -333,12 +348,13 @@ def do_exchange(self, context, descriptor, reader, writer): writer.write_batch(output_batch) finally: # Decrease running rows gauge after processing - self.running_rows.labels(function_name=func_name).dec(batch_rows) + self.running_rows.labels(function_name=func_name).dec( + batch_rows + ) except Exception as e: self.error_count.labels( - function_name=func_name, - error_type=e.__class__.__name__ + function_name=func_name, error_type=e.__class__.__name__ ).inc() logger.exception(e) raise e @@ -368,7 +384,9 @@ def serve(self): logger.info(f"UDF server listening on {self._location}") if self._metric_location: self._start_metrics_server() - logger.info(f"Prometheus metrics available at http://{self._metric_location}/metrics") + logger.info( + f"Prometheus metrics available at http://{self._metric_location}/metrics" + ) super(UDFServer, self).serve() @@ -677,6 +695,7 @@ def _field_type_to_string(field: pa.Field) -> str: def builtin_echo(a): return a + @udf(input_types=[], result_type="INT") def builtin_healthy(): - return 1 \ No newline at end of file + return 1 diff --git a/python/example/server.py b/python/example/server.py index d825b32..0820241 100644 --- a/python/example/server.py +++ b/python/example/server.py @@ -314,7 +314,7 @@ def wait_concurrent(x): if __name__ == "__main__": - udf_server = UDFServer("0.0.0.0:8815", metric_location = "0.0.0.0:8816") + udf_server = UDFServer("0.0.0.0:8815", metric_location="0.0.0.0:8816") udf_server.add_function(add_signed) udf_server.add_function(add_unsigned) udf_server.add_function(add_float) From adfcd44ac57d4092e23d80723004bc4467d66408 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 28 Oct 2024 15:28:14 +0800 Subject: [PATCH 5/5] builtin-functions --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 4ac5226..4077b9f 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -10,7 +10,7 @@ name = "databend-udf" version = "0.2.5" readme = "README.md" requires-python = ">=3.7" -ependencies = [ +dependencies = [ "pyarrow", "prometheus-client>=0.17.0" ]