Skip to content

Commit

Permalink
Added documentation for index server
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jul 3, 2019
1 parent 785cc6c commit 9ef3369
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 2 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -63,7 +63,8 @@ CarbonData is built using Apache Maven, to [build CarbonData](https://github.com
* [SDK Guide](https://github.com/apache/carbondata/blob/master/docs/sdk-guide.md)
* [C++ SDK Guide](https://github.com/apache/carbondata/blob/master/docs/csdk-guide.md)
* [Performance Tuning](https://github.com/apache/carbondata/blob/master/docs/performance-tuning.md)
* [S3 Storage](https://github.com/apache/carbondata/blob/master/docs/s3-guide.md)
* [S3 Storage](https://github.com/apache/carbondata/blob/master/docs/s3-guide.md)
* [Distributed Index Server](https://github.com/apache/carbondata/blob/master/docs/index-server.md)
* [Carbon as Spark's Datasource](https://github.com/apache/carbondata/blob/master/docs/carbon-as-spark-datasource-guide.md)
* [FAQs](https://github.com/apache/carbondata/blob/master/docs/faq.md)

Expand Down
Expand Up @@ -2223,7 +2223,7 @@ private CarbonCommonConstants() {
/**
* min value for in memory serialization size
*/
public static final int CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MIN = 100;
public static final int CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MIN = 0;

/**
* max value for in memory serialization size
Expand Down
231 changes: 231 additions & 0 deletions docs/index-server.md
@@ -0,0 +1,231 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Distributed Index Server

## Background

Carbon currently caches all block/blocklet datamap index information into the driver. For bloom
datamap, it can prune the splits in a distributed way. In the first case, there are limitations
like driver memory scale up and cache sharing between multiple applications is not possible. In
the second case, there are limitations like, there is
no guarantee that the next query goes to the same executor to reuse the cache and hence cache
would be duplicated in multiple executors.
Distributed Index Cache Server aims to solve the above mentioned problems.

## Distribution
When enabled, any query on a carbon table will be routed to the index server application using
the Hadoop RPC framework in form of a request. The request will consist of the table name, segments,
filter expression and other information used for pruning.

In IndexServer application a pruning RDD is fired which will take care of the pruning for that
request. This RDD will be creating tasks based on the number of segments that are applicable for
pruning. It can happen that the user has specified segments to access for that table, so only the
specified segments would be applicable for pruning.

IndexServer driver would have 2 important tasks, distributing the segments equally among the
available executors and keeping track of the cache location(where the segment cache is present).

To achieve this 2 separate mappings would be maintained as follows.
1. segment to executor location:
This mapping will be maintained for each table and will enable the index server to track the
cache location for each segment.
```
tableToExecutorMapping = Map(tableName -> Map(segmentNo -> uniqueExecutorIdentifier))
```
2. Cache size held by each executor:
This mapping will be used to distribute the segments equally(on the basis of size) among the
executors.
```
executorToCacheMapping = Map(HostAddress -> Map(ExecutorId -> cacheSize))
```

Once a request is received each segment would be iterated over and
checked against tableToExecutorMapping to find if a executor is already
assigned. If a mapping already exists then it means that most
probably(if not evicted by LRU) the segment is already cached in that
executor and the task for that segment has to be fired on this executor.

If mapping is not found then first check executorToCacheMapping against
the available executor list to find if any unassigned executor is
present and use that executor for the current segment. If all the
executors are assigned with some segment then find the least loaded
executor on the basis of size.

Initially the segment index size would be used to distribute the
segments fairly among the executor because the actual cache size would
be known to the driver only when the segments are cached and appropriate
information is returned to the driver.

**NOTE:** In case of legacy segment the index size if not available
therefore all the legacy segments would be processed in a round robin
fashion.

After the job is completed the tasks would return the cache size held by
each executor which would be updated to the executorToCacheMapping and
the pruned blocklets which would be further used for result fetching.

## Reallocation of executor
In case executor(s) become dead/unavailable then the segments that were
earlier being handled by those would be reassigned to some other
executor using the distribution logic.

**Note:** Cache loading would be done again in the new executor for the
current query.

## MetaCache DDL
The show/drop metacache DDL have been modified to operate on the
executor side cache as well. So when the used fires show cache a new
column called cache location will indicate whether the cache is from
executor or driver. For drop cache the user has to enable/disable the
index server using the dynamic configuration to clear the cache of the
desired location.

## Fallback
In case of any failure the index server would fallback to embedded mode
which means that the JDBCServer would take care of distributed pruning.
A similar job would be fired by the JDBCServer which would take care of
pruning using its own executors. If for any reason the embedded mode
also fails to prune the datamaps then the job would be passed on to
driver.

**NOTE:** In case of embedded mode a job would be fired to clear the
cache as data cached in JDBCServer executors would be of no use.

## Writing splits to a file
If the response is too huge then it is better to write the splits to a file so that the driver can
read this file and create the splits. This can be controlled using the property 'carbon.index.server
.inmemory.serialization.threshold.inKB'. By default, the minimum value for this property is 0,
meaning that no matter how small the splits are they would be written to the file. Maximum is
102400KB which will mean if the size of the splits for a executor cross this value then they would
be written to file.

The user can set th location for these file by using 'carbon.indexserver.temp.path'. By default
table path would be used to write the files.

## Configurations

##### carbon.properties(JDBCServer)

| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| carbon.enable.index.server | false | Enable the use of index server for pruning for the whole application. |
| carbon.index.server.ip | NA | Specify the IP/HOST on which the server would be started. Better to specify the private IP. |
| carbon.index.server.port | NA | The port on which the index server has to be started. |
| carbon.disable.index.server.fallback | false | Whether to enable/disable fallback for index server
. Should be used for testing purposes only. |
|carbon.index.server.max.worker.threads| 500 | Number of RPC handlers to open for accepting the
requests from JDBC driver. Max accepted value is Integer.Max. |
|carbon.index.server.max.jobname.length|NA|The max length of the job to show in the index server application UI. For bigger queries this may impact performance as the whole string would be sent from JDBCServer to IndexServer.|


##### carbon.properties(IndexServer)

| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| carbon.enable.index.server | false | Enable the use of index server for pruning for the
whole application. |
| carbon.index.server.ip | NA | Specify the IP/HOST on which the server would be started. Better to specify the private IP. |
| carbon.index.server.port | NA | The port on which the index server has to be started. |
|carbon.index.server.max.worker.threads| 500 | Number of RPC handlers to open for accepting the
requests from JDBC driver. Max accepted value is Integer.Max. |
|carbon.max.executor.lru.cache.size| NA | Used to specify the max size for executor LRU cache. Mandatory to set fo the user. |
|carbon.index.server.max.jobname.length|NA|The max length of the job to show in the index server application UI. For bigger queries this may impact performance as the whole string would be sent from JDBCServer to IndexServer.|
|carbon.max.executor.threads.for.block.pruning|4| max executor threads used for block pruning. |
|carbon.index.server.inmemory.serialization.threshold.inKB|300|Max in memory serialization size
after reaching threshold data will be written to file. Min value that the user can set is 0KB and
max is 102400KB. |
|carbon.indexserver.temp.path|tablePath|will be used to write split serialize data when in memory
threashold crosses the limit|


##### spark-defaults.conf(only for secure mode)

| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| spark.carbon.indexserver.principal | NA | Used for authentication, whether a valid application is
trying to connect to the server or not. Set in both IndexServer and JDBCServer. |
| spark.carbon.indexserver.keytab | NA | Specify the path to the keytab file through which
authentication would happen. Set in both IndexServer and JDBCServer. |
| spark.dynamicAllocation.enabled | true | Set to false, so that spark does not kill the executor, If executors are killed, cache would be lost. Applicable only for Index Server. |
| spark.yarn.principal | NA | Should be set to the same user used for JDBCServer. Required only for
IndexServer. |
|spark.yarn.keytab| NA | Should be set to the same as JDBCServer. |

##### spark-defaults.conf(non-secure mode)
| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| spark.dynamicAllocation.enabled | true | Set to false, so that spark does not kill the executor, If executors are killed, cache would be lost. Applicable only for Index Server. |


**NOTE:** Its better to create a new user for indexserver principal,
that will authenticate the user to access the index server and no other
application.

##### core-site.xml

| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| ipc.client.rpc-timeout.ms | NA | Set the above property to some appropriate value based on your
estimated query time. The best option is to set this to the same value as spark.network.timeout. |

##### dynamic-properties(set command)

| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| carbon.enable.index.server | false | Enable the use of index server for pruning for the current
session. |
| carbon.enable.index.server.dbName.tableName | false | Enable the use of index server for the
specified table in the current session. |


## Starting the Server
```
./bin/spark-submit --master [yarn/local] --[o ptional parameters] --class org.apache.carbondata.indexserver.IndexServer [path to carbondata-spark2-<version>.jar]
```
Or
```
./sbin/start-indexserver.sh --master yarn --num-executors 2 /<absolute path>/carbondata-spark2-1.6.0.0100.jar
```

## FAQ

Q. **Index Server is throwing Large response size exception.**

A. The exception would show the size of response it is trying to send over the
network. Use ipc.maximum.response.length to a value bigger than the
response size.

Q. **Index server is throwing Kerberos principal not set exception**

A. Set spark.carbon.indexserver.principal to the correct principal in both IndexServer and
JDBCServer configurations.

Q. **Unable to connect to index server**

A. Check whether the carbon.properties configurations are set in JDBCServer as well as the index
server.

Q. **IndexServer is throwing FileNotFoundException for index files.**

A. Check whether the Index server and JDBCServer are connected to the
same namenode or not. And the store should be shared by both

Q. **OutOfMemoryException in DirectMemoryBuffer**

A. Increase -XX:MaxDirectMemorySize in driver.extraJavaOptions to
accommodate the large response in driver.

0 comments on commit 9ef3369

Please sign in to comment.