Skip to content

Commit

Permalink
[CARBONDATA-3512]Index Server enhancement
Browse files Browse the repository at this point in the history
What changes are proposed

1. Remove the keytab dependency for IndexServer. Currently IndexServer needs to configure keytab and prinicipal for both
Client side and Server Side.But indexServer is super user and having super user's keytab and principal in client is not
correct(specialy spark-submit). Since IndexServer is wrapped around spark application so no need to ask Keytab from User
for IndexServer.

2. Authentication:-This happens in org.apache.hadoop.security.SaslRpcClient#createSaslClient .it checks getServerPrincipal
(spark.carbon.indexserver.principal) and Server protocol (UGI of IndexServer). User need to configure spark.carbon.indexserver.principal
properly.

3. Authorization(ACL):- Support User who can access the IndexServer. Authorization is controlled by hadoop.security.authorization
parameter. IndexServer has below scenarios.
	1. Spark-submit,spark-shell,spark-Sql :-> These type of spark Application has UGI where LoginUser and LoginUser will be same
	either based on kinit or based on spark.yarn.principal. Authorization is done in org.apache.hadoop.ipc.Server#authorize
	using IndexServer ProtocolClass and ACL list which is prepared by org.apache.hadoop.security.authorize.PolicyProvider
	(generally hadoop-policy.xml with key security.indexserver.protocol.acl , by default *).
	2. Spark JDBCServer :- It has UGI based on ProxyUser like user1(auth:PROXY)via spark//. where user1 is currentUser and spark is
	LoginUser (JDBCServer started UGI).This type of Authorization happens in org.apache.hadoop.security.authorize.ProxyUsers#authorize
	with proxyUserAcl list prepared by hadoop.proxyuser.<INDEXSERVER_UGI>.users ,hadoop.proxyuser.<INDEXSERVER_UGI>.hosts ,
	hadoop.proxyuser.<INDEXSERVER_UGI>.groups.

TokenRenewer:- IndexServer is NOT Token based Hadoop Service. It does not required Delegation Token as IndexServer does not connect to
KDC since it is inside SparkApplication(both, Indexclient and IndexServer) so take advantage of it.

This closes #3375
  • Loading branch information
BJangir authored and akashrn5 committed Nov 13, 2019
1 parent 2cec58b commit 15b88ef
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 60 deletions.
Expand Up @@ -113,18 +113,10 @@ private static void executeClearDataMapJob(DataMapJob dataMapJob,
for (Segment segment : validAndInvalidSegmentsInfo.getInvalidSegments()) {
invalidSegment.add(segment.getSegmentNo());
}
DistributableDataMapFormat dataMapFormat = new DistributableDataMapFormat(carbonTable,
validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true,
dataMapToClear);
try {
dataMapJob.execute(dataMapFormat);
} catch (Exception e) {
if (dataMapJob.getClass().getName().equalsIgnoreCase(DISTRIBUTED_JOB_NAME)) {
LOGGER.warn("Failed to clear distributed cache.", e);
} else {
throw e;
}
}
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, validAndInvalidSegmentsInfo.getValidSegments(),
invalidSegment, true, dataMapToClear);
dataMapJob.execute(dataMapFormat);
}

public static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName)
Expand Down
23 changes: 6 additions & 17 deletions docs/index-server.md
Expand Up @@ -119,16 +119,6 @@ be written to file.
The user can set the location for these file by using 'carbon.indexserver.temp.path'. By default
table path would be used to write the files.

## Security
The security for the index server is controlled through 'spark.carbon.indexserver.keytab' and 'spark
.carbon.indexserver.principal'. These allow the RPC framework to login using the principal. It is
recommended that the principal should be a super user, and the user should be exclusive for index
server so that it does not grant access to any other service. Internally the operations would be
executed as a Privileged Action using the login user.

The Index Server is a long running service therefore the 'spark.yarn.keytab' and 'spark.yarn
.principal' should be configured.

## Configurations

##### carbon.properties(JDBCServer)
Expand Down Expand Up @@ -160,8 +150,6 @@ The Index Server is a long running service therefore the 'spark.yarn.keytab' and

| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| spark.carbon.indexserver.principal | NA | Used for authentication, whether a valid service is trying to connect to the server or not. Set in both IndexServer and JDBCServer. |
| spark.carbon.indexserver.keytab | NA | Specify the path to the keytab file through which authentication would happen. Set in both IndexServer and JDBCServer. |
| spark.dynamicAllocation.enabled | true | Set to false, so that spark does not kill the executor, If executors are killed, cache would be lost. Applicable only for Index Server. |
| spark.yarn.principal | NA | Should be set to the same user used for JDBCServer. Required only for IndexServer. |
|spark.yarn.keytab| NA | Should be set to the same as JDBCServer. |
Expand All @@ -180,6 +168,12 @@ that will authenticate the user to access the index server and no other service.
| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| ipc.client.rpc-timeout.ms | NA | Set the above property to some appropriate value based on your estimated query time. The best option is to set this to the same value as spark.network.timeout. |
| hadoop.security.authorization | false | Property to enable the hadoop security which is required only on the server side. |
| hadoop.proxyuser.<indexserver_user>.users | NA | Property to set Proxy User list for which IndexServer permission were to be given ,check https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
| hadoop.proxyuser.<indexserver_user>.hosts | NA | Property to set hosts list for which IndexServer permission were to be given ,check https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
| hadoop.proxyuser.<indexserver_user>.groups | NA | Property to set groups list for which IndexServer permission to be given ,check https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
| security.indexserver.protocol.acl | * | Property to set List of User to be Authorized for Other than proxy Spark Application |


##### dynamic-properties(set command)

Expand All @@ -206,11 +200,6 @@ A. The exception would show the size of response it is trying to send over the
network. Use ipc.maximum.response.length to a value bigger than the
response size.

Q. **Index server is throwing Kerberos principal not set exception**

A. Set spark.carbon.indexserver.principal to the correct principal in both IndexServer and
JDBCServer configurations.

Q. **Unable to connect to index server**

A. Check whether the carbon.properties configurations are set in JDBCServer as well as the index
Expand Down
Expand Up @@ -25,5 +25,9 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
case class IndexServerLoadEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
segment: List[Segment],
invalidsegment: List[String])
invalidsegment: List[String]) extends Event with IndexServerEventInfo

case class IndexServerEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
username: String)
extends Event with IndexServerEventInfo
Expand Up @@ -130,7 +130,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
.getCarbonTable.getTablePath, dataMapFormat.getQueryId, dataMapFormat.isCountStarJob)
// Fire a job to clear the cache from executors as Embedded mode does not maintain the cache.
IndexServer.invalidateSegmentCache(dataMapFormat.getCarbonTable, dataMapFormat
.getValidSegmentIds.asScala.toArray)
.getValidSegmentIds.asScala.toArray, isFallBack = true)
spark.sparkContext.setLocalProperty("spark.job.description", originalJobDesc)
splits
}
Expand Down
Expand Up @@ -25,9 +25,10 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
import org.apache.hadoop.ipc.{ProtocolInfo, RPC, Server}
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.{KerberosInfo, UserGroupInformation}
import org.apache.hadoop.security.authorize.{PolicyProvider, Service}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.{CarbonSession, SparkSession}
Expand All @@ -38,10 +39,13 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DistributableDataMapFormat
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapperContainer
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{IndexServerEvent, OperationContext, OperationListenerBus}

@ProtocolInfo(protocolName = "Server", protocolVersion = 1)
@ProtocolInfo(protocolName = "org.apache.carbondata.indexserver.ServerInterface",
protocolVersion = 1)
@KerberosInfo(serverPrincipal = "spark.carbon.indexserver.principal",
clientPrincipal = "spark.carbon.indexserver.principal")
trait ServerInterface {
Expand All @@ -59,7 +63,7 @@ trait ServerInterface {
* Invalidate the cache for the specified segments only. Used in case of compaction/Update/Delete.
*/
def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String], jobGroupId: String = ""): Unit
segmentIds: Array[String], jobGroupId: String = "", isFallBack: Boolean = false): Unit

def getCount(request: DistributableDataMapFormat): LongWritable

Expand Down Expand Up @@ -96,14 +100,17 @@ object IndexServer extends ServerInterface {
private val isExecutorLRUConfigured: Boolean =
CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE) != null

private val operationContext: OperationContext = new OperationContext
/**
* Getting sparkSession from ActiveSession because in case of embedded mode the session would
* have already been created whereas in case of distributed mode the session would be
* created by the main method after some validations.
*/
private lazy val sparkSession: SparkSession = SparkSQLUtil.getSparkSession

