Skip to content

Commit

Permalink
[CARBONDATA-3088][Compaction] support prefetch for compaction
Browse files Browse the repository at this point in the history
Current compaction performance is low. By adding logs to observe the compaction procedure, we found that in
`CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait about 30ms before submitting a new TablePage producer. Since the method
`addDataToStore` is called in single thread, it will result the waiting every 32000 records since it will collect 32000 records to form a TablePage.

To reduce the waiting time, we can prepare the 32000 records ahead. This an be achived using prefetch.

We will prepare two buffers, one will provide the records to the downstream (`addDataToStore`) and the other one will prepare the records
asynchronously. The first is called working buffer and the second is called backup buffer. Once working buffer is exhausted, the two buffers
will exchange their roles: the backup buffer will be the new working buffer and the old working buffer will be the new backup buffer and it
will be filled asynchronously.

Two parameters are involved for this feature:

1. carbon.detail.batch.size: This is an existed parameter and the default value is 100. This parameter controls the batch size of records that
return to the client. For normal query, it is OK to keep it as 100. But for compaction, since all the records will be operated, we suggest you
to set it to a larger value such as 32000. (32000 is the max rows for a table page that the down stream wants).

2. carbon.compaction.prefetch.enable: This is a new parameter and the default value is `false` (We may change it to `true` later). This
parameter controls whether we will prefetch the records for compation.

By using this prefetch feature, we can enhance the performance for compaction. More test results can be found in the PR description.

This closes #2906
  • Loading branch information
xuchuanyin authored and ravipesala committed Nov 21, 2018
1 parent 415635e commit fedba41
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@
*/
package org.apache.carbondata.core.scan.result.iterator;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.log4j.Logger;

Expand All @@ -40,103 +49,148 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
*/
private CarbonIterator<RowBatch> detailRawQueryResultIterator;

/**
* Counter to maintain the row counter.
*/
private int counter = 0;

private Object[] currentConveretedRawRow = null;
private boolean prefetchEnabled;
private List<Object[]> currentBuffer;
private List<Object[]> backupBuffer;
private int currentIdxInBuffer;
private ExecutorService executorService;
private Future<Void> fetchFuture;
private Object[] currentRawRow = null;
private boolean isBackupFilled = false;

/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(RawResultIterator.class.getName());

/**
* batch of the result.
*/
private RowBatch batch;

public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
boolean isStreamingHandoff) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
this.destinationSegProperties = destinationSegProperties;
this.executorService = Executors.newFixedThreadPool(1);

if (!isStreamingHandoff) {
init();
}
}

@Override
public boolean hasNext() {
if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
if (detailRawQueryResultIterator.hasNext()) {
batch = null;
batch = detailRawQueryResultIterator.next();
counter = 0; // batch changed so reset the counter.
} else {
return false;
private void init() {
this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
try {
new RowsFetcher(false).call();
if (prefetchEnabled) {
this.fetchFuture = executorService.submit(new RowsFetcher(true));
}
}
if (!checkIfBatchIsProcessedCompletely(batch)) {
return true;
} else {
return false;
} catch (Exception e) {
LOGGER.error("Error occurs while fetching records", e);
throw new RuntimeException(e);
}
}

@Override
public Object[] next() {
if (null == batch) { // for 1st time
batch = detailRawQueryResultIterator.next();
/**
* fetch rows
*/
private final class RowsFetcher implements Callable<Void> {
private boolean isBackupFilling;

private RowsFetcher(boolean isBackupFilling) {
this.isBackupFilling = isBackupFilling;
}
if (!checkIfBatchIsProcessedCompletely(batch)) {
try {
if (null != currentConveretedRawRow) {
counter++;
Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
currentConveretedRawRow = null;
return currentConveretedRawRowTemp;
}
return convertRow(batch.getRawRow(counter++));
} catch (KeyGenException e) {
LOGGER.error(e.getMessage());
return null;

@Override
public Void call() throws Exception {
if (isBackupFilling) {
backupBuffer = fetchRows();
isBackupFilled = true;
} else {
currentBuffer = fetchRows();
}
} else { // completed one batch.
batch = null;
batch = detailRawQueryResultIterator.next();
counter = 0;
return null;
}
}

private List<Object[]> fetchRows() throws Exception {
List<Object[]> converted = new ArrayList<>();
if (detailRawQueryResultIterator.hasNext()) {
for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
converted.add(convertRow(r));
}
}
return converted;
}

private void fillDataFromPrefetch() {
try {
if (null != currentConveretedRawRow) {
counter++;
Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
currentConveretedRawRow = null;
return currentConveretedRawRowTemp;
if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) {
if (prefetchEnabled) {
if (!isBackupFilled) {
fetchFuture.get();
}
// copy backup buffer to current buffer and fill backup buffer asyn
currentIdxInBuffer = 0;
currentBuffer.clear();
currentBuffer = backupBuffer;
isBackupFilled = false;
fetchFuture = executorService.submit(new RowsFetcher(true));
} else {
currentIdxInBuffer = 0;
new RowsFetcher(false).call();
}
}
return convertRow(batch.getRawRow(counter++));
} catch (KeyGenException e) {
LOGGER.error(e.getMessage());
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* populate a row with index counter increased
*/
private void popRow() {
fillDataFromPrefetch();
currentRawRow = currentBuffer.get(currentIdxInBuffer);
currentIdxInBuffer++;
}

/**
* populate a row with index counter unchanged
*/
private void pickRow() {
fillDataFromPrefetch();
currentRawRow = currentBuffer.get(currentIdxInBuffer);
}

@Override
public boolean hasNext() {
fillDataFromPrefetch();
if (currentIdxInBuffer < currentBuffer.size()) {
return true;
}

return false;
}

@Override
public Object[] next() {
try {
popRow();
return this.currentRawRow;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* for fetching the row with out incrementing counter.
* @return
*/
public Object[] fetchConverted() throws KeyGenException {
if (null != currentConveretedRawRow) {
return currentConveretedRawRow;
}
if (hasNext()) {
Object[] rawRow = batch.getRawRow(counter);
currentConveretedRawRow = convertRow(rawRow);
return currentConveretedRawRow;
}
else {
return null;
}
pickRow();
return this.currentRawRow;
}

private Object[] convertRow(Object[] rawRow) throws KeyGenException {
Expand All @@ -148,16 +202,9 @@ private Object[] convertRow(Object[] rawRow) throws KeyGenException {
return rawRow;
}

/**
* To check if the batch is processed completely
* @param batch
* @return
*/
private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
if (counter < batch.getSize()) {
return false;
} else {
return true;
public void close() {
if (null != executorService) {
executorService.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class HandoffPartition(
*/
class StreamingRawResultIterator(
recordReader: RecordReader[Void, Any]
) extends RawResultIterator(null, null, null) {
) extends RawResultIterator(null, null, null, true) {

override def hasNext: Boolean = {
recordReader.nextKeyValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public List<RawResultIterator> processTableBlocks(Configuration configuration) t
resultList.add(
new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
sourceSegProperties,
destinationSegProperties));
destinationSegProperties, false));
}
}
return resultList;
Expand Down

0 comments on commit fedba41

Please sign in to comment.