Skip to content

Commit

Permalink
[CARBONDATA-2994] unify badrecordpath property name for create and load
Browse files Browse the repository at this point in the history
Problem:
Currently bad records path can be specified in create and load. In create the property name is bad_records_path and load is bad_record_path. This can cause confusion for the user.

Solution: Use bad_record_path as the property for create so that both load and create use the same name.

This closes #2799
  • Loading branch information
kunal642 authored and ravipesala committed Oct 8, 2018
1 parent fa08825 commit 19097f2
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 20 deletions.
11 changes: 11 additions & 0 deletions docs/ddl-of-carbondata.md
Expand Up @@ -33,6 +33,7 @@ CarbonData DDL statements are documented here,which includes:
* [Hive/Parquet folder Structure](#support-flat-folder-same-as-hiveparquet)
* [Extra Long String columns](#string-longer-than-32000-characters)
* [Compression for Table](#compression-for-table)
* [Bad Records Path](#bad-records-path)
* [CREATE TABLE AS SELECT](#create-table-as-select)
* [CREATE EXTERNAL TABLE](#create-external-table)
* [External Table on Transactional table location](#create-external-table-on-managed-table-data-location)
Expand Down Expand Up @@ -454,6 +455,16 @@ CarbonData DDL statements are documented here,which includes:
```
carbon.column.compressor=zstd
```
- ##### Bad Records Path
This property is used to specify the location where bad records would be written.
As the table path remains the same after rename therefore the user can use this property to
specify bad records path for the table at the time of creation, so that the same path can
be later viewed in table description for reference.

```
TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords'')
```

## CREATE TABLE AS SELECT
This function allows user to create a Carbon table from any of the Parquet/Hive/Carbon table. This is beneficial when the user wants to create Carbon table from any other Parquet/Hive table and use the Carbon query engine to query and achieve better query results for cases where Carbon is faster than other file formats. Also this feature can be used for backing up the data.
Expand Down
8 changes: 0 additions & 8 deletions docs/dml-of-carbondata.md
Expand Up @@ -240,14 +240,6 @@ CarbonData DML statements are documented here,which includes:
* Since Bad Records Path can be specified in create, load and carbon properties.
Therefore, value specified in load will have the highest priority, and value specified in carbon properties will have the least priority.

**Bad Records Path:**
This property is used to specify the location where bad records would be written.


```
TBLPROPERTIES('BAD_RECORDS_PATH'='/opt/badrecords'')
```

Example:

```
Expand Down
Expand Up @@ -374,7 +374,7 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
String badRecordsPath = conf.get(BAD_RECORD_PATH);
if (StringUtils.isEmpty(badRecordsPath)) {
badRecordsPath =
carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_record_path");
if (StringUtils.isEmpty(badRecordsPath)) {
badRecordsPath = carbonProperty
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, carbonProperty
Expand Down
Expand Up @@ -44,7 +44,7 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter
test("test partition redirect") {
sql(
s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORD_PATH'='$warehouse')""")

val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
Expand Down
Expand Up @@ -56,7 +56,7 @@ class StreamingOption(val userInputMap: Map[String, String]) {

lazy val badRecordsPath: String =
userInputMap
.getOrElse("bad_records_path", CarbonProperties.getInstance()
.getOrElse("bad_record_path", CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))

Expand Down
Expand Up @@ -844,7 +844,7 @@ class TableNewProcessor(cm: TableModel) {
cm.tableName,
tableSchema.getTableId,
cm.databaseNameOp.getOrElse("default"))
tablePropertiesMap.put("bad_records_path", badRecordsPath)
tablePropertiesMap.put("bad_record_path", badRecordsPath)
tableSchema.setTableProperties(tablePropertiesMap)
if (cm.bucketFields.isDefined) {
val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
Expand Down Expand Up @@ -898,7 +898,7 @@ class TableNewProcessor(cm: TableModel) {
tableId: String,
databaseName: String): String = {
val badRecordsPath = tablePropertiesMap.asScala
.getOrElse("bad_records_path", CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
.getOrElse("bad_record_path", CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
if (badRecordsPath == null || badRecordsPath.isEmpty) {
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
} else {
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.table

import scala.collection.JavaConverters._

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -217,6 +218,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
relation.carbonTable.getTableName).asScala
.map(column => column).mkString(","), ""))

val bad_record_path = relation.carbonTable.getTableInfo.getFactTable
.getTableProperties.get("bad_record_path")
if (!StringUtils.isEmpty(bad_record_path)) {
results ++= Seq(("BAD_RECORD_PATH", bad_record_path, ""))
}
// add columns configured in column meta cache
if (null != tblProps.get(CarbonCommonConstants.COLUMN_META_CACHE)) {
results ++=
Expand Down
Expand Up @@ -44,7 +44,7 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll
test("data load log file and csv file written at the configured location") {
sql(
s"""CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORD_PATH'='$warehouse')""")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
Expand All @@ -64,7 +64,7 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll
def isFilesWrittenAtBadStoreLocation: Boolean = {
val badStorePath =
CarbonEnv.getCarbonTable(Some("default"), "salestest")(sqlContext.sparkSession).getTableInfo
.getFactTable.getTableProperties.get("bad_records_path") + "/0/0"
.getFactTable.getTableProperties.get("bad_record_path") + "/0/0"
val carbonFile: CarbonFile = FileFactory
.getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath))
var exists: Boolean = carbonFile.exists()
Expand Down
Expand Up @@ -1880,7 +1880,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| 'interval'='1 seconds',
| 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
| 'BAD_RECORDS_ACTION' = 'FORCE',
| 'BAD_RECORDS_PATH'='$warehouse')
| 'BAD_RECORD_PATH'='$warehouse')
|AS
| SELECT *
| FROM source
Expand All @@ -1894,7 +1894,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| 'interval'='1 seconds',
| 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
| 'BAD_RECORDS_ACTION' = 'FORCE',
| 'BAD_RECORDS_PATH'='$warehouse')
| 'BAD_RECORD_PATH'='$warehouse')
|AS
| SELECT *
| FROM source
Expand Down Expand Up @@ -2554,7 +2554,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
| 'sort_columns'='name', 'dictionary_include'='city,register', 'BAD_RECORDS_PATH'='$badRecordFilePath')
| 'sort_columns'='name', 'dictionary_include'='city,register', 'BAD_RECORD_PATH'='$badRecordFilePath')
| """.stripMargin)

if (withBatchLoad) {
Expand Down Expand Up @@ -2583,7 +2583,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
| 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'BAD_RECORDS_PATH'='$badRecordFilePath')
| 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'BAD_RECORD_PATH'='$badRecordFilePath')
| """.stripMargin)

if (withBatchLoad) {
Expand Down
Expand Up @@ -130,7 +130,7 @@ public static boolean hasBadRecord(CarbonLoadModel loadModel) {
public static String getBadRecordsPath(Map<String, String> loadOptions, CarbonTable table) {
String badRecordsFromLoad = loadOptions.get("bad_record_path");
String badRecordsFromCreate =
table.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
table.getTableInfo().getFactTable().getTableProperties().get("bad_record_path");
String badRecordsPath;
if (StringUtils.isNotEmpty(badRecordsFromLoad)) {
badRecordsPath =
Expand Down

0 comments on commit 19097f2

Please sign in to comment.