Skip to content

Commit

Permalink
[CARBONDATA-2491] Fix the error when reader read twice with SDK carbo…
Browse files Browse the repository at this point in the history
…nReader
  • Loading branch information
xubo245 committed May 21, 2018
1 parent f184de8 commit 53f9522
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
}
}
}
} else {
dataMap.clear();
}

if (dataMap == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@
package org.apache.carbondata.sdk.file;

import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CarbonReaderTest {
public class CarbonReaderTest extends TestCase {

@Before
public void cleanFile() {
Expand Down Expand Up @@ -77,6 +85,24 @@ public void testWriteAndReadFiles() throws IOException, InterruptedException {
Assert.assertEquals(i, 100);

reader.close();

// Read again
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.build();

i = 0;
while (reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
Assert.assertEquals(name[i], row[0]);
Assert.assertEquals(age[i], row[1]);
i++;
}
Assert.assertEquals(i, 100);
reader2.close();

FileUtils.deleteDirectory(new File(path));
}

Expand Down Expand Up @@ -177,4 +203,134 @@ public void testWriteAndReadFilesNonTransactional() throws IOException, Interrup
reader.close();
FileUtils.deleteDirectory(new File(path));
}

CarbonProperties carbonProperties;

@Override
public void setUp() {
carbonProperties = CarbonProperties.getInstance();
}

private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonReaderTest.class.getName());

@Test
public void testTimeStampAndBadRecord() throws IOException, InterruptedException {
String timestampFormat = carbonProperties.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
String badRecordAction = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
String badRecordLoc = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
String rootPath = new File(this.getClass().getResource("/").getPath()
+ "../../").getCanonicalPath();
String storeLocation = rootPath + "/target/";
carbonProperties
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, storeLocation)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd hh:mm:ss")
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[9];
fields[0] = new Field("stringField", DataTypes.STRING);
fields[1] = new Field("intField", DataTypes.INT);
fields[2] = new Field("shortField", DataTypes.SHORT);
fields[3] = new Field("longField", DataTypes.LONG);
fields[4] = new Field("doubleField", DataTypes.DOUBLE);
fields[5] = new Field("boolField", DataTypes.BOOLEAN);
fields[6] = new Field("dateField", DataTypes.DATE);
fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));

try {
CarbonWriterBuilder builder = CarbonWriter.builder()
.withSchema(new Schema(fields))
.isTransactionalTable(true)
.persistSchemaFile(true)
.outputPath(path);

CarbonWriter writer = builder.buildWriterForCSVInput();

for (int i = 0; i < 100; i++) {
String[] row = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2018-05-12",
"2018-05-12",
"12.345"
};
writer.write(row);
String[] row2 = new String[]{
"robot" + (i % 10),
String.valueOf(i),
String.valueOf(i),
String.valueOf(Long.MAX_VALUE - i),
String.valueOf((double) i / 2),
String.valueOf(true),
"2019-03-02",
"2019-02-12 03:03:34",
"12.345"
};
writer.write(row2);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
LOGGER.audit("Bad record location:" + storeLocation);
File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
Assert.assertTrue(segmentFolder.exists());

File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
}
});
Assert.assertNotNull(dataFiles);
Assert.assertTrue(dataFiles.length > 0);

CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{
"stringField"
, "shortField"
, "intField"
, "longField"
, "doubleField"
, "boolField"
, "dateField"
, "timeField"
, "decimalField"}).build();

int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
int id = (int) row[2];
Assert.assertEquals("robot" + (id % 10), row[0]);
Assert.assertEquals(Short.parseShort(String.valueOf(id)), row[1]);
Assert.assertEquals(Long.MAX_VALUE - id, row[3]);
Assert.assertEquals((double) id / 2, row[4]);
Assert.assertEquals(true, (boolean) row[5]);
// TODO: verify date
Assert.assertEquals("2019-02-12 03:03:34.0", new Timestamp((long) row[7] / 1000).toString());
i++;
}
Assert.assertEquals(i, 100);

reader.close();
FileUtils.deleteDirectory(new File(path));
carbonProperties.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
timestampFormat);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
badRecordAction);
carbonProperties.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
badRecordLoc);
}

}

0 comments on commit 53f9522

Please sign in to comment.