Skip to content

Commit

Permalink
Merge 98172df into 2fb7dc9
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Sep 15, 2018
2 parents 2fb7dc9 + 98172df commit eb3fb99
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 10 deletions.
1 change: 1 addition & 0 deletions format/src/main/thrift/carbondata.thrift
Expand Up @@ -204,6 +204,7 @@ struct FileFooter3{
3: required list<BlockletIndex> blocklet_index_list; // Blocklet index of all blocklets in this file
4: optional list<BlockletInfo3> blocklet_info_list3; // Information about blocklets of all columns in this file for V3 format
5: optional dictionary.ColumnDictionaryChunk dictionary; // Blocklet local dictionary
6: optional bool is_sort; // True if the data is sorted in this file, it is used for compaction to decide whether to use merge sort or not
}

/**
Expand Down
Expand Up @@ -43,9 +43,9 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
assertResult(2)(result.length)
assertResult("table_info1")(result(0).getString(0))
// 2087 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
assertResult(2187)(result(0).getLong(1))
assertResult(2188)(result(0).getLong(1))
assertResult("table_info2")(result(1).getString(0))
assertResult(2187)(result(1).getLong(1))
assertResult(2188)(result(1).getLong(1))
}

override def afterAll: Unit = {
Expand Down
Expand Up @@ -365,7 +365,7 @@ public void closeHandler() throws CarbonDataWriterException {
}
consumerExecutorService.shutdownNow();
processWriteTaskSubmitList(consumerExecutorServiceTaskList);
this.dataWriter.writeFooterToFile();
this.dataWriter.writeFooter();
LOGGER.info("All blocklets have been finished writing");
// close all the open stream for both the files
this.dataWriter.closeWriter();
Expand Down
Expand Up @@ -229,7 +229,7 @@ protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
LOGGER.info("Writing data to file as max file size reached for file: "
+ activeFile + ". Data block size: " + currentFileSize);
// write meta data to end of the existing file
writeBlockletInfoToFile();
writeFooterToFile();
this.currentFileSize = 0;
this.dataChunksOffsets = new ArrayList<>();
this.dataChunksLength = new ArrayList<>();
Expand Down Expand Up @@ -324,7 +324,7 @@ public void initializeWriter() throws CarbonDataWriterException {
/**
* This method will write metadata at the end of file file format in thrift format
*/
protected abstract void writeBlockletInfoToFile() throws CarbonDataWriterException;
protected abstract void writeFooterToFile() throws CarbonDataWriterException;

/**
* Below method will be used to fill the vlock info details
Expand Down
Expand Up @@ -35,7 +35,7 @@ public interface CarbonFactDataWriter {
*
* @throws CarbonDataWriterException
*/
void writeFooterToFile() throws CarbonDataWriterException;
void writeFooter() throws CarbonDataWriterException;

/**
* Below method will be used to initialise the writer
Expand Down
Expand Up @@ -45,6 +45,7 @@
import static org.apache.carbondata.core.constants.CarbonCommonConstants.TABLE_BLOCKLET_SIZE;
import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB;
import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE;
import static org.apache.carbondata.processing.loading.sort.SortScopeOptions.SortScope.NO_SORT;

/**
* Below class will be used to write the data in V3 format
Expand All @@ -68,6 +69,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
*/
private long blockletSizeThreshold;

/**
* True if this file is sorted
*/
private boolean isSorted;

public CarbonFactDataWriterImplV3(CarbonFactDataHandlerModel model) {
super(model);
String blockletSize =
Expand All @@ -83,17 +89,19 @@ public CarbonFactDataWriterImplV3(CarbonFactDataHandlerModel model) {
LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold);
}
blockletDataHolder = new BlockletDataHolder(fallbackExecutorService, model);
isSorted = model.getSortScope() != NO_SORT;
}

@Override protected void writeBlockletInfoToFile()
throws CarbonDataWriterException {
@Override
protected void writeFooterToFile() throws CarbonDataWriterException {
try {
// get the current file position
long currentPosition = currentOffsetInFile;
// get thrift file footer instance
FileFooter3 convertFileMeta = CarbonMetadataUtil
.convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
thriftColumnSchemaList.size());
convertFileMeta.setIs_sort(isSorted);
// fill the carbon index details
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFileName, currentPosition);
// write the footer
Expand Down Expand Up @@ -376,9 +384,10 @@ public void closeWriter() throws CarbonDataWriterException {
}
}

@Override public void writeFooterToFile() throws CarbonDataWriterException {
@Override
public void writeFooter() throws CarbonDataWriterException {
if (this.blockletMetadata.size() > 0) {
writeBlockletInfoToFile();
writeFooterToFile();
}
}
}

0 comments on commit eb3fb99

Please sign in to comment.