Skip to content

Commit

Permalink
review comments handled
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed Oct 22, 2018
1 parent edd3c57 commit b4a15d6
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 24 deletions.
Expand Up @@ -1845,6 +1845,16 @@ public final class CarbonCommonConstants {
public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MIN = 10;
public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MAX = 1000;

/**
* Written by detail to be written in carbondata footer for better maintanability
*/
public static final String CARBON_WRITTEN_BY_FOOTER_INFO = "written_by";

/**
* carbon version detail to be written in carbondata footer for better maintanability
*/
public static final String CARBON_VERSION_FOOTER_INFO = "version";

private CarbonCommonConstants() {
}
}
3 changes: 1 addition & 2 deletions format/src/main/thrift/carbondata.thrift
Expand Up @@ -206,8 +206,7 @@ struct FileFooter3{
4: optional list<BlockletInfo3> blocklet_info_list3; // Information about blocklets of all columns in this file for V3 format
5: optional dictionary.ColumnDictionaryChunk dictionary; // Blocklet local dictionary
6: optional bool is_sort; // True if the data is sorted in this file, it is used for compaction to decide whether to use merge sort or not
7: optional string written_by; // written by is used to write who wrote the file, it can be LOAD, or SDK etc
8: optional string version; // version in which this carbondata file is written
7: optional map<string, string> extra_info; // written by is used to write who wrote the file, it can be Aplication name, or SDK etc and version in which this carbondata file is written etc
}

/**
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonVersionConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
Expand Down Expand Up @@ -174,6 +175,8 @@ public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String fac
loadModel.setSegmentId("0");
loadModel.setFactTimeStamp(System.currentTimeMillis());
loadModel.setMaxColumns("10");
loadModel.setWrittenBy("StoreCreator7");
loadModel.setVersion(CarbonVersionConstants.CARBONDATA_VERSION);
return loadModel;
}

Expand Down
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.util.CarbonException

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants}
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block._
import org.apache.carbondata.core.datastore.impl.FileFactory
Expand Down Expand Up @@ -71,6 +71,8 @@ class CarbonMergerRDD[K, V](
ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true")

var writtenBy = ss.sparkContext.getConf.get("spark.app.name")

private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
var storeLocation: String = null
var mergeResult: String = null
Expand All @@ -93,6 +95,8 @@ class CarbonMergerRDD[K, V](
} else {
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
}
carbonLoadModel.setWrittenBy(writtenBy)
carbonLoadModel.setVersion(CarbonVersionConstants.CARBONDATA_VERSION)
val partitionSpec = if (carbonTable.isHivePartitionTable) {
carbonSparkPartition.partitionSpec.get
} else {
Expand Down
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.util.{SerializableConfiguration, TaskCompletionListener}
import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.BlockletDetailInfo
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
Expand Down Expand Up @@ -123,6 +123,8 @@ class SparkCarbonFileFormat extends FileFormat
val conf = job.getConfiguration

val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
model.setWrittenBy(sparkSession.sparkContext.getConf.get("spark.app.name"))
model.setVersion(CarbonVersionConstants.CARBONDATA_VERSION)
model.setLoadWithoutConverterStep(true)
CarbonTableOutputFormat.setLoadModel(conf, model)

Expand Down
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants, CarbonVersionConstants}
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
Expand Down Expand Up @@ -213,6 +213,7 @@ case class CarbonLoadDataCommand(
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
carbonLoadModel.setWrittenBy(sparkSession.sparkContext.getConf.get("spark.app.name"))
carbonLoadModel.setVersion(CarbonVersionConstants.CARBONDATA_VERSION)

val javaPartition = mutable.Map[String, String]()
partition.foreach { case (k, v) =>
Expand Down
Expand Up @@ -127,7 +127,7 @@ public class CarbonDataLoadConfiguration {
*/
private String columnCompressor;

private String appName;
private String writtenBy;

private String version;

Expand Down Expand Up @@ -465,12 +465,12 @@ public void setColumnCompressor(String columnCompressor) {
this.columnCompressor = columnCompressor;
}

public String getAppName() {
return appName;
public String getWrittenBy() {
return writtenBy;
}

public void setAppName(String appName) {
this.appName = appName;
public void setWrittenBy(String writtenBy) {
this.writtenBy = writtenBy;
}

public String getVersion() {
Expand Down
Expand Up @@ -316,7 +316,7 @@ public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel lo
configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
}
configuration.setColumnCompressor(loadModel.getColumnCompressor());
configuration.setAppName(loadModel.getAppName());
configuration.setWrittenBy(loadModel.getWrittenBy());
configuration.setVersion(CarbonVersionConstants.CARBONDATA_VERSION);
return configuration;
}
Expand Down
Expand Up @@ -73,7 +73,9 @@ public class CarbonLoadModel implements Serializable {

private String blocksID;

private String appName;
private String writtenBy;

private String version;

/**
* Map from carbon dimension to pre defined dict file path
Expand Down Expand Up @@ -481,7 +483,8 @@ public CarbonLoadModel getCopyWithTaskNo(String taskNo) {
copy.parentTablePath = parentTablePath;
copy.sdkWriterCores = sdkWriterCores;
copy.columnCompressor = columnCompressor;
copy.appName = appName;
copy.writtenBy = writtenBy;
copy.version = version;
return copy;
}

Expand Down Expand Up @@ -539,7 +542,8 @@ public CarbonLoadModel getCopyWithPartition(String header, String delimiter) {
copyObj.parentTablePath = parentTablePath;
copyObj.sdkWriterCores = sdkWriterCores;
copyObj.columnCompressor = columnCompressor;
copyObj.appName = appName;
copyObj.writtenBy = writtenBy;
copyObj.version = version;
return copyObj;
}

Expand Down Expand Up @@ -941,12 +945,19 @@ public void setColumnCompressor(String columnCompressor) {
this.columnCompressor = columnCompressor;
}

public String getAppName() {
return appName;
public String getWrittenBy() {
return writtenBy;
}

public void setWrittenBy(String writtenBy) {
this.writtenBy = writtenBy;
}

public void setAppName(String appName) {
this.appName = appName;
public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}
}
Expand Up @@ -187,7 +187,7 @@ public void setBlockSizeInMB(int blockSize) {

private String columnCompressor;

private String appName;
private String writtenBy;

private String version;

Expand Down Expand Up @@ -317,7 +317,7 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
}
carbonFactDataHandlerModel.dataMapWriterlistener = listener;
carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
carbonFactDataHandlerModel.appName = configuration.getAppName();
carbonFactDataHandlerModel.writtenBy = configuration.getWrittenBy();
carbonFactDataHandlerModel.version = configuration.getVersion();
setNumberOfCores(carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict);
Expand Down Expand Up @@ -393,6 +393,8 @@ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoa
carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
carbonFactDataHandlerModel.setColumnCompressor(loadModel.getColumnCompressor());
carbonFactDataHandlerModel.writtenBy = loadModel.getWrittenBy();
carbonFactDataHandlerModel.version = loadModel.getVersion();

carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable);
DataMapWriterListener listener = new DataMapWriterListener();
Expand Down Expand Up @@ -745,8 +747,8 @@ public void setNoDictAndComplexColumns(CarbonColumn[] noDictAndComplexColumns) {
this.noDictAndComplexColumns = noDictAndComplexColumns;
}

public String getAppName() {
return appName;
public String getWrittenBy() {
return writtenBy;
}

public String getVersion() {
Expand Down
Expand Up @@ -102,8 +102,10 @@ protected void writeFooterToFile() throws CarbonDataWriterException {
.convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
thriftColumnSchemaList.size());
convertFileMeta.setIs_sort(isSorted);
convertFileMeta.setWritten_by(model.getAppName());
convertFileMeta.setVersion(model.getVersion());
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO,
model.getWrittenBy());
convertFileMeta
.putToExtra_info(CarbonCommonConstants.CARBON_VERSION_FOOTER_INFO, model.getVersion());
// fill the carbon index details
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFileName, currentPosition);
// write the footer
Expand Down

0 comments on commit b4a15d6

Please sign in to comment.