brach-4.1: [Feat](udf) Support Python UDF/UDAF/UDTF#63387
Conversation
…0466) In some python_udf p0 tests, two tables with the same name exist under the same database, and one was just created when the other was tested and dropped. ```text 2026-02-02 18:53:05.163 INFO [suite-thread-16] (Suite.groovy:572) - Execute sql: DROP TABLE IF EXISTS test_conditional_module; ... Exception in pythonudtf_p0/test_pythonudtf_exceptions_module.groovy(line 445): sql """ INSERT INTO test_conditional_module VALUES (1, -10), (2, 0), (3, 50), (4, 200), (5, NULL); """ qt_conditional_process """ ^^^^^^^^^^^^^^^^^^^^^^^^^^ERROR LINE^^^^^^^^^^^^^^^^^^^^^^^^^^ SELECT id, tmp.input, tmp.category, tmp.result FROM test_conditional_module LATERAL VIEW udtf_conditional_process_module(val) tmp AS input, category, result ORDER BY id; """ // Test 6.2: Yield Control - No Output Case sql """ DROP FUNCTION IF EXISTS udtf_filter_yield_module(INT); """ sql """ CREATE TABLES FUNCTION udtf_filter_yield_module(INT) Exception: java.sql.SQLException: errCode = 2, detailMessage = Table [test_conditional_module] does not exist in database [regression_test_pythonudtf_p0].(line 3, pos 17) ```
Some versions of pyarrow do not support directly converting Python's
`dict` type to the `arrow Map` type, so we need to manually convert the
`dict` type to a `list of tuple` before converting it to `arrow Map`
(consistent with the type mapping in `convert_arrow_field_to_python`).
```python
def convert_arrow_field_to_python(field, column_metadata=None):
if pa.types.is_map(field.type):
# pyarrow.lib.MapScalar's as_py() returns a list of tuples, convert to dict
list_of_tuples = field.as_py()
return dict(list_of_tuples) if list_of_tuples is not None else None
```
…e#60690) Before ```text +-----------+-------------+---------------+-------------------+------------+ | Signature | Return Type | Function Type | Intermediate Type | Properties | +-----------+-------------+---------------+-------------------+------------+ | same2 | | | | | +-----------+-------------+---------------+-------------------+------------+ ``` After ```text +------------------------------------+-------------+----------------------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | lzq(int) | text | SCALAR/PYTHON_UDF | | ID=1770634239619, CHECKSUM=8c0e13ec9d1f3447ec2dd0efa0251b76, OBJECT_FILE=file:///mnt/disk7/linzhenqi/doooooris/lzq.zip, NULLABLE_MODE=ALWAYS_NOT_NULLABLE, RUNTIME_VERSION=3.9.18, SYMBOL=lzq_py_test.lzq_print | | py_add(int, int) | int | SCALAR/PYTHON_UDF | | ID=1770634239620, NULLABLE_MODE=ALWAYS_NULLABLE, RUNTIME_VERSION=3.10.12, SYMBOL=evaluate, INLINE_CODE= def evaluate(a, b): return a + b | | py_split_string_module(text) | text | UDTF/PYTHON_UDF | | ID=1770871349701, CHECKSUM=c6230f1580bfdb4639ebcf7042119dcf, OBJECT_FILE=file:///mnt/disk7/linzhenqi/d4/doris/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip, NULLABLE_MODE=ALWAYS_NULLABLE, RUNTIME_VERSION=3.8.10, SYMBOL=pyudtf_module.basic_udtf.split_string_udtf | | py_split_string_module_outer(text) | text | UDTF/PYTHON_UDF | | ID=1770871349702, CHECKSUM=c6230f1580bfdb4639ebcf7042119dcf, OBJECT_FILE=file:///mnt/disk7/linzhenqi/d4/doris/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip, NULLABLE_MODE=ALWAYS_NULLABLE, RUNTIME_VERSION=3.8.10, SYMBOL=pyudtf_module.basic_udtf.split_string_udtf | | python_udaf_sum_int(int) | bigint | AGGREGATE/PYTHON_UDF | | ID=1770871349699, CHECKSUM=8968fc20fc629d1e193359c235927bed, OBJECT_FILE=file:///mnt/disk7/linzhenqi/ddoooooris/doris/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip, NULLABLE_MODE=ALWAYS_NULLABLE, RUNTIME_VERSION=3.8.10, INIT_FN=, UPDATE_FN=, MERGE_FN=, SYMBOL=sum_int.SumInt | +------------------------------------+-------------+----------------------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ```
…60751) doc: apache/doris-website#3402 Add the auxiliary commands `SHOW PYTHON VERSIONS` and `SHOW PYTHON PACKAGES IN '<VERSION>'` to display more PythonUDF-related information. ```sql Doris> show python versions; +---------+---------+---------+---------------------------------------------+--------------------------------------------------------+ | Version | EnvName | EnvType | BasePath | ExecutablePath | +---------+---------+---------+---------------------------------------------+--------------------------------------------------------+ | 3.9.18 | py39 | conda | /mnt/disk7/linzhenqi/miniconda3/envs/py39 | /mnt/disk7/linzhenqi/miniconda3/envs/py39/bin/python | | 3.8.10 | py3810 | conda | /mnt/disk7/linzhenqi/miniconda3/envs/py3810 | /mnt/disk7/linzhenqi/miniconda3/envs/py3810/bin/python | | 3.12.11 | py312 | conda | /mnt/disk7/linzhenqi/miniconda3/envs/py312 | /mnt/disk7/linzhenqi/miniconda3/envs/py312/bin/python | +---------+---------+---------+---------------------------------------------+--------------------------------------------------------+ 3 rows in set (0.02 sec) Doris> show python packages in '3.9.18'; +-----------------+-------------+ | Package | Version | +-----------------+-------------+ | pyarrow | 21.0.0 | | Bottleneck | 1.4.2 | | jieba | 0.42.1 | | six | 1.17.0 | | wheel | 0.45.1 | | python-dateutil | 2.9.0.post0 | | tzdata | 2025.3 | | setuptools | 80.9.0 | | numpy | 2.0.1 | | psutil | 7.0.0 | | pandas | 2.3.3 | | mkl_random | 1.2.8 | | pip | 25.3 | | snownlp | 0.12.3 | | pytz | 2025.2 | | mkl_fft | 1.3.11 | | mkl-service | 2.4.0 | | numexpr | 2.10.1 | +-----------------+-------------+ ```
1. drop python udf file cache when drop function
2. Fixed the issue where the same-named module from two different paths
would incorrectly use the cache.
```sql
-- same1(x) -> x + 1
-- same2(x) -> x + 2
Doris> CREATE FUNCTION same1(INT)
-> RETURNS INT
-> PROPERTIES (
-> "type" = "PYTHON_UDF",
-> "file" = "file:///mnt/disk7/linzhenqi/d2/same_name_py_test/1/1.zip",
-> "symbol" = "same_name.same_name",
-> "runtime_version" = "3.9.18",
-> "always_nullable" = "false"
-> );
Query OK, 0 rows affected (0.04 sec)Doris> select same1(1);
+----------+
| same1(1) |
+----------+
| 2 |
+----------+
1 row in set (6.25 sec)
Doris> CREATE FUNCTION same2(INT)
-> RETURNS INT
-> PROPERTIES (
-> "type" = "PYTHON_UDF",
-> "file" = "file:///mnt/disk7/linzhenqi/d2/same_name_py_test/2/2.zip",
-> "symbol" = "same_name.same_name",
-> "runtime_version" = "3.9.18",
-> "always_nullable" = "false"
-> );
Query OK, 0 rows affected (0.00 sec)
-- due to the same module name, it incorrectly hit the cache of same1.
Doris> select same2(1);
+----------+
| same2(1) |
+----------+
| 2 |
+----------+
1 row in set (0.09 sec)
```
…n_udf_support` is not set (apache#61196) When `enable_python_udf_support` is not set to true, a null pointer dereference occurs when invoking helper commands (`SHOW PYTHON VERSION/ PACKAGES IN`). ```text /home/zcp/repo_center/doris_master/doris/be/src/udf/python/python_env.cpp:305:38: runtime error: member call on null pointer of type 'doris::PythonEnvScanner' #0 0x559edede162c in doris::PythonVersionManager::env_infos_to_thrift() const /home/zcp/repo_center/doris_master/doris/be/src/udf/python/python_env.cpp:305:38 apache#1 0x559ede63acd4 in doris::BaseBackendService::get_python_envs(std::vector>&) /home/zcp/repo_center/doris_master/doris/be/src/service/backend_service.cpp:1316:47 apache#2 0x559edf99796a in doris::BackendServiceProcessor::process_get_python_envs(int, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, void*) /home/zcp/repo_center/doris_master/doris/gensrc/build/gen_cpp/BackendService.cpp:7789:13 apache#3 0x559edf94a69c in doris::BackendServiceProcessor::dispatchCall(apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, std::__cxx11::basic_string, std::allocator> const&, int, void*) /home/zcp/repo_center/doris_master/doris/gensrc/build/gen_cpp/BackendService.cpp:6466:3 apache#4 0x559eac6bad06 in apache::thrift::TDispatchProcessor::process(std::shared_ptr, std::shared_ptr, void*) /home/zcp/repo_center/doris_master/doris/thirdparty/installed/include/thrift/TDispatchProcessor.h:121:12 apache#5 0x559ee47c071a in apache::thrift::server::TConnectedClient::run() (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1c871a) apache#6 0x559ee47c1a45 in apache::thrift::server::TThreadedServer::TConnectedClientRunner::run() (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1c9a45) apache#7 0x559ee47c557f in apache::thrift::concurrency::Thread::threadMain(std::shared_ptr) (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1cd57f) apache#8 0x559ee47c52a5 in void std::__invoke_impl), std::shared_ptr>(std::__invoke_other, void (*&&)(std::shared_ptr), std::shared_ptr&&) (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1cd2a5) apache#9 0x559ee47c521c in std::__invoke_result), std::shared_ptr>::type std::__invoke), std::shared_ptr>(void (*&&)(std::shared_ptr), std::shared_ptr&&) (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1cd21c) apache#10 0x559ee47c51f1 in void std::thread::_Invoker), std::shared_ptr>>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1cd1f1) apache#11 0x559ee47c51b4 in std::thread::_Invoker), std::shared_ptr>>::operator()() (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1cd1b4) apache#12 0x559ee47c4fd8 in std::thread::_State_impl), std::shared_ptr>>>::_M_run() (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x8a1ccfd8) apache#13 0x559ef61b5f3f in execute_native_thread_routine archive64.c apache#14 0x559eac535d26 in asan_thread_start(void*) (/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be+0x51f3dd26) apache#15 0x7fcba62bfac2 in start_thread nptl/pthread_create.c:442:8 apache#16 0x7fcba635184f misc/../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior /home/zcp/repo_center/doris_master/doris/be/src/udf/python/python_env.cpp:305:38 ``` now: ```text Doris> show python packages in '3.19.18'; ERROR 1105 (HY000): errCode = 2, detailMessage = Failed to get python packages from any backend: [E42] Set 'python_venv_interpreter_paths' in be.conf to enable PythonUDF feature Doris> show python versions; ERROR 1105 (HY000): errCode = 2, detailMessage = Failed to get python envs from any backend: [E-236] Set 'python_venv_interpreter_paths' in be.conf to enable PythonUDF feature ```
…#61280) ### What problem does this PR solve? Issue Number: close #xxx Related PR: apache#60630 Problem Summary: ### Release note 1. sys.modules uses `modules_name` as a key to distinguish imported modules, but the current behavior uses `location + modules_name` as a key to differentiate and acquire a lock for lib loading, which can cause a concurrent race condition when two threads import the same named module from different locations at the same time. 2. Currently, importing the same package with different file paths causes issues. ```text Doris> CREATE FUNCTION func1(INT) -> RETURNS INT -> PROPERTIES ( -> "type" = "PYTHON_UDF", -> "file" = "file:///mnt/disk7/linzhenqi/dv/path_a/pkg.zip", -> "symbol" = "pkg.mdu_a.func", -> "runtime_version" = "3.9.18", -> "always_nullable" = "false" -> ); Query OK, 0 rows affected (0.02 sec) Doris> select func1(1); +----------+ | func1(1) | +----------+ | 2 | +----------+ 1 row in set (2.70 sec) Doris> CREATE FUNCTION func10(INT) -> RETURNS INT -> PROPERTIES ( -> "type" = "PYTHON_UDF", -> "file" = "file:///mnt/disk7/linzhenqi/dv/path_b/pkg.zip", -> "symbol" = "pkg.mdu_b.func", -> "runtime_version" = "3.9.18", -> "always_nullable" = "false" -> ); Query OK, 0 rows affected (0.01 sec) Doris> select func10(1); ERROR 1105 (HY000): errCode = 2, detailMessage = (127.0.0.1)[RUNTIME_ERROR]Failed to load packaged UDF 'pkg.mdu_b.func': No module named 'pkg.mdu_b'. Detail: Python exception: Traceback (most recent call last): File "/mnt/disk7/linzhenqi/d4/doris/output/be/plugins/python_udf/python_server.py", line 1058, in _load_package_udf udf_module = self._get_or_import_module(location, full_module_name) File "/mnt/disk7/linzhenqi/d4/doris/output/be/plugins/python_udf/python_server.py", line 949, in _get_or_import_module modul -- pkg does not exist in path_b mdu_a Doris> CREATE FUNCTION func_no_module(INT) -> RETURNS INT -> PROPERTIES ( -> "type" = "PYTHON_UDF", -> "file" = "file:///mnt/disk7/linzhenqi/dv/path_b/pkg.zip", -> "symbol" = "pkg.mdu_a.func", -> "runtime_version" = "3.9.18", -> "always_nullable" = "false" -> ); Query OK, 0 rows affected (0.00 sec) -- The pkg cache was used by mistake, and the pkg of the func1 cache was found. Doris> select func_no_module(1); +-------------------+ | func_no_module(1) | +-------------------+ | 2 | +-------------------+ 1 row in set (0.04 sec) ``` In the `python_server`, the `location` is the cache directory, composed of the `function_id`. Thus, the internal `module_cache` only needs `location` as its key. After importing a cache, immediately remove the mapping maintained in `sys`. now ```text ```
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
1. `LargeInt` is mapped to arrow's `str` type, but no conversion is
performed in python_server which leads to an error due to the mismatch
between the final result and the expect return type(pa.str) after the
internal calculation is converted to int.
```text
Doris> CREATE FUNCTION py_largeint_inc(LARGEINT)
-> RETURNS LARGEINT
-> PROPERTIES (
-> "type"="PYTHON_UDF",
-> "symbol"="evaluate",
-> "runtime_version"="3.12.11",
-> "always_nullable"="true"
-> )
-> AS $$
-> def evaluate(x):
-> if x is None:
-> return None
-> return int(x) + 1
-> $$;
Query OK, 0 rows affected (0.024 sec)
Doris> SELECT py_largeint_inc(1);
ERROR 1105 (HY000): errCode = 2, detailMessage = (127.0.0.1)[RUNTIME_ERROR]Expected bytes, got a 'int' object. Detail: Python exception: Traceback (most recent call last):
File "pyarrow/_flight.pyx", line 2315, in pyarrow._flight._do_exchange
File "/mnt/disk7/linzhenqi/d1/doris/output/be/plugins/python_udf/python_server.py", line 2481, in do_exchange
self._handle_exchange_udf(python_udf_meta, reader, writer)
File "/mnt/disk7/linzhenqi/d1/doris/output/be/plugins/python_udf/python_server.py", line 2013, in _handle_exc
```
now:
```text
Doris> select py_largeint_inc(1);
+--------------------+
| py_largeint_inc(1) |
+--------------------+
| 2 |
+--------------------+
```
2. Udaf does not maintain metadata for special types
(IPv4/IPv6/LargeInt), causing the input_type to be passed and not
processed normally.
```text
Doris> CREATE AGGREGATE FUNCTION udaf_count_loopback_ipv6_inline(IPV6)
-> RETURNS BIGINT
-> PROPERTIES (
-> "type" = "PYTHON_UDF",
-> "symbol" = "CountLoopbackIPv6UDAF",
-> "runtime_version" = "3.9.18"
-> )
-> AS $$
-> class CountLoopbackIPv6UDAF:
-> def __init__(self):
-> self.count = 0
->
-> def accumulate(self, value):
-> if value is not None and value.is_loopback:
-> self.count += 1
->
-> def merge(self, other_state):
-> if other_state is not None:
-> self.count += other_state
->
-> def finish(self):
-> return self.count
->
-> @Property
-> def aggregate_state(self):
-> return self.count
-> $$;
Query OK, 0 rows affected (0.004 sec)
Doris> SELECT udaf_count_loopback_ipv6_inline(ip_v6) as loopback_ipv6_count
-> FROM test_pythonudaf_ipv6_table;
ERROR 1105 (HY000): errCode = 2, detailMessage = (127.0.0.1)[INTERNAL_ERROR][INTERNAL_ERROR]ACCUMULATE operation failed for place_id=140343167410176
0# doris::PythonUDAFClient::accumulate(long, bool, arrow::RecordBatch const&, long, long) at ../src/common/status.h:500
1# doris::AggregatePythonUDAFData::add(long, doris::IColumn const**, long, long, std::vector<std::shared_ptr<doris::IDataType const>, std::allocator<std::shared_ptr<doris::IDataType const> > > const&) at ./be/build_RELEASE/../src/exprs/aggregate/aggregate_f
```
now
```text
Doris> SELECT udaf_count_loopback_ipv6_inline(ip_v6) as loopback_ipv6_count
-> FROM test_pythonudaf_ipv6_table;
+---------------------+
| loopback_ipv6_count |
+---------------------+
| 1 |
+---------------------+
```
… name (apache#62167) Related PR: apache#61280 Problem Summary: When two imported modules share the same top-level package name, python_server imports them package by package, and clean up `sys.modules` after import, which causes incorrect lock key assignment and leads to thread contention or interference between imported states. For example, the symbols `pkg.mod.func1` and `pkg.mod.func2` may conflict when importing `pkg` or `pkg.mod` ```text 2026-04-07 16:35:30.108 ERROR [suite-thread-1] (ScriptContext.groovy:122) - Run test_pythonudtf_data_types_module in /mnt/disk7/linzhenqi/dv/doris/regression-test/suites/pythonudtf_p0/test_pythonudtf_data_types_module.groovy failed java.sql.SQLException: errCode = 2, detailMessage = (127.0.0.1)[RUNTIME_ERROR]Flight stream finish failed with message: 'pyudtf_module'. Detail: Python exception: Traceback (most recent call last): File "pyarrow/_flight.pyx", line 2315, in pyarrow._flight._do_exchange File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 2499, in do_exchange self._handle_exchange_udtf(python_udf_meta, reader, writer) File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 2316, in _handle_exchange_udtf adaptive_udtf = loader.load() ^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 877, in load func = self.load_udf_from_module(location, package_name, module_name, func_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 1135, in load_udf_from_module return self._load_package_udf( ^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 1088, in _load_package_udf udf_module = self._get_or_import_module(location, full_module_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 978, in _get_or_import_module module = importlib.import_module(full_module_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/miniconda3/envs/py312/lib/python3.12/importlib/__init__.py", line 90, in import_module return _bootstrap._gcd_import(name[level:], package, level) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<frozen importlib._bootstrap>", line 1387, in _gcd_import File "<frozen importlib._bootstrap>", line 1360, in _find_and_load File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed File "<frozen importlib._bootstrap>", line 1387, in _gcd_import File "<frozen importlib._bootstrap>", line 1360, in _find_and_load File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 946, in _load_unlocked KeyError: 'pyudtf_module' ... 2026-04-07 16:35:30.169 ERROR [suite-thread-6] (ScriptContext.groovy:122) - Run test_pythonudtf_exceptions_module in /mnt/disk7/linzhenqi/dv/doris/regression-test/suites/pythonudtf_p0/test_pythonudtf_exceptions_module.groovy failed java.sql.SQLException: errCode = 2, detailMessage = (127.0.0.1)[RUNTIME_ERROR]Flight stream finish failed with message: 'pyudtf_module'. Detail: Python exception: Traceback (most recent call last): File "pyarrow/_flight.pyx", line 2315, in pyarrow._flight._do_exchange File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 2499, in do_exchange self._handle_exchange_udtf(python_udf_meta, reader, writer) File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 2316, in _handle_exchange_udtf adaptive_udtf = loader.load() ^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 877, in load func = self.load_udf_from_module(location, package_name, module_name, func_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 1135, in load_udf_from_module return self._load_package_udf( ^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 1088, in _load_package_udf udf_module = self._get_or_import_module(location, full_module_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 978, in _get_or_import_module module = importlib.import_module(full_module_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/disk7/linzhenqi/miniconda3/envs/py312/lib/python3.12/importlib/__init__.py", line 90, in import_module return _bootstrap._gcd_import(name[level:], package, level) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<frozen importlib._bootstrap>", line 1387, in _gcd_import File "<frozen importlib._bootstrap>", line 1360, in _find_and_load File "<frozen importlib._bootstrap>", line 1337, in _find_and_load_unlocked KeyError: 'pyudtf_module' ```
Newer versions of Conda can no longer keep Python pinned to `3.8.10` while remaining compatible with several required test dependencies (e.g. pyarrow, pandas) from conda-forge.(will automatically upgrade to `3.8.19`) To ensure the test environment can be successfully created and remains stable, This PR allows controlling the Python UDF version in p0 by modifying the `pythonUdfRuntimeVersion` in `regression_conf.groovy`.
…n-isolated (apache#62620) `PythonServerManager` does not check if the python process corresponding to the version is alive when retrieving the process, which may cause errors like: ```text java.lang.IllegalStateException: PYTHON_UDF_BLOCKED suite=python_udf_cross_feature_import_storage scenario=python_udf_cross_feature_import_storage.inline_runtime reason=inline probe failed. reason=errCode = 2, detailMessage = (172.20.49.73)[INTERNAL_ERROR]IOError: Flight stream finish failed with gRPC code 14, message: failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/doris_python_udf_55799.sock: Connection refused ``` After modification, before obtaining the Python process, it will check if the process is alive to ensure the availability of this feature.
…e#62624) Problem Summary: ```sql CREATE FUNCTION py_pkg_versions() RETURNS STRING PROPERTIES ( "type" = "PYTHON_UDF", "symbol" = "evaluate", "runtime_version" = "3.12.11", "always_nullable" = "true" ) AS $$ import json import sys def evaluate(): versions = {"python": sys.version} try: import numpy versions["numpy"] = numpy.__version__ except: versions["numpy"] = "not_found" try: import pandas versions["pandas"] = pandas.__version__ except: versions["pandas"] = "not_found" try: import jieba versions["jieba"] = jieba.__version__ except: versions["jieba"] = "not_found" return json.dumps(versions) $$; ``` before: ```sql SELECT py_pkg_versions(); -- errCode = 2, detailMessage = (172.20.49.73)[INVALID_ARGUMENT]Python UDF input types is empty ``` now: ```sql SELECT py_pkg_versions(); +------------------------------------------------------------------------------------------------------------------------------------------------------+ | py_pkg_versions() | +------------------------------------------------------------------------------------------------------------------------------------------------------+ | {"python": "3.12.11 | packaged by conda-forge | (main, Jun 4 2025, 14:45:31) [GCC 13.3.0]", "numpy": "2.4.3", "pandas": "3.0.1", "jieba": "0.42.1"} | +------------------------------------------------------------------------------------------------------------------------------------------------------+ ```
…2974) Improve observability for rare Python UDF process pool initialization stalls. Previously, when initialization was blocked or unusually slow, logs only showed the start of pool creation, making it difficult to tell whether BE was still waiting on process startup or where time was being spent. This adds lightweight progress and elapsed-time logs to help diagnose initialization hangs without increasing normal-case log volume. And reduce the `max_python_process_num` to 16 in regression test, we don't need so many processes for testing.
Nereids resolves UDF calls from FunctionRegistry, while SHOW FUNCTIONS
reads catalog
metadata. After DROP DATABASE and recreating the same database name,
catalog metadata
could be empty but FunctionRegistry still contained stale Python UDF
builders, causing
SELECT to bind and execute the old function body.
```sql
DROP DATABASE IF EXISTS registry_test_db
CREATE DATABASE registry_test_db;
USE registry_test_db;
DROP FUNCTION IF EXISTS py_exc_cache_test(INT);
CREATE FUNCTION py_exc_cache_test(INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.12.11",
"always_nullable" = "true"
)
AS $$
def evaluate(x):
if x is None:
return None
return x + 1
$$;
-- Normal operation
SELECT py_exc_cache_test(10); -- 11
-- Directly delete, but the code didn't clean up the FunctionRegistry under db.
DROP DATABASE registry_test_db FORCE;
CREATE DATABASE registry_test_db;
USE registry_test_db;
-- show functions 走catalog的 db.getFunctions()
SHOW FUNCTIONS LIKE 'py_exc_cache_test';
-- empty
-- Function execution, go through FunctionRegistry, use the remaining FunctionRegistry, expected to be 11 (bug)
SELECT py_exc_cache_test(10);
-- Create a new function with the same name, but the execution logic is to append to the end of the Registry.
-- Normal call still goes through the previous version of x + 1 after drop dp (bug)
DROP FUNCTION IF EXISTS py_exc_cache_test(INT);
CREATE FUNCTION py_exc_cache_test(INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.12.11",
"always_nullable" = "true"
)
AS $$
def evaluate(x):
if x is None:
return None
return x + 999
$$;
SELECT py_exc_cache_test(10); -- still 11 (bug)
```
Fix Python UDAF stale cache reuse after dropping and recreating an
inline UDAF with the same name/signature.
The Python server previously keyed `UDAF state` managers by `function
name and argument types`, so a recreated inline UDAF could reuse the old
loaded Python class. This fix includes the FE function id in the Python
UDAF metadata/cache key and clears `UDAF state` manager cache during
`DROP FUNCTION` cleanup.
```sql
set enable_sql_cache = 0;
DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
drop database if exists db001;
create database db001;
use db001;
-- 0. Prepare test data
DROP TABLE IF EXISTS t_udaf_cache_bug_test;
CREATE TABLE t_udaf_cache_bug_test (
id INT,
val INT
) DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num"="1");
INSERT INTO t_udaf_cache_bug_test VALUES (1, 10), (2, 20), (3, 30);
-- At this moment, the total of the entire table val is 60.
-- 1. Create V1 version of UDAF (Logic: Accumulate and multiply by 10)
DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
select sleep(10);
CREATE AGGREGATE FUNCTION py_udaf_bug_repro(INT)
RETURNS BIGINT
PROPERTIES (
"type"="PYTHON_UDF",
"symbol"="RecreateUDAF",
"runtime_version"="3.12.11",
"always_nullable"="true"
)
AS $$
class RecreateUDAF:
def __init__(self):
self.total = 0
@Property
def aggregate_state(self):
return self.total
def accumulate(self, val):
if val is not None:
self.total += val
def merge(self, other):
self.total += other
def finish(self):
return self.total * 10 # V1: 乘以 10
$$;
-- 2. Verify V1 Logic
SELECT py_udaf_bug_repro(val) FROM t_udaf_cache_bug_test;
-- Expected Return: 600 (60 * 10)
-- Actual Return: 600 (Correct)
-- 3. Drop the old function and create a V2 version of the UDAF with the same name (logic: accumulate and multiply by 100)
DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
select sleep(10);
select sleep(10);
CREATE AGGREGATE FUNCTION py_udaf_bug_repro(INT)
RETURNS BIGINT
PROPERTIES (
"type"="PYTHON_UDF",
"symbol"="RecreateUDAF",
"runtime_version"="3.12.11",
"always_nullable"="true"
)
AS $$
class RecreateUDAF:
def __init__(self):
self.total = 0
@Property
def aggregate_state(self):
return self.total
def accumulate(self, val):
if val is not None:
self.total += val
def merge(self, other):
self.total += other
def finish(self):
return self.total * 100 # V2: Logic modified to multiply by 100
$$;
-- 4. Verify V2 Logic
SELECT py_udaf_bug_repro(val) FROM t_udaf_cache_bug_test;
-- Expected Return: 6000 (60 * 100)
-- Actual Return: 600 ([Bug occurs] Still outputs the old cached 600)
```
Problem Summary:
before
```sql
CREATE FUNCTION py_err_stats_test(INT)
RETURNS INT
PROPERTIES (
"type"="PYTHON_UDF",
"symbol"="evaluate",
"runtime_version"="3.12.11",
"always_nullable"="true"
) AS $$
def evaluate(x):
raise TypeError("consistent_error_42")
$$;
SELECT py_err_stats_test(1);
+----------------------+
| py_err_stats_test(1) |
+----------------------+
| NULL |
+----------------------+
```
now
```sql
SELECT py_err_stats_test(1);
-- ERROR 1105 (HY000): errCode = 2, detailMessage = (127.0.0.1)[RUNTIME_ERROR]Flight stream finish failed with message: Error in scalar UDF execution at row 0: consistent_error_42. Detail: Python exception: Traceback (most recent call last):
File "/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", line 614, in _scalar_call
res = self._eval_func(*converted_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 3, in evaluate
TypeError: consistent_error_42
```
Problem Summary:
prevent failures caused by conflicting function names across test
suites.
```text
Exception in pythonudf_p0/test_pythonudf_mixed_params.groovy(line 242):
sql """ DROP FUNCTION IF EXISTS py_vec_add_int(INT, INT); """
// If another suite creates a function with the same name at this time, it will cause the test to fail.
sql """
CREATE FUNCTION py_vec_add_int(INT, INT)
Exception:
java.sql.SQLException: errCode = 2, detailMessage = function already exists
```
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
wait for master test to be completed |
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
0a1a1ff to
f0ebef8
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
f0ebef8 to
d28462f
Compare
…#62698) Problem Summary: Previously, UDFs could be treated as deterministic in optimizer-related paths, which is unsafe for UDFs whose results are not stable across evaluations. That may cause invalid rewrite/planning decisions and lead to incorrect query semantics in some cases. Introduce `immutable`, `stable`, and `volatile` semantics through `"volatility" = "immutable|stable|volatile"`, persist the property in function metadata, and use it to drive deterministic and volatile-expression behavior in Nereids. Immutable UDFs are treated as deterministic, stable UDFs avoid volatile identity handling while remaining non-deterministic, and volatile UDFs receive per-call volatile identities to protect unsafe rewrites. ```sql CREATE TABLE cte_uuid_seed (id INT) ENGINE=OLAP DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ("replication_num" = "1"); INSERT INTO cte_uuid_seed VALUES (1),(2),(3); DROP FUNCTION IF EXISTS py_uuid_token(INT); CREATE FUNCTION py_uuid_token(INT) RETURNS STRING PROPERTIES ( "type" = "PYTHON_UDF", "symbol" = "py_uuid_token_impl", "always_nullable" = "false", "runtime_version" = "3.12.11" ) AS $$ import uuid def py_uuid_token_impl(x): return f"{x}-{uuid.uuid4()}" $$; ``` before: ```sql SET enable_cte_materialize = true; SET inline_cte_referenced_threshold = 10; -- treated as volatile func(UniqueFunction), which caused wrong planning WITH cte AS (SELECT id, py_uuid_token(id) AS token FROM cte_uuid_seed) SELECT id, COUNT(DISTINCT token) AS distinct_tokens FROM (SELECT id, token FROM cte UNION ALL SELECT id, token FROM cte) u GROUP BY id ORDER BY id; +------+-----------------+ | id | distinct_tokens | +------+-----------------+ | 1 | 2 | | 2 | 2 | | 3 | 2 | +------+-----------------+ ``` now ```sql +------+-----------------+ | id | distinct_tokens | +------+-----------------+ | 1 | 1 | | 2 | 1 | | 3 | 1 | +------+-----------------+ ``` doc: apache/doris-website#3570
d28462f to
0d5d720
Compare
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
pick pythonUdf from master