From 594c42f8ba48df97cd46d598a33caf56faf53912 Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Wed, 15 Nov 2017 00:05:03 +0800 Subject: [PATCH] show file format for segment fix testcase --- .../core/statusmanager/FileFormat.java | 17 +++++++++++------ .../core/statusmanager/LoadMetadataDetails.java | 2 +- .../carbondata/hadoop/CarbonInputSplit.java | 2 +- .../hadoop/CarbonMultiBlockSplit.java | 2 +- .../hadoop/api/CarbonTableInputFormat.java | 6 +++--- .../streaming/CarbonStreamInputFormatTest.java | 2 +- .../org/apache/carbondata/api/CarbonStore.scala | 3 ++- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 12 ++++++------ .../carbondata/spark/rdd/CarbonScanRDD.scala | 6 +++--- .../carbondata/spark/util/CommonUtil.scala | 4 ++-- .../spark/sql/CarbonCatalystOperators.scala | 1 + .../segmentreading/TestSegmentReading.scala | 2 +- .../TestStreamingTableOperation.scala | 7 ++++--- .../streaming/segment/StreamSegment.java | 6 +++--- 14 files changed, 40 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java index 83a4813ee8d..c154c5f65dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java @@ -18,23 +18,28 @@ package org.apache.carbondata.core.statusmanager; /** - * the data file format which was supported + * The data file format supported in carbondata project */ public enum FileFormat { - carbondata, rowformat; + + // carbondata columnar file format, optimized for read + COLUMNAR_V3, + + // carbondata row file format, optimized for write + ROW_V1; public static FileFormat getByOrdinal(int ordinal) { if (ordinal < 0 || ordinal >= FileFormat.values().length) { - return carbondata; + return COLUMNAR_V3; } switch (ordinal) { case 0: - return carbondata; + return COLUMNAR_V3; case 1: - return rowformat; + return ROW_V1; } - return carbondata; + return COLUMNAR_V3; } } diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java index b282d53ffec..bb7fc9d0050 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -98,7 +98,7 @@ public void setIndexSize(String indexSize) { /** * the file format of this segment */ - private FileFormat fileFormat = FileFormat.carbondata; + private FileFormat fileFormat = FileFormat.COLUMNAR_V3; public String getPartitionCount() { return partitionCount; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index f7b372f1e60..e89c2d618ad 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -82,7 +82,7 @@ public class CarbonInputSplit extends FileSplit private BlockletDetailInfo detailInfo; - private FileFormat fileFormat = FileFormat.carbondata; + private FileFormat fileFormat = FileFormat.COLUMNAR_V3; public CarbonInputSplit() { segmentId = null; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index d3fa2c23bf9..96fe909c5ed 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -45,7 +45,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { */ private String[] locations; - private FileFormat fileFormat = FileFormat.carbondata; + private FileFormat fileFormat = FileFormat.COLUMNAR_V3; public CarbonMultiBlockSplit() { splitList = null; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 552455a807e..8bf779e70ab 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -475,19 +475,19 @@ private List getSplitsOfStreaming(JobContext job, AbsoluteTableIdent int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(segmentId, path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), - blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat)); + blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(segmentId, path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), - blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat)); + blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(segmentId, path, 0, length, new String[0], - FileFormat.rowformat)); + FileFormat.ROW_V1)); } } } finally { diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java index 9970c50a3f3..4f81518246d 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java @@ -73,7 +73,7 @@ private InputSplit buildInputSplit() throws IOException { List splitList = new ArrayList<>(); splitList.add(carbonInputSplit); return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" }, - FileFormat.rowformat); + FileFormat.ROW_V1); } @Test public void testCreateRecordReader() { diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index a2c9c6d706d..6c2490e2512 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager @@ -90,6 +90,7 @@ object CarbonStore { load.getSegmentStatus.getMessage, startTime, endTime, + load.getFileFormat.toString, mergedTo) }.toSeq } else { diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 2a7ca475e6c..aaeedb48aab 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -306,10 +306,10 @@ class CarbonMergerRDD[K, V]( val splits = format.getSplits(job) // keep on assigning till last one is reached. - if (null != splits && splits.size > 0) { - splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) - .filter { split => FileFormat.carbondata.equals(split.getFileFormat) }.toList.asJava - } + if (null != splits && splits.size > 0) splitsOfLastSegment = + splits.asScala + .map(_.asInstanceOf[CarbonInputSplit]) + .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => { val blockInfo = new TableBlockInfo(entry.getPath.toString, @@ -317,10 +317,10 @@ class CarbonMergerRDD[K, V]( entry.getLocations, entry.getLength, entry.getVersion, updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString) ) - (((!updated) || ((updated) && (!CarbonUtil + ((!updated) || (updated && (!CarbonUtil .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath, updateDetails, updateStatusManager)))) && - FileFormat.carbondata.equals(entry.getFileFormat)) + FileFormat.COLUMNAR_V3.equals(entry.getFileFormat) }) } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index a84b04059c4..b24562c3a2b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -94,7 +94,7 @@ class CarbonScanRDD( val streamSplits = new ArrayBuffer[InputSplit]() splits.asScala.foreach { split => val carbonInputSplit = split.asInstanceOf[CarbonInputSplit] - if (FileFormat.rowformat == carbonInputSplit.getFileFormat) { + if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) { streamSplits += split } else { columnarSplits.add(split) @@ -111,7 +111,7 @@ class CarbonScanRDD( new CarbonMultiBlockSplit(identifier, Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, splitWithIndex._1.getLocations, - FileFormat.rowformat) + FileFormat.ROW_V1) new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit) } if (batchPartitions.isEmpty) { @@ -250,7 +250,7 @@ class CarbonScanRDD( val model = format.getQueryModel(inputSplit, attemptContext) // get RecordReader by FileFormat val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match { - case FileFormat.rowformat => + case FileFormat.ROW_V1 => // create record reader for row format DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) val inputFormat = new CarbonStreamInputFormat diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index f0b33f48492..a922a0749c6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -859,14 +859,14 @@ object CommonUtil { CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath, - carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath, + carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath, segmentIds).collect() } } catch { case _: Exception => if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) { new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath, - carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath, + carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath, segmentIds).collect() } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index 62632df4dd0..f8a54048b0a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -123,6 +123,7 @@ case class ShowLoadsCommand( AttributeReference("Status", StringType, nullable = false)(), AttributeReference("Load Start Time", TimestampType, nullable = false)(), AttributeReference("Load End Time", TimestampType, nullable = true)(), + AttributeReference("File Format", StringType, nullable = false)(), AttributeReference("Merged To", StringType, nullable = false)()) } } diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala index ac3fa5c5977..b23ba2ca116 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala @@ -232,7 +232,7 @@ class TestSegmentReading extends QueryTest with BeforeAndAfterAll { |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin) val df = sql("SHOW SEGMENTS for table carbon_table_show_seg") val col = df.collect().map{ - row => Row(row.getString(0),row.getString(1),row.getString(4)) + row => Row(row.getString(0),row.getString(1),row.getString(5)) }.toSeq assert(col.equals(Seq(Row("2","Success","NA"), Row("1","Compacted","0.1"), diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index b29cca4364d..33aa2c986dd 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -32,8 +32,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types.StructType import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.statusmanager.SegmentStatus +import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -561,11 +560,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } sql("alter table streaming.stream_table_compact compact 'minor'") + sql("show segments for table streaming.stream_table_compact").show val result = sql("show segments for table streaming.stream_table_compact").collect() result.foreach { row => if (row.getString(0).equals("1")) { assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) + assertResult(FileFormat.ROW_V1.toString)(row.getString(4)) } } } @@ -583,7 +584,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { badRecordAction = "force", handoffSize = 1024L * 200 ) - assert(sql("show segments for table streaming.stream_table_new").count() == 4) + assert(sql("show segments for table streaming.stream_table_new").count() > 1) checkAnswer( sql("select count(*) from streaming.stream_table_new"), diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 76824375cef..01875978fe2 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -73,7 +73,7 @@ public static String open(CarbonTable table) throws IOException { SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath()); LoadMetadataDetails streamSegment = null; for (LoadMetadataDetails detail : details) { - if (FileFormat.rowformat == detail.getFileFormat()) { + if (FileFormat.ROW_V1 == detail.getFileFormat()) { if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { streamSegment = detail; break; @@ -85,7 +85,7 @@ public static String open(CarbonTable table) throws IOException { LoadMetadataDetails newDetail = new LoadMetadataDetails(); newDetail.setPartitionCount("0"); newDetail.setLoadName("" + segmentId); - newDetail.setFileFormat(FileFormat.rowformat); + newDetail.setFileFormat(FileFormat.ROW_V1); newDetail.setLoadStartTime(System.currentTimeMillis()); newDetail.setSegmentStatus(SegmentStatus.STREAMING); @@ -149,7 +149,7 @@ public static String close(CarbonTable table, String segmentId) LoadMetadataDetails newDetail = new LoadMetadataDetails(); newDetail.setPartitionCount("0"); newDetail.setLoadName("" + newSegmentId); - newDetail.setFileFormat(FileFormat.rowformat); + newDetail.setFileFormat(FileFormat.ROW_V1); newDetail.setLoadStartTime(System.currentTimeMillis()); newDetail.setSegmentStatus(SegmentStatus.STREAMING);