Skip to content

Commit

Permalink
[CARBONDATA-2909] Multi user support for SDK on S3
Browse files Browse the repository at this point in the history
Added support for multiple users with different SK/AK to write concurrently to S3.
Make it mandatory for user to give Hadoop configuration while creating SDK writer/reader.
Passed hadoop configuration to core layer so that FileFactory can access it.
Fixed various SK/AK not found exceptions in CarbonSparkFileFormat.

This closes #2678
  • Loading branch information
kunal642 authored and ravipesala committed Sep 11, 2018
1 parent 7c827c0 commit 8f1a029
Show file tree
Hide file tree
Showing 81 changed files with 616 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
Expand Down Expand Up @@ -99,7 +100,7 @@ public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable)
}
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
getValidAndInvalidSegments(carbonTable);
getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration());
List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
DataMapExprWrapper dataMapExprWrapper = null;
Expand Down Expand Up @@ -140,7 +141,7 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
List<PartitionSpec> partitionsToPrune) throws IOException {
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
getValidAndInvalidSegments(carbonTable);
getValidAndInvalidSegments(carbonTable, validSegments.get(0).getConfiguration());
List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
DistributableDataMapFormat dataMapFormat =
createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments,
Expand All @@ -152,8 +153,9 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
}

private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
CarbonTable carbonTable) throws IOException {
SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
CarbonTable carbonTable, Configuration configuration) throws IOException {
SegmentStatusManager ssm =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
return ssm.getValidAndInvalidSegments();
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import org.apache.hadoop.conf.Configuration;

/**
* Represents one load of carbondata
*/
Expand Down Expand Up @@ -64,6 +66,11 @@ public Segment(String segmentNo) {
this.segmentNo = segmentNo;
}

public Segment(String segmentNo, ReadCommittedScope readCommittedScope) {
this.segmentNo = segmentNo;
this.readCommittedScope = readCommittedScope;
}

/**
* ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
* a NullPointerException. In case getCommittedIndexFile is need to be accessed then
Expand Down Expand Up @@ -202,6 +209,10 @@ public static Segment getSegment(String segmentNo, LoadMetadataDetails[] loadMet
return null;
}

public Configuration getConfiguration() {
return readCommittedScope.getConfiguration();
}

public Set<String> getFilteredIndexShardNames() {
return filteredIndexShardNames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public interface DataMap<T extends Blocklet> {
/**
* It is called to load the data map to memory or to initialize it.
*/
void init(DataMapModel dataMapModel) throws MemoryException, IOException;
void init(DataMapModel dataMapModel)
throws MemoryException, IOException;

/**
* Prune the datamap with resolved filter expression and partition information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@

package org.apache.carbondata.core.datamap.dev;

import org.apache.hadoop.conf.Configuration;

/**
* Information required to build datamap
*/
public class DataMapModel {

private String filePath;

public DataMapModel(String filePath) {
private Configuration configuration;

public DataMapModel(String filePath, Configuration configuration) {
this.filePath = filePath;
this.configuration = configuration;
}

public String getFilePath() {
return filePath;
}

public Configuration getConfiguration() {
return configuration;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,12 @@ public boolean delete() {
@Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, Configuration hadoopConf) throws IOException {
return getDataInputStream(path, fileType, bufferSize,
CarbonUtil.inferCompressorFromFileName(path));
CarbonUtil.inferCompressorFromFileName(path), hadoopConf);
}

@Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, String compressor) throws IOException {
return getDataInputStream(path, fileType, bufferSize, FileFactory.getConfiguration());
}

/**
Expand All @@ -305,12 +310,12 @@ public boolean delete() {
return new DataInputStream(new BufferedInputStream(stream));
}

@Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, String compressor) throws IOException {
private DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, String compressor, Configuration configuration) throws IOException {
path = path.replace("\\", "/");
Path pt = new Path(path);
InputStream inputStream;
FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
FileSystem fs = pt.getFileSystem(configuration);
if (bufferSize <= 0) {
inputStream = fs.open(pt);
} else {
Expand Down Expand Up @@ -509,7 +514,7 @@ public List<CarbonFile> listFiles(Boolean recursive) throws IOException {
RemoteIterator<LocatedFileStatus> listStatus = null;
if (null != fileStatus && fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
listStatus = path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive);
listStatus = fs.listFiles(path, recursive);
} else {
return new ArrayList<CarbonFile>();
}
Expand All @@ -521,8 +526,7 @@ public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOExcep
if (null != fileStatus && fileStatus.isDirectory()) {
List<FileStatus> listStatus = new ArrayList<>();
Path path = fileStatus.getPath();
RemoteIterator<LocatedFileStatus> iter =
path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus fileStatus = iter.next();
if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public static DataInputStream getDataInputStream(String path, FileType fileType)
return getDataInputStream(path, fileType, -1);
}

public static DataInputStream getDataInputStream(String path, FileType fileType,
Configuration configuration) throws IOException {
return getDataInputStream(path, fileType, -1, configuration);
}

public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
throws IOException {
return getDataInputStream(path, fileType, bufferSize, getConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.BlockletDataMapUtil;

import org.apache.hadoop.conf.Configuration;

/**
* Class to handle loading, unloading,clearing,storing of the table
* blocks
Expand Down Expand Up @@ -87,7 +89,8 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
List<BlockDataMap> dataMaps = new ArrayList<>();
if (blockletDataMapIndexWrapper == null) {
try {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
SegmentIndexFileStore indexFileStore =
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
Set<String> filesRead = new HashSet<>();
String segmentFilePath = identifier.getIndexFilePath();
if (segInfoCache == null) {
Expand All @@ -97,7 +100,8 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
segInfoCache.get(segmentFilePath);
if (carbonDataFileBlockMetaInfoMapping == null) {
carbonDataFileBlockMetaInfoMapping =
BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
identifierWrapper.getConfiguration());
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
}
// if the identifier is not a merge file we can directly load the datamaps
Expand All @@ -107,10 +111,12 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
carbonDataFileBlockMetaInfoMapping);
BlockDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe());
identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe(),
identifierWrapper.getConfiguration());
dataMaps.add(blockletDataMap);
blockletDataMapIndexWrapper =
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
identifierWrapper.getConfiguration());
} else {
// if the identifier is a merge file then collect the index files and load the datamaps
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
Expand All @@ -125,12 +131,14 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
BlockDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(),
identifierWrapper.isAddTableBlockToUnsafe());
identifierWrapper.isAddTableBlockToUnsafe(),
identifierWrapper.getConfiguration());
dataMaps.add(blockletDataMap);
}
}
blockletDataMapIndexWrapper =
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
identifierWrapper.getConfiguration());
}
lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
blockletDataMapIndexWrapper.getMemorySize());
Expand Down Expand Up @@ -265,7 +273,7 @@ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIden
*/
private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
CarbonTable carbonTable, boolean addTableBlockToUnsafe)
CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration)
throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
identifier.getUniqueTableSegmentIdentifier();
Expand All @@ -279,7 +287,7 @@ private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifie
dataMap.init(new BlockletDataMapModel(carbonTable,
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe));
blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration));
}
return dataMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,27 @@
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;

