Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: udf select form #3449

Merged
merged 51 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
d46d0a7
fix: result not have column.
leechor Dec 14, 2022
2cd3f75
feat: udf select form
May 4, 2024
f87e778
chore: reset
May 4, 2024
85f938b
feat: select udf and change name
May 4, 2024
8d3be48
feat: revert
May 4, 2024
736c3c5
feat: name assign from className
May 4, 2024
82be2b0
feat: pass to back
May 5, 2024
19ca45a
feat: insert udfRefer to statement
May 5, 2024
be1675e
style: spotless apply
May 5, 2024
517671d
refactor: move staticUdf function
May 5, 2024
eb9cbf8
refactor: move staticUdf function
May 5, 2024
59bc0dd
feat: add java custom udf cache
May 5, 2024
aeb10d3
feat: change udf combine
May 6, 2024
0cd6c46
Merge branch 'udev' into udfExtracter
May 6, 2024
ebf9d5b
feat: add scan all UserDefinedFunction sub class
May 6, 2024
6507dde
feat: switch server and cache
May 6, 2024
d6101b9
feat: switch server and cache
May 7, 2024
e1f1365
style: spotless apply
May 7, 2024
7fbbe1c
chore: change pages.datastudio.label.udf.tip
May 7, 2024
6615737
Implement automatic injection and detection of duplicate injections
Zzm0809 May 7, 2024
e6d5261
优化下拉框移除udf逻辑
Zzm0809 May 7, 2024
1bb6e2f
Optimize UDF display logic and add list display in UDF management
Zzm0809 May 8, 2024
d6e19a4
added udf manage table column
Zzm0809 May 8, 2024
de3601b
added change sql file
Zzm0809 May 8, 2024
e2fd68c
distinct udf
Zzm0809 May 8, 2024
1ff6bc0
Update JarController.java
Zzm0809 May 10, 2024
5e08fbc
remove some method
Zzm0809 May 10, 2024
a7fcb1f
modify code
Zzm0809 May 10, 2024
db1b464
modify code
Zzm0809 May 10, 2024
79ac1cb
Optimize code
Zzm0809 May 10, 2024
07be131
Optimize code
Zzm0809 May 10, 2024
83f6b1e
Optimize code
Zzm0809 May 10, 2024
ac4970e
Optimize code
Zzm0809 May 10, 2024
4053217
Optimize code
Zzm0809 May 10, 2024
e745b99
Merge branch 'dev' into udfExtracter
Zzm0809 May 10, 2024
8af6505
added confirm tips
Zzm0809 May 14, 2024
7c778b5
format
Zzm0809 May 14, 2024
f9f8e47
Merge branch 'udev' into udfExtracter
May 20, 2024
c05f730
Modify SQL file and README.md
Zzm0809 May 20, 2024
9bfd972
Add Meta Information Table for Upgrading to 1.0.0
Zzm0809 May 20, 2024
aade05a
Optimize the length of resource names (#3513)
Zzm0809 May 20, 2024
95c4045
[Bug] Fix inability to login when tenant is not set up (#3512)
Zzm0809 May 20, 2024
f2a6d77
[Doc] Jar package task submission Demo bug fix (#3517)
ufoe May 22, 2024
9b3e8d8
[feature] add kubernetes deploy (#3516)
tgluon May 22, 2024
654f684
[BugFix]Fix udf compiler problem (#3518)
zackyoungh May 23, 2024
6d3bfcb
[BugFix] [UDF] Fix the issue where UDF cannot find classes in Flink S…
zackyoungh May 23, 2024
a8ec027
Optimize the prompt properties of the front-end dropdown menu and upg…
Zzm0809 May 23, 2024
9312504
[Fix] Fix the issue of cluster instances being unable to be deleted (…
Zzm0809 May 23, 2024
b895052
[BugFix] [UDF] Fix the issue where UDF cannot find classes in Flink S…
zackyoungh May 23, 2024
9fa403d
Merge branch 'refs/heads/dev' into fork/udfExtracter
Zzm0809 May 24, 2024
3ac287c
Optimizing MySQL stored procedures
Zzm0809 May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public class JarController {
@PostMapping("/udf/generateJar")
@ApiOperation("Generate jar")
public Result<Map<String, List<String>>> generateJar() {
List<Task> allUDF = taskService.getAllUDF();
List<Task> allUDF = taskService.getReleaseUDF();
List<UDF> udfCodes = allUDF.stream()
.map(task -> UDF.builder()
.code(task.getStatement())
.className(task.getSavePointPath())
.className(task.getConfigJson().getUdfConfig().getClassName())
.functionLanguage(
FunctionLanguage.valueOf(task.getDialect().toUpperCase()))
.build())
Expand Down
20 changes: 20 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/UDFController.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
import org.dinky.data.model.Resources;
import org.dinky.data.model.udf.UDFManage;
import org.dinky.data.result.Result;
import org.dinky.data.vo.CascaderVO;
import org.dinky.data.vo.UDFManageVO;
import org.dinky.function.data.model.UDF;
import org.dinky.service.TaskService;
import org.dinky.service.UDFService;
import org.dinky.utils.UDFUtils;

import java.util.List;
import java.util.stream.Collectors;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -36,6 +41,7 @@

import cn.dev33.satoken.annotation.SaCheckLogin;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -50,6 +56,7 @@
@RequiredArgsConstructor
public class UDFController {
private final UDFService udfService;
private final TaskService taskService;

/**
* update udf name by id
Expand Down Expand Up @@ -94,4 +101,17 @@ public Result<Void> addOrUpdateByResourceId(@RequestBody CommonDTO<List<Integer>
udfService.addOrUpdateByResourceId(dto.getData());
return Result.succeed();
}

/**
* get all udf and convert its to cascader
* @return {@link Result} of {@link List} of {@link CascaderVO}
*/
@GetMapping("/getAllUdfs")
@ApiOperation("Get All UDFs")
public Result<List<CascaderVO>> getAllUdfsToCascader() {
// get all UDFs of dynamic UDFs(user defined UDFs in the task)
List<UDF> userDefinedReleaseUdfs =
taskService.getReleaseUDF().stream().map(UDFUtils::taskToUDF).collect(Collectors.toList());
return Result.succeed(udfService.getAllUdfsToCascader(userDefinedReleaseUdfs));
}
}
4 changes: 2 additions & 2 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

/**
* StudioExecuteDTO
*
*/
@Getter
@Setter
Expand Down Expand Up @@ -237,10 +236,11 @@ public JobConfig getJobConfig() {

Map<String, String> parsedConfig =
this.configJson == null ? new HashMap<>(0) : this.configJson.getCustomConfigMaps();

Map<String, String> udfRefers = this.configJson == null ? new HashMap<>(0) : this.configJson.getUdfReferMaps();
JobConfig jobConfig = new JobConfig();
BeanUtil.copyProperties(this, jobConfig);
jobConfig.setConfigJson(parsedConfig);
jobConfig.setUdfRefer(udfRefers);
jobConfig.setTaskId(id);
jobConfig.setJobName(name);

Expand Down
3 changes: 3 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/model/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public class Task extends SuperEntity<Task> {
notes = "ID of the version associated with the task")
private Integer versionId;

@ApiModelProperty(value = "Enabled", dataType = "Boolean", example = "true", notes = "Whether the task is enabled")
private Boolean enabled;

@ApiModelProperty(value = "Statement", dataType = "String", notes = "SQL statement for the task")
private String statement;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.ext.ConfigItem;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -49,6 +51,12 @@ public class TaskExtConfig implements Serializable {
notes = "UDF (User-Defined Function) configuration for the task")
private TaskUdfConfig udfConfig;

@ApiModelProperty(
value = "UDF Refer",
dataType = "TaskUdfRefer",
notes = "UDF (User-Defined Function) reference for the task")
private List<TaskUdfRefer> udfRefer;

@ApiModelProperty(
value = "Custom Config",
dataType = "List<ConfigItem>",
Expand Down Expand Up @@ -80,6 +88,25 @@ public Map<String, String> getCustomConfigMaps() {
: new HashMap<>();
}

// udfRefer-value的所有key-value
@JsonIgnore
public Map<String, String> getUdfReferMaps() {
return Asserts.isNotNullCollection(udfRefer)
? udfRefer.stream()
.filter(item -> item.getClassName() != null)
.map(t -> {
if (StringUtils.isEmpty(t.getName())) {
String name = t.getClassName()
.substring(t.getClassName().lastIndexOf(".") + 1);
name = name.substring(0, 1).toLowerCase() + name.substring(1);
t.setName(name);
}
return t;
})
.collect(Collectors.toConcurrentMap(TaskUdfRefer::getClassName, TaskUdfRefer::getName))
: new HashMap<>();
}

// 是否包含某个key
public boolean containsKey(String key) {
return customConfig.stream().anyMatch(item -> item.getKey().equals(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@AllArgsConstructor
@NoArgsConstructor
public class TaskUdfConfig implements Serializable {
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = -5981544561742928810L;

@ApiModelProperty(value = "Template ID", dataType = "Integer", example = "1", notes = "ID of the UDF template")
private Integer templateId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.model.ext;

import java.io.Serializable;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@ApiModel(value = "TaskUdfRefer", description = "UDF (User-Defined Function) refer for Task")
@AllArgsConstructor
@NoArgsConstructor
public class TaskUdfRefer implements Serializable {

@ApiModelProperty(value = "function name", dataType = "String", example = "add", notes = "Nmae of the UDF function")
private String name;

@ApiModelProperty(value = "Class Name", dataType = "String", notes = "Name of the UDF class")
private String className;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class UDFManage extends SuperEntity<UDFManage> {
@ApiModelProperty(value = "Class Name", dataType = "String", notes = "Class Name")
private String className;

@ApiModelProperty(value = "Language", dataType = "String", notes = "Language")
private String language;

@ApiModelProperty(value = "Task Id", dataType = "Integer", notes = "Task Id")
private Integer taskId;

Expand Down
12 changes: 11 additions & 1 deletion dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,19 @@ public CascaderVO(String label) {
this.value = label;
}

public CascaderVO(String label, String value) {
this.label = label;
this.value = value;
}

public CascaderVO(String label, List<CascaderVO> children) {
this.label = label;
this.value = label;
this.children = children;
}

public CascaderVO(String label, String value, List<CascaderVO> children) {
this.label = label;
this.value = value;
this.children = children;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class UDFManageVO implements Serializable {
private String name;
private Boolean enabled;
private String className;
private String language;
private Integer taskId;
private Integer resourcesId;
/**
Expand Down
1 change: 0 additions & 1 deletion dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public class SystemInit implements ApplicationRunner {
private final TenantService tenantService;
private final GitProjectService gitProjectService;
private final ScheduleThreadPool schedule;

private static Project project;

@Override
Expand Down
7 changes: 0 additions & 7 deletions dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ public interface TaskService extends ISuperService<Task> {
*/
Task initDefaultFlinkSQLEnv(Integer tenantId);

/**
* Get a list of all user-defined functions (UDFs) in the system.
*
* @return A list of {@link Task} objects representing the UDFs.
*/
List<Task> getAllUDF();

/**
* Get a list of all release user-defined functions (UDFs) in the system.
* @return A list of {@link Task} objects representing the release UDFs.
Expand Down
14 changes: 14 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/UDFService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import org.dinky.data.model.Resources;
import org.dinky.data.model.udf.UDFManage;
import org.dinky.data.vo.CascaderVO;
import org.dinky.data.vo.UDFManageVO;
import org.dinky.function.data.model.UDF;

import java.util.List;

Expand Down Expand Up @@ -55,4 +57,16 @@ public interface UDFService extends IService<UDFManage> {
*/
@Transactional(rollbackFor = Exception.class)
void addOrUpdateByResourceId(List<Integer> resourceIds);

/**
* get udf from udfManage
* @return List
*/
List<UDFManage> getUDFFromUdfManage();

/**
* get all udf to cascader list
* @return List
*/
List<CascaderVO> getAllUdfsToCascader(List<UDF> userDefinedReleaseUdfs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ public List<CascaderVO> loadConfigOptions() {
List<FlinkConfigOption> flinkConfigOptions = FlinkConfigOptionsUtils.loadOptionsByClassName(name);
String binlogGroup = FlinkConfigOptionsUtils.parsedBinlogGroup(name);
List<CascaderVO> child = flinkConfigOptions.stream()
.map(conf -> new CascaderVO(conf.getKey()))
.map(conf -> new CascaderVO(conf.getKey(), conf.getKey()))
.collect(Collectors.toList());
CascaderVO cascaderVO = new CascaderVO(binlogGroup, child);
dataList.add(cascaderVO);
}

List<CascaderVO> voList = documentService.lambdaQuery().eq(Document::getType, "FLINK_OPTIONS").list().stream()
.map(d -> new CascaderVO(d.getName().replace("set ", "")))
.map(d -> new CascaderVO(
d.getName().replace("set ", ""), d.getName().replace("set ", "")))
.collect(Collectors.toList());

CascaderVO cascaderVO = new CascaderVO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@
import org.dinky.service.SavepointsService;
import org.dinky.service.TaskService;
import org.dinky.service.TaskVersionService;
import org.dinky.service.UDFService;
import org.dinky.service.UDFTemplateService;
import org.dinky.service.UserService;
import org.dinky.service.catalogue.CatalogueService;
import org.dinky.service.resource.ResourcesService;
import org.dinky.service.task.BaseTask;
import org.dinky.utils.FragmentVariableUtils;
import org.dinky.utils.JsonUtils;
Expand All @@ -109,6 +111,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Resource;

Expand Down Expand Up @@ -156,6 +159,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private final DataSourceProperties dsProperties;
private final UserService userService;
private final ApplicationContext applicationContext;
private final UDFService udfService;
private final ResourcesService resourcesService;

@Resource
@Lazy
Expand Down Expand Up @@ -716,21 +721,20 @@ public JobModelOverview getJobStreamingOrBatchModelOverview() {
return baseMapper.getJobStreamingOrBatchModelOverview();
}

@Override
public List<Task> getAllUDF() {
return list(new QueryWrapper<Task>()
.in("dialect", Dialect.JAVA.getValue(), Dialect.SCALA.getValue(), Dialect.PYTHON.getValue())
.eq("enabled", 1)
.isNotNull("save_point_path"));
}

@Override
public List<Task> getReleaseUDF() {
return list(new LambdaQueryWrapper<Task>()
.in(Task::getDialect, Dialect.JAVA.getValue(), Dialect.SCALA.getValue(), Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())
.isNotNull(Task::getSavePointPath));
.in(
Task::getDialect,
Dialect.JAVA.getValue(),
Dialect.SCALA.getValue(),
Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue()))
.stream()
.filter(task -> Asserts.isNotNullString(
task.getConfigJson().getUdfConfig().getClassName()))
.collect(Collectors.toList());
}

@Override
Expand Down