Skip to content

Commit

Permalink
KYLIN-4921: New rowkey for streamv2 source config and fix bug that ca…
Browse files Browse the repository at this point in the history
…n't create same name table in diff project (apache#1604)

* support diff project with same stream name

* fix bug: when create duplicated table, the original stream table can't be deleted
  • Loading branch information
liukun4515 authored and hit-lacus committed Apr 19, 2021
1 parent ae8e656 commit d5de6e5
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ public void before() throws Exception {

final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
final String streamingTableName = cubeInstance.getRootFactTable();
final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName);
final String projectName = cubeInstance.getProject();
final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName, projectName);

topicName = KafkaSource.getTopicName(sourceConfig.getProperties());
String bootstrapServers = KafkaSource.getBootstrapServers(sourceConfig.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.UUID;

import org.apache.commons.lang.StringUtils;
import org.apache.directory.api.util.Strings;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -104,10 +105,21 @@ public class StreamingV2Controller extends BasicController {
@RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
@ResponseBody
public List<StreamingSourceConfig> getStreamings(@RequestParam(value = "table", required = false) String table,
@RequestParam(value = "project", required = false) String project,
@RequestParam(value = "limit", required = false) Integer limit,
@RequestParam(value = "offset", required = false) Integer offset) {
try {
return streamingService.getStreamingConfigs(table, limit, offset);
// query all streaming config or query one streaming config
if (!Strings.isEmpty(table) && !Strings.isEmpty(project)) {
// check the table metadata
if (tableService.getTableDescByName(table, false, project) == null) {
// the table metadata doesn't exist
throw new InternalErrorException(String.format(Locale.ROOT,
"The table %s of project %s doesn't exist, please make the stream table exists",
table, project));
}
}
return streamingService.getStreamingConfigs(table, project, limit, offset);
} catch (IOException e) {
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
Expand Down Expand Up @@ -140,10 +152,15 @@ public StreamingRequestV2 saveStreamingConfig(@RequestBody StreamingRequestV2 st
try {
try {
tableDesc.setUuid(UUID.randomUUID().toString());
if (tableService.getTableDescByName(tableDesc.getIdentity(), false, project) != null) {
throw new IOException(String.format(Locale.ROOT,
"The table %s of project %s exists",
tableDesc.getIdentity(), project));
}
tableService.loadTableToProject(tableDesc, null, project);
saveTableSuccess = true;
} catch (IOException e) {
throw new BadRequestException("Failed to add streaming table.");
throw new BadRequestException("Failed to add streaming table, because of " + e.getMessage());
}
try {
streamingSourceConfig.setName(tableDesc.getIdentity());
Expand All @@ -159,7 +176,8 @@ public StreamingRequestV2 saveStreamingConfig(@RequestBody StreamingRequestV2 st
if (!saveTableSuccess || !saveStreamingSuccess) {
if (saveTableSuccess) {
try {
tableService.unloadHiveTable(tableDesc.getIdentity(), project);
// just drop the table metadata and don't drop the stream source config info
tableService.unloadHiveTable(tableDesc.getIdentity(), project, false);
} catch (IOException e) {
shouldThrow = new InternalErrorException(
"Action failed and failed to rollback the create table " + e.getLocalizedMessage(), e);
Expand Down Expand Up @@ -280,7 +298,7 @@ public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest stre
final String user = SecurityContextHolder.getContext().getAuthentication().getName();
logger.info("{} try to updateStreamingConfig.", user);
try {
streamingSourceConfig = streamingService.updateStreamingConfig(streamingSourceConfig);
streamingService.updateStreamingConfig(streamingSourceConfig);
} catch (AccessDeniedException accessDeniedException) {
throw new ForbiddenException("You don't have right to update this StreamingSourceConfig.");
} catch (Exception e) {
Expand All @@ -292,10 +310,13 @@ public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest stre
return streamingRequest;
}

@RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
@Deprecated
@RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = {
"application/json" })
@ResponseBody
public void deleteConfig(@PathVariable String configName) throws IOException {
StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName);
public void deleteConfig(@PathVariable String project, @PathVariable String configName) throws IOException {
// This method will never be called by the frontend.
StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName, project);
if (null == config) {
throw new NotFoundException("StreamingSourceConfig with name " + configName + " not found..");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Map<String, String[]> unLoadHiveTables(@PathVariable String tables, @Path
try {
for (String tableName : StringUtil.splitByComma(tables)) {
tableACLService.deleteFromTableACLByTbl(project, tableName);
if (tableService.unloadHiveTable(tableName, project)) {
if (tableService.unloadHiveTable(tableName, project, true)) {
unLoadSuccess.add(tableName);
} else {
unLoadFail.add(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public List<ProjectInstance> listAllProjects(final Integer limit, final Integer
public void deleteProject(String projectName, ProjectInstance project) throws IOException {
Set<String> tables = project.getTables();
for (String table : Sets.newTreeSet(tables)) {
tableService.unloadHiveTable(table, projectName);
tableService.unloadHiveTable(table, projectName, true);
getTableManager().removeTableExt(table, projectName);
getTableACLManager().deleteTableACLByTbl(projectName, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ public StreamingV2Service() {
receiverAdminClient = adminClient;
}

public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException {
public List<StreamingSourceConfig> listAllStreamingConfigs(final String table, final String projectName) throws IOException {
List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
if (StringUtils.isEmpty(table)) {
if (StringUtils.isEmpty(table) || StringUtils.isEmpty(projectName)) {
streamingSourceConfigs = getStreamingManagerV2().listAllStreaming();
} else {
StreamingSourceConfig config = getStreamingManagerV2().getConfig(table);
StreamingSourceConfig config = getStreamingManagerV2().getConfig(table, projectName);
if (config != null) {
streamingSourceConfigs.add(config);
}
Expand All @@ -119,10 +119,10 @@ public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) t
return streamingSourceConfigs;
}

public List<StreamingSourceConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset)
public List<StreamingSourceConfig> getStreamingConfigs(final String table, final String projectName, final Integer limit, final Integer offset)
throws IOException {
List<StreamingSourceConfig> streamingSourceConfigs;
streamingSourceConfigs = listAllStreamingConfigs(table);
streamingSourceConfigs = listAllStreamingConfigs(table, projectName);

if (limit == null || offset == null) {
return streamingSourceConfigs;
Expand All @@ -138,7 +138,7 @@ public List<StreamingSourceConfig> getStreamingConfigs(final String table, final
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
+ " or hasPermission(#project, 'ADMINISTRATION')")
public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig config, ProjectInstance project) throws IOException {
if (getStreamingManagerV2().getConfig(config.getName()) != null) {
if (getStreamingManagerV2().getConfigMustWithProject(config.getName(), config.getProjectName()) != null) {
throw new InternalErrorException("The streamingSourceConfig named " + config.getName() + " already exists");
}
StreamingSourceConfig streamingSourceConfig = getStreamingManagerV2().saveStreamingConfig(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ protected void removeTableFromProject(String tableName, String projectName) thro
* that's why we have two if statement here.
* @param tableName
* @param project
* @param needRemoveStreamInfo
* @return
*/
public boolean unloadHiveTable(String tableName, String project) throws IOException {
public boolean unloadHiveTable(String tableName, String project, boolean needRemoveStreamInfo) throws IOException {
aclEvaluate.checkProjectAdminPermission(project);
Message msg = MsgPicker.getMsg();

Expand Down Expand Up @@ -319,7 +320,9 @@ public boolean unloadHiveTable(String tableName, String project) throws IOExcept
// remove streaming info
SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
ISource source = sourceManager.getCachedSource(desc);
source.unloadTable(tableName, project);
if (!desc.isStreamingTable() || needRemoveStreamInfo) {
source.unloadTable(tableName, project);
}
return rtn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import java.io.IOException;
import java.util.Map;

import org.apache.directory.api.util.Strings;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.persistence.Serializer;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.shaded.com.google.common.collect.Maps;

/**
Expand All @@ -53,10 +55,28 @@ public class StreamingSourceConfig extends RootPersistentEntity {
@JsonProperty("properties")
private Map<String, String> properties = Maps.newLinkedHashMap();

public static String concatResourcePath(String name) {
return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + ".json";
@JsonProperty("project_name")
private String projectName;

@Deprecated
static String concatResourcePath(String name) {
return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + MetadataConstants.FILE_SURFIX;
}

public static String concatResourcePathWithProjName(String name, String projectName) {
if (Strings.isEmpty(projectName)) {
return concatResourcePath(name);
} else {
// like table desc
return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + "--" + projectName + MetadataConstants.FILE_SURFIX;
}
}

public String getResourcePathWithProjName() {
return concatResourcePathWithProjName(name, projectName);
}

@Deprecated
public String getResourcePath() {
return concatResourcePath(name);
}
Expand Down Expand Up @@ -85,6 +105,14 @@ public void setParserInfo(MessageParserInfo parserInfo) {
this.parserInfo = parserInfo;
}

public void setProjectName(String projectName) {
this.projectName = projectName;
}

public String getProjectName() {
return projectName;
}

@Override
public StreamingSourceConfig clone() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.apache.directory.api.util.Strings;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
Expand Down Expand Up @@ -83,7 +84,6 @@ public List<StreamingSourceConfig> listAllStreaming() throws IOException {
ResourceStore store = getStore();
logger.info("Load all streaming metadata from folder "
+ store.getReadableResourcePath(ResourceStore.STREAMING_V2_RESOURCE_ROOT));

List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_V2_RESOURCE_ROOT,
MetadataConstants.FILE_SURFIX);
for (String path : paths) {
Expand All @@ -94,7 +94,10 @@ public List<StreamingSourceConfig> listAllStreaming() throws IOException {
logger.error("Error loading streaming desc " + path, e);
continue;
}
if (path.equals(streamingSourceConfig.getResourcePath()) == false) {
// check path without project name
// check path with project name
if (path.equals(streamingSourceConfig.getResourcePath()) == false &&
path.equals(streamingSourceConfig.getResourcePathWithProjName()) == false) {
logger.error("Skip suspicious desc at " + path + ", " + streamingSourceConfig + " should be at "
+ streamingSourceConfig.getResourcePath());
continue;
Expand All @@ -113,59 +116,98 @@ public List<StreamingSourceConfig> listAllStreaming() throws IOException {
* @param name
* @throws IOException
*/
public StreamingSourceConfig reloadStreamingConfigLocal(String name) throws IOException {

// Save Source
String path = StreamingSourceConfig.concatResourcePath(name);

public StreamingSourceConfig reloadStreamingConfigLocal(String name, String projectName) throws IOException {
if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) {
throw new StreamingException(String.format(Locale.ROOT,
"the table name %s or project name %s is null", name, projectName));
}
// path with project name
String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName);
// Reload the StreamingSourceConfig
StreamingSourceConfig ndesc = loadStreamingConfigAt(path);
return ndesc;
StreamingSourceConfig config = loadStreamingConfigAt(path);
if (config == null) {
// the path with project name doesn't contain the source config, and check the old path without project name.
path = StreamingSourceConfig.concatResourcePath(name);
config = loadStreamingConfigAt(path);
if (config != null) {
config.setProjectName(projectName);
// remove from the old path, and save the source config to the new path
removeStreamingConfig(config);
saveStreamingConfig(config);
}
}
return config;
}

// remove streamingSourceConfig
public void removeStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
String path = streamingSourceConfig.getResourcePath();
getStore().deleteResource(path);
// path with project name
String path = streamingSourceConfig.getResourcePathWithProjName();
if (loadStreamingConfigAt(path) != null) {
getStore().deleteResource(path);
} else {
// The source is stored in the old path which is prefix + table name + suffix
path = streamingSourceConfig.getResourcePath();
getStore().deleteResource(path);
}
}

public StreamingSourceConfig getConfig(String name, String projectName) {
name = name.toUpperCase(Locale.ROOT);
try {
return reloadStreamingConfigLocal(name, projectName);
} catch (IOException e) {
throw new StreamingException(e);
}
}

public StreamingSourceConfig getConfig(String name) {
public StreamingSourceConfig getConfigMustWithProject(String name, String projectName) {
name = name.toUpperCase(Locale.ROOT);
if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) {
throw new StreamingException(String.format(Locale.ROOT,
"the table name %s or project name %s is null", name, projectName));
}
// path with project name
String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName);
// Reload the StreamingSourceConfig
try {
return reloadStreamingConfigLocal(name);
return loadStreamingConfigAt(path);
} catch (IOException e) {
throw new StreamingException(e);
}
}

/**
*
* @param desc
* @param streamingSourceConfig
* @return
* @throws IOException
*/
public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig desc) throws IOException {
public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
// Validate CubeDesc
if (desc.getUuid() == null || desc.getName() == null) {
if (streamingSourceConfig.getUuid() == null || streamingSourceConfig.getName() == null) {
throw new IllegalArgumentException("SteamingConfig Illegal.");
}

// Save Source
String path = desc.getResourcePath();
getStore().putResource(path, desc, System.currentTimeMillis(), STREAMING_SERIALIZER);
// remove Source
removeStreamingConfig(streamingSourceConfig);

// Save Source, the path with project name
String path = streamingSourceConfig.getResourcePathWithProjName();
getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(), STREAMING_SERIALIZER);

// Reload the StreamingSourceConfig
StreamingSourceConfig ndesc = loadStreamingConfigAt(path);
StreamingSourceConfig newStreamingSourceConfig = loadStreamingConfigAt(path);

return ndesc;
return newStreamingSourceConfig;
}

public StreamingSourceConfig saveStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
if (streamingSourceConfig == null || StringUtils.isEmpty(streamingSourceConfig.getName())) {
throw new IllegalArgumentException();
}

String path = StreamingSourceConfig.concatResourcePath(streamingSourceConfig.getName());
// path = prefix + /table name---project name + suffix
String path = streamingSourceConfig.getResourcePathWithProjName();
getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(),
StreamingSourceConfig.SERIALIZER);
return streamingSourceConfig;
Expand Down
Loading

0 comments on commit d5de6e5

Please sign in to comment.