Skip to content

Commit

Permalink
Merge branch 'dev-1.4.0' into dev-1.4.0-HiveEngineConn-support-concur…
Browse files Browse the repository at this point in the history
…rent

* dev-1.4.0:
  support map string value (apache#4409)
  Improve exception information and add path information (apache#4351)
  [fix bug] The s3a file cannot be written because FileSystem is closed prematurely (apache#4375)
  Optimization of upload file interface in FsRestfulApi.java (apache#4357)
  • Loading branch information
yyc35 committed Mar 28, 2023
2 parents a518042 + 1e25954 commit ecdf1c6
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 32 deletions.
Expand Up @@ -61,6 +61,7 @@ public static String excelToCsv(
throws Exception {
String hdfsPath =
"/tmp/" + StorageUtils.getJvmUser() + "/" + System.currentTimeMillis() + ".csv";
LOG.info("The excel to csv with hdfsPath:" + hdfsPath);
ExcelXlsReader xlsReader = new ExcelXlsReader();
RowToCsvDeal rowToCsvDeal = new RowToCsvDeal();
OutputStream out = null;
Expand Down
Expand Up @@ -24,8 +24,13 @@
import java.io.File;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FileSystem implements Fs {

private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);

protected String user;
private String defaultFilePerm = "rwxr-----"; // 740
private String defaultFolderPerm = "rwxr-x---"; // 750
Expand Down Expand Up @@ -94,6 +99,7 @@ protected FsPath getParentPath(String path) {
} else {
parentPath = path.substring(0, path.lastIndexOf("/"));
}
LOG.info("Get Parent Path:" + parentPath);
return new FsPath(parentPath);
}

Expand Down
Expand Up @@ -75,12 +75,14 @@ public String listRoot() throws IOException {
@Override
public long getTotalSpace(FsPath dest) throws IOException {
String path = dest.getPath();
LOG.info("Get total space with path:" + path);
return new File(path).getTotalSpace();
}

@Override
public long getFreeSpace(FsPath dest) throws IOException {
String path = dest.getPath();
LOG.info("Get free space with path:" + path);
return new File(path).getFreeSpace();
}

Expand Down Expand Up @@ -117,6 +119,7 @@ public boolean setOwner(FsPath dest, String user, String group) throws IOExcepti

@Override
public boolean setOwner(FsPath dest, String user) throws IOException {
LOG.info("Set owner with path:" + dest.getPath() + "and user:" + user);
if (!StorageUtils.isIOProxy()) {
LOG.info("io not proxy, setOwner skip");
return true;
Expand All @@ -133,6 +136,7 @@ public boolean setOwner(FsPath dest, String user) throws IOException {

@Override
public boolean setGroup(FsPath dest, String group) throws IOException {
LOG.info("Set group with path:" + dest.getPath() + "and group:" + user);
if (!StorageUtils.isIOProxy()) {
LOG.info("io not proxy, setGroup skip");
return true;
Expand All @@ -155,6 +159,7 @@ public boolean mkdir(FsPath dest) throws IOException {
@Override
public boolean mkdirs(FsPath dest) throws IOException {
String path = dest.getPath();
LOG.info("Try to mkdirs with path:" + path);
File file = new File(path);
// Create parent directories one by one and set their permissions to rwxrwxrwx.
Stack<File> dirsToMake = new Stack<File>();
Expand Down Expand Up @@ -182,6 +187,7 @@ public boolean mkdirs(FsPath dest) throws IOException {
}

public boolean canMkdir(FsPath destParentDir) throws IOException {
LOG.info("Try to check if the directory can be created with path:" + destParentDir.getPath());
if (!StorageUtils.isIOProxy()) {
LOG.debug("io not proxy, not check owner, just check if have write permission ");
return this.canWrite(destParentDir);
Expand All @@ -203,6 +209,7 @@ public boolean canMkdir(FsPath destParentDir) throws IOException {
@Override
public boolean copy(String origin, String dest) throws IOException {
File file = new File(dest);
LOG.info("Try to copy file from:" + origin + " to dest:" + dest);
if (!isOwner(file.getParent())) {
throw new IOException("you have on permission to create file " + dest);
}
Expand All @@ -225,6 +232,7 @@ public boolean copy(String origin, String dest) throws IOException {

@Override
public boolean setPermission(FsPath dest, String permission) throws IOException {
LOG.info("Try to set permission dest with path:" + dest.getPath());
if (!StorageUtils.isIOProxy()) {
LOG.info("io not proxy, setPermission as parent.");
try {
Expand All @@ -251,6 +259,7 @@ public boolean setPermission(FsPath dest, String permission) throws IOException
public FsPathListWithError listPathWithError(FsPath path) throws IOException {
File file = new File(path.getPath());
File[] files = file.listFiles();
LOG.info("Try to list path:" + path.getPath() + " with error msg");
if (files != null) {
List<FsPath> rtn = new ArrayList();
String message = "";
Expand Down Expand Up @@ -294,6 +303,7 @@ public void init(Map<String, String> properties) throws IOException {
String groupInfo;
try {
groupInfo = Utils.exec(new String[] {"id", user});
LOG.info("Get groupinfo:" + groupInfo + " with shell command: id " + user);
} catch (RuntimeException e) {
group = user;
return;
Expand Down Expand Up @@ -322,7 +332,7 @@ public FsPath get(String dest) throws IOException {
} else {
fsPath = new FsPath(dest);
}

LOG.info("Try to get FsPath with path:" + fsPath.getPath());
PosixFileAttributes attr = null;
try {
attr = Files.readAttributes(Paths.get(fsPath.getPath()), PosixFileAttributes.class);
Expand Down Expand Up @@ -365,7 +375,7 @@ public OutputStream write(FsPath dest, boolean overwrite) throws IOException {

@Override
public boolean create(String dest) throws IOException {

LOG.info("try to create file with path:" + dest);
File file = new File(dest);
if (!isOwner(file.getParent())) {
throw new IOException("you have on permission to create file " + dest);
Expand All @@ -391,6 +401,7 @@ public boolean create(String dest) throws IOException {
public List<FsPath> list(FsPath path) throws IOException {
File file = new File(path.getPath());
File[] files = file.listFiles();
LOG.info("Try to get file list with path:" + path.getPath());
if (files != null) {
List<FsPath> rtn = new ArrayList();
for (File f : files) {
Expand Down
Expand Up @@ -108,7 +108,7 @@ class DefaultResultSetFactory extends ResultSetFactory with Logging {
if (StringUtils.isEmpty(resultSetType)) {
throw new StorageWarnException(
THE_FILE_IS_EMPTY.getErrorCode,
s"The file (${fsPath.getPath}) is empty(文件(${fsPath.getPath}) 为空)"
MessageFormat.format(THE_FILE_IS_EMPTY.getErrorDesc, fsPath.getPath)
)
}
Utils.tryQuietly(inputStream.close())
Expand Down
Expand Up @@ -19,14 +19,15 @@ package org.apache.linkis.storage.resultset

import org.apache.linkis.common.io.{FsPath, MetaData, Record}
import org.apache.linkis.common.io.resultset.{ResultSet, ResultSetReader}
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.storage.FSFactory
import org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TABLE_ARE_NOT_SUPPORTED
import org.apache.linkis.storage.exception.StorageErrorException
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord, TableResultSet}

import java.io.InputStream

object ResultSetReader {
object ResultSetReader extends Logging {

def getResultSetReader[K <: MetaData, V <: Record](
resultSet: ResultSet[K, V],
Expand Down Expand Up @@ -83,6 +84,7 @@ object ResultSetReader {
)
}
val fs = FSFactory.getFs(resPath)
logger.info("Try to init Fs with path:" + resPath.getPath)
fs.init(null)
ResultSetReader.getResultSetReader(resultSet.asInstanceOf[TableResultSet], fs.read(resPath))
}
Expand Down
Expand Up @@ -34,6 +34,7 @@ abstract class StorageResultSet[K <: MetaData, V <: Record] extends ResultSet[K,
} else {
parentDir.toPath + "/" + fileName + Dolphin.DOLPHIN_FILE_SUFFIX
}
logger.info(s"Get result set path:${path}")
new FsPath(path)
}

Expand Down
Expand Up @@ -86,6 +86,7 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
WRITER_LOCK_CREATE.synchronized {
if (!fileCreated) {
if (storePath != null && outputStream == null) {
logger.info(s"Try to create a new file:${storePath}, with proxy user:${proxyUser}")
fs = FSFactory.getFsByProxyUser(storePath, proxyUser)
fs.init(null)
FileSystemUtils.createNewFile(storePath, proxyUser, true)
Expand Down Expand Up @@ -187,11 +188,11 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
}
}
Utils.tryFinally(if (outputStream != null) flush()) {
closeFs
if (outputStream != null) {
IOUtils.closeQuietly(outputStream)
outputStream = null
}
closeFs
}
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.linkis.manager.label.entity.Feature;
import org.apache.linkis.manager.label.entity.GenericLabel;
import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
import org.apache.linkis.manager.label.utils.LabelUtils;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -77,16 +78,27 @@ public Boolean isEmpty() {

@Override
protected void setStringValue(String stringValue) {
String version;
String engineType = stringValue.split("-")[0];

if (engineType.equals("*")) {
version = stringValue.replaceFirst("[" + engineType + "]-", "");
if (StringUtils.isNotBlank(stringValue)) {
HashMap<String, String> valueMap = LabelUtils.Jackson.fromJson(stringValue, HashMap.class);
if (valueMap == null) {
String version;
String engineType = stringValue.split("-")[0];

if (engineType.equals("*")) {
version = stringValue.replaceFirst("[" + engineType + "]-", "");
} else {
version = stringValue.replaceFirst(engineType + "-", "");
}

setEngineType(engineType);
setVersion(version);
} else {
setEngineType(valueMap.get("engineType"));
setVersion(valueMap.get("version"));
}
} else {
version = stringValue.replaceFirst(engineType + "-", "");
setEngineType("*");
setVersion("*");
}

setEngineType(engineType);
setVersion(version);
}
}
Expand Up @@ -28,20 +28,38 @@ public class EngineTypeLabelTest {

@Test
public void testSetStringValue() {
String engineType = "hive";
String version = "1.1.0-cdh5.12.0";

String engineType1 = "*";
String version1 = "*";

LabelBuilderFactory labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory();
EngineTypeLabel engineTypeLabel = labelBuilderFactory.createLabel(EngineTypeLabel.class);
engineTypeLabel.setStringValue(engineType + "-" + version);
Assertions.assertEquals(engineTypeLabel.getEngineType(), engineType);
Assertions.assertEquals(engineTypeLabel.getVersion(), version);

engineTypeLabel.setStringValue(engineType1 + "-" + version1);
Assertions.assertEquals(engineTypeLabel.getEngineType(), engineType1);
Assertions.assertEquals(engineTypeLabel.getVersion(), version1);
// str value
String hiveEngineType = "hive";
String hiveVersion = "1.1.0-cdh5.12.0";
engineTypeLabel.setStringValue(hiveEngineType + "-" + hiveVersion);
Assertions.assertEquals(engineTypeLabel.getEngineType(), hiveEngineType);
Assertions.assertEquals(engineTypeLabel.getVersion(), hiveVersion);

// any value
String anyEngineType = "*";
String anyVersion = "*";
engineTypeLabel.setStringValue(anyEngineType + "-" + anyVersion);
Assertions.assertEquals(engineTypeLabel.getEngineType(), anyEngineType);
Assertions.assertEquals(engineTypeLabel.getVersion(), anyVersion);

// map value
String mapStringValue = "{\"engineType\":\"shell\",\"version\":\"1\"}";
engineTypeLabel.setStringValue(mapStringValue);
Assertions.assertEquals(engineTypeLabel.getEngineType(), "shell");
Assertions.assertEquals(engineTypeLabel.getVersion(), "1");

// empty value will treat as *
String emptyStringValue = "";
engineTypeLabel.setStringValue(emptyStringValue);
Assertions.assertEquals(engineTypeLabel.getEngineType(), "*");
Assertions.assertEquals(engineTypeLabel.getVersion(), "*");

// null value will treat as *
engineTypeLabel.setStringValue(null);
Assertions.assertEquals(engineTypeLabel.getEngineType(), "*");
Assertions.assertEquals(engineTypeLabel.getVersion(), "*");
}
}
4 changes: 3 additions & 1 deletion linkis-dist/package/conf/linkis.properties
Expand Up @@ -87,4 +87,6 @@ linkis.session.redis.port=6379
# redis password
linkis.session.redis.password=test123
# redis sso switch
linkis.session.redis.cache.enabled=false
linkis.session.redis.cache.enabled=false
wds.linkis.workspace.filesystem.owner.check=true
wds.linkis.workspace.filesystem.path.check=true
Expand Up @@ -319,8 +319,8 @@ public Message upload(
FileSystem fileSystem = fsService.getFileSystem(userName, fsPath);
for (MultipartFile p : files) {
String fileName = p.getOriginalFilename();
WorkspaceUtil.charCheckFileName(fileName);
FsPath fsPathNew = new FsPath(fsPath.getPath() + "/" + fileName);
WorkspaceUtil.fileAndDirNameSpecialCharCheck(fsPathNew.getPath());
fileSystem.createNewFile(fsPathNew);
try (InputStream is = p.getInputStream();
OutputStream outputStream = fileSystem.write(fsPathNew, true)) {
Expand Down Expand Up @@ -442,7 +442,7 @@ public void download(
// downloaded(判断目录,目录不能下载)
FileSystem fileSystem = fsService.getFileSystem(userName, fsPath);
if (!fileSystem.exists(fsPath)) {
throw WorkspaceExceptionManager.createException(8011, path);
throw WorkspaceExceptionManager.createException(80011, path);
}
inputStream = fileSystem.read(fsPath);
byte[] buffer = new byte[1024];
Expand Down
Expand Up @@ -84,14 +84,18 @@ public static String suffixTuning(String path) {

public static void fileAndDirNameSpecialCharCheck(String path) throws WorkSpaceException {
String name = new File(path).getName();
int i = name.lastIndexOf(".");
charCheckFileName(name);
}

public static void charCheckFileName(String fileName) throws WorkSpaceException {
int i = fileName.lastIndexOf(".");
if (i != -1) {
name = name.substring(0, i);
fileName = fileName.substring(0, i);
}
// Only support numbers, uppercase letters, underscores, Chinese(只支持数字,字母大小写,下划线,中文)
String specialRegEx = "^[\\w\\u4e00-\\u9fa5]{1,200}$";
Pattern specialPattern = Pattern.compile(specialRegEx);
if (!specialPattern.matcher(name).find()) {
if (!specialPattern.matcher(fileName).find()) {
WorkspaceExceptionManager.createException(80028);
}
}
Expand Down

0 comments on commit ecdf1c6

Please sign in to comment.