Skip to content

Commit

Permalink
Merge 0ed151c into 24753a9
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed May 10, 2019
2 parents 24753a9 + 0ed151c commit 4ede9cb
Show file tree
Hide file tree
Showing 58 changed files with 1,881 additions and 448 deletions.
56 changes: 56 additions & 0 deletions bin/start-indexserver.sh
@@ -0,0 +1,56 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Shell script for starting the Distributed Index Server

# Enter posix mode for bash
set -o posix

if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.carbondata.indexserver.IndexServer"

function usage {
echo "Usage: ./sbin/start-indexserver [options] [index server options]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
pattern+="\|Spark Command: "
pattern+="\|======="
pattern+="\|--help"

"${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "Thrift server options:"
"${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage
exit 0
fi

export SUBMIT_USAGE_FUNCTION=usage

exec "${SPARK_HOME}"/sbin/spark-daemon.sh submit $CLASS 1 --name "DistributedIndexServer" "$@"
26 changes: 26 additions & 0 deletions bin/stop-indexserver.sh
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Stops the Distributed Index Server on the machine this script is executed on.

if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

"${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.carbondata.indexserver.IndexServer 1
Expand Up @@ -217,6 +217,7 @@ public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSi
} else {
synchronized (lruCacheMap) {
addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
currentSize = currentSize + requiredSize;
}
columnKeyAddedSuccessfully = true;
}
Expand Down Expand Up @@ -358,4 +359,11 @@ private double getPartOfXmx() {
long mSizeMB = Runtime.getRuntime().maxMemory() / BYTE_CONVERSION_CONSTANT;
return mSizeMB * CarbonCommonConstants.CARBON_LRU_CACHE_PERCENT_OVER_MAX_SIZE;
}

/**
* @return current size of the cache in memory.
*/
public long getCurrentSize() {
return currentSize;
}
}
Expand Up @@ -2125,4 +2125,38 @@ private CarbonCommonConstants() {
*/
public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512";

/**
* The IP on which Index Server will be started.
*/
@CarbonProperty
public static final String CARBON_INDEX_SERVER_IP = "carbon.index.server.ip";

/**
* The Port to be used to start Index Server.
*/
@CarbonProperty
public static final String CARBON_INDEX_SERVER_PORT = "carbon.index.server.port";

/**
* Whether to use index server for caching and pruning or not.
* This property can be used for
* 1. the whole application(carbon.properties).
* 2. the whole session(set carbon.enable.index.server)
* 3. a specific table for one session (set carbon.enable.index.server.<dbName>.<tableName>)
*/
@CarbonProperty(dynamicConfigurable = true)
public static final String CARBON_ENABLE_INDEX_SERVER = "carbon.enable.index.server";

/**
* Property is used to enable/disable fallback for indexserver.
* Used for testing purposes only.
*/
public static final String CARBON_DISABLE_INDEX_SERVER_FALLBACK =
"carbon.disable.index.server.fallback";

public static final String CARBON_INDEX_SERVER_WORKER_THREADS =
"carbon.index.server.max.worker.threads";

public static final int CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT =
500;
}
Expand Up @@ -120,7 +120,7 @@ public DataMapExprWrapper chooseCGDataMap(FilterResolverIntf resolverIntf) {
return chooseDataMap(DataMapLevel.CG, resolverIntf);
}

private DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) {
DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) {
if (resolverIntf != null) {
Expression expression = resolverIntf.getFilterExpression();
List<TableDataMap> datamaps = level == DataMapLevel.CG ? cgDataMaps : fgDataMaps;
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

Expand All @@ -34,7 +33,6 @@ public interface DataMapJob extends Serializable {

void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format);

List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
FilterResolverIntf filter);
List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);

}
Expand Up @@ -454,10 +454,11 @@ private TableDataMap getTableDataMap(String dataMapName, List<TableDataMap> tabl

/**
* Clear the invalid segments from all the datamaps of the table
* @param carbonTable
* @param segments
*
* @param carbonTable table for which the operation has to be performed.
* @param segments segments which have to be cleared from cache.
*/
public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments)
public void clearInvalidSegments(CarbonTable carbonTable, List<String> segments)
throws IOException {
getDefaultDataMap(carbonTable).clear(segments);
List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
Expand All @@ -467,6 +468,30 @@ public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments

}

public List<String> getSegmentsToBeRefreshed(CarbonTable carbonTable,
SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
throws IOException {
List<String> toBeCleanedSegments = new ArrayList<>();
for (Segment filteredSegment : filteredSegmentToAccess) {
boolean refreshNeeded = getTableSegmentRefresher(carbonTable).isRefreshNeeded(filteredSegment,
updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
if (refreshNeeded) {
toBeCleanedSegments.add(filteredSegment.getSegmentNo());
}
}
return toBeCleanedSegments;
}

public void refreshSegmentCacheIfRequired(CarbonTable carbonTable,
SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
throws IOException {
List<String> toBeCleanedSegments =
getSegmentsToBeRefreshed(carbonTable, updateStatusManager, filteredSegmentToAccess);
if (toBeCleanedSegments.size() > 0) {
clearInvalidSegments(carbonTable, toBeCleanedSegments);
}
}

/**
* Clear the datamap/datamaps of a table from memory
*
Expand All @@ -483,29 +508,44 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) {
*/
public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) {
String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
tableIndices = allDataMaps.get(tableUniqueName);
}
}
if (launchJob && tableIndices != null) {
CarbonTable carbonTable = getCarbonTable(identifier);
CarbonTable carbonTable = getCarbonTable(identifier);
if (launchJob && CarbonProperties.getInstance()
.isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
if (null != carbonTable) {
try {
DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
}
} else {
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
}
}
if (launchJob && null != carbonTable) {
try {
DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
}
// remove carbon table from meta cache if launchJob is false as this would be called in
// executor side.
if (!launchJob) {
CarbonMetadata.getInstance()
.removeTable(identifier.getDatabaseName(), identifier.getTableName());
}
segmentRefreshMap.remove(identifier.uniqueName());
clearDataMaps(tableUniqueName);
allDataMaps.remove(tableUniqueName);
tablePathMap.remove(tableUniqueName);
}
segmentRefreshMap.remove(identifier.uniqueName());
clearDataMaps(tableUniqueName);
allDataMaps.remove(tableUniqueName);
tablePathMap.remove(tableUniqueName);
}

/**
Expand Down Expand Up @@ -554,29 +594,41 @@ public void clearDataMaps(String tableUniqName) {
*
* @param identifier Table identifier
*/
public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
public void deleteDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
CarbonTable carbonTable = getCarbonTable(identifier);
String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices != null) {
int i = 0;
for (TableDataMap tableDataMap : tableIndices) {
if (carbonTable != null && tableDataMap != null && dataMapName
.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
try {
DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
tableDataMap.clear();
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
try {
DataMapUtil
.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME, dataMapName);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
} else {
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices != null) {
int i = 0;
for (TableDataMap tableDataMap : tableIndices) {
if (carbonTable != null && tableDataMap != null && dataMapName
.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
try {
DataMapUtil
.executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME, dataMapName);
tableDataMap.clear();
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
tableDataMap.deleteDatamapData();
tableIndices.remove(i);
break;
}
tableDataMap.deleteDatamapData();
tableIndices.remove(i);
break;
i++;
}
i++;
allDataMaps.put(tableUniqueName, tableIndices);
}
allDataMaps.put(tableUniqueName, tableIndices);
}
}

Expand Down

0 comments on commit 4ede9cb

Please sign in to comment.