Skip to content

Commit

Permalink
[CARBONDATA-3337][CARBONDATA-3306] Distributed index server
Browse files Browse the repository at this point in the history
Implement Distributed Index Server which will cache and prune the datamaps using executors.

Implemented Hadoop RPC framework.
Implemented DistributedPruneRDD which will prune and cache the datamaps.
Implemented DistributedShowCacheRDD.

This closes #3177
  • Loading branch information
kunal642 authored and ravipesala committed May 20, 2019
1 parent 789c97e commit 24fe230
Show file tree
Hide file tree
Showing 58 changed files with 1,917 additions and 481 deletions.
56 changes: 56 additions & 0 deletions bin/start-indexserver.sh
@@ -0,0 +1,56 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Shell script for starting the Distributed Index Server

# Enter posix mode for bash
set -o posix

if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.carbondata.indexserver.IndexServer"

function usage {
echo "Usage: ./sbin/start-indexserver [options] [index server options]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
pattern+="\|Spark Command: "
pattern+="\|======="
pattern+="\|--help"

"${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "Thrift server options:"
"${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage
exit 0
fi

export SUBMIT_USAGE_FUNCTION=usage

exec "${SPARK_HOME}"/sbin/spark-daemon.sh submit $CLASS 1 --name "DistributedIndexServer" "$@"
26 changes: 26 additions & 0 deletions bin/stop-indexserver.sh
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Stops the Distributed Index Server on the machine this script is executed on.

if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

"${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.carbondata.indexserver.IndexServer 1
Expand Up @@ -217,6 +217,7 @@ public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSi
} else {
synchronized (lruCacheMap) {
addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
currentSize = currentSize + requiredSize;
}
columnKeyAddedSuccessfully = true;
}
Expand Down Expand Up @@ -358,4 +359,11 @@ private double getPartOfXmx() {
long mSizeMB = Runtime.getRuntime().maxMemory() / BYTE_CONVERSION_CONSTANT;
return mSizeMB * CarbonCommonConstants.CARBON_LRU_CACHE_PERCENT_OVER_MAX_SIZE;
}

/**
* @return current size of the cache in memory.
*/
public long getCurrentSize() {
return currentSize;
}
}
Expand Up @@ -2129,4 +2129,38 @@ private CarbonCommonConstants() {
*/
public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512";

/**
* The IP on which Index Server will be started.
*/
@CarbonProperty
public static final String CARBON_INDEX_SERVER_IP = "carbon.index.server.ip";

/**
* The Port to be used to start Index Server.
*/
@CarbonProperty
public static final String CARBON_INDEX_SERVER_PORT = "carbon.index.server.port";

/**
* Whether to use index server for caching and pruning or not.
* This property can be used for
* 1. the whole application(carbon.properties).
* 2. the whole session(set carbon.enable.index.server)
* 3. a specific table for one session (set carbon.enable.index.server.<dbName>.<tableName>)
*/
@CarbonProperty(dynamicConfigurable = true)
public static final String CARBON_ENABLE_INDEX_SERVER = "carbon.enable.index.server";

/**
* Property is used to enable/disable fallback for indexserver.
* Used for testing purposes only.
*/
public static final String CARBON_DISABLE_INDEX_SERVER_FALLBACK =
"carbon.disable.index.server.fallback";

public static final String CARBON_INDEX_SERVER_WORKER_THREADS =
"carbon.index.server.max.worker.threads";

public static final int CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT =
500;
}
Expand Up @@ -120,7 +120,7 @@ public DataMapExprWrapper chooseCGDataMap(FilterResolverIntf resolverIntf) {
return chooseDataMap(DataMapLevel.CG, resolverIntf);
}

private DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) {
DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) {
if (resolverIntf != null) {
Expression expression = resolverIntf.getFilterExpression();
List<TableDataMap> datamaps = level == DataMapLevel.CG ? cgDataMaps : fgDataMaps;
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

Expand All @@ -34,7 +33,6 @@ public interface DataMapJob extends Serializable {

void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format);

List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
FilterResolverIntf filter);
List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);

}
Expand Up @@ -454,10 +454,11 @@ private TableDataMap getTableDataMap(String dataMapName, List<TableDataMap> tabl

/**
* Clear the invalid segments from all the datamaps of the table
* @param carbonTable
* @param segments
*
* @param carbonTable table for which the operation has to be performed.
* @param segments segments which have to be cleared from cache.
*/
public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments)
public void clearInvalidSegments(CarbonTable carbonTable, List<String> segments)
throws IOException {
getDefaultDataMap(carbonTable).clear(segments);
List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
Expand All @@ -467,6 +468,30 @@ public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments

}

public List<String> getSegmentsToBeRefreshed(CarbonTable carbonTable,
SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
throws IOException {
List<String> toBeCleanedSegments = new ArrayList<>();
for (Segment filteredSegment : filteredSegmentToAccess) {
boolean refreshNeeded = getTableSegmentRefresher(carbonTable).isRefreshNeeded(filteredSegment,
updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
if (refreshNeeded) {
toBeCleanedSegments.add(filteredSegment.getSegmentNo());
}
}
return toBeCleanedSegments;
}

public void refreshSegmentCacheIfRequired(CarbonTable carbonTable,
SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
throws IOException {
List<String> toBeCleanedSegments =
getSegmentsToBeRefreshed(carbonTable, updateStatusManager, filteredSegmentToAccess);
if (toBeCleanedSegments.size() > 0) {
clearInvalidSegments(carbonTable, toBeCleanedSegments);
}
}

/**
* Clear the datamap/datamaps of a table from memory
*
Expand All @@ -483,29 +508,44 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) {
*/
public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) {
String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
tableIndices = allDataMaps.get(tableUniqueName);
}
}
if (launchJob && tableIndices != null) {
CarbonTable carbonTable = getCarbonTable(identifier);
CarbonTable carbonTable = getCarbonTable(identifier);
if (launchJob && CarbonProperties.getInstance()
.isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
if (null != carbonTable) {
try {
DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
}
} else {
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
}
}
if (launchJob && null != carbonTable) {
try {
DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
}
// remove carbon table from meta cache if launchJob is false as this would be called in
// executor side.
if (!launchJob) {
CarbonMetadata.getInstance()
.removeTable(identifier.getDatabaseName(), identifier.getTableName());
}
segmentRefreshMap.remove(identifier.uniqueName());
clearDataMaps(tableUniqueName);
allDataMaps.remove(tableUniqueName);
tablePathMap.remove(tableUniqueName);
}
segmentRefreshMap.remove(identifier.uniqueName());
clearDataMaps(tableUniqueName);
allDataMaps.remove(tableUniqueName);
tablePathMap.remove(tableUniqueName);
}

/**
Expand Down Expand Up @@ -554,29 +594,41 @@ public void clearDataMaps(String tableUniqName) {
*
* @param identifier Table identifier
*/
public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
public void deleteDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
CarbonTable carbonTable = getCarbonTable(identifier);
String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices != null) {
int i = 0;
for (TableDataMap tableDataMap : tableIndices) {
if (carbonTable != null && tableDataMap != null && dataMapName
.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
try {
DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
tableDataMap.clear();
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
try {
DataMapUtil
.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME, dataMapName);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
} else {
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices != null) {
int i = 0;
for (TableDataMap tableDataMap : tableIndices) {
if (carbonTable != null && tableDataMap != null && dataMapName
.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
try {
DataMapUtil
.executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME, dataMapName);
tableDataMap.clear();
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
tableDataMap.deleteDatamapData();
tableIndices.remove(i);
break;
}
tableDataMap.deleteDatamapData();
tableIndices.remove(i);
break;
i++;
}
i++;
allDataMaps.put(tableUniqueName, tableIndices);
}
allDataMaps.put(tableUniqueName, tableIndices);
}
}

Expand Down

0 comments on commit 24fe230

Please sign in to comment.