Skip to content

Commit

Permalink
[CARBONDATA-3307] Fix Performance Issue in No Sort
Browse files Browse the repository at this point in the history
When creating the table without sort_columns and loading the data into it, it is generating more carbondata
files than expected. Now the no. of carbondata files is being generated based on the no. of threads launched.
Each thread is initialising its own writer and writing data.

Now we pass the same writer instance to all the threads, so all the threads will write the data to same file.

This closes #3140
  • Loading branch information
shivamasn authored and kumarvishal09 committed Mar 12, 2019
1 parent 182eeb0 commit f5e4793
Showing 1 changed file with 29 additions and 32 deletions.
Expand Up @@ -18,9 +18,7 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -83,16 +81,17 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces

private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;

private List<CarbonFactHandler> carbonFactHandlers;
private CarbonFactHandler dataHandler;

private ExecutorService executorService = null;

private static final Object lock = new Object();

public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
super(configuration, child);
this.localDictionaryGeneratorMap =
CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
this.carbonFactHandlers = new CopyOnWriteArrayList<>();
}

@Override public void initialize() throws IOException {
Expand Down Expand Up @@ -129,20 +128,31 @@ private String[] getStoreLocation() {
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
System.currentTimeMillis());

//Creating a Instance of CarbonFacthandler that will be passed to all the threads
String[] storeLocation = getStoreLocation();
DataMapWriterListener listener = getDataMapWriterListener(0);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
.createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
dataHandler.initialise();

if (iterators.length == 1) {
doExecute(iterators[0], 0);
doExecute(iterators[0], 0, dataHandler);
} else {
executorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
.getCarbonTableIdentifier().getTableName()));
Future[] futures = new Future[iterators.length];
for (int i = 0; i < iterators.length; i++) {
futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i));
futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i, dataHandler));
}
for (Future future : futures) {
future.get();
}
}
finish(dataHandler, 0);
dataHandler = null;
} catch (CarbonDataWriterException e) {
LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
throw new CarbonDataLoadingException(
Expand All @@ -157,31 +167,15 @@ private String[] getStoreLocation() {
return null;
}

private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
String[] storeLocation = getStoreLocation();
DataMapWriterListener listener = getDataMapWriterListener(0);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
configuration, storeLocation, 0, iteratorIndex, listener);
model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
CarbonFactHandler dataHandler = null;
private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
CarbonFactHandler dataHandler) throws IOException {
boolean rowsNotExist = true;
while (iterator.hasNext()) {
if (rowsNotExist) {
rowsNotExist = false;
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
this.carbonFactHandlers.add(dataHandler);
dataHandler.initialise();
}
processBatch(iterator.next(), dataHandler, iteratorIndex);
}
try {
if (!rowsNotExist) {
finish(dataHandler, iteratorIndex);
}
} finally {
carbonFactHandlers.remove(dataHandler);
}


}

Expand Down Expand Up @@ -306,7 +300,9 @@ private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler, i
while (batch.hasNext()) {
CarbonRow row = batch.next();
CarbonRow converted = convertRow(row);
dataHandler.addDataToStore(converted);
synchronized (lock) {
dataHandler.addDataToStore(converted);
}
readCounter[iteratorIndex]++;
}
writeCounter[iteratorIndex] += batch.getSize();
Expand All @@ -320,15 +316,18 @@ class DataWriterRunnable implements Runnable {

private Iterator<CarbonRowBatch> iterator;
private int iteratorIndex = 0;
private CarbonFactHandler dataHandler = null;

DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
CarbonFactHandler dataHandler) {
this.iterator = iterator;
this.iteratorIndex = iteratorIndex;
this.dataHandler = dataHandler;
}

@Override public void run() {
try {
doExecute(this.iterator, iteratorIndex);
doExecute(this.iterator, iteratorIndex, dataHandler);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
throw new RuntimeException(e);
Expand All @@ -342,11 +341,9 @@ class DataWriterRunnable implements Runnable {
if (null != executorService) {
executorService.shutdownNow();
}
if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) {
for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
carbonFactHandler.finish();
carbonFactHandler.closeHandler();
}
if (null != dataHandler) {
dataHandler.finish();
dataHandler.closeHandler();
}
}
}
Expand Down

0 comments on commit f5e4793

Please sign in to comment.