Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 65 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -769,16 +769,74 @@ public void setName(String name) {
}

public synchronized void addFunction(Function function, boolean ifNotExists) throws UserException {
function.checkWritable();
if (FunctionUtil.addFunctionImpl(function, ifNotExists, false, name2Function)) {
Env.getCurrentEnv().getEditLog().logAddFunction(function);
try {
addFunctions(ImmutableList.of(function), ifNotExists, false);
}

public synchronized void addTableFunction(Function function, boolean ifNotExists) throws UserException {
// Doris table functions are registered as two functions: the normal function and its outer variant.
Function outerFunction = function.clone();
FunctionName name = outerFunction.getFunctionName();
name.setFn(name.getFunction() + "_outer");
if (hasSameTableFunctionPair(function, outerFunction, ifNotExists)) {
return;
}
addFunctions(ImmutableList.of(function, outerFunction), false, true);
}

private boolean hasSameTableFunctionPair(Function function, Function outerFunction, boolean ifNotExists)
throws UserException {
Function existingFunction = getExistingFunction(function);
Function existingOuterFunction = getExistingFunction(outerFunction);
if (existingFunction == null && existingOuterFunction == null) {
return false;
}
if (ifNotExists && existingFunction != null && existingOuterFunction != null
&& existingFunction.isUDTFunction() && existingOuterFunction.isUDTFunction()) {
return true;
}
throw new UserException("function already exists");
}

private Function getExistingFunction(Function function) {
try {
return getFunction(getFunctionSearchDesc(function));
} catch (AnalysisException e) {
return null;
}
}

private void addFunctions(List<Function> functions, boolean ifNotExists, boolean logAsBatch) throws UserException {
List<Function> addedFunctions = Lists.newArrayList();
try {
for (Function function : functions) {
function.checkWritable();
if (FunctionUtil.addFunctionImpl(function, ifNotExists, false, name2Function)) {
Comment thread
morrySnow marked this conversation as resolved.
addedFunctions.add(function);
}
}
for (Function function : addedFunctions) {
FunctionUtil.translateToNereidsThrows(this.getFullName(), function);
} catch (Exception e) {
name2Function.remove(function.getFunctionName().getFunction());
throw e;
}
} catch (Exception e) {
for (Function function : addedFunctions) {
FunctionUtil.dropFromNereids(this.getFullName(), getFunctionSearchDesc(function));
Comment thread
morrySnow marked this conversation as resolved.
}
for (int i = addedFunctions.size() - 1; i >= 0; i--) {
FunctionUtil.removeFunctionImpl(addedFunctions.get(i), name2Function);
}
throw e;
}
if (logAsBatch) {
Env.getCurrentEnv().getEditLog().logAddFunctions(addedFunctions);
} else {
for (Function function : addedFunctions) {
Env.getCurrentEnv().getEditLog().logAddFunction(function);
}
}
}

private FunctionSearchDesc getFunctionSearchDesc(Function function) {
return new FunctionSearchDesc(function.getFunctionName(), function.getArgs(), function.hasVarArgs());
}

public synchronized void replayAddFunction(Function function) {
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.CreateFunctionInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.EditLog;
Expand Down Expand Up @@ -6774,6 +6775,12 @@ public void replayCreateFunction(Function function) throws MetaNotFoundException
db.replayAddFunction(function);
}

public void replayCreateFunctions(CreateFunctionInfo info) throws MetaNotFoundException {
for (Function function : info.getFunctions()) {
replayCreateFunction(function);
}
}

public void replayCreateGlobalFunction(Function function) {
globalFunctionMgr.replayAddFunction(function);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,18 @@ public void addUdf(String dbName, String name, UdfBuilder builder) {
}
}

public void dropUdf(String dbName, String name, List<DataType> argTypes) {
public void dropUdf(String dbName, String name, List<DataType> argTypes, boolean hasVarArgs) {
if (dbName == null) {
dbName = GLOBAL_FUNCTION;
}
synchronized (name2UdfBuilders) {
Map<String, List<FunctionBuilder>> builders = name2UdfBuilders.getOrDefault(dbName, ImmutableMap.of());
builders.getOrDefault(name, Lists.newArrayList())
.removeIf(builder -> ((UdfBuilder) builder).getArgTypes().equals(argTypes));
.removeIf(builder -> {
UdfBuilder udfBuilder = (UdfBuilder) builder;
return udfBuilder.getArgTypes().equals(argTypes)
&& udfBuilder.hasVarArguments() == hasVarArgs;
});

// the name will be used when show functions, so remove the name when it's dropped
if (builders.getOrDefault(name, Lists.newArrayList()).isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.nereids.trees.expressions.functions.udf.AliasUdf;
import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdaf;
import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdf;
Expand Down Expand Up @@ -136,6 +137,13 @@ public static boolean addFunctionImpl(Function function, boolean ifNotExists, bo
return true;
}

public static boolean removeFunctionImpl(Function function,
ConcurrentMap<String, ImmutableList<Function>> name2Function) throws UserException {
FunctionSearchDesc functionSearchDesc = new FunctionSearchDesc(function.getFunctionName(), function.getArgs(),
function.hasVarArgs());
return dropFunctionImpl(functionSearchDesc, false, name2Function);
}

public static Function getFunction(FunctionSearchDesc function,
ConcurrentMap<String, ImmutableList<Function>> name2Function) throws AnalysisException {
String functionName = function.getName().getFunction();
Expand Down Expand Up @@ -179,6 +187,9 @@ public static boolean translateToNereids(String dbName, Function function) {
}

public static boolean translateToNereidsThrows(String dbName, Function function) {
if (DebugPointUtil.isEnable("FunctionUtil.translateToNereidsThrows.exception")) {
throw new RuntimeException("debug point FunctionUtil.translateToNereidsThrows.exception");
}
try {
translateToNereidsImpl(dbName, function);
} catch (Exception e) {
Expand Down Expand Up @@ -220,7 +231,7 @@ public static boolean dropFromNereids(String dbName, FunctionSearchDesc function
String fnName = function.getName().getFunction();
List<DataType> argTypes = Arrays.stream(function.getArgTypes()).map(DataType::fromCatalogType)
.collect(Collectors.toList());
Env.getCurrentEnv().getFunctionRegistry().dropUdf(dbName, fnName, argTypes);
Env.getCurrentEnv().getFunctionRegistry().dropUdf(dbName, fnName, argTypes, function.isVariadic());
} catch (Exception e) {
LOG.warn("Nereids drop function {}:{} failed, caused by: {}", dbName == null ? "_global_" : dbName,
function.getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public synchronized void addFunction(Function function, boolean ifNotExists) thr
function.setGlobal(true);
function.checkWritable();
if (FunctionUtil.addFunctionImpl(function, ifNotExists, false, name2Function)) {
Env.getCurrentEnv().getEditLog().logAddGlobalFunction(function);
try {
FunctionUtil.translateToNereidsThrows(null, function);
} catch (Exception e) {
LOG.warn("Nereids add function failed", e);
name2Function.remove(function.getFunctionName().getFunction());
FunctionUtil.removeFunctionImpl(function, name2Function);
throw e;
}
Env.getCurrentEnv().getEditLog().logAddGlobalFunction(function);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.doris.persist.ConsistencyCheckInfo;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.CreateDictionaryPersistInfo;
import org.apache.doris.persist.CreateFunctionInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DatabaseInfo;
import org.apache.doris.persist.DictionaryDecreaseVersionInfo;
Expand Down Expand Up @@ -499,6 +500,11 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_ADD_FUNCTIONS: {
data = CreateFunctionInfo.read(in);
isRead = true;
break;
}
case OperationType.OP_DROP_FUNCTION: {
data = FunctionSearchDesc.read(in);
isRead = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AliasUdf extends ScalarFunction implements ExplicitlyCastableSignat
private final Expression unboundFunction;
private final List<String> parameters;
private final List<DataType> argTypes;
private final boolean hasVarArguments;
private final Map<String, String> sessionVariables;

/**
Expand All @@ -55,14 +56,28 @@ public AliasUdf(String name, List<DataType> argTypes, Expression unboundFunction
List<String> parameters, Map<String, String> sessionVariables, Expression... arguments) {
super(name, arguments);
this.argTypes = argTypes;
this.hasVarArguments = false;
this.unboundFunction = unboundFunction;
this.parameters = parameters;
this.sessionVariables = sessionVariables;
}

/**
* constructor with session variables.
*/
public AliasUdf(String name, List<DataType> argTypes, boolean hasVarArguments, Expression unboundFunction,
List<String> parameters, Map<String, String> sessionVariables, Expression... arguments) {
super(name, arguments);
this.argTypes = argTypes;
this.hasVarArguments = hasVarArguments;
this.unboundFunction = unboundFunction;
this.parameters = parameters;
this.sessionVariables = sessionVariables;
}

@Override
public List<FunctionSignature> getSignatures() {
return ImmutableList.of(FunctionSignature.of(NullType.INSTANCE, argTypes));
return ImmutableList.of(FunctionSignature.of(NullType.INSTANCE, hasVarArguments, argTypes));
}

public List<String> getParameters() {
Expand Down Expand Up @@ -101,6 +116,7 @@ public static void translateToNereidsFunction(String dbName, AliasFunction funct
AliasUdf aliasUdf = new AliasUdf(
function.functionName(),
Arrays.stream(function.getArgs()).map(DataType::fromCatalogType).collect(Collectors.toList()),
function.hasVarArgs(),
parsedFunction,
function.getParameters(),
sessionVariables);
Expand All @@ -116,7 +132,7 @@ public int arity() {

@Override
public Expression withChildren(List<Expression> children) {
return new AliasUdf(getName(), argTypes, unboundFunction, parameters, sessionVariables,
return new AliasUdf(getName(), argTypes, hasVarArguments, unboundFunction, parameters, sessionVariables,
children.toArray(new Expression[0]));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public abstract class UdfBuilder extends FunctionBuilder {
public abstract List<DataType> getArgTypes();

public abstract List<FunctionSignature> getSignatures();

public boolean hasVarArguments() {
return getSignatures().get(0).hasVarArgs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,10 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
functionName.setDb(dbName);
}
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
db.addFunction(function, ifNotExists);
if (function.isUDTFunction()) {
// all of the table function in doris will have two function
// one is the noraml, and another is outer, the different of them is deal with
// empty: whether need to insert NULL result value
Function outerFunction = function.clone();
FunctionName name = outerFunction.getFunctionName();
name.setFn(name.getFunction() + "_outer");
db.addFunction(outerFunction, ifNotExists);
db.addTableFunction(function, ifNotExists);
} else {
db.addFunction(function, ifNotExists);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.persist;

import org.apache.doris.catalog.Function;
import org.apache.doris.common.io.Writable;

import com.google.common.collect.ImmutableList;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;

public class CreateFunctionInfo implements Writable {
private final List<Function> functions;

public CreateFunctionInfo(List<Function> functions) {
this.functions = ImmutableList.copyOf(functions);
}

public List<Function> getFunctions() {
return functions;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(functions.size());
for (Function function : functions) {
function.write(out);
}
}

public static CreateFunctionInfo read(DataInput in) throws IOException {
ImmutableList.Builder<Function> builder = ImmutableList.builder();
int functionSize = in.readInt();
for (int i = 0; i < functionSize; i++) {
builder.add(Function.read(in));
}
return new CreateFunctionInfo(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,11 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
Env.getCurrentEnv().replayCreateFunction(function);
break;
}
case OperationType.OP_ADD_FUNCTIONS: {
final CreateFunctionInfo info = (CreateFunctionInfo) journal.getData();
Env.getCurrentEnv().replayCreateFunctions(info);
break;
}
case OperationType.OP_DROP_FUNCTION: {
FunctionSearchDesc function = (FunctionSearchDesc) journal.getData();
Env.getCurrentEnv().replayDropFunction(function);
Expand Down Expand Up @@ -2135,6 +2140,10 @@ public void logAddFunction(Function function) {
logEdit(OperationType.OP_ADD_FUNCTION, function);
}

public void logAddFunctions(List<Function> functions) {
logEdit(OperationType.OP_ADD_FUNCTIONS, new CreateFunctionInfo(functions));
}

public void logAddGlobalFunction(Function function) {
logEdit(OperationType.OP_ADD_GLOBAL_FUNCTION, function);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public class OperationType {
public static final short OP_DROP_FUNCTION = 131;
public static final short OP_ADD_GLOBAL_FUNCTION = 132;
public static final short OP_DROP_GLOBAL_FUNCTION = 133;
public static final short OP_ADD_FUNCTIONS = 134;

// modify database/table/tablet/replica meta
public static final short OP_SET_REPLICA_VERSION = 141;
Expand Down
Loading
Loading