diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java index 862c36c09e9..4372b447cbd 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java @@ -17,10 +17,27 @@ package org.apache.carbondata.core.statusmanager; +import java.util.HashSet; +import java.util.Set; + /** * Provides the constant name for the file format properties */ public class FileFormatProperties { + private static final Set SUPPORTED_EXTERNAL_FORMAT = new HashSet() { + { + add("csv"); + add("kafka"); + } + }; + + public static boolean isExternalFormatSupported(String format) { + return SUPPORTED_EXTERNAL_FORMAT.contains(format.toLowerCase()); + } + public static Set getSupportedExternalFormat() { + return SUPPORTED_EXTERNAL_FORMAT; + } + public static class CSV { public static final String HEADER = "csv.header"; public static final String DELIMITER = "csv.delimiter"; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 5fdc5229860..b6fc4b31ede 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -55,10 +55,10 @@ import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.stats.QueryStatisticsRecorder; -import org.apache.carbondata.core.util.BlockletDataMapUtil; import org.apache.carbondata.core.statusmanager.FileFormat; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.BlockletDataMapUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala index 7f07878ccb3..85ccc108f85 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala @@ -151,7 +151,7 @@ class CsvBasedCarbonTableSuite extends QueryTest ) } - assert(expectedException.getMessage.contains("Currently we only support csv as external file format")) + assert(expectedException.getMessage.contains("Unsupported external format parquet")) } test("test csv based carbon table: the sequence of header does not match schema") { diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 56e91f9ab7a..2fdbba7b474 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ListBuffer +import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.TableIdentifier @@ -42,7 +43,7 @@ import org.apache.carbondata.core.metadata.schema._ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} +import org.apache.carbondata.core.statusmanager.{FileFormatProperties, LoadMetadataDetails, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -892,8 +893,11 @@ class TableNewProcessor(cm: TableModel) { tableInfo.setFactTable(tableSchema) val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT) if (format.isDefined) { - if (!format.get.equalsIgnoreCase("csv")) { - CarbonException.analysisException(s"Currently we only support csv as external file format") + if (!FileFormatProperties.isExternalFormatSupported(format.get)) { + CarbonException.analysisException( + s"Unsupported external format ${format.get}, currently carbondata only support" + + s" ${FileFormatProperties.getSupportedExternalFormat.asScala.mkString(", ")}" + + s" as external file format") } tableInfo.setFormat(format.get) val formatProperties = cm.tableProperties.filter(pair =>