import org.apache.hadoop.conf.Configuration;

/**
* A cacheable wrapper of datamaps
*/
public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {

private static final long serialVersionUID = -2859075086955465810L;

private List<BlockDataMap> dataMaps;

private String segmentId;

private transient Configuration configuration;

// size of the wrapper. basically the total size of the datamaps this wrapper is holding
private long wrapperSize;

public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps) {
public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps, Configuration
configuration) {
this.configuration = configuration;
this.dataMaps = dataMaps;
this.wrapperSize = 0L;
this.segmentId = segmentId;
Expand Down Expand Up @@ -72,4 +80,8 @@ public List<BlockDataMap> getDataMaps() {
public String getSegmentId() {
return segmentId;
}

public Configuration getConfiguration() {
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import java.io.Serializable;

import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;

import org.apache.hadoop.conf.Configuration;

/**
* Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info
* This is just a wrapper passed between methods like a context, This object must never be cached.
Expand All @@ -35,6 +38,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {

// holds the reference to CarbonTable
private CarbonTable carbonTable;

private transient Configuration configuration;
/**
* flag to specify whether to load table block metadata in unsafe or safe. Default value is true
*/
Expand All @@ -44,6 +49,15 @@ public TableBlockIndexUniqueIdentifierWrapper(
TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) {
this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
this.carbonTable = carbonTable;
this.configuration = FileFactory.getConfiguration();
}

public TableBlockIndexUniqueIdentifierWrapper(
TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable,
Configuration configuration) {
this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
this.carbonTable = carbonTable;
this.configuration = configuration;
}

// Note: The constructor is getting used in extensions with other functionalities.
Expand All @@ -53,6 +67,7 @@ public TableBlockIndexUniqueIdentifierWrapper(
boolean addTableBlockToUnsafe) {
this(tableBlockIndexUniqueIdentifier, carbonTable);
this.addTableBlockToUnsafe = addTableBlockToUnsafe;
this.configuration = FileFactory.getConfiguration();
}


Expand All @@ -67,4 +82,8 @@ public CarbonTable getCarbonTable() {
public boolean isAddTableBlockToUnsafe() {
return addTableBlockToUnsafe;
}

public Configuration getConfiguration() {
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ public class BlockDataMap extends CoarseGrainDataMap
*/
protected boolean isFilePathStored;

@Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
@Override public void init(DataMapModel dataMapModel)
throws IOException, MemoryException {
long startTime = System.currentTimeMillis();
assert (dataMapModel instanceof BlockletDataMapModel);
BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
DataFileFooterConverter fileFooterConverter =
new DataFileFooterConverter(dataMapModel.getConfiguration());
List<DataFileFooter> indexInfo = fileFooterConverter
.getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
blockletDataMapInfo.getCarbonTable().isTransactionalTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
this.getCarbonTable(), segment.getConfiguration()));
}
}
List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
Expand Down
Loading

0 comments on commit 8f1a029

Please sign in to comment.