Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-2491] Fix the error when reader read twice with SDK carbonReader #2318

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -411,7 +411,7 @@ public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) {
}

/**
* this methos clears the datamap of table from memory
* this methods clears the datamap of table from memory
*/
public void clearDataMaps(String tableUniqName) {
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqName);
Expand Down
Expand Up @@ -23,6 +23,7 @@

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
Expand Down Expand Up @@ -118,6 +119,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
CarbonUtil.clearDictionaryCache(entry.getValue());
}
}
// Clear the datamap cache
DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear();
// close read support
readSupport.close();
try {
Expand Down
Expand Up @@ -18,21 +18,30 @@
package org.apache.carbondata.sdk.file;

import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CarbonReaderTest {
public class CarbonReaderTest extends TestCase {

@Before
public void cleanFile() {
Expand Down Expand Up @@ -77,6 +86,59 @@ public void testWriteAndReadFiles() throws IOException, InterruptedException {
Assert.assertEquals(i, 100);

reader.close();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case points to sequential read. One reader gets closed and second one starts. What exactly happens when there is parallel read of two readers. Can we have a test case for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will add. What's more, search mode has used CarbonRecordReader, there are some test case to concurrent run in org.apache.carbondata.examples.SearchModeExample.

// Read again
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test case of two sequential reads but without closing the 1st reader, 2nd reader starts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done

.build();

i = 0;
while (reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
Assert.assertEquals(name[i], row[0]);
Assert.assertEquals(age[i], row[1]);
i++;
}
Assert.assertEquals(i, 100);
reader2.close();

FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadFilesParallel() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(new Schema(fields), path, true);

CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();
// Reader 2
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();

while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
Object[] row2 = (Object[]) reader2.readNextRow();
// parallel compare
Assert.assertEquals(row[0], row2[0]);
Assert.assertEquals(row[1], row2[1]);
}

reader.close();
reader2.close();

FileUtils.deleteDirectory(new File(path));
}

Expand Down Expand Up @@ -177,4 +239,134 @@ public void testWriteAndReadFilesNonTransactional() throws IOException, Interrup
reader.close();
FileUtils.deleteDirectory(new File(path));
}

CarbonProperties carbonProperties;

@Override
public void setUp() {
carbonProperties = CarbonProperties.getInstance();
}

private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonReaderTest.class.getName());

@Test
public void testTimeStampAndBadRecord() throws IOException, InterruptedException {
String timestampFormat = carbonProperties.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
String badRecordAction = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
String badRecordLoc = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
String rootPath = new File(this.getClass().getResource("/").getPath()
+ "../../").getCanonicalPath();
String storeLocation = rootPath + "/target/";
carbonProperties
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, storeLocation)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd hh:mm:ss")
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[9];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("intField", DataTypes.INT);
fields[2] = new Field("shortField", DataTypes.SHORT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));

try {
CarbonWriterBuilder builder = CarbonWriter.builder()
.isTransactionalTable(true)
.persistSchemaFile(true)
.outputPath(path);

CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));

for (int i = 0; i < 100; i++) {
String[] row = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2018-05-12",
"2018-05-12",
"12.345"
};
writer.write(row);
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345"
};
writer.write(row2);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
LOGGER.audit("Bad record location:" + storeLocation);
File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
Assert.assertTrue(segmentFolder.exists());

File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
}
});
Assert.assertNotNull(dataFiles);
Assert.assertTrue(dataFiles.length > 0);

CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{
"stringField"
, "shortField"
, "intField"
, "longField"
, "doubleField"
, "boolField"
, "dateField"
, "timeField"
, "decimalField"}).build();

int i = 0;
while (reader.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try out another test case. Open the reader and then close the reader and then try to do readNextRow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done

Object[] row = (Object[]) reader.readNextRow();
int id = (int) row[2];
Assert.assertEquals("robot" + (id % 10), row[0]);
Assert.assertEquals(Short.parseShort(String.valueOf(id)), row[1]);
Assert.assertEquals(Long.MAX_VALUE - id, row[3]);
Assert.assertEquals((double) id / 2, row[4]);
Assert.assertEquals(true, (boolean) row[5]);
long day = 24L * 3600 * 1000;
Assert.assertEquals("2019-03-02", new Date((day * ((int) row[6]))).toString());
Assert.assertEquals("2019-02-12 03:03:34.0", new Timestamp((long) row[7] / 1000).toString());
i++;
}
Assert.assertEquals(i, 100);

reader.close();
FileUtils.deleteDirectory(new File(path));
carbonProperties.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
timestampFormat);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
badRecordAction);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
badRecordLoc);
}

}