Skip to content
Browse files
[CARBONDATA-4242]Improve cdc performance and introduce new APIs for U…

Why is this PR needed?
1. In the exiting solution, when we perform join of the source and target dataset for tagging records to delete, update and insert records, we were scanning all the data of target table and then perform join with source dataset. But it can happen that the source data is less and its range may cover only some 100s of Carbondata files out of 1000s of files in the target table. So pruning is main bottleneck here, so scanning all records and involving in join results in so much of shuffle and reduces performance.
2. Source data caching was not there, caching source data will help to improve its multiple scans and since input source data will be of less size, we can persist the dataset.
3. When we were performing join, we used to first get the Row object and then operate on it and then for each datatype casting happens to convert to spark datatype and then covert to InternalRow object for further processing of joined data. This will add extra deserializeToObject and map nodes in DAG and increase time.
4. Initially during tagging records(Join operation), we were preparing a new projection of required columns, which basically involves operations of preparing an internal row object as explained in point 3, and then apply eval function on each row to prepare a projection, so this basically applying same eval of expression on joined data, a repeated work and increases time.
5. In join operation we were using all the columns of source dataset and the required columns of target table like, join key column and other columns of tupleID, status_on_mergeds etc. So when we there will be so many columns in the table, then it will increase the execution time due to lot of data shuffling.
6. The current APIs of merge are little bit complex and generalized and confusing to user for simple Upsert, delete and insert operations.

What changes were proposed in this PR?
1. Add a pruning logic before the join operations. Compare the incoming row with an interval based tree data structure which contains the Carbondata file path and min and max to identify the Carbondata file where the incoming row can be present, so that in some use case scenario which will be explained in later section, can give benefit and help to scan less files rather than blindly scanning all the Carbondata files in the target table.
2. Cache the incoming source dataset srcDS.cache(), so that the cached data will be used in all the operations and speed will be improved. Uncache() after the merge operation
3. Instead of operating on row object and then converting to InternalRow, directly operate on the InternalRow object to avoid the data type conversions.
4. Instead of evaluating the expression again based on required project columns on matching conditions and making new projection, directly identify the indexes required for output row and then directly access these indices on the incoming internal row object after step3, so evaluation is avoided and array access with indices will give O(1) performance.
5. During join or the tagging of records, do not include all the column data, just include the join key columns and identify the tupleIDs to delete and the rows to insert, this will avoid lot of shuffle and improve performance significantly.
6. Introduce new APIs for UPSERT, UPDATE, DELETE and INSERT and make the user exposed APIs simple. So now user just needs to give the key column for join, source dataset and the operation type as mentioned above. These new APIs will make use of all the improvements mentioned above and avoid unnecessary operations of the existing merge APIs.

Does this PR introduce any user interface change?

Is any new testcase added?

This closes #4148
  • Loading branch information
