Skip to content

Commit

Permalink
[CARBONDATA-2359][SDK] Support applicable load options and table prop…
Browse files Browse the repository at this point in the history
…erties for Non-Transactional table

Support read multiple sdk writer placed at same path

This closes #2190
  • Loading branch information
ajantha-bhat authored and gvramana committed Apr 23, 2018
1 parent 4a47630 commit 5f32647
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public TableSchema build() {
if (blockletSize > 0) {
property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize));
}
// TODO: check other table properties
if (property.size() != 0) {
schema.setTableProperties(property);
}
Expand All @@ -119,7 +118,14 @@ public TableSchemaBuilder addColumn(StructField field, boolean isSortColumn) {
}
newColumn.setSchemaOrdinal(ordinal++);
newColumn.setColumnar(true);
newColumn.setColumnUniqueId(UUID.randomUUID().toString());

// For NonTransactionalTable, multiple sdk writer output with same column name can be placed in
// single folder for query.
// That time many places in code, columnId check will fail. To avoid that
// keep column ID as same as column name.
// Anyhow Alter table is not supported for NonTransactionalTable.
// SO, this will not have any impact.
newColumn.setColumnUniqueId(field.getFieldName());
newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
if (DataTypes.isDecimal(field.getDataType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2349,6 +2349,7 @@ public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDa
fistFilePath = filePaths.get(0);
} catch (Exception e) {
LOGGER.error("CarbonData file is not present in the table location");
throw new IOException("CarbonData file is not present in the table location");
}
CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath);
List<ColumnSchema> columnSchemaList = carbonHeaderReader.readSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class RestructureUtilTest {
List<ProjectionDimension> result = null;
result = RestructureUtil
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, queryDimensions,
tableBlockDimensions, tableComplexDimensions, queryMeasures.size(), false);
tableBlockDimensions, tableComplexDimensions, queryMeasures.size(), true);
List<CarbonDimension> resultDimension = new ArrayList<>(result.size());
for (ProjectionDimension queryDimension : result) {
resultDimension.add(queryDimension.getDimension());
Expand Down Expand Up @@ -127,7 +127,7 @@ public class RestructureUtilTest {
List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2, queryMeasure3);
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, queryMeasures,
currentBlockMeasures, false);
currentBlockMeasures, true);
MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo();
boolean[] measuresExist = { true, true, false };
assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.carbondata.spark.testsuite.createTable

import java.io.{File, FileFilter}
import java.util

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
Expand All @@ -32,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}

import scala.collection.JavaConverters._

class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

Expand All @@ -45,22 +47,51 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

def buildTestDataSingleFile(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildTestData(3,false)
buildTestData(3, false, null)
}

def buildTestDataMultipleFiles(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildTestData(1000000,false)
buildTestData(1000000, false, null)
}

def buildTestDataTwice(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildTestData(3,false)
buildTestData(3,false)
buildTestData(3, false, null)
buildTestData(3, false, null)
}

def buildTestDataSameDirectory(): Any = {
buildTestData(3, false, null)
}

def buildTestDataWithBadRecordForce(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "FORCE").asJava
buildTestData(3, false, options)
}

def buildTestDataWithBadRecordFail(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "FAIL").asJava
buildTestData(3, false, options)
}

def buildTestDataWithBadRecordIgnore(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "IGNORE").asJava
buildTestData(3, false, options)
}

def buildTestDataWithBadRecordRedirect(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
buildTestData(3, false, options)
}


// prepare sdk writer output
def buildTestData(rows:Int, persistSchema:Boolean): Any = {
def buildTestData(rows: Int, persistSchema: Boolean, options: util.Map[String, String]): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
Expand All @@ -80,17 +111,34 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.uniqueIdentifier(System.currentTimeMillis)
.buildWriterForCSVInput()
} else {
builder.withSchema(Schema.parseJson(schema))
.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis).withBlockSize(2)
.buildWriterForCSVInput()
if (options != null) {
builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
.buildWriterForCSVInput()
} else {
builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
.buildWriterForCSVInput()
}
}
var i = 0
while (i < rows) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
if (options != null){
// writing a bad record
writer.write(Array[String]( "robot" + i, String.valueOf(i.toDouble / 2), "robot"))
} else {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
}
i += 1
}
if (options != null) {
//Keep one valid record. else carbon data file will not generate
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
}
writer.close()
} catch {
case ex: Exception => None
Expand Down Expand Up @@ -150,7 +198,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

test("test create External Table with insert into feature")
{
buildTestData(3, false)
buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql("DROP TABLE IF EXISTS t1")
Expand Down Expand Up @@ -183,7 +231,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

test("test create External Table with insert overwrite")
{
buildTestData(3, false)
buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql("DROP TABLE IF EXISTS t1")
Expand Down Expand Up @@ -222,7 +270,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

test("test create External Table with Load")
{
buildTestData(3, false)
buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql("DROP TABLE IF EXISTS t1")
Expand Down Expand Up @@ -391,6 +439,14 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage()
.contains("Unsupported operation on non transactional table"))

//14. Block clean files
exception = intercept[MalformedCarbonCommandException] {
sql("clean files for table sdkOutputTable")
}
assert(exception.getMessage()
.contains("Unsupported operation on non transactional table"))


sql("DROP TABLE sdkOutputTable")
//drop table should not delete the files
assert(new File(writerPath).exists())
Expand Down Expand Up @@ -433,8 +489,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage()
.contains("Operation not allowed: Invalid table path provided:"))

// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

Expand All @@ -457,8 +511,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage()
.contains("Operation not allowed: Invalid table path provided:"))

// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

Expand All @@ -484,13 +536,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

test("Read two sdk writer outputs with same column name placed in same folder") {
buildTestDataTwice()
buildTestDataSingleFile()

assert(new File(writerPath).exists())

sql("DROP TABLE IF EXISTS sdkOutputTable")
Expand All @@ -500,17 +554,86 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
|'$writerPath' """.stripMargin)


checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", 0, 0.0),
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0)))

buildTestDataSameDirectory()

checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", 0, 0.0),
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0),
Row("robot0", 0, 0.0),
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0)))

//test filter query
checkAnswer(sql("select * from sdkOutputTable where age = 1"), Seq(
Row("robot1", 1, 0.5),
Row("robot1", 1, 0.5)))

// test the default sort column behavior in Nontransactional table
checkExistence(sql("describe formatted sdkOutputTable"), true,
"SORT_COLUMNS name")

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

test("test bad records form sdk writer") {

//1. Action = FORCE
buildTestDataWithBadRecordForce()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", null, null),
Row("robot1", null, null),
Row("robot2", null, null),
Row("robot3", 3, 1.5)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())


//2. Action = REDIRECT
buildTestDataWithBadRecordRedirect()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)

checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot3", 3, 1.5)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())

//3. Action = IGNORE
buildTestDataWithBadRecordIgnore()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot3", 3, 1.5)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())

cleanTestData()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, D
import org.apache.spark.sql.optimizer.CarbonFilters

import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.exception.ConcurrentOperationException
Expand Down Expand Up @@ -117,6 +118,9 @@ case class CarbonCleanFilesCommand(

private def cleanGarbageData(sparkSession: SparkSession,
databaseNameOp: Option[String], tableName: String): Unit = {
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
Seq.empty[Expression],
sparkSession,
Expand Down

0 comments on commit 5f32647

Please sign in to comment.