Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ public void put(K key, V value, long expirationTimeMs) {
public V get(K key) {
Long expirationTime = expirationMap.get(key);
if (expirationTime == null || System.currentTimeMillis() > expirationTime) {
map.remove(key);
expirationMap.remove(key);
ttlMap.remove(key);
remove(key);
return null;
}
// reset time again
Expand All @@ -64,18 +62,25 @@ private void startExpirationTask() {
long now = System.currentTimeMillis();
for (K key : expirationMap.keySet()) {
if (expirationMap.get(key) <= now) {
map.remove(key);
expirationMap.remove(key);
ttlMap.remove(key);
remove(key);
}
}
}, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES);
}

public void remove(K key) {
map.remove(key);
V value = map.remove(key);
expirationMap.remove(key);
ttlMap.remove(key);

// Uniformly release resources for any AutoCloseable value,
if (value instanceof AutoCloseable) {
try {
((AutoCloseable) value).close();
} catch (Exception e) {
LOG.warn("Failed to close cached resource: " + key, e);
}
}
}

public int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package org.apache.doris.common.jni.utils;

import com.esotericsoftware.reflectasm.MethodAccess;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URLClassLoader;
import java.util.HashMap;

/**
* This class is used for caching the class of UDF.
*/
public class UdfClassCache {
public class UdfClassCache implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(UdfClassCache.class);
public Class<?> udfClass;
// the index of evaluate() method in the class
public MethodAccess methodAccess;
Expand All @@ -42,4 +46,22 @@ public class UdfClassCache {
// for java-udf index is evaluate method index
// for java-udaf index is add method index
public int methodIndex;

// Keep a reference to the ClassLoader for static load mode
// This ensures the ClassLoader is not garbage collected and can load dependent classes
// Note: classLoader may be null when jarPath is empty (UDF loaded from custom_lib via
// system class loader), which must not be closed — null is intentional in that case.
public URLClassLoader classLoader;

@Override
public void close() {
if (classLoader != null) {
try {
classLoader.close();
} catch (IOException e) {
LOG.warn("Failed to close ClassLoader", e);
}
classLoader = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.thrift.protocol.TBinaryProtocol;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
import java.net.URLClassLoader;
Expand Down Expand Up @@ -139,6 +138,10 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira
UdfClassCache cache = null;
if (isStaticLoad) {
cache = ScannerLoader.getUdfClassLoader(signature);
if (cache != null && cache.classLoader != null) {
// Reuse the cached classLoader to ensure dependent classes can be loaded
classLoader = cache.classLoader;
}
}
if (cache == null) {
ClassLoader loader;
Expand All @@ -156,6 +159,7 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira
cache.allMethods = new HashMap<>();
cache.udfClass = Class.forName(className, true, loader);
cache.methodAccess = MethodAccess.get(cache.udfClass);
cache.classLoader = classLoader;
checkAndCacheUdfClass(cache, funcRetType, parameterTypes);
if (isStaticLoad) {
ScannerLoader.cacheClassLoader(signature, cache, expirationTime);
Expand All @@ -171,24 +175,17 @@ protected abstract void checkAndCacheUdfClass(UdfClassCache cache, Type funcRetT
* Close the class loader we may have created.
*/
public void close() {
if (classLoader != null) {
try {
classLoader.close();
} catch (IOException e) {
// Log and ignore.
if (LOG.isDebugEnabled()) {
LOG.debug("Error closing the URLClassloader.", e);
}
}
}
// Close the output table if it exists.
if (outputTable != null) {
outputTable.close();
}
// We are now un-usable (because the class loader has been
// closed), so null out method_ and classLoader_.
classLoader = null;
objCache.methodAccess = null;
if (!isStaticLoad) {
// close classLoader via UdfClassCache.close() if not in static load mode.
// In static load mode, the classLoader is cached and should not be closed here.
objCache.close();
objCache.methodAccess = null;
classLoader = null;
}
}

protected ColumnValueConverter getInputConverter(TPrimitiveType primitiveType, Class clz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ public UdafExecutor(byte[] thriftParams) throws Exception {
*/
@Override
public void close() {
if (!isStaticLoad) {
super.close();
}
// Call parent's close method which handles classLoader and outputTable properly
// It will only close classLoader if not in static load mode
super.close();
// Clear the state map
stateObjMap = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,9 @@ public UdfExecutor(byte[] thriftParams) throws Exception {
*/
@Override
public void close() {
// We are now un-usable (because the class loader has been
// closed), so null out method_ and classLoader_.
if (!isStaticLoad) {
super.close();
} else if (outputTable != null) {
outputTable.close();
}
// Call parent's close method which handles classLoader properly
// It will only close classLoader if not in static load mode
super.close();
}

public long evaluate(Map<String, String> inputParams, Map<String, String> outputParams) throws UdfRuntimeException {
Expand Down
Loading