Skip to content

Commit

Permalink
[CARBONDATA-2557] [CARBONDATA-2472] [CARBONDATA-2570] Improve Carbon …
Browse files Browse the repository at this point in the history
…Reader performance on S3 and fixed datamap clear issue in reader

[CARBONDATA-2557] [CARBONDATA-2472] Problem : CarbonReaderBuilder.build() is slower in s3. It takes around 8 seconds to finish build()
Solution: S3 is slow in listFiles, open, FileExist, getCarbonFile operations. So, List down all the calls of those API in the reader flow and remove the redundant checks.

[CARBONDATA-2570] Problem : Carbon SDK Reader, second time reader instance have an issue in cluster test
Solution: Blocklet datamap's of first time reader is not cleared properly in the cluster. Need to change the API to clear the blocklet datamap.

so change
DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear();
to
DataMapStoreManager.getInstance().clearDataMaps(queryModel.getTable().getAbsoluteTableIdentifie());

This closes #2345
  • Loading branch information
ajantha-bhat authored and ravipesala committed Jun 5, 2018
1 parent 2f23486 commit 5f68a79
Show file tree
Hide file tree
Showing 22 changed files with 375 additions and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
import org.apache.carbondata.core.memory.MemoryException;

/**
Expand All @@ -33,10 +33,10 @@ public interface CacheableDataMap {
/**
* Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier
*
* @param tableBlockIndexUniqueIdentifier
* @param tableBlockIndexUniqueIdentifierWrapper
* @param blockletDataMapIndexWrapper
*/
void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
void cache(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public SegmentTaskIndexWrapper get(TableSegmentUniqueIdentifier tableSegmentUniq
segmentTaskIndexWrapper =
loadAndGetTaskIdToSegmentsMap(
tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
CarbonTable.buildFromTablePath("name", "path", false),
CarbonTable.buildDummyTable("path"),
tableSegmentUniqueIdentifier);
} catch (IndexBuilderException e) {
throw new IOException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* blocks
*/
public class BlockletDataMapIndexStore
implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> {
implements Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
/**
Expand All @@ -68,8 +68,10 @@ public BlockletDataMapIndexStore(CarbonLRUCache lruCache) {
}

@Override
public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier)
public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper)
throws IOException {
TableBlockIndexUniqueIdentifier identifier =
identifierWrapper.getTableBlockIndexUniqueIdentifier();
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
(BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
Expand All @@ -84,7 +86,7 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifie
// if the identifier is not a merge file we can directly load the datamaps
if (identifier.getMergeIndexFileName() == null) {
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
.getBlockMetaInfoMap(identifier, indexFileStore, filesRead,
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
Expand All @@ -96,9 +98,10 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifie
BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
tableBlockIndexUniqueIdentifiers) {
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
.getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil.getBlockMetaInfoMap(
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
dataMaps.add(blockletDataMap);
Expand All @@ -119,26 +122,28 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifie
return blockletDataMapIndexWrapper;
}

@Override
public List<BlockletDataMapIndexWrapper> getAll(
List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
@Override public List<BlockletDataMapIndexWrapper> getAll(
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
throws IOException {
List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
List<TableBlockIndexUniqueIdentifierWrapper> missedIdentifiersWrapper = new ArrayList<>();
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null;
// Get the datamaps for each indexfile from cache.
try {
for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier);
for (TableBlockIndexUniqueIdentifierWrapper
identifierWrapper : tableSegmentUniqueIdentifiers) {
BlockletDataMapIndexWrapper dataMapIndexWrapper =
getIfPresent(identifierWrapper);
if (dataMapIndexWrapper != null) {
blockletDataMapIndexWrappers.add(dataMapIndexWrapper);
} else {
missedIdentifiers.add(identifier);
missedIdentifiersWrapper.add(identifierWrapper);
}
}
if (missedIdentifiers.size() > 0) {
for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) {
blockletDataMapIndexWrapper = get(identifier);
if (missedIdentifiersWrapper.size() > 0) {
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
blockletDataMapIndexWrapper = get(identifierWrapper);
blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
}
}
Expand All @@ -151,37 +156,40 @@ public List<BlockletDataMapIndexWrapper> getAll(
}
throw new IOException("Problem in loading segment blocks.", e);
}

return blockletDataMapIndexWrappers;
}

/**
* returns the SegmentTaskIndexWrapper
*
* @param tableSegmentUniqueIdentifier
* @param tableSegmentUniqueIdentifierWrapper
* @return
*/
@Override
public BlockletDataMapIndexWrapper getIfPresent(
TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
@Override public BlockletDataMapIndexWrapper getIfPresent(
TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) {
return (BlockletDataMapIndexWrapper) lruCache.get(
tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
.getUniqueTableSegmentIdentifier());
}

/**
* method invalidate the segment cache for segment
*
* @param tableSegmentUniqueIdentifier
* @param tableSegmentUniqueIdentifierWrapper
*/
@Override
public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
@Override public void invalidate(
TableBlockIndexUniqueIdentifierWrapper tableSegmentUniqueIdentifierWrapper) {
lruCache.remove(tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
.getUniqueTableSegmentIdentifier());
}

@Override
public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
.getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
if (lock == null) {
lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
Expand All @@ -190,16 +198,16 @@ public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
// as in that case clearing unsafe memory need to be taken card. If at all datamap entry
// in the cache need to be overwritten then use the invalidate interface
// and then use the put interface
if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
synchronized (lock) {
if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
try {
for (BlockletDataMap blockletDataMap: dataMaps) {
blockletDataMap.convertToUnsafeDMStore();
}
lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper,
wrapper.getMemorySize());
lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
.getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize());
} catch (Throwable e) {
// clear all the memory acquired by data map in case of any failure
for (DataMap blockletDataMap : dataMaps) {
Expand Down Expand Up @@ -264,14 +272,14 @@ private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
/**
* The method clears the access count of table segments
*
* @param tableSegmentUniqueIdentifiers
* @param tableSegmentUniqueIdentifiersWrapper
*/
@Override
public void clearAccessCount(
List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) {
for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
BlockletDataMap cacheable =
(BlockletDataMap) lruCache.get(identifier.getUniqueTableSegmentIdentifier());
@Override public void clearAccessCount(
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiersWrapper) {
for (TableBlockIndexUniqueIdentifierWrapper
identifierWrapper : tableSegmentUniqueIdentifiersWrapper) {
BlockletDataMap cacheable = (BlockletDataMap) lruCache.get(
identifierWrapper.getTableBlockIndexUniqueIdentifier().getUniqueTableSegmentIdentifier());
cacheable.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.indexstore;

import java.io.Serializable;

import org.apache.carbondata.core.metadata.schema.table.CarbonTable;

/**
* Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info
* This is just a wrapper passed between methods like a context, This object must never be cached.
*
*/
public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {

private static final long serialVersionUID = 1L;

// holds the reference to tableBlockIndexUniqueIdentifier
private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;

// holds the reference to CarbonTable
private CarbonTable carbonTable;

public TableBlockIndexUniqueIdentifierWrapper(
TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) {
this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
this.carbonTable = carbonTable;
}

public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() {
return tableBlockIndexUniqueIdentifier;
}

public CarbonTable getCarbonTable() {
return carbonTable;
}
}

0 comments on commit 5f68a79

Please sign in to comment.