Skip to content

Commit

Permalink
[CARBONDATA-2520] Clean and close datamap writers on any task failure…
Browse files Browse the repository at this point in the history
… during load

Problem: The datamap writers registered to listener are closed or finished only in case of load success case and not in any failure case. So when tesing lucene, it is found that, after task is failed and the writer is not closed, so the write.lock file written in the index folder of lucene is still exists, so when next task comes to write index in same directory, it fails with the error lock file already exists.

Solution: close the writers if any load task fails.

This closes #2321
  • Loading branch information
akashrn5 authored and jackylk committed May 28, 2018
1 parent 1b6ce8c commit 7f4bd3d
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public abstract class DataMapWriter {

private List<CarbonColumn> indexColumns;

private boolean isWritingFinished;

public DataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
Segment segment, String shardName) {
this.tablePath = tablePath;
Expand Down Expand Up @@ -133,4 +135,12 @@ public static String getDefaultDataMapPath(
String tablePath, String segmentId, String dataMapName) {
return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
}

public boolean isWritingFinished() {
return isWritingFinished;
}

public void setWritingFinished(boolean writingFinished) {
isWritingFinished = writingFinished;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,13 @@ protected void writeBloomDataMapFile() {

@Override
public void finish() throws IOException {
if (indexBloomFilters.size() > 0) {
writeBloomDataMapFile();
if (!isWritingFinished()) {
if (indexBloomFilters.size() > 0) {
writeBloomDataMapFile();
}
releaseResouce();
setWritingFinished(true);
}
releaseResouce();
}

protected void releaseResouce() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,14 @@ public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>>
* class.
*/
public void finish() throws IOException {
flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
// finished a file , close this index writer
if (indexWriter != null) {
indexWriter.close();
if (!isWritingFinished()) {
flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
// finished a file , close this index writer
if (indexWriter != null) {
indexWriter.close();
indexWriter = null;
}
setWritingFinished(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;

/**
* This base abstract class for data loading.
Expand Down Expand Up @@ -149,6 +153,26 @@ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
*/
protected abstract String getStepName();

/**
* This method registers all writer listeners and returns the listener
* @param bucketId bucketId
* @return
*/
protected DataMapWriterListener getDataMapWriterListener(int bucketId) {
CarbonDataFileAttributes carbonDataFileAttributes =
new CarbonDataFileAttributes(Long.parseLong(configuration.getTaskNo()),
(Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
DataMapWriterListener listener = new DataMapWriterListener();
listener.registerAllWriter(
configuration.getTableSpec().getCarbonTable(),
configuration.getSegmentId(),
CarbonTablePath.getShardName(
carbonDataFileAttributes.getTaskId(),
bucketId,
0,
String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
return listener;
}

/**
* Close all resources.This method is called after execute() is finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
Expand Down Expand Up @@ -156,8 +157,9 @@ private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {

private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
String[] storeLocation = getStoreLocation(tableIdentifier);
DataMapWriterListener listener = getDataMapWriterListener(0);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
configuration, storeLocation, 0, iteratorIndex);
configuration, storeLocation, 0, iteratorIndex, listener);
CarbonFactHandler dataHandler = null;
boolean rowsNotExist = true;
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
Expand Down Expand Up @@ -85,8 +86,9 @@ private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
CarbonRowBatch next = iterator.next();
// If no rows from merge sorter, then don't create a file in fact column handler
if (next.hasNext()) {
DataMapWriterListener listener = getDataMapWriterListener(0);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
.createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++);
.createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++, listener);
CarbonFactHandler dataHandler = CarbonFactHandlerFactory
.createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
dataHandler.initialise();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
Expand All @@ -57,6 +58,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {

private long readCounter;

private DataMapWriterListener listener;

public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
super(configuration, child);
Expand Down Expand Up @@ -88,8 +91,9 @@ public CarbonFactDataHandlerModel getDataHandlerModel() {
CarbonTableIdentifier tableIdentifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
String[] storeLocation = getStoreLocation(tableIdentifier);
listener = getDataMapWriterListener(0);
return CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration,
storeLocation, 0, 0);
storeLocation, 0, 0, listener);
}

@Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
Expand Down Expand Up @@ -162,8 +166,9 @@ private void processRange(Iterator<CarbonRowBatch> insideRangeIterator,
CarbonTableIdentifier tableIdentifier, int rangeId) {
String[] storeLocation = getStoreLocation(tableIdentifier);

listener = getDataMapWriterListener(rangeId);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
.createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0);
.createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0, listener);
CarbonFactHandler dataHandler = null;
boolean rowsNotExist = true;
while (insideRangeIterator.hasNext()) {
Expand Down Expand Up @@ -247,4 +252,18 @@ public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyG
return null;
}

@Override public void close() {
if (!closed) {
super.close();
if (listener != null) {
try {
LOGGER.info("closing all the DataMap writers registered to DataMap writer listener");
listener.finish();
} catch (IOException e) {
LOGGER.error(e, "error while closing the datamap writers");
// ignoring the exception
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void setBlockSizeInMB(int blockSize) {
*/
public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
CarbonDataLoadConfiguration configuration, String[] storeLocation, int bucketId,
int taskExtension) {
int taskExtension, DataMapWriterListener listener) {
CarbonTableIdentifier identifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
boolean[] isUseInvertedIndex =
Expand Down Expand Up @@ -258,15 +258,17 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);

DataMapWriterListener listener = new DataMapWriterListener();
listener.registerAllWriter(
configuration.getTableSpec().getCarbonTable(),
configuration.getSegmentId(),
CarbonTablePath.getShardName(
carbonDataFileAttributes.getTaskId(),
bucketId,
0,
String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
if (listener == null) {
listener = new DataMapWriterListener();
listener.registerAllWriter(
configuration.getTableSpec().getCarbonTable(),
configuration.getSegmentId(),
CarbonTablePath.getShardName(
carbonDataFileAttributes.getTaskId(),
bucketId,
0,
String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
}
carbonFactDataHandlerModel.dataMapWriterlistener = listener;
carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();

Expand Down

0 comments on commit 7f4bd3d

Please sign in to comment.