Skip to content

Commit

Permalink
added configuration to SDK reader/writer
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Sep 10, 2018
1 parent 9ebab57 commit ead37b8
Show file tree
Hide file tree
Showing 80 changed files with 566 additions and 302 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
Expand Down Expand Up @@ -99,7 +100,7 @@ public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable)
}
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
getValidAndInvalidSegments(carbonTable);
getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration());
List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
DataMapExprWrapper dataMapExprWrapper = null;
Expand Down Expand Up @@ -140,7 +141,7 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
List<PartitionSpec> partitionsToPrune) throws IOException {
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
getValidAndInvalidSegments(carbonTable);
getValidAndInvalidSegments(carbonTable, validSegments.get(0).getConfiguration());
List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
DistributableDataMapFormat dataMapFormat =
createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments,
Expand All @@ -152,8 +153,9 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
}

private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
CarbonTable carbonTable) throws IOException {
SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
CarbonTable carbonTable, Configuration configuration) throws IOException {
SegmentStatusManager ssm =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
return ssm.getValidAndInvalidSegments();
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
Expand Up @@ -32,6 +32,8 @@
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import org.apache.hadoop.conf.Configuration;

/**
* Represents one load of carbondata
*/
Expand Down Expand Up @@ -64,6 +66,11 @@ public Segment(String segmentNo) {
this.segmentNo = segmentNo;
}

public Segment(String segmentNo, ReadCommittedScope readCommittedScope) {
this.segmentNo = segmentNo;
this.readCommittedScope = readCommittedScope;
}

/**
* ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
* a NullPointerException. In case getCommittedIndexFile is need to be accessed then
Expand Down Expand Up @@ -202,6 +209,10 @@ public static Segment getSegment(String segmentNo, LoadMetadataDetails[] loadMet
return null;
}

public Configuration getConfiguration() {
return readCommittedScope.getConfiguration();
}

public Set<String> getFilteredIndexShardNames() {
return filteredIndexShardNames;
}
Expand Down
Expand Up @@ -37,7 +37,8 @@ public interface DataMap<T extends Blocklet> {
/**
* It is called to load the data map to memory or to initialize it.
*/
void init(DataMapModel dataMapModel) throws MemoryException, IOException;
void init(DataMapModel dataMapModel)
throws MemoryException, IOException;

/**
* Prune the datamap with resolved filter expression and partition information.
Expand Down
Expand Up @@ -17,18 +17,28 @@

package org.apache.carbondata.core.datamap.dev;

import org.apache.hadoop.conf.Configuration;

/**
* Information required to build datamap
*/
public class DataMapModel {

private String filePath;

public DataMapModel(String filePath) {
private Configuration configuration;

public DataMapModel(String filePath, Configuration configuration) {
this.filePath = filePath;
this.configuration = configuration;
}

public String getFilePath() {
return filePath;
}

public Configuration getConfiguration() {
return configuration;
}

}
Expand Up @@ -282,7 +282,12 @@ public boolean delete() {
@Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, Configuration hadoopConf) throws IOException {
return getDataInputStream(path, fileType, bufferSize,
CarbonUtil.inferCompressorFromFileName(path));
CarbonUtil.inferCompressorFromFileName(path), hadoopConf);
}

@Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, String compressor) throws IOException {
return getDataInputStream(path, fileType, bufferSize, FileFactory.getConfiguration());
}

/**
Expand All @@ -305,12 +310,12 @@ public boolean delete() {
return new DataInputStream(new BufferedInputStream(stream));
}

@Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, String compressor) throws IOException {
private DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, String compressor, Configuration configuration) throws IOException {
path = path.replace("\\", "/");
Path pt = new Path(path);
InputStream inputStream;
FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
FileSystem fs = pt.getFileSystem(configuration);
if (bufferSize <= 0) {
inputStream = fs.open(pt);
} else {
Expand Down Expand Up @@ -509,7 +514,7 @@ public List<CarbonFile> listFiles(Boolean recursive) throws IOException {
RemoteIterator<LocatedFileStatus> listStatus = null;
if (null != fileStatus && fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
listStatus = path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive);
listStatus = fs.listFiles(path, recursive);
} else {
return new ArrayList<CarbonFile>();
}
Expand All @@ -521,8 +526,7 @@ public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOExcep
if (null != fileStatus && fileStatus.isDirectory()) {
List<FileStatus> listStatus = new ArrayList<>();
Path path = fileStatus.getPath();
RemoteIterator<LocatedFileStatus> iter =
path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus fileStatus = iter.next();
if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
Expand Down
Expand Up @@ -111,6 +111,11 @@ public static DataInputStream getDataInputStream(String path, FileType fileType)
return getDataInputStream(path, fileType, -1);
}

public static DataInputStream getDataInputStream(String path, FileType fileType,
Configuration configuration) throws IOException {
return getDataInputStream(path, fileType, -1, configuration);
}

public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
throws IOException {
return getDataInputStream(path, fileType, bufferSize, getConfiguration());
Expand Down
Expand Up @@ -40,6 +40,8 @@
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.BlockletDataMapUtil;

import org.apache.hadoop.conf.Configuration;

/**
* Class to handle loading, unloading,clearing,storing of the table
* blocks
Expand Down Expand Up @@ -87,7 +89,8 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
List<BlockDataMap> dataMaps = new ArrayList<>();
if (blockletDataMapIndexWrapper == null) {
try {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
SegmentIndexFileStore indexFileStore =
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
Set<String> filesRead = new HashSet<>();
String segmentFilePath = identifier.getIndexFilePath();
if (segInfoCache == null) {
Expand All @@ -97,7 +100,8 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
segInfoCache.get(segmentFilePath);
if (carbonDataFileBlockMetaInfoMapping == null) {
carbonDataFileBlockMetaInfoMapping =
BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
identifierWrapper.getConfiguration());
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
}
// if the identifier is not a merge file we can directly load the datamaps
Expand All @@ -107,10 +111,12 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
carbonDataFileBlockMetaInfoMapping);
BlockDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe());
identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe(),
identifierWrapper.getConfiguration());
dataMaps.add(blockletDataMap);
blockletDataMapIndexWrapper =
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
identifierWrapper.getConfiguration());
} else {
// if the identifier is a merge file then collect the index files and load the datamaps
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
Expand All @@ -125,12 +131,14 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
BlockDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(),
identifierWrapper.isAddTableBlockToUnsafe());
identifierWrapper.isAddTableBlockToUnsafe(),
identifierWrapper.getConfiguration());
dataMaps.add(blockletDataMap);
}
}
blockletDataMapIndexWrapper =
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
identifierWrapper.getConfiguration());
}
lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
blockletDataMapIndexWrapper.getMemorySize());
Expand Down Expand Up @@ -265,7 +273,7 @@ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIden
*/
private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
CarbonTable carbonTable, boolean addTableBlockToUnsafe)
CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration)
throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
identifier.getUniqueTableSegmentIdentifier();
Expand All @@ -279,7 +287,7 @@ private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifie
dataMap.init(new BlockletDataMapModel(carbonTable,
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe));
blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration));
}
return dataMap;
}
Expand Down
Expand Up @@ -23,19 +23,27 @@
import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;