/**
* Perform the operation 'f' on behalf of the login user.
*/
private def doAs[T](f: => T): T = {
UserGroupInformation.getLoginUser.doAs(new PrivilegedAction[T] {
override def run(): T = {
Expand Down Expand Up @@ -134,6 +141,10 @@ object IndexServer extends ServerInterface {
}
new LongWritable(splits.map(_._2.toLong).sum)
}
// Fire Generic Event like ACLCheck..etc
val indexServerEvent = IndexServerEvent(sparkSession, request.getCarbonTable,
Server.getRemoteUser.getShortUserName)
OperationListenerBus.getInstance().fireEvent(indexServerEvent, operationContext)
if (request.ifAsyncCall) {
submitAsyncTask(getCountTask)
new LongWritable(0)
Expand All @@ -149,6 +160,10 @@ object IndexServer extends ServerInterface {
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
sparkSession.sparkContext
.setLocalProperty("spark.job.description", request.getTaskGroupDesc)
// Fire Generic Event like ACLCheck..etc
val indexServerEvent = IndexServerEvent(sparkSession, request.getCarbonTable,
Server.getRemoteUser.getShortUserName)
OperationListenerBus.getInstance().fireEvent(indexServerEvent, operationContext)
}
if (!request.getInvalidSegments.isEmpty) {
DistributedRDDUtils
Expand All @@ -167,18 +182,26 @@ object IndexServer extends ServerInterface {
}

override def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String], jobGroupId: String = ""): Unit = doAs {
val databaseName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId)
new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
.collect()
if (segmentIds.nonEmpty) {
DistributedRDDUtils
.invalidateSegmentMapping(s"${databaseName}_$tableName",
segmentIds)
segmentIds: Array[String], jobGroupId: String = "", isFallBack: Boolean = false): Unit = {
doAs {
val databaseName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId)
if (!isFallBack) {
val indexServerEvent = IndexServerEvent(sparkSession,
carbonTable,
Server.getRemoteUser.getShortUserName)
OperationListenerBus.getInstance().fireEvent(indexServerEvent, operationContext)
}
new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
.collect()
if (segmentIds.nonEmpty) {
DistributedRDDUtils
.invalidateSegmentMapping(s"${databaseName}_$tableName",
segmentIds)
}
}
}

Expand Down Expand Up @@ -208,8 +231,8 @@ object IndexServer extends ServerInterface {
.setNumHandlers(numHandlers)
.setProtocol(classOf[ServerInterface]).build
server.start()
SecurityUtil.login(sparkSession.asInstanceOf[CarbonSession].sessionState.newHadoopConf(),
"spark.carbon.indexserver.keytab", "spark.carbon.indexserver.principal")
// Define the Authorization Policy provider
server.refreshServiceAcl(conf, new IndexServerPolicyProvider)
sparkSession.sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
LOGGER.info("Spark Application has ended. Stopping the Index Server")
Expand Down Expand Up @@ -244,16 +267,22 @@ object IndexServer extends ServerInterface {
def getClient: ServerInterface = {
val configuration = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
import org.apache.hadoop.ipc.RPC
val indexServerUser = sparkSession.sparkContext.getConf
.get("spark.carbon.indexserver.principal", "")
val indexServerKeyTab = sparkSession.sparkContext.getConf
.get("spark.carbon.indexserver.keytab", "")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(indexServerUser,
indexServerKeyTab)
LOGGER.info("Login successful for user " + indexServerUser)
RPC.getProxy(classOf[ServerInterface],
RPC.getProtocolProxy(classOf[ServerInterface],
RPC.getProtocolVersion(classOf[ServerInterface]),
new InetSocketAddress(serverIp, serverPort), ugi,
FileFactory.getConfiguration, NetUtils.getDefaultSocketFactory(configuration))
new InetSocketAddress(serverIp, serverPort),
UserGroupInformation.getLoginUser, configuration,
NetUtils.getDefaultSocketFactory(configuration)).getProxy
}

/**
* This class to define the acl for indexserver ,similar to HDFSPolicyProvider.
* key in Service can be configured in hadoop-policy.xml or in Configuration().This ACL
* will be used for Authorization in
* org.apache.hadoop.security.authorize.ServiceAuthorizationManager#authorize
*/
class IndexServerPolicyProvider extends PolicyProvider {
override def getServices: Array[Service] = {
Array(new Service("security.indexserver.protocol.acl", classOf[ServerInterface]))
}
}
}

0 comments on commit 15b88ef

Please sign in to comment.