Skip to content

Commit

Permalink
[CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero …
Browse files Browse the repository at this point in the history
…and vectorRead False.

Problem: SDK reader is failing if vectorRead is false and detail query batch size is given as 0.
Compiler is giving stack overflow error after getting stuck in ChunkRowIterator.hasnext recurssion.

Solution: Since 0 is wrong batch size, we should take DETAIL_QUERY_BATCH_SIZE_DEFAULT as the batch size

This closes #3097
  • Loading branch information
shardul-cr7 authored and kunal642 committed Jan 23, 2019
1 parent df3d4c8 commit 0f1d98f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 45 deletions.
Expand Up @@ -1227,6 +1227,16 @@ private CarbonCommonConstants() {

public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 100;

/**
* Maximum batch size of carbon.detail.batch.size property
*/
public static final int DETAIL_QUERY_BATCH_SIZE_MAX = 1000;

/**
* Minimum batch size of carbon.detail.batch.size property
*/
public static final int DETAIL_QUERY_BATCH_SIZE_MIN = 100;

/**
* max driver lru cache size upto which lru cache will be loaded in memory
*/
Expand Down
Expand Up @@ -94,9 +94,6 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
if (null != batchSizeString) {
try {
batchSize = Integer.parseInt(batchSizeString);
if (0 == batchSize) {
batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
}
} catch (NumberFormatException ne) {
LOGGER.error("Invalid inmemory records size. Using default value");
batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
Expand Down
Expand Up @@ -56,6 +56,10 @@
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MAX;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MIN;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
Expand Down Expand Up @@ -192,6 +196,9 @@ private void validateAndLoadDefaultProperties(String key) {
case CARBON_MINMAX_ALLOWED_BYTE_COUNT:
validateStringCharacterLimit();
break;
case DETAIL_QUERY_BATCH_SIZE:
validateDetailQueryBatchSize();
break;
// TODO : Validation for carbon.lock.type should be handled for addProperty flow
default:
// none
Expand Down Expand Up @@ -256,6 +263,7 @@ private void validateAndLoadDefaultProperties() {
validateEnableQueryStatistics();
validateSortMemorySpillPercentage();
validateStringCharacterLimit();
validateDetailQueryBatchSize();
}

/**
Expand Down Expand Up @@ -1547,5 +1555,34 @@ private void validateStringCharacterLimit() {
}
}


/**
* This method validates the DETAIL_QUERY_BATCH_SIZE. If some invalid input is set, we use the
* default value for this property
*/
private void validateDetailQueryBatchSize() {
String batchSizeString =
carbonProperties.getProperty(DETAIL_QUERY_BATCH_SIZE);
if (batchSizeString == null) {
carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
LOGGER.info(
"Using default value for carbon.detail.batch.size " + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
} else {
int batchSize;
try {
batchSize = Integer.parseInt(batchSizeString);
if (batchSize < DETAIL_QUERY_BATCH_SIZE_MIN || batchSize > DETAIL_QUERY_BATCH_SIZE_MAX) {
LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+ DETAIL_QUERY_BATCH_SIZE_DEFAULT);
carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
}
} catch (NumberFormatException ne) {
LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+ DETAIL_QUERY_BATCH_SIZE_DEFAULT);
carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
}
}
}
}
2 changes: 1 addition & 1 deletion docs/configuration-parameters.md
Expand Up @@ -129,7 +129,7 @@ This section provides the details of all the configurations required for the Car
| carbon.search.master.port | 10020 | Port on which the search master listens for incoming query requests |
| carbon.search.worker.port | 10021 | Port on which search master communicates with the workers. |
| carbon.search.worker.workload.limit | 10 * *carbon.search.scan.thread* | Maximum number of active requests that can be sent to a worker. Beyond which the request needs to be rescheduled for later time or to a different worker. |
| carbon.detail.batch.size | 100 | The buffer size to store records, returned from the block scan. In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
| carbon.detail.batch.size | 100 | The buffer size to store records, returned from the block scan. In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000.<br /><br /> **NOTE** The minimum batch size allowed is 100 and maximum batch size allowed by this property is 1000. |
| carbon.enable.vector.reader | true | Spark added vector processing to optimize cpu cache miss and there by increase the query performance. This configuration enables to fetch data as columnar batch of size 4*1024 rows instead of fetching data row by row and provide it to spark so that there is improvement in select queries performance. |
| carbon.task.distribution | block | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. Each of these task distribution suggestions has its own advantages and disadvantages. Based on the customer use case, appropriate task distribution can be configured.**block**: Setting this value will launch one task per block. This setting is suggested in case of concurrent queries and queries having big shuffling scenarios. **custom**: Setting this value will group the blocks and distribute it uniformly to the available resources in the cluster. This enhances the query performance but not suggested in case of concurrent queries and queries having big shuffling scenarios. **blocklet**: Setting this value will launch one task per blocklet. This setting is suggested in case of concurrent queries and queries having big shuffling scenarios. **merge_small_files**: Setting this value will merge all the small carbondata files upto a bigger size configured by ***spark.sql.files.maxPartitionBytes*** (128 MB is the default value,it is configurable) during querying. The small carbondata files are combined to a map task to reduce the number of read task. This enhances the performance. |
| carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster), CarbonData would combine blocks as 4,3,3 and give it to 3 tasks to run. **NOTE:** When this configuration is false, as per the ***carbon.task.distribution*** configuration, each block/blocklet would be given to each task. |
Expand Down
Expand Up @@ -104,6 +104,29 @@ public void testWriteAndReadFiles() throws IOException, InterruptedException {
FileUtils.deleteDirectory(new File(path));
}

@Test public void testReadWithZeroBatchSize() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
CarbonReader reader;
reader = CarbonReader.builder(path).withRowRecordReader().withBatch(0).build();

int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Assert.assertEquals(("robot" + (i % 10)), row[0]);
Assert.assertEquals(i, row[1]);
i++;
}
Assert.assertEquals(i, 10);
FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
String path = "./testWriteFiles";
Expand Down
Expand Up @@ -131,46 +131,6 @@ private void writeDataMultipleFiles(int numFiles, long numRowsPerFile) {
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
}

@Test public void testReadWithZeroBatchSize() throws InterruptedException {
int numFiles = 5;
int numRowsPerFile = 5;
short numThreads = 4;
writeDataMultipleFiles(numFiles, numRowsPerFile);

// Concurrent Reading
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
try {
long count;
CarbonReader reader =
CarbonReader.builder(dataDir).withRowRecordReader().withBatch(0).build();
List<CarbonReader> multipleReaders = reader.split(numThreads);
try {
List<ReadLogic> tasks = new ArrayList<>();
List<Future<Long>> results;
count = 0;

for (CarbonReader reader_i : multipleReaders) {
tasks.add(new ReadLogic(reader_i));
}
results = executorService.invokeAll(tasks);
for (Future result_i : results) {
count += (long) result_i.get();
}
Assert.assertEquals(numFiles * numRowsPerFile, count);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
}

class ReadLogic implements Callable<Long> {
CarbonReader reader;

Expand Down

0 comments on commit 0f1d98f

Please sign in to comment.