akashrn5 authored and ajantha-bhat committed Jul 29, 2021
1 parent feb0521 commit 1e2fc4c367c5d56d48ef21d749d662c9f6f68aec
Showing 38 changed files with 2,439 additions and 221 deletions.
@@ -2670,4 +2670,14 @@ private CarbonCommonConstants() {
public static final String CARBON_SPARK3_VERSION = "2.2.0";

* This property is to enable the min max pruning of target carbon table based on input/source
* data
public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED =

public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT = "false";

@@ -36,6 +36,7 @@
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CdcVO;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
@@ -102,6 +103,8 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>

private Set<String> missingSISegments;

private CdcVO cdcVO;

IndexInputFormat() {

@@ -275,6 +278,10 @@ public void write(DataOutput out) throws IOException {
out.writeBoolean(cdcVO != null);
if (cdcVO != null) {

@@ -330,6 +337,11 @@ public void readFields(DataInput in) throws IOException {
boolean isCDCJob = in.readBoolean();
if (isCDCJob) {
this.cdcVO = new CdcVO();

private void initReadCommittedScope() throws IOException {
@@ -353,6 +365,14 @@ public boolean isFallbackJob() {
return isFallbackJob;

public CdcVO getCdcVO() {
return cdcVO;

public void setCdcVO(CdcVO cdcVO) {
this.cdcVO = cdcVO;

* @return Whether asyncCall to the IndexServer.
@@ -31,7 +31,7 @@
public class Blocklet implements Writable, Serializable {

/** file path of this blocklet */
private String filePath;
protected String filePath;

/** id to identify the blocklet inside the block (it is a sequential number) */
private String blockletId;
@@ -21,15 +21,21 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.core.index.IndexInputFormat;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexRowIndexes;
import org.apache.carbondata.core.indexstore.row.IndexRow;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.CdcVO;
import org.apache.carbondata.core.mutate.FilePathMinMaxVO;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.CarbonInputSplit;

@@ -47,6 +53,8 @@ public class ExtendedBlocklet extends Blocklet {

private boolean isCgIndexPresent = false;

private Map<String, List<FilePathMinMaxVO>> columnToMinMaxMapping;

public ExtendedBlocklet() {

@@ -163,20 +171,36 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
* Method to serialize extended blocklet and input split for index server
* DataFormat
* <Extended Blocklet data><Carbon input split serializeData length><CarbonInputSplitData>
* @param out
* @param uniqueLocation
* @param out data output to write the primitives to extended blocklet
* @param uniqueLocation location to write the blocklet in case of distributed pruning, ex: Lucene
* @param isExternalPath identification for the externam segment
* @throws IOException
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob,
boolean isExternalPath)
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation,
IndexInputFormat indexInputFormat, boolean isExternalPath)
throws IOException {
if (isCountJob) {
if (indexInputFormat.isCountStarJob()) {
// In CarbonInputSplit, getDetailInfo() is a lazy call. we want to avoid this during
// countStar query. As rowCount is filled inside getDetailInfo(). In countStar case we may
// not have proper row count. So, always take row count from indexRow.
} else if (indexInputFormat.getCdcVO() != null) {
// In case of CDC, we just need the filepath and the min max of the blocklet,so just serialize
// these data to reduce less network transfer cost and faster cache access from index server.
List<Integer> indexesToFetch = indexInputFormat.getCdcVO().getIndexesToFetch();
for (Integer indexToFetch : indexesToFetch) {
byte[] minValues = CarbonUtil.getMinMaxValue(inputSplit.getIndexRow(),
byte[] maxValues = CarbonUtil.getMinMaxValue(inputSplit.getIndexRow(),
} else {
if (indexUniqueId == null) {
@@ -207,19 +231,35 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boo

* Method to deserialize extended blocklet and input split for index server
* @param in
* @param locations
* @param tablePath
* @param in data input stream to read the primitives of extended blocklet
* @param locations locations of the input split
* @param tablePath carbon table path
* @throws IOException
public void deserializeFields(DataInput in, String[] locations, String tablePath,
boolean isCountJob)
boolean isCountJob, CdcVO cdcVO)
throws IOException {
if (isCountJob) {
count = in.readLong();
segmentNo = in.readUTF();
} else if (cdcVO != null) {
filePath = in.readUTF();
this.columnToMinMaxMapping = new HashMap<>();
for (String column : cdcVO.getColumnToIndexMap().keySet()) {
List<FilePathMinMaxVO> minMaxOfColumnInList = new ArrayList<>();
int minLength = in.readInt();
byte[] minValuesForBlocklets = new byte[minLength];
int maxLength = in.readInt();
byte[] maxValuesForBlocklets = new byte[maxLength];
.add(new FilePathMinMaxVO(filePath, minValuesForBlocklets, maxValuesForBlocklets));
this.columnToMinMaxMapping.put(column, minMaxOfColumnInList);
if (in.readBoolean()) {
indexUniqueId = in.readUTF();
@@ -243,4 +283,8 @@ public void deserializeFields(DataInput in, String[] locations, String tablePath
public void setCgIndexPresent(boolean cgIndexPresent) {
isCgIndexPresent = cgIndexPresent;

public Map<String, List<FilePathMinMaxVO>> getColumnToMinMaxMapping() {
return columnToMinMaxMapping;
@@ -36,7 +36,9 @@
import org.apache.carbondata.core.datastore.compression.SnappyCompressor;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.IndexInputFormat;
import org.apache.carbondata.core.metadata.schema.table.Writable;
import org.apache.carbondata.core.mutate.CdcVO;
@@ -65,20 +67,20 @@ public ExtendedBlockletWrapper() {


public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
String queryId, boolean isWriteToFile, boolean isCountJob) {
public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList,
IndexInputFormat indexInputFormat) {
Map<String, Short> uniqueLocations = new HashMap<>();
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob);
byte[] bytes = convertToBytes(indexInputFormat, uniqueLocations, extendedBlockletList);
int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
DataOutputStream stream = null;
// if data size is more then data will be written in file and file name will be sent from
// executor to driver, in case of any failure data will send through network
if (bytes.length > serializeAllowedSize && isWriteToFile) {
if (bytes.length > serializeAllowedSize && indexInputFormat.isWriteToFile()) {
final String fileName = UUID.randomUUID().toString();
String folderPath = CarbonUtil.getIndexServerTempPath()
+ CarbonCommonConstants.FILE_SEPARATOR + queryId;
+ CarbonCommonConstants.FILE_SEPARATOR + indexInputFormat.getQueryId();
try {
final CarbonFile carbonFile = FileFactory.getCarbonFile(folderPath);
boolean isFolderExists = true;
@@ -115,15 +117,16 @@ public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, Stri

private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) {
private byte[] convertToBytes(IndexInputFormat indexInputFormat,
Map<String, Short> uniqueLocations, List<ExtendedBlocklet> extendedBlockletList) {
ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
DataOutputStream stream = new DataOutputStream(bos);
String tablePath = indexInputFormat.getCarbonTable().getTablePath();
try {
for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
boolean isExternalPath = !extendedBlocklet.getFilePath().startsWith(tablePath);
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob, isExternalPath);
extendedBlocklet.serializeData(stream, uniqueLocations, indexInputFormat, isExternalPath);
byte[] input = bos.toByteArray();
return new SnappyCompressor().compressByte(input, input.length);
@@ -171,8 +174,8 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data,
* @return
* @throws IOException
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob)
throws IOException {
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob,
CdcVO cdcVO) throws IOException {
byte[] data;
if (bytes != null) {
if (isWrittenToFile) {
@@ -216,7 +219,7 @@ public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boo
try {
for (int i = 0; i < numberOfBlocklet; i++) {
ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob);
extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob, cdcVO);
} finally {
@@ -31,6 +31,8 @@
import java.util.concurrent.TimeUnit;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.index.IndexInputFormat;
import org.apache.carbondata.core.mutate.CdcVO;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;

@@ -62,8 +64,8 @@ public ExtendedBlockletWrapperContainer(ExtendedBlockletWrapper[] extendedBlockl
this.isFallbackJob = isFallbackJob;

public List<ExtendedBlocklet> getExtendedBlocklets(String tablePath, String queryId,
boolean isCountJob) throws IOException {
public List<ExtendedBlocklet> getExtendedBlocklets(IndexInputFormat indexInputFormat)
throws IOException {
if (!isFallbackJob) {
int numOfThreads = CarbonProperties.getNumOfThreadsForPruning();
ExecutorService executorService = Executors
@@ -85,8 +87,9 @@ public List<ExtendedBlocklet> getExtendedBlocklets(String tablePath, String quer
List<Future<List<ExtendedBlocklet>>> futures = new ArrayList<>();
for (int value : split) {
end += value;
new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId, isCountJob)));
futures.add(executorService.submit(new ExtendedBlockletDeserializerThread(start, end,
indexInputFormat.getCarbonTable().getTablePath(), indexInputFormat.getQueryId(),
indexInputFormat.isCountStarJob(), indexInputFormat.getCdcVO())));
start += value;
@@ -109,8 +112,10 @@ public List<ExtendedBlocklet> getExtendedBlocklets(String tablePath, String quer
} else {
List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) {
.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId, isCountJob));
indexInputFormat.getQueryId(), indexInputFormat.isCountStarJob(),
return extendedBlocklets;
@@ -128,21 +133,24 @@ private class ExtendedBlockletDeserializerThread implements Callable<List<Extend

private boolean isCountJob;

private CdcVO cdcVO;

public ExtendedBlockletDeserializerThread(int start, int end, String tablePath,
String queryId, boolean isCountJob) {
String queryId, boolean isCountJob, CdcVO cdcVO) {
this.start = start;
this.end = end;
this.tablePath = tablePath;
this.queryId = queryId;
this.isCountJob = isCountJob;
this.cdcVO = cdcVO;

public List<ExtendedBlocklet> call() throws Exception {
List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
for (int i = start; i < end; i++) {
extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId,
isCountJob, cdcVO));
return extendedBlocklets;
@@ -67,6 +67,8 @@
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import static org.apache.carbondata.core.util.CarbonUtil.getMinMaxValue;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
@@ -886,15 +888,6 @@ public String getTableTaskInfo(int index) {

private byte[][] getMinMaxValue(IndexRow row, int index) {
IndexRow minMaxRow = row.getRow(index);
byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
for (int i = 0; i < minMax.length; i++) {
minMax[i] = minMaxRow.getByteArray(i);
return minMax;

private boolean[] getMinMaxFlag(IndexRow row, int index) {
IndexRow minMaxFlagRow = row.getRow(index);
boolean[] minMaxFlag = new boolean[minMaxFlagRow.getColumnCount()];

0 comments on commit 1e2fc4c

Please sign in to comment.