Skip to content

Commit

Permalink
[Plugin] Support apache hadoop hdfs (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
qianmoQ committed Apr 28, 2023
2 parents 8e760c7 + 47513e9 commit 8e39640
Show file tree
Hide file tree
Showing 43 changed files with 1,096 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bofore_checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
java-version: '11'
distribution: 'temurin'
- run: chmod 755 ./mvnw
- run: ./mvnw -T 1C clean install checkstyle:checkstyle -Dfindbugs.skip -Dgpg.skip -Dskip.yarn -DskipTests=true
- run: ./mvnw clean install checkstyle:checkstyle -Dfindbugs.skip -Dgpg.skip -Dskip.yarn -DskipTests=true

before_checker_bugs:
runs-on: ubuntu-latest
Expand All @@ -42,7 +42,7 @@ jobs:
java-version: '11'
distribution: 'temurin'
- run: chmod 755 ./mvnw
- run: ./mvnw -T 1C clean install findbugs:findbugs -Dcheckstyle.skip -Dgpg.skip -Dskip.yarn -DskipTests=true
- run: ./mvnw clean install findbugs:findbugs -Dcheckstyle.skip -Dgpg.skip -Dskip.yarn -DskipTests=true

before_checker_package:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ list

# shaded #
shaded/*/dependency-reduced-pom.xml

# datacap #
cache/
config/
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ Here are some of the major database solutions that are supported:
</a>&nbsp;
<a href="https://www.alibabacloud.com/product/hologres" target="_blank" class="connector-logo-index">
<img src="docs/docs/assets/plugin/hologres.png" alt="Hologres" height=60" />
</a>&nbsp;
<a href="https://hadoop.apache.org/" target="_blank" class="connector-logo-index">
<img src="docs/docs/assets/plugin/hdfs.png" alt="Apache Hdfs" height=60" />
</a>
</p>

Expand Down
5 changes: 5 additions & 0 deletions core/datacap-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@
<artifactId>datacap-jdbc-hologres</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-native-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Executor -->
<dependency>
<groupId>io.edurt.datacap</groupId>
Expand Down
4 changes: 4 additions & 0 deletions core/datacap-server/src/main/etc/conf/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ spring.redis.database=0
### If this directory is not set, the system will get the project root directory to build the data subdirectory
datacap.executor.data=
datacap.executor.seatunnel.home=/opt/lib/seatunnel

################################ Upload configure #################################
datacap.config.data=
datacap.cache.data=
26 changes: 26 additions & 0 deletions core/datacap-server/src/main/etc/conf/plugins/native/hdfs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: HDFS
supportTime: '2023-04-27'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: '-'
disabled: true
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 1
disabled: true
message: port is a required field, please be sure to enter
- field: file
type: File
required: true
value: []
group: advanced
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public static Configure preparedConfigure(List<IConfigureField> configures)
case configures:
configure.setEnv(Optional.ofNullable(IConfigureCommon.getMapValue(configures, IConfigureFieldName.configures)));
break;
case file:
configure.setUsedConfig(true);
break;
}
});
configure.setFormat(FormatType.JSON);
Expand Down Expand Up @@ -149,6 +152,9 @@ public static SourceEntity preparedSourceEntity(List<IConfigureField> configures
case configures:
configure.setConfigure(JSON.toJSON(IConfigureCommon.getMapValue(configures, IConfigureFieldName.configures)));
break;
case file:
configure.setUsedConfig(true);
break;
}
});
configure.setCreateTime(Timestamp.from(Instant.now()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package io.edurt.datacap.server.controller.user;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.server.body.SharedSourceBody;
import io.edurt.datacap.server.common.Response;
import io.edurt.datacap.server.entity.PageEntity;
import io.edurt.datacap.server.entity.PluginEntity;
import io.edurt.datacap.server.entity.SourceEntity;
import io.edurt.datacap.server.entity.UserEntity;
import io.edurt.datacap.server.security.UserDetailsService;
import io.edurt.datacap.server.service.SourceService;
import io.edurt.datacap.server.validation.ValidationGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
Expand All @@ -16,22 +23,30 @@
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;

@SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
@RestController()
@RequestMapping(value = "/api/v1/source")
@Slf4j
public class SourceController
{
private final SourceService sourceService;
private final Environment environment;

public SourceController(SourceService sourceService)
public SourceController(SourceService sourceService, Environment environment)
{
this.sourceService = sourceService;
this.environment = environment;
}

@Deprecated
Expand All @@ -49,8 +64,7 @@ public Response<SourceEntity> update(@RequestBody @Validated(ValidationGroup.Cru
}

@GetMapping
public Response<PageEntity<SourceEntity>> getAll(@RequestParam(value = "page", defaultValue = "1") int start,
@RequestParam(value = "size", defaultValue = "10") int end)
public Response<PageEntity<SourceEntity>> getAll(@RequestParam(value = "page", defaultValue = "1") int start, @RequestParam(value = "size", defaultValue = "10") int end)
{
return this.sourceService.getAll(start, end);
}
Expand Down Expand Up @@ -86,4 +100,38 @@ public Response<Object> shared(@RequestBody SharedSourceBody configure)
{
return this.sourceService.shared(configure);
}

@SneakyThrows
@PostMapping("uploadFile")
public Response<String> uploadFile(@RequestParam("file") MultipartFile file, @RequestHeader("PluginType") String pluginType)
{
UserEntity user = UserDetailsService.getUser();

String cacheHome = environment.getProperty("datacap.cache.data");
if (StringUtils.isEmpty(cacheHome)) {
cacheHome = String.join(File.separator, System.getProperty("user.dir"), "cache");
}
String userCacheHome = String.join(File.separator, cacheHome, user.getUsername(), pluginType);

String originalFilename = file.getOriginalFilename();
File targetFile = new File(String.join(File.separator, userCacheHome, originalFilename));
if (!targetFile.getParentFile().exists()) {
targetFile.getParentFile().mkdirs();
}
else {
// If you already have cache files, clean and delete all files
File[] files = targetFile.getParentFile().listFiles();
for (File f : files) {
log.info("Removing cache file {} state {}", f.getName(), f.delete());
}
}

try {
file.transferTo(targetFile);
}
catch (IOException e) {
log.warn("File upload exception on user {} by type {} ", user.getUsername(), pluginType, e);
}
return Response.success(targetFile.getPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public class SourceEntity
@Transient
private IConfigure schema;

@Column(name = "used_config")
private boolean usedConfig;

@ManyToOne
@JoinColumn(name = "user_id")
@JsonIncludeProperties(value = {"id", "username"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ public enum IConfigureFieldName
ssl,
catalog,
database,
file,
configures
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ public enum IConfigureFieldType
List,
Boolean,
Array,
Map
Map,
File
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import io.edurt.datacap.spi.Plugin;
import io.edurt.datacap.spi.model.Configure;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import java.io.File;
import java.util.Optional;

@Service
Expand All @@ -26,11 +28,13 @@ public class ExecuteServiceImpl
{
private final Injector injector;
private final SourceRepository sourceRepository;
private final Environment environment;

public ExecuteServiceImpl(Injector injector, SourceRepository sourceRepository)
public ExecuteServiceImpl(Injector injector, SourceRepository sourceRepository, Environment environment)
{
this.injector = injector;
this.sourceRepository = sourceRepository;
this.environment = environment;
}

@AuditPlugin
Expand Down Expand Up @@ -59,6 +63,16 @@ public Response<Object> execute(ExecuteEntity configure)
_configure.setSsl(Optional.ofNullable(entity.getSsl()));
_configure.setEnv(Optional.ofNullable(entity.getConfigures()));
_configure.setFormat(configure.getFormat());
_configure.setUsedConfig(entity.isUsedConfig());
if (entity.isUsedConfig()) {
_configure.setUsername(Optional.of(entity.getUser().getUsername()));
String configHome = environment.getProperty("datacap.config.data");
if (StringUtils.isEmpty(configHome)) {
configHome = String.join(File.separator, System.getProperty("user.dir"), "config");
}
_configure.setHome(configHome);
_configure.setId(String.valueOf(entity.getId()));
}
plugin.connect(_configure);
io.edurt.datacap.spi.model.Response response = plugin.execute(configure.getContent());
plugin.destroy();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.edurt.datacap.server.service.impl;

import com.google.common.io.Files;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.server.adapter.PageRequestAdapter;
import io.edurt.datacap.server.body.SharedSourceBody;
import io.edurt.datacap.server.body.SourceBody;
Expand All @@ -24,12 +26,15 @@
import io.edurt.datacap.spi.FormatType;
import io.edurt.datacap.spi.Plugin;
import io.edurt.datacap.spi.model.Configure;
import lombok.SneakyThrows;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -39,6 +44,7 @@
import java.util.stream.Collectors;

@Service
@SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
public class SourceServiceImpl
implements SourceService
{
Expand Down Expand Up @@ -72,10 +78,23 @@ public Response<PageEntity<SourceEntity>> getAll(int offset, int limit)
return Response.success(PageEntity.build(this.sourceRepository.findAllByUserOrPublishIsTrue(user, pageable)));
}

@SneakyThrows
@Override
public Response<Long> delete(Long id)
{
this.sourceRepository.deleteById(id);
Optional<SourceEntity> entityOptional = this.sourceRepository.findById(id);
if (entityOptional.isPresent()) {
SourceEntity source = entityOptional.get();
if (source.isUsedConfig()) {
String configHome = environment.getProperty("datacap.config.data");
if (StringUtils.isEmpty(configHome)) {
configHome = String.join(File.separator, System.getProperty("user.dir"), "config");
}
String destination = String.join(File.separator, configHome, source.getUser().getUsername(), source.getType(), String.valueOf(source.getId()));
FileUtils.deleteDirectory(new File(destination));
}
this.sourceRepository.deleteById(id);
}
return Response.success(id);
}

Expand Down Expand Up @@ -183,6 +202,15 @@ public Response<Object> testConnectionV2(SourceBody configure)
// The filter parameter value is null data
List<IConfigureField> applyConfigures = IConfigureCommon.filterNotEmpty(configure.getConfigure().getConfigures());
Configure _configure = IConfigureCommon.preparedConfigure(applyConfigures);
// Adapter file configure
if (_configure.isUsedConfig()) {
String cacheHome = environment.getProperty("datacap.cache.data");
if (StringUtils.isEmpty(cacheHome)) {
cacheHome = String.join(File.separator, System.getProperty("user.dir"), "cache");
}
_configure.setHome(cacheHome);
_configure.setUsername(Optional.of(UserDetailsService.getUser().getUsername()));
}
plugin.connect(_configure);
io.edurt.datacap.spi.model.Response response = plugin.execute(plugin.validator());
if (response.getIsSuccessful()) {
Expand Down Expand Up @@ -218,10 +246,38 @@ public Response<SourceEntity> saveOrUpdateV2(SourceBody configure)
source.setProtocol(configure.getType());
source.setType(configure.getName());
source.setUser(UserDetailsService.getUser());
if (ObjectUtils.isNotEmpty(configure.getId())) {
if (ObjectUtils.isNotEmpty(configure.getId()) && configure.getId() > 0) {
source.setId(configure.getId());
}
return Response.success(this.sourceRepository.save(source));

this.sourceRepository.save(source);
// Copy file to local data
if (source.isUsedConfig()) {
String cacheHome = environment.getProperty("datacap.cache.data");
if (StringUtils.isEmpty(cacheHome)) {
cacheHome = String.join(File.separator, System.getProperty("user.dir"), "cache");
}
String configHome = environment.getProperty("datacap.config.data");
if (StringUtils.isEmpty(configHome)) {
configHome = String.join(File.separator, System.getProperty("user.dir"), "config");
}
File sourceFile = new File(String.join(File.separator, cacheHome, source.getUser().getUsername(), source.getType()));
String destination = String.join(File.separator, configHome, source.getUser().getUsername(), source.getType(), String.valueOf(source.getId()));
File directory = new File(destination);
try {
if (!directory.exists()) {
directory.mkdirs();
}
for (File file : sourceFile.listFiles()) {
Files.copy(file, new File(String.join(File.separator, destination, file.getName())));
}
FileUtils.deleteDirectory(sourceFile);
}
catch (Exception e) {
return Response.failure("Copy failed: " + e.getMessage());
}
}
return Response.success(source);
}

@Override
Expand Down
Loading

0 comments on commit 8e39640

Please sign in to comment.