Skip to content

Commit

Permalink
[CARBONDATA-2491] Fix the error when reader read twice with SDK carbo…
Browse files Browse the repository at this point in the history
…nReader

This PR includes:
1. Fix the error out of bound when reader read twice with SDK carbonReader
2. Fix the java.lang.NegativeArraySizeException
3. Add timestamp and bad record test case
4. support parallel read of two readers

This closes #2318
  • Loading branch information
xubo245 authored and kunal642 committed May 24, 2018
1 parent 6cc86db commit a7ac656
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) {
}

/**
* this methos clears the datamap of table from memory
* this methods clears the datamap of table from memory
*/
public void clearDataMaps(String tableUniqName) {
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> {
private CarbonIterator<RowBatch> iterator;

/**
* currect chunk
* current chunk
*/
private RowBatch currentchunk;
private RowBatch currentChunk;

public ChunkRowIterator(CarbonIterator<RowBatch> iterator) {
this.iterator = iterator;
if (iterator.hasNext()) {
currentchunk = iterator.next();
currentChunk = iterator.next();
}
}

Expand All @@ -50,13 +50,13 @@ public ChunkRowIterator(CarbonIterator<RowBatch> iterator) {
* @return {@code true} if the iteration has more elements
*/
@Override public boolean hasNext() {
if (null != currentchunk) {
if ((currentchunk.hasNext())) {
if (null != currentChunk) {
if ((currentChunk.hasNext())) {
return true;
} else if (!currentchunk.hasNext()) {
} else if (!currentChunk.hasNext()) {
while (iterator.hasNext()) {
currentchunk = iterator.next();
if (currentchunk != null && currentchunk.hasNext()) {
currentChunk = iterator.next();
if (currentChunk != null && currentChunk.hasNext()) {
return true;
}
}
Expand All @@ -71,7 +71,7 @@ public ChunkRowIterator(CarbonIterator<RowBatch> iterator) {
* @return the next element in the iteration
*/
@Override public Object[] next() {
return currentchunk.next();
return currentChunk.next();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
Expand Down Expand Up @@ -118,6 +119,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
CarbonUtil.clearDictionaryCache(entry.getValue());
}
}
// Clear the datamap cache
DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear();
// close read support
readSupport.close();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ public class CarbonReader<T> {

private int index;

private boolean initialise;

/**
* Call {@link #builder(String)} to construct an instance
*/
CarbonReader(List<RecordReader<Void, T>> readers) {
if (readers.size() == 0) {
throw new IllegalArgumentException("no reader");
}
this.initialise = true;
this.readers = readers;
this.index = 0;
this.currentReader = readers.get(0);
Expand All @@ -60,6 +63,7 @@ public class CarbonReader<T> {
* Return true if has next row
*/
public boolean hasNext() throws IOException, InterruptedException {
validateReader();
if (currentReader.nextKeyValue()) {
return true;
} else {
Expand All @@ -78,6 +82,7 @@ public boolean hasNext() throws IOException, InterruptedException {
* Read and return next row object
*/
public T readNextRow() throws IOException, InterruptedException {
validateReader();
return currentReader.getCurrentValue();
}

Expand Down Expand Up @@ -111,6 +116,18 @@ public static TableInfo readSchemaFile(String schemaFilePath) throws IOException
* @throws IOException
*/
public void close() throws IOException {
validateReader();
this.currentReader.close();
this.initialise = false;
}

/**
* Validate the reader
*/
private void validateReader() {
if (!this.initialise) {
throw new RuntimeException(this.getClass().getSimpleName() +
" not initialise, please create it first.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,30 @@
package org.apache.carbondata.sdk.file;

import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CarbonReaderTest {
public class CarbonReaderTest extends TestCase {

@Before
public void cleanFile() {
Expand Down Expand Up @@ -77,6 +86,99 @@ public void testWriteAndReadFiles() throws IOException, InterruptedException {
Assert.assertEquals(i, 100);

reader.close();

// Read again
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();

i = 0;
while (reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
Assert.assertEquals(name[i], row[0]);
Assert.assertEquals(age[i], row[1]);
i++;
}
Assert.assertEquals(i, 100);
reader2.close();

FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadFilesParallel() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(new Schema(fields), path, true);

CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();
// Reader 2
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();

while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Object[] row2 = (Object[]) reader2.readNextRow();
// parallel compare
Assert.assertEquals(row[0], row2[0]);
Assert.assertEquals(row[1], row2[1]);
}

reader.close();
reader2.close();

FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadAfterClose() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(new Schema(fields), path, true);

CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{"name", "age"}).build();

reader.close();
String msg = "CarbonReader not initialise, please create it first.";
try {
reader.hasNext();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equals(msg));
}

try {
reader.readNextRow();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equals(msg));
}

try {
reader.close();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equals(msg));
}

FileUtils.deleteDirectory(new File(path));
}

Expand Down Expand Up @@ -177,4 +279,134 @@ public void testWriteAndReadFilesNonTransactional() throws IOException, Interrup
reader.close();
FileUtils.deleteDirectory(new File(path));
}

CarbonProperties carbonProperties;

@Override
public void setUp() {
carbonProperties = CarbonProperties.getInstance();
}

private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonReaderTest.class.getName());

@Test
public void testTimeStampAndBadRecord() throws IOException, InterruptedException {
String timestampFormat = carbonProperties.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
String badRecordAction = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
String badRecordLoc = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
String rootPath = new File(this.getClass().getResource("/").getPath()
+ "../../").getCanonicalPath();
String storeLocation = rootPath + "/target/";
carbonProperties
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, storeLocation)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd hh:mm:ss")
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[9];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("intField", DataTypes.INT);
fields[2] = new Field("shortField", DataTypes.SHORT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));

try {
CarbonWriterBuilder builder = CarbonWriter.builder()
.isTransactionalTable(true)
.persistSchemaFile(true)
.outputPath(path);

CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));

for (int i = 0; i < 100; i++) {
String[] row = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2018-05-12",
"2018-05-12",
"12.345"
};
writer.write(row);
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345"
};
writer.write(row2);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
LOGGER.audit("Bad record location:" + storeLocation);
File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
Assert.assertTrue(segmentFolder.exists());

File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
}
});
Assert.assertNotNull(dataFiles);
Assert.assertTrue(dataFiles.length > 0);

CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{
"stringField"
, "shortField"
, "intField"
, "longField"
, "doubleField"
, "boolField"
, "dateField"
, "timeField"
, "decimalField"}).build();

int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
int id = (int) row[2];
Assert.assertEquals("robot" + (id % 10), row[0]);
Assert.assertEquals(Short.parseShort(String.valueOf(id)), row[1]);
Assert.assertEquals(Long.MAX_VALUE - id, row[3]);
Assert.assertEquals((double) id / 2, row[4]);
Assert.assertEquals(true, (boolean) row[5]);
long day = 24L * 3600 * 1000;
Assert.assertEquals("2019-03-02", new Date((day * ((int) row[6]))).toString());
Assert.assertEquals("2019-02-12 03:03:34.0", new Timestamp((long) row[7] / 1000).toString());
i++;
}
Assert.assertEquals(i, 100);

reader.close();
FileUtils.deleteDirectory(new File(path));
carbonProperties.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
timestampFormat);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
badRecordAction);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
badRecordLoc);
}

}

0 comments on commit a7ac656

Please sign in to comment.