Skip to content

Commit

Permalink
[CARBONDATA-2359] Support applicable load options and table propertie…
Browse files Browse the repository at this point in the history
…s for Non-Transactional table
  • Loading branch information
ajantha-bhat committed Apr 20, 2018
1 parent 3664156 commit bf9ee9f
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.carbondata.spark.testsuite.createTable

import java.io.{File, FileFilter}
import java.util

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
Expand All @@ -32,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}

import scala.collection.JavaConverters._

class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {

Expand All @@ -45,22 +47,51 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {

def buildTestDataSingleFile(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildTestData(3,false)
buildTestData(3, false, null)
}

def buildTestDataMultipleFiles(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildTestData(1000000,false)
buildTestData(1000000, false, null)
}

def buildTestDataTwice(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildTestData(3,false)
buildTestData(3,false)
buildTestData(3, false, null)
buildTestData(3, false, null)
}

def buildTestDataSameDirectory(): Any = {
buildTestData(3, false, null)
}

def buildTestDataWithBadRecordForce(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "FORCE").asJava
buildTestData(3, false, options)
}

def buildTestDataWithBadRecordFail(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "FAIL").asJava
buildTestData(3, false, options)
}

def buildTestDataWithBadRecordIgnore(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "IGNORE").asJava
buildTestData(3, false, options)
}

def buildTestDataWithBadRecordRedirect(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
buildTestData(3, false, options)
}


// prepare sdk writer output
def buildTestData(rows:Int, persistSchema:Boolean): Any = {
def buildTestData(rows: Int, persistSchema: Boolean, options: util.Map[String, String]): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
Expand All @@ -79,16 +110,32 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
System.currentTimeMillis)
.buildWriterForCSVInput()
} else {
builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
.buildWriterForCSVInput()
if (options != null) {
builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
.buildWriterForCSVInput()
} else {
builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
.buildWriterForCSVInput()
}

}
var i = 0
while (i < rows) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
if (options != null){
writer.write(Array[String]( "robot" + i, String.valueOf(i.toDouble / 2), "robot"))
} else {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
}
i += 1
}
if (options != null) {
//Keep one valid record. else carbon data file will not generate
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
}
writer.close()
} catch {
case ex: Exception => None
Expand Down Expand Up @@ -284,6 +331,14 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage()
.contains("Unsupported operation on unmanaged table"))

//14. Block clean files
exception = intercept[MalformedCarbonCommandException] {
sql("clean files for table sdkOutputTable")
}
assert(exception.getMessage()
.contains("Unsupported operation on unmanaged table"))


sql("DROP TABLE sdkOutputTable")
//drop table should not delete the files
assert(new File(writerPath).exists())
Expand Down Expand Up @@ -326,8 +381,6 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage()
.contains("Operation not allowed: Invalid table path provided:"))

// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

Expand All @@ -350,8 +403,6 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage()
.contains("Operation not allowed: Invalid table path provided:"))

// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

