Skip to content

Commit

Permalink
feat: udf select form (#3449)
Browse files Browse the repository at this point in the history
Signed-off-by: sunlichao11 <sunlichao11@jd.com>
Signed-off-by: Zzm0809 <934230207@qq.com>
Co-authored-by: sunlichao11 <sunlichao11@jd.com>
Co-authored-by: Zzm0809 <934230207@qq.com>
Co-authored-by: Zzm0809 <Zzm0809@users.noreply.github.com>
Co-authored-by: ufoe <guiyinzhang@qq.com>
Co-authored-by: XiuhongTang <t_spider@aliyun.com>
Co-authored-by: 唐修红 <tangxiuhong@qgs-china.com>
Co-authored-by: ZackYoung <zackyoungh@163.com>
Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
9 people committed May 24, 2024
1 parent 6dd39fb commit 64fe0da
Show file tree
Hide file tree
Showing 49 changed files with 680 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public class JarController {
@PostMapping("/udf/generateJar")
@ApiOperation("Generate jar")
public Result<Map<String, List<String>>> generateJar() {
List<Task> allUDF = taskService.getAllUDF();
List<Task> allUDF = taskService.getReleaseUDF();
List<UDF> udfCodes = allUDF.stream()
.map(task -> UDF.builder()
.code(task.getStatement())
.className(task.getSavePointPath())
.className(task.getConfigJson().getUdfConfig().getClassName())
.functionLanguage(
FunctionLanguage.valueOf(task.getDialect().toUpperCase()))
.build())
Expand Down
20 changes: 20 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/UDFController.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
import org.dinky.data.model.Resources;
import org.dinky.data.model.udf.UDFManage;
import org.dinky.data.result.Result;
import org.dinky.data.vo.CascaderVO;
import org.dinky.data.vo.UDFManageVO;
import org.dinky.function.data.model.UDF;
import org.dinky.service.TaskService;
import org.dinky.service.UDFService;
import org.dinky.utils.UDFUtils;

import java.util.List;
import java.util.stream.Collectors;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -36,6 +41,7 @@

import cn.dev33.satoken.annotation.SaCheckLogin;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -50,6 +56,7 @@
@RequiredArgsConstructor
public class UDFController {
private final UDFService udfService;
private final TaskService taskService;

/**
* update udf name by id
Expand Down Expand Up @@ -94,4 +101,17 @@ public Result<Void> addOrUpdateByResourceId(@RequestBody CommonDTO<List<Integer>
udfService.addOrUpdateByResourceId(dto.getData());
return Result.succeed();
}

/**
* get all udf and convert its to cascader
* @return {@link Result} of {@link List} of {@link CascaderVO}
*/
@GetMapping("/getAllUdfs")
@ApiOperation("Get All UDFs")
public Result<List<CascaderVO>> getAllUdfsToCascader() {
// get all UDFs of dynamic UDFs(user defined UDFs in the task)
List<UDF> userDefinedReleaseUdfs =
taskService.getReleaseUDF().stream().map(UDFUtils::taskToUDF).collect(Collectors.toList());
return Result.succeed(udfService.getAllUdfsToCascader(userDefinedReleaseUdfs));
}
}
4 changes: 2 additions & 2 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

/**
* StudioExecuteDTO
*
*/
@Getter
@Setter
Expand Down Expand Up @@ -237,10 +236,11 @@ public JobConfig getJobConfig() {

Map<String, String> parsedConfig =
this.configJson == null ? new HashMap<>(0) : this.configJson.getCustomConfigMaps();

Map<String, String> udfRefers = this.configJson == null ? new HashMap<>(0) : this.configJson.getUdfReferMaps();
JobConfig jobConfig = new JobConfig();
BeanUtil.copyProperties(this, jobConfig);
jobConfig.setConfigJson(parsedConfig);
jobConfig.setUdfRefer(udfRefers);
jobConfig.setTaskId(id);
jobConfig.setJobName(name);

Expand Down
3 changes: 3 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/model/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public class Task extends SuperEntity<Task> {
notes = "ID of the version associated with the task")
private Integer versionId;

@ApiModelProperty(value = "Enabled", dataType = "Boolean", example = "true", notes = "Whether the task is enabled")
private Boolean enabled;

@ApiModelProperty(value = "Statement", dataType = "String", notes = "SQL statement for the task")
private String statement;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.ext.ConfigItem;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -49,6 +51,12 @@ public class TaskExtConfig implements Serializable {
notes = "UDF (User-Defined Function) configuration for the task")
private TaskUdfConfig udfConfig;

@ApiModelProperty(
value = "UDF Refer",
dataType = "TaskUdfRefer",
notes = "UDF (User-Defined Function) reference for the task")
private List<TaskUdfRefer> udfRefer;

@ApiModelProperty(
value = "Custom Config",
dataType = "List<ConfigItem>",
Expand Down Expand Up @@ -80,6 +88,25 @@ public Map<String, String> getCustomConfigMaps() {
: new HashMap<>();
}

// udfRefer-value的所有key-value
@JsonIgnore
public Map<String, String> getUdfReferMaps() {
return Asserts.isNotNullCollection(udfRefer)
? udfRefer.stream()
.filter(item -> item.getClassName() != null)
.map(t -> {
if (StringUtils.isEmpty(t.getName())) {
String name = t.getClassName()
.substring(t.getClassName().lastIndexOf(".") + 1);
name = name.substring(0, 1).toLowerCase() + name.substring(1);
t.setName(name);
}
return t;
})
.collect(Collectors.toConcurrentMap(TaskUdfRefer::getClassName, TaskUdfRefer::getName))
: new HashMap<>();
}

// 是否包含某个key
public boolean containsKey(String key) {
return customConfig.stream().anyMatch(item -> item.getKey().equals(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@AllArgsConstructor
@NoArgsConstructor
public class TaskUdfConfig implements Serializable {
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = -5981544561742928810L;

@ApiModelProperty(value = "Template ID", dataType = "Integer", example = "1", notes = "ID of the UDF template")
private Integer templateId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.model.ext;

import java.io.Serializable;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@ApiModel(value = "TaskUdfRefer", description = "UDF (User-Defined Function) refer for Task")
@AllArgsConstructor
@NoArgsConstructor
public class TaskUdfRefer implements Serializable {

@ApiModelProperty(value = "function name", dataType = "String", example = "add", notes = "Nmae of the UDF function")
private String name;

@ApiModelProperty(value = "Class Name", dataType = "String", notes = "Name of the UDF class")
private String className;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class UDFManage extends SuperEntity<UDFManage> {
@ApiModelProperty(value = "Class Name", dataType = "String", notes = "Class Name")
private String className;

@ApiModelProperty(value = "Language", dataType = "String", notes = "Language")
private String language;

@ApiModelProperty(value = "Task Id", dataType = "Integer", notes = "Task Id")
private Integer taskId;

Expand Down
12 changes: 11 additions & 1 deletion dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,19 @@ public CascaderVO(String label) {
this.value = label;
}

public CascaderVO(String label, String value) {
this.label = label;
this.value = value;
}

public CascaderVO(String label, List<CascaderVO> children) {
this.label = label;
this.value = label;
this.children = children;
}

public CascaderVO(String label, String value, List<CascaderVO> children) {
this.label = label;
this.value = value;
this.children = children;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class UDFManageVO implements Serializable {
private String name;
private Boolean enabled;
private String className;
private String language;
private Integer taskId;
private Integer resourcesId;
/**
Expand Down
1 change: 0 additions & 1 deletion dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public class SystemInit implements ApplicationRunner {
private final TenantService tenantService;
private final GitProjectService gitProjectService;
private final ScheduleThreadPool schedule;

private static Project project;

@Override
Expand Down
7 changes: 0 additions & 7 deletions dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ public interface TaskService extends ISuperService<Task> {
*/
Task initDefaultFlinkSQLEnv(Integer tenantId);

/**
* Get a list of all user-defined functions (UDFs) in the system.
*
* @return A list of {@link Task} objects representing the UDFs.
*/
List<Task> getAllUDF();

/**
* Get a list of all release user-defined functions (UDFs) in the system.
* @return A list of {@link Task} objects representing the release UDFs.
Expand Down
14 changes: 14 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/UDFService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import org.dinky.data.model.Resources;
import org.dinky.data.model.udf.UDFManage;
import org.dinky.data.vo.CascaderVO;
import org.dinky.data.vo.UDFManageVO;
import org.dinky.function.data.model.UDF;

import java.util.List;

Expand Down Expand Up @@ -55,4 +57,16 @@ public interface UDFService extends IService<UDFManage> {
*/
@Transactional(rollbackFor = Exception.class)
void addOrUpdateByResourceId(List<Integer> resourceIds);

/**
* get udf from udfManage
* @return List
*/
List<UDFManage> getUDFFromUdfManage();

/**
* get all udf to cascader list
* @return List
*/
List<CascaderVO> getAllUdfsToCascader(List<UDF> userDefinedReleaseUdfs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ public List<CascaderVO> loadConfigOptions() {
List<FlinkConfigOption> flinkConfigOptions = FlinkConfigOptionsUtils.loadOptionsByClassName(name);
String binlogGroup = FlinkConfigOptionsUtils.parsedBinlogGroup(name);
List<CascaderVO> child = flinkConfigOptions.stream()
.map(conf -> new CascaderVO(conf.getKey()))
.map(conf -> new CascaderVO(conf.getKey(), conf.getKey()))
.collect(Collectors.toList());
CascaderVO cascaderVO = new CascaderVO(binlogGroup, child);
dataList.add(cascaderVO);
}

List<CascaderVO> voList = documentService.lambdaQuery().eq(Document::getType, "FLINK_OPTIONS").list().stream()
.map(d -> new CascaderVO(d.getName().replace("set ", "")))
.map(d -> new CascaderVO(
d.getName().replace("set ", ""), d.getName().replace("set ", "")))
.collect(Collectors.toList());

CascaderVO cascaderVO = new CascaderVO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@
import org.dinky.service.SavepointsService;
import org.dinky.service.TaskService;
import org.dinky.service.TaskVersionService;
import org.dinky.service.UDFService;
import org.dinky.service.UDFTemplateService;
import org.dinky.service.UserService;
import org.dinky.service.catalogue.CatalogueService;
import org.dinky.service.resource.ResourcesService;
import org.dinky.service.task.BaseTask;
import org.dinky.utils.FragmentVariableUtils;
import org.dinky.utils.JsonUtils;
Expand All @@ -109,6 +111,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Resource;

Expand Down Expand Up @@ -156,6 +159,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private final DataSourceProperties dsProperties;
private final UserService userService;
private final ApplicationContext applicationContext;
private final UDFService udfService;
private final ResourcesService resourcesService;

@Resource
@Lazy
Expand Down Expand Up @@ -716,21 +721,20 @@ public JobModelOverview getJobStreamingOrBatchModelOverview() {
return baseMapper.getJobStreamingOrBatchModelOverview();
}

@Override
public List<Task> getAllUDF() {
return list(new QueryWrapper<Task>()
.in("dialect", Dialect.JAVA.getValue(), Dialect.SCALA.getValue(), Dialect.PYTHON.getValue())
.eq("enabled", 1)
.isNotNull("save_point_path"));
}

@Override
public List<Task> getReleaseUDF() {
return list(new LambdaQueryWrapper<Task>()
.in(Task::getDialect, Dialect.JAVA.getValue(), Dialect.SCALA.getValue(), Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())
.isNotNull(Task::getSavePointPath));
.in(
Task::getDialect,
Dialect.JAVA.getValue(),
Dialect.SCALA.getValue(),
Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue()))
.stream()
.filter(task -> Asserts.isNotNullString(
task.getConfigJson().getUdfConfig().getClassName()))
.collect(Collectors.toList());
}

@Override
Expand Down
Loading

0 comments on commit 64fe0da

Please sign in to comment.