diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 5cc275b88b4..623ef835896 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; @@ -460,27 +461,29 @@ public static class CarbonMultiRecordWriter extends CarbonRecordWriter { private CarbonOutputIteratorWrapper[] iterators; - private int counter; + private AtomicLong counter; CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators, DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, ExecutorService executorService) { super(null, dataLoadExecutor, loadModel, future, executorService); this.iterators = iterators; + counter = new AtomicLong(0); } - @Override public synchronized void write(NullWritable aVoid, ObjectArrayWritable objects) + @Override public void write(NullWritable aVoid, ObjectArrayWritable objects) throws InterruptedException { - iterators[counter].write(objects.get()); - if (++counter == iterators.length) { - //round robin reset - counter = 0; + int hash = (int) (counter.incrementAndGet() % iterators.length); + synchronized (iterators[hash]) { + iterators[hash].write(objects.get()); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { - for (CarbonOutputIteratorWrapper itr : iterators) { - itr.closeWriter(false); + for (int i = 0; i < iterators.length; i++) { + synchronized (iterators[i]) { + iterators[i].closeWriter(false); + } } super.close(taskAttemptContext); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index 15d0994e88d..076f71e21a7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -19,7 +19,6 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -64,10 +63,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce private Map dataFieldsWithComplexDataType; + private short sdkUserCore; + public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration configuration, CarbonIterator[] inputIterators) { super(configuration, null); this.inputIterators = inputIterators; + sdkUserCore = configuration.getWritingCoresCount(); } @Override public DataField[] getOutput() { @@ -133,7 +135,8 @@ private int[] arrangeData(DataField[] dataFields, String[] header) { @Override public Iterator[] execute() { int batchSize = CarbonProperties.getInstance().getBatchSize(); - List>[] readerIterators = partitionInputReaderIterators(); + List>[] readerIterators = + CarbonDataProcessorUtil.partitionInputReaderIterators(this.inputIterators, sdkUserCore); Iterator[] outIterators = new Iterator[readerIterators.length]; for (int i = 0; i < outIterators.length; i++) { outIterators[i] = @@ -144,29 +147,6 @@ private int[] arrangeData(DataField[] dataFields, String[] header) { return outIterators; } - /** - * Partition input iterators equally as per the number of threads. - * - * @return - */ - private List>[] partitionInputReaderIterators() { - // Get the number of cores configured in property. - int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); - // Get the minimum of number of cores and iterators size to get the number of parallel threads - // to be launched. - int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores); - - List>[] iterators = new List[parallelThreadNumber]; - for (int i = 0; i < parallelThreadNumber; i++) { - iterators[i] = new ArrayList<>(); - } - // Equally partition the iterators as per number of threads - for (int i = 0; i < inputIterators.length; i++) { - iterators[i % parallelThreadNumber].add(inputIterators[i]); - } - return iterators; - } - @Override public void close() { if (!closed) { super.close();