diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 5cc275b88b4..623ef835896 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; @@ -460,27 +461,29 @@ public static class CarbonMultiRecordWriter extends CarbonRecordWriter { private CarbonOutputIteratorWrapper[] iterators; - private int counter; + private AtomicLong counter; CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators, DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, ExecutorService executorService) { super(null, dataLoadExecutor, loadModel, future, executorService); this.iterators = iterators; + counter = new AtomicLong(0); } - @Override public synchronized void write(NullWritable aVoid, ObjectArrayWritable objects) + @Override public void write(NullWritable aVoid, ObjectArrayWritable objects) throws InterruptedException { - iterators[counter].write(objects.get()); - if (++counter == iterators.length) { - //round robin reset - counter = 0; + int hash = (int) (counter.incrementAndGet() % iterators.length); + synchronized (iterators[hash]) { + iterators[hash].write(objects.get()); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { - for (CarbonOutputIteratorWrapper itr : iterators) { - itr.closeWriter(false); + for (int i = 0; i < iterators.length; i++) { + synchronized (iterators[i]) { + iterators[i].closeWriter(false); + } } super.close(taskAttemptContext); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index 15d0994e88d..076f71e21a7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -19,7 +19,6 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -64,10 +63,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce private Map dataFieldsWithComplexDataType; + private short sdkUserCore; + public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration configuration, CarbonIterator[] inputIterators) { super(configuration, null); this.inputIterators = inputIterators; + sdkUserCore = configuration.getWritingCoresCount(); } @Override public DataField[] getOutput() { @@ -133,7 +135,8 @@ private int[] arrangeData(DataField[] dataFields, String[] header) { @Override public Iterator[] execute() { int batchSize = CarbonProperties.getInstance().getBatchSize(); - List>[] readerIterators = partitionInputReaderIterators(); + List>[] readerIterators = + CarbonDataProcessorUtil.partitionInputReaderIterators(this.inputIterators, sdkUserCore); Iterator[] outIterators = new Iterator[readerIterators.length]; for (int i = 0; i < outIterators.length; i++) { outIterators[i] = @@ -144,29 +147,6 @@ private int[] arrangeData(DataField[] dataFields, String[] header) { return outIterators; } - /** - * Partition input iterators equally as per the number of threads. - * - * @return - */ - private List>[] partitionInputReaderIterators() { - // Get the number of cores configured in property. - int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); - // Get the minimum of number of cores and iterators size to get the number of parallel threads - // to be launched. - int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores); - - List>[] iterators = new List[parallelThreadNumber]; - for (int i = 0; i < parallelThreadNumber; i++) { - iterators[i] = new ArrayList<>(); - } - // Equally partition the iterators as per number of threads - for (int i = 0; i < inputIterators.length; i++) { - iterators[i % parallelThreadNumber].add(inputIterators[i]); - } - return iterators; - } - @Override public void close() { if (!closed) { super.close(); diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java new file mode 100644 index 00000000000..9a6e8e05081 --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.avro.generic.GenericData; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * multi-thread Test suite for {@link CSVCarbonWriter} + */ +public class ConcurrentAvroSdkWriterTest { + + private static final int recordsPerItr = 10; + private static final short numOfThreads = 4; + + @Test public void testWriteFiles() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + String mySchema = + "{" + " \"name\": \"address\", " + " \"type\": \"record\", " + " \"fields\": [ " + + " { \"name\": \"name\", \"type\": \"string\"}, " + + " { \"name\": \"age\", \"type\": \"int\"}, " + " { " + " \"name\": \"address\", " + + " \"type\": { " + " \"type\" : \"record\", " + + " \"name\" : \"my_address\", " + " \"fields\" : [ " + + " {\"name\": \"street\", \"type\": \"string\"}, " + + " {\"name\": \"city\", \"type\": \"string\"} " + " ]} " + " } " + "] " + "}"; + + String json = + "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}"; + + // conversion to GenericData.Record + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(mySchema); + GenericData.Record record = TestUtil.jsonToAvro(json, mySchema); + + ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); + try { + CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path); + CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads); + // write in multi-thread + for (int i = 0; i < numOfThreads; i++) { + executorService.submit(new WriteLogic(writer, record)); + } + executorService.shutdown(); + executorService.awaitTermination(2, TimeUnit.HOURS); + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + // read the files and verify the count + CarbonReader reader; + try { + reader = + CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build(); + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + i++; + } + Assert.assertEquals(i, numOfThreads * recordsPerItr); + reader.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + FileUtils.deleteDirectory(new File(path)); + } + + class WriteLogic implements Runnable { + CarbonWriter writer; + GenericData.Record record; + + WriteLogic(CarbonWriter writer, GenericData.Record record) { + this.writer = writer; + this.record = record; + } + + @Override public void run() { + try { + for (int i = 0; i < recordsPerItr; i++) { + writer.write(record); + } + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + } + +}