Skip to content

Commit

Permalink
Support Incremental dataload for mv(single tables)
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed May 20, 2019
1 parent 24fe230 commit 410078c
Show file tree
Hide file tree
Showing 34 changed files with 1,519 additions and 137 deletions.
Expand Up @@ -2163,4 +2163,15 @@ private CarbonCommonConstants() {

public static final int CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT =
500;

/**
* This property will be used to store datamap name
*/
public static final String DATAMAP_NAME = "datamap_name";

/**
* This property will be used to store parentable name's associated with datamap
*/
public static final String PARENT_TABLES = "parent_tables";

}
Expand Up @@ -18,13 +18,27 @@
package org.apache.carbondata.core.datamap;

import java.io.IOException;
import java.util.*;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import com.google.gson.Gson;
import org.apache.log4j.Logger;

/**
* DataMap is a accelerator for certain type of query. Developer can add new DataMap
Expand Down Expand Up @@ -58,6 +72,8 @@ public abstract class DataMapProvider {
private CarbonTable mainTable;
private DataMapSchema dataMapSchema;

private Logger LOGGER = LogServiceFactory.getLogService(this.getClass().getCanonicalName());

public DataMapProvider(CarbonTable mainTable, DataMapSchema dataMapSchema) {
this.mainTable = mainTable;
this.dataMapSchema = dataMapSchema;
Expand Down Expand Up @@ -106,13 +122,214 @@ public void initData() { }
* 1. after datamap creation and if `autoRefreshDataMap` is set to true
* 2. user manually trigger REBUILD DATAMAP command
*/
public abstract void rebuild() throws IOException, NoSuchDataMapException;
public boolean rebuild() throws IOException, NoSuchDataMapException {
if (null == dataMapSchema.getRelationIdentifier()) {
return false;
}
String newLoadName = "";
String segmentMap = "";
AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = AbsoluteTableIdentifier
.from(dataMapSchema.getRelationIdentifier().getTablePath(),
dataMapSchema.getRelationIdentifier().getDatabaseName(),
dataMapSchema.getRelationIdentifier().getTableName());
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier);
Map<String, List<String>> segmentMapping = new HashMap<>();
// Acquire table status lock to handle concurrent dataloading
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
"Acquired lock for table" + dataMapSchema.getRelationIdentifier().getDatabaseName()
+ "." + dataMapSchema.getRelationIdentifier().getTableName()
+ " for table status updation");
String dataMapTableMetadataPath =
CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
LoadMetadataDetails[] loadMetaDataDetails =
SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath);
List<LoadMetadataDetails> listOfLoadFolderDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Collections.addAll(listOfLoadFolderDetails, loadMetaDataDetails);
if (dataMapSchema.isLazy()) {
// check if rebuild to datamap is already in progress and throw exception
if (!listOfLoadFolderDetails.isEmpty()) {
for (LoadMetadataDetails loadMetaDetail : loadMetaDataDetails) {
if ((loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
|| loadMetaDetail.getSegmentStatus()
== SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) && SegmentStatusManager
.isLoadInProgress(dataMapTableAbsoluteTableIdentifier,
loadMetaDetail.getLoadName())) {
throw new RuntimeException("Rebuild to datamap " + dataMapSchema.getDataMapName()
+ " is already in progress");
}
}
}
}
boolean isFullRefresh = false;
if (null != dataMapSchema.getProperties().get("full_refresh")) {
isFullRefresh = Boolean.valueOf(dataMapSchema.getProperties().get("full_refresh"));
}
if (!isFullRefresh) {
if (!getSpecificSegmentsTobeLoaded(segmentMapping, listOfLoadFolderDetails)) {
return false;
}
segmentMap = new Gson().toJson(segmentMapping);
} else {
List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
for (RelationIdentifier relationIdentifier : relationIdentifiers) {
List<String> mainTableSegmentList =
DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
if (mainTableSegmentList.isEmpty()) {
return false;
}
segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ relationIdentifier.getTableName(), mainTableSegmentList);
}
segmentMap = new Gson().toJson(segmentMapping);
}

// To handle concurrent dataloading to datamap, create new loadMetaEntry and
// set segmentMap to new loadMetaEntry and pass new segmentId with load command
LoadMetadataDetails loadMetadataDetail = new LoadMetadataDetails();
String segmentId =
String.valueOf(SegmentStatusManager.createNewSegmentId(loadMetaDataDetails));
loadMetadataDetail.setLoadName(segmentId);
loadMetadataDetail.setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS);
loadMetadataDetail.setExtraInfo(segmentMap);
listOfLoadFolderDetails.add(loadMetadataDetail);
newLoadName = segmentId;

SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath
.getTableStatusFilePath(dataMapSchema.getRelationIdentifier().getTablePath()),
listOfLoadFolderDetails
.toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
} else {
LOGGER.error(
"Not able to acquire the lock for Table status updation for table " + dataMapSchema
.getRelationIdentifier().getDatabaseName() + "." + dataMapSchema
.getRelationIdentifier().getTableName());
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info("Table unlocked successfully after table status updation" + dataMapSchema
.getRelationIdentifier().getDatabaseName() + "." + dataMapSchema.getRelationIdentifier()
.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table" + dataMapSchema.getRelationIdentifier()
.getDatabaseName() + "." + dataMapSchema.getRelationIdentifier().getTableName()
+ " during table status updation");
}
}
return rebuildInternal(newLoadName, segmentMapping);
}

/**
* Build the datamap incrementally by loading specified segment data
* This method will compare mainTable and dataMapTable segment List and loads only newly added
* segment from main table to dataMap table.
* In case if mainTable is compacted, then based on dataMap to mainTables segmentMapping, dataMap
* will be loaded
* Eg:
* case 1: Consider mainTableSegmentList: {0, 1, 2}, dataMapToMainTable segmentMap:
* { 0 -> 0, 1-> 1,2}. If (1, 2) segments of main table are compacted to 1.1 and new segment (3)
* is loaded to main table, then mainTableSegmentList will be updated to{0, 1.1, 3}.
* In this case, segment (1) of dataMap table will be marked for delete, and new segment
* {2 -> 1.1, 3} will be loaded to dataMap table
* case 2: Consider mainTableSegmentList: {0, 1, 2, 3}, dataMapToMainTable segmentMap:
* { 0 -> 0,1,2, 1-> 3}. If (1, 2) segments of main table are compacted to 1.1 and new segment
* (4) is loaded to main table, then mainTableSegmentList will be updated to {0, 1.1, 3, 4}.
* In this case, segment (0) of dataMap table will be marked for delete and segment (0) of
* main table will be added to validSegmentList which needs to be loaded again. Now, new dataMap
* table segment (2) with main table segmentList{2 -> 1.1, 4, 0} will be loaded to dataMap table.
* dataMapToMainTable segmentMap will be updated to {1 -> 3, 2 -> 1.1, 4, 0} after rebuild
*/
public void incrementalBuild(String[] segmentIds) {
throw new UnsupportedOperationException();
private boolean getSpecificSegmentsTobeLoaded(Map<String, List<String>> segmentMapping,
List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
// invalidDataMapSegmentList holds segment list which needs to be marked for delete
HashSet<String> invalidDataMapSegmentList = new HashSet<>();
if (listOfLoadFolderDetails.isEmpty()) {
// If segment Map is empty, load all valid segments from main tables to dataMap
for (RelationIdentifier relationIdentifier : relationIdentifiers) {
List<String> mainTableSegmentList =
DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
// If mainTableSegmentList is empty, no need to trigger load command
// TODO: handle in case of multiple tables load to datamap table
if (mainTableSegmentList.isEmpty()) {
return false;
}
segmentMapping.put(
relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT + relationIdentifier
.getTableName(), mainTableSegmentList);
}
} else {
for (RelationIdentifier relationIdentifier : relationIdentifiers) {
List<String> dataMapTableSegmentList = new ArrayList<>();
for (LoadMetadataDetails loadMetaDetail : listOfLoadFolderDetails) {
if (loadMetaDetail.getSegmentStatus() == SegmentStatus.SUCCESS
|| loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
Map<String, List<String>> segmentMaps =
DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo());
dataMapTableSegmentList.addAll(segmentMaps.get(
relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ relationIdentifier.getTableName()));
}
}
List<String> dataMapSegmentList = new ArrayList<>(dataMapTableSegmentList);
// Get all segments for parent relationIdentifier
List<String> mainTableSegmentList =
DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
dataMapTableSegmentList.removeAll(mainTableSegmentList);
mainTableSegmentList.removeAll(dataMapSegmentList);
if (mainTableSegmentList.isEmpty()) {
return false;
}
if (!dataMapTableSegmentList.isEmpty()) {
List<String> invalidMainTableSegmentList = new ArrayList<>();
// validMainTableSegmentList holds segment list which needs to be loaded again
HashSet<String> validMainTableSegmentList = new HashSet<>();

// For dataMap segments which are not in main table segment list(if main table
// is compacted), iterate over those segments and get dataMap segments which needs to
// be marked for delete and main table segments which needs to be loaded again
for (String segmentId : dataMapTableSegmentList) {
for (LoadMetadataDetails loadMetaDetail : listOfLoadFolderDetails) {
if (loadMetaDetail.getSegmentStatus() == SegmentStatus.SUCCESS
|| loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
Map<String, List<String>> segmentMaps =
DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo());
List<String> segmentIds = segmentMaps.get(
relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ relationIdentifier.getTableName());
if (segmentIds.contains(segmentId)) {
segmentIds.remove(segmentId);
validMainTableSegmentList.addAll(segmentIds);
invalidMainTableSegmentList.add(segmentId);
invalidDataMapSegmentList.add(loadMetaDetail.getLoadName());
}
}
}
}
// remove invalid segment from validMainTableSegmentList if present
validMainTableSegmentList.removeAll(invalidMainTableSegmentList);
// Add all valid segments of main table which needs to be loaded again
mainTableSegmentList.addAll(validMainTableSegmentList);
segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ relationIdentifier.getTableName(), mainTableSegmentList);
} else {
segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ relationIdentifier.getTableName(), mainTableSegmentList);
}
}
}
// Remove invalid datamap segments
if (!invalidDataMapSegmentList.isEmpty()) {
for (LoadMetadataDetails loadMetadataDetail : listOfLoadFolderDetails) {
if (invalidDataMapSegmentList.contains(loadMetadataDetail.getLoadName())) {
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
}
}
}
return true;
}

/**
Expand All @@ -126,4 +343,6 @@ public DataMapCatalog createDataMapCatalog() {
public abstract DataMapFactory getDataMapFactory();

public abstract boolean supportRebuild();

public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap);
}
Expand Up @@ -30,7 +30,9 @@
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
Expand Down Expand Up @@ -250,4 +252,22 @@ public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvali
return ssm.getValidAndInvalidSegments();
}

/**
* Returns valid segment list for a given RelationIdentifier
*
* @param relationIdentifier get list of segments for relation identifier
* @return list of valid segment id's
* @throws IOException
*/
public static List<String> getMainTableValidSegmentList(RelationIdentifier relationIdentifier)
throws IOException {
List<String> segmentList = new ArrayList<>();
List<Segment> validSegments = new SegmentStatusManager(AbsoluteTableIdentifier
.from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(),
relationIdentifier.getTableName())).getValidAndInvalidSegments().getValidSegments();
for (Segment segment : validSegments) {
segmentList.add(segment.getSegmentNo());
}
return segmentList;
}
}
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datamap.dev;


import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapUtil;
import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

/**
* Interface to check whether datamap can be enabled
*/
@InterfaceAudience.Developer("DataMap")
public abstract class DataMapSyncStatus {

/**
* This method checks if main table and datamap table are synchronised or not. If synchronised
* return true to enable the datamap
*
* @param dataMapSchema of datamap to be disabled or enabled
* @return flag to enable or disable datamap
* @throws IOException
*/
public static boolean canDataMapBeEnabled(DataMapSchema dataMapSchema) throws IOException {
boolean isDataMapInSync = true;
String metaDataPath =
CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
LoadMetadataDetails[] dataMapLoadMetadataDetails =
SegmentStatusManager.readLoadMetadata(metaDataPath);
Map<String, List<String>> dataMapSegmentMap = new HashMap<>();
for (LoadMetadataDetails loadMetadataDetail : dataMapLoadMetadataDetails) {
Map<String, List<String>> segmentMap =
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
if (dataMapSegmentMap.isEmpty()) {
dataMapSegmentMap.putAll(segmentMap);
} else {
for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
if (null != dataMapSegmentMap.get(entry.getKey())) {
dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
}
}
}
}
List<RelationIdentifier> parentTables = dataMapSchema.getParentTables();
for (RelationIdentifier parentTable : parentTables) {
List<String> mainTableValidSegmentList =
DataMapUtil.getMainTableValidSegmentList(parentTable);
if (!mainTableValidSegmentList.isEmpty() && !dataMapSegmentMap.isEmpty()) {
isDataMapInSync = dataMapSegmentMap.get(
parentTable.getDatabaseName() + CarbonCommonConstants.POINT + parentTable
.getTableName()).containsAll(mainTableValidSegmentList);
} else if (dataMapSegmentMap.isEmpty() && !mainTableValidSegmentList.isEmpty()) {
isDataMapInSync = false;
}
}
return isDataMapInSync;
}
}

0 comments on commit 410078c

Please sign in to comment.