Expand All @@ -376,13 +427,15 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {

checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

test("Read two sdk writer outputs with same column name placed in same folder") {
buildTestDataTwice()
buildTestDataSingleFile()

assert(new File(writerPath).exists())

sql("DROP TABLE IF EXISTS sdkOutputTable")
Expand All @@ -392,17 +445,81 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
|'$writerPath' """.stripMargin)


checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", 0, 0.0),
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0)))

buildTestDataSameDirectory()

checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", 0, 0.0),
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0),
Row("robot0", 0, 0.0),
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0)))

// test the default sort column behavior in unmanaged table
checkExistence(sql("describe formatted sdkOutputTable"), true,
"SORT_COLUMNS name")

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}

test("test bad records form sdk writer") {

//1. Action = FORCE
buildTestDataWithBadRecordForce()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", null, null),
Row("robot1", null, null),
Row("robot2", null, null),
Row("robot3", 3, 1.5)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())


//2. Action = REDIRECT
buildTestDataWithBadRecordRedirect()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)

checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot3", 3, 1.5)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())

//3. Action = IGNORE
buildTestDataWithBadRecordIgnore()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot3", 3, 1.5)))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())

cleanTestData()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.optimizer.CarbonFilters

import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.exception.ConcurrentOperationException
Expand Down Expand Up @@ -90,6 +91,9 @@ case class CarbonCleanFilesCommand(
private def cleanGarbageData(sparkSession: SparkSession,
databaseNameOp: Option[String], tableName: String): Unit = {
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
if (carbonTable.getTableInfo.isUnManagedTable) {
throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
}
val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
Seq.empty[Expression],
sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
package org.apache.carbondata.sdk.file;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;

import org.apache.carbondata.common.Strings;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
Expand All @@ -57,6 +58,7 @@ public class CarbonWriterBuilder {
private int blockSize;
private boolean isUnManagedTable;
private long UUID;
private Map<String, String> options;

/**
* prepares the builder with the schema provided
Expand Down Expand Up @@ -123,6 +125,43 @@ public CarbonWriterBuilder uniqueIdentifier(long UUID) {
return this;
}

/**
* To support the load options for sdk writer
* @param options key,value pair of load options.
* supported keys values are
* a. bad_records_logger_enable -- true, false
* b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
* c. bad_record_path -- path
* d. dateformat -- same as JAVA SimpleDateFormat
* e. timestampformat -- same as JAVA SimpleDateFormat
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
Objects.requireNonNull(options, "Load options should not be null");
//validate the options.
if (options.size() > 5) {
throw new IllegalArgumentException("Supports only five options now. "
+ "Refer method header or documentation");
}

for (String option: options.keySet()) {
if (!option.equalsIgnoreCase("bad_records_logger_enable") &&
!option.equalsIgnoreCase("bad_records_action") &&
!option.equalsIgnoreCase("bad_record_path") &&
!option.equalsIgnoreCase("dateformat") &&
!option.equalsIgnoreCase("timestampformat")) {
throw new IllegalArgumentException("Unsupported options. "
+ "Refer method header or documentation");
}
}

// convert it to treeMap as keys need to be case insensitive
Map<String, String> optionsTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
optionsTreeMap.putAll(options);
this.options = optionsTreeMap;
return this;
}

/**
* To set the carbondata file size in MB between 1MB-2048MB
* @param blockSize is size in MB between 1MB to 2048 MB
Expand Down Expand Up @@ -180,7 +219,7 @@ private CarbonLoadModel createLoadModel() throws IOException, InvalidLoadOptionE
}

// build LoadModel
return buildLoadModel(table, UUID);
return buildLoadModel(table, UUID, options);
}

/**
Expand All @@ -196,11 +235,22 @@ private CarbonTable buildCarbonTable() {
tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize);
}

List<String> sortColumnsList;
if (sortColumns != null) {
sortColumnsList = Arrays.asList(sortColumns);
List<String> sortColumnsList = new ArrayList<>();
if (sortColumns == null) {
// If sort columns are not specified, default set all dimensions to sort column.
// When dimensions are default set to sort column,
// Inverted index will be supported by default for sort columns.
for (Field field : schema.getFields()) {
if (field.getDataType() == DataTypes.STRING ||
field.getDataType() == DataTypes.DATE ||
field.getDataType() == DataTypes.TIMESTAMP) {
sortColumnsList.add(field.getFieldName());
}
}
sortColumns = new String[sortColumnsList.size()];
sortColumns = sortColumnsList.toArray(sortColumns);
} else {
sortColumnsList = new LinkedList<>();
sortColumnsList = Arrays.asList(sortColumns);
}
for (Field field : schema.getFields()) {
tableSchemaBuilder.addColumn(
Expand Down Expand Up @@ -261,11 +311,10 @@ private void persistSchemaFile(CarbonTable table, String persistFilePath) throws
/**
* Build a {@link CarbonLoadModel}
*/
private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID)
private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, Map<String, String> options)
throws InvalidLoadOptionException, IOException {
Map<String, String> options = new HashMap<>();
if (sortColumns != null) {
options.put("sort_columns", Strings.mkString(sortColumns, ","));
if (options == null) {
options = new HashMap<>();
}
CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
return builder.build(options, UUID);
Expand Down

0 comments on commit bf9ee9f

Please sign in to comment.