import org.apache.hadoop.conf.Configuration;

/**
* A cacheable wrapper of datamaps
*/
public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {

private static final long serialVersionUID = -2859075086955465810L;

private List<BlockDataMap> dataMaps;

private String segmentId;

private transient Configuration configuration;

// size of the wrapper. basically the total size of the datamaps this wrapper is holding
private long wrapperSize;

public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps) {
public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps, Configuration
configuration) {
this.configuration = configuration;
this.dataMaps = dataMaps;
this.wrapperSize = 0L;
this.segmentId = segmentId;
Expand Down Expand Up @@ -64,4 +72,8 @@ public List<BlockDataMap> getDataMaps() {
public String getSegmentId() {
return segmentId;
}

public Configuration getConfiguration() {
return configuration;
}
}
Expand Up @@ -21,6 +21,8 @@

import org.apache.carbondata.core.metadata.schema.table.CarbonTable;

import org.apache.hadoop.conf.Configuration;

/**
* Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info
* This is just a wrapper passed between methods like a context, This object must never be cached.
Expand All @@ -35,6 +37,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {

// holds the reference to CarbonTable
private CarbonTable carbonTable;

private transient Configuration configuration;
/**
* flag to specify whether to load table block metadata in unsafe or safe. Default value is true
*/
Expand All @@ -46,6 +50,14 @@ public TableBlockIndexUniqueIdentifierWrapper(
this.carbonTable = carbonTable;
}

public TableBlockIndexUniqueIdentifierWrapper(
TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable,
Configuration configuration) {
this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
this.carbonTable = carbonTable;
this.configuration = configuration;
}

// Note: The constructor is getting used in extensions with other functionalities.
// Kindly do not remove
public TableBlockIndexUniqueIdentifierWrapper(
Expand All @@ -67,4 +79,8 @@ public CarbonTable getCarbonTable() {
public boolean isAddTableBlockToUnsafe() {
return addTableBlockToUnsafe;
}

public Configuration getConfiguration() {
return configuration;
}
}
Expand Up @@ -103,11 +103,13 @@ public class BlockDataMap extends CoarseGrainDataMap
*/
protected boolean isFilePathStored;

@Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
@Override public void init(DataMapModel dataMapModel)
throws IOException, MemoryException {
long startTime = System.currentTimeMillis();
assert (dataMapModel instanceof BlockletDataMapModel);
BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
DataFileFooterConverter fileFooterConverter =
new DataFileFooterConverter(dataMapModel.getConfiguration());
List<DataFileFooter> indexInfo = fileFooterConverter
.getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
blockletDataMapInfo.getCarbonTable().isTransactionalTable());
Expand Down
Expand Up @@ -137,7 +137,7 @@ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
this.getCarbonTable(), segment.getConfiguration()));
}
}
List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
Expand Down
Expand Up @@ -22,6 +22,8 @@
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;

import org.apache.hadoop.conf.Configuration;

/**
* It is the model object to keep the information to build or initialize BlockletDataMap.
*/
Expand All @@ -37,9 +39,9 @@ public class BlockletDataMapModel extends DataMapModel {

private boolean addToUnsafe = true;

public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
super(filePath);
public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] fileData,
Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, Configuration configuration) {
super(filePath, configuration);
this.fileData = fileData;
this.blockMetaInfoMap = blockMetaInfoMap;
this.segmentId = segmentId;
Expand All @@ -48,8 +50,8 @@ public BlockletDataMapModel(CarbonTable carbonTable, String filePath,

public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId,
boolean addToUnsafe) {
this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId);
boolean addToUnsafe, Configuration configuration) {
this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId, configuration);
this.addToUnsafe = addToUnsafe;
}

Expand Down

0 comments on commit ead37b8

Please sign in to comment.