Skip to content

Commit

Permalink
fix bug: when create duplicated table, the original stream table can'…
Browse files Browse the repository at this point in the history
…t be deleted
  • Loading branch information
liukun4515 committed Mar 8, 2021
1 parent 66e70c9 commit cbaa2a8
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.UUID;

import org.apache.commons.lang.StringUtils;
import org.apache.directory.api.util.Strings;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -109,6 +110,16 @@ public List<StreamingSourceConfig> getStreamings(@RequestParam(value = "table",
@RequestParam(value = "limit", required = false) Integer limit,
@RequestParam(value = "offset", required = false) Integer offset) {
try {
// query all streaming config or query one streaming config
if (!Strings.isEmpty(table) && !Strings.isEmpty(project)) {
// check the table metadata
if (tableService.getTableDescByName(table, false, project) == null) {
// the table metadata doesn't exist
throw new InternalErrorException(String.format(Locale.ROOT,
"The table %s of project %s doesn't exist, please make the stream table exists",
table, project));
}
}
return streamingService.getStreamingConfigs(table, project, limit, offset);
} catch (IOException e) {
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
Expand Down Expand Up @@ -142,10 +153,15 @@ public StreamingRequestV2 saveStreamingConfig(@RequestBody StreamingRequestV2 st
try {
try {
tableDesc.setUuid(UUID.randomUUID().toString());
if (tableService.getTableDescByName(tableDesc.getIdentity(), false, project) != null) {
throw new IOException(String.format(Locale.ROOT,
"The table %s of project %s exists",
tableDesc.getIdentity(), project));
}
tableService.loadTableToProject(tableDesc, null, project);
saveTableSuccess = true;
} catch (IOException e) {
throw new BadRequestException("Failed to add streaming table.");
throw new BadRequestException("Failed to add streaming table, because of " + e.getMessage());
}
try {
streamingSourceConfig.setName(tableDesc.getIdentity());
Expand All @@ -161,7 +177,8 @@ public StreamingRequestV2 saveStreamingConfig(@RequestBody StreamingRequestV2 st
if (!saveTableSuccess || !saveStreamingSuccess) {
if (saveTableSuccess) {
try {
tableService.unloadHiveTable(tableDesc.getIdentity(), project);
// just drop the table metadata and don't drop the stream source config info
tableService.unloadHiveTable(tableDesc.getIdentity(), project, false);
} catch (IOException e) {
shouldThrow = new InternalErrorException(
"Action failed and failed to rollback the create table " + e.getLocalizedMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Map<String, String[]> unLoadHiveTables(@PathVariable String tables, @Path
try {
for (String tableName : StringUtil.splitByComma(tables)) {
tableACLService.deleteFromTableACLByTbl(project, tableName);
if (tableService.unloadHiveTable(tableName, project)) {
if (tableService.unloadHiveTable(tableName, project, true)) {
unLoadSuccess.add(tableName);
} else {
unLoadFail.add(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public List<ProjectInstance> listAllProjects(final Integer limit, final Integer
public void deleteProject(String projectName, ProjectInstance project) throws IOException {
Set<String> tables = project.getTables();
for (String table : Sets.newTreeSet(tables)) {
tableService.unloadHiveTable(table, projectName);
tableService.unloadHiveTable(table, projectName, true);
getTableManager().removeTableExt(table, projectName);
getTableACLManager().deleteTableACLByTbl(projectName, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public List<StreamingSourceConfig> getStreamingConfigs(final String table, final
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
+ " or hasPermission(#project, 'ADMINISTRATION')")
public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig config, ProjectInstance project) throws IOException {
if (getStreamingManagerV2().getConfig(config.getName(), config.getProjectName()) != null) {
if (getStreamingManagerV2().getConfigMustWithProject(config.getName(), config.getProjectName()) != null) {
throw new InternalErrorException("The streamingSourceConfig named " + config.getName() + " already exists");
}
StreamingSourceConfig streamingSourceConfig = getStreamingManagerV2().saveStreamingConfig(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ protected void removeTableFromProject(String tableName, String projectName) thro
* that's why we have two if statement here.
* @param tableName
* @param project
* @param needRemoveStreamInfo
* @return
*/
public boolean unloadHiveTable(String tableName, String project) throws IOException {
public boolean unloadHiveTable(String tableName, String project, boolean needRemoveStreamInfo) throws IOException {
aclEvaluate.checkProjectAdminPermission(project);
Message msg = MsgPicker.getMsg();

Expand Down Expand Up @@ -319,7 +320,9 @@ public boolean unloadHiveTable(String tableName, String project) throws IOExcept
// remove streaming info
SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
ISource source = sourceManager.getCachedSource(desc);
source.unloadTable(tableName, project);
if (!desc.isStreamingTable() || needRemoveStreamInfo) {
source.unloadTable(tableName, project);
}
return rtn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ public void removeStreamingConfig(StreamingSourceConfig streamingSourceConfig) t
if (loadStreamingConfigAt(path) != null) {
getStore().deleteResource(path);
} else {
// if the source is stored in the old path which is prefix + tablename + suffix
// path without project name
// The source is stored in the old path which is prefix + table name + suffix
path = streamingSourceConfig.getResourcePath();
getStore().deleteResource(path);
}
Expand All @@ -162,6 +161,22 @@ public StreamingSourceConfig getConfig(String name, String projectName) {
}
}

public StreamingSourceConfig getConfigMustWithProject(String name, String projectName) {
name = name.toUpperCase(Locale.ROOT);
if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) {
throw new StreamingException(String.format(Locale.ROOT,
"the table name %s or project name %s is null", name, projectName));
}
// path with project name
String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName);
// Reload the StreamingSourceConfig
try {
return loadStreamingConfigAt(path);
} catch (IOException e) {
throw new StreamingException(e);
}
}

/**
*
* @param streamingSourceConfig
Expand Down Expand Up @@ -191,7 +206,7 @@ public StreamingSourceConfig saveStreamingConfig(StreamingSourceConfig streaming
if (streamingSourceConfig == null || StringUtils.isEmpty(streamingSourceConfig.getName())) {
throw new IllegalArgumentException();
}
// path = prefix + /tableanme---projectname + suffix
// path = prefix + /table name---project name + suffix
String path = streamingSourceConfig.getResourcePathWithProjName();
getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(),
StreamingSourceConfig.SERIALIZER);
Expand Down

0 comments on commit cbaa2a8

Please sign in to comment.