From 1450fa9dafacb5d78cada13e184fd5c12bbb99f2 Mon Sep 17 00:00:00 2001 From: Prafulla T Date: Sun, 1 Jun 2014 15:37:48 -0700 Subject: [PATCH 1/5] TAJO-858 - Support for hadoop kerberos authentication in Tajo Added parameters to accept kerberos principle, keytab in tajo conf. Added new class FileSystemUtil to do hadoop authentication work while getting instance of FileSystem. Changed couple of places to use this new interface to get FileSystem. This is still work in progress. TajoMaster authentication works. TODO: Need new protocol message to exchange hadoop dfs delegation token. --- .../java/org/apache/tajo/conf/TajoConf.java | 9 + .../org/apache/tajo/master/TajoMaster.java | 12 +- .../master/querymaster/QueryMasterTask.java | 8 +- .../apache/tajo/storage/FileSystemUtil.java | 157 ++++++++++++++++++ 4 files changed, 180 insertions(+), 6 deletions(-) create mode 100644 tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index e5f6ca1ceb..b12e093915 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -303,6 +303,15 @@ public static enum ConfVars { ////////////////////////////////// HIVE_QUERY_MODE("tajo.hive.query.mode", false), + ////////////////////////////////// + // HDFS Authentication Configuration + ////////////////////////////////// + HADOOP_SECURTY_AUTH_TYPE("tajo.hadoop.security.authentication", "simple"), + HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL("tajo.dfs.namenode.kerberos.principal", "null"), + HADOOP_DFS_NAMENODE_KERBEROS_KEYTAB_LOC("tajo.dfs.namenode.keytab.file", "null"), + HADOOP_DFS_DELEGATION_TOKEN("tajo.dfs.delegation.token","null"), + + ////////////////////////////////// // Task Configuration TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512), diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index dfae300b64..461b4c1899 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -19,6 +19,7 @@ package org.apache.tajo.master; import com.google.protobuf.ServiceException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,6 +29,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -53,6 +57,7 @@ import org.apache.tajo.master.session.SessionManager; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.AbstractStorageManager; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.util.ClassUtil; import org.apache.tajo.util.CommonTestingUtil; @@ -69,6 +74,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -219,19 +225,21 @@ private void initWebServer() throws Exception { } } - private void checkAndInitializeSystemDirectories() throws IOException { + private void checkAndInitializeSystemDirectories() throws Exception { // Get Tajo root dir this.tajoRootPath = TajoConf.getTajoRootDir(systemConf); LOG.info("Tajo Root Directory: " + tajoRootPath); + this.defaultFS = FileSystemUtil.getFileSystem(tajoRootPath, systemConf, true); // Check and Create Tajo root dir - this.defaultFS = tajoRootPath.getFileSystem(systemConf); systemConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString()); LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized."); if (!defaultFS.exists(tajoRootPath)) { defaultFS.mkdirs(tajoRootPath, new FsPermission(TAJO_ROOT_DIR_PERMISSION)); LOG.info("Tajo Root Directory '" + tajoRootPath + "' is created."); } + String delegationToken = systemConf.getVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN); + LOG.info("Delegation token :" + delegationToken); // Check and Create system and system resource dir Path systemPath = TajoConf.getSystemDir(systemConf); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index f81271521d..21eb0021ba 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -58,6 +58,7 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.AbstractStorageManager; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; import org.apache.tajo.worker.AbstractResourceAllocator; @@ -364,11 +365,10 @@ public synchronized void startQuery() { } } - private void initStagingDir() throws IOException { + private void initStagingDir() throws Exception { Path stagingDir = null; Path outputDir = null; - FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); - + FileSystem defaultFS = FileSystemUtil.getFileSystem(TajoConf.getWarehouseDir(systemConf), systemConf); try { stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString()); @@ -574,4 +574,4 @@ public TajoMetrics getQueryMetrics() { return queryMetrics; } } -} \ No newline at end of file +} diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java new file mode 100644 index 0000000000..478e6be1fb --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.token.Token; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; + +public class FileSystemUtil { + private static final Log LOG = LogFactory.getLog(FileSystemUtil.class); + + private static FileSystem getDFSUsingDelegationToken(Path path, TajoConf systemConf, + String rootUriParam) throws Exception { + String delegationToken = systemConf.getVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN); + LOG.info("Delegation token :" + delegationToken); + if(delegationToken.equals("null")) + throw new Exception("Hadoop DFS delegationToken is null, It should have been set in TajoMaster"); + + Token dfsToken = + new Token(); + dfsToken.decodeFromUrlString(delegationToken); + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + Configuration conf = new Configuration(); + SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf ); + conf.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.setConfiguration(conf); + + if(ugi.addToken(dfsToken)) + LOG.info("DFS Token added to hadoop usergroup information"); + final String kerberosPrincipal = systemConf.getVar(ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL); + if(kerberosPrincipal.equals("null")) + throw new Exception("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL+" : null"); + + final String rootUri = rootUriParam; + FileSystem fs = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set("fs.defaultFs", rootUri); + conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); + return FileSystem.get(conf); + } + }); + return fs; + } + private static FileSystem getDFSUsingKeyTab(Path path, TajoConf systemConf, + String rootUriParam) throws Exception { + Configuration conf = new Configuration(); + SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf ); + UserGroupInformation.setConfiguration(conf); + final String kerberosPrincipal = systemConf.getVar(ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL); + final String kerberosKeyTabLocation = systemConf.getVar(ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_KEYTAB_LOC); + if(kerberosPrincipal.equals("null")) + throw new Exception("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL+" : null"); + if(kerberosKeyTabLocation.equals("null")) + throw new Exception("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_KEYTAB_LOC+" : null"); + UserGroupInformation ugi = UserGroupInformation. + loginUserFromKeytabAndReturnUGI(kerberosPrincipal, kerberosKeyTabLocation); + final String rootUri = rootUriParam; + FileSystem fs = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set("fs.defaultFs", rootUri); + conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); + return FileSystem.get(conf); + } + }); + Token dfsToken = fs.getDelegationToken(ugi.getShortUserName()); + String dfsTokenString = dfsToken.encodeToUrlString(); + systemConf.setVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN, dfsTokenString); + return fs; + } + + public static FileSystem getFileSystem(Path path, TajoConf systemConf + ) throws Exception { + return getFileSystem(path, systemConf, false); + } + + /** + * Returns file system object from the path. + * Does the authentication with underlying filesystem if it is required + * (Eg. Hadoop kerberose/tokens) + */ + public static FileSystem getFileSystem(Path path, TajoConf systemConf, + boolean callFromTajoMaster + ) throws Exception { + path = new Path(path.toString().toLowerCase()); + if(path.toUri().getScheme() != null && path.toUri().getScheme().equals("hdfs")) { + String hdfsSecurtyType = systemConf.getVar(ConfVars.HADOOP_SECURTY_AUTH_TYPE); + LOG.info("HDFS securty type "+ hdfsSecurtyType); + if(hdfsSecurtyType.equals("simple")) { + return path.getFileSystem(systemConf); + } + if(hdfsSecurtyType.equals("kerberos") == false) { + throw new Exception("Unsupported value for " + ConfVars.HADOOP_SECURTY_AUTH_TYPE + + " , Supported values : kerberos, simple"); + } + + if(path.toUri().getPort() == -1) { + throw new Exception("Port missing in hdfs path :"+path); + } + if(path.toUri().getHost() == null) { + throw new Exception("Host missing/malformed in hdfs path :" + path); + } + String rootUri = path.toUri().getScheme() + "://" + path.toUri().getHost() + ":" + + path.toUri().getPort(); + if(callFromTajoMaster) + return getDFSUsingKeyTab(path, systemConf, rootUri); + return getDFSUsingDelegationToken(path, systemConf, rootUri); + } + return path.getFileSystem(systemConf); + } +} From 665013a0a4396157f79479ca1ac01dc434e549c6 Mon Sep 17 00:00:00 2001 From: Prafulla T Date: Sun, 1 Jun 2014 21:46:00 -0700 Subject: [PATCH 2/5] TAJO-858 - Support for hadoop kerberos authentication in Tajo Added new protocol message to send hdfs delegation token from TajoMaster to TajoWorker. With this match, TajoWorker/QueryMaster starts up correctly. TajoClient does not yet have delegation token so end-to-end query does not work yet. --- .../org/apache/tajo/jdbc/TajoResultSet.java | 11 +-- .../org/apache/tajo/master/TajoMaster.java | 3 - .../apache/tajo/master/TajoMasterService.java | 9 +++ .../org/apache/tajo/worker/TajoWorker.java | 36 +++++++++- .../src/main/proto/TajoMasterProtocol.proto | 3 +- .../apache/tajo/storage/FileSystemUtil.java | 69 +++++++++++-------- 6 files changed, 93 insertions(+), 38 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java index 8595970a7e..517f73dab8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java @@ -30,6 +30,7 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.MergeScanner; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; @@ -60,7 +61,7 @@ public TajoResultSet(TajoClient tajoClient, QueryId queryId) { init(); } - public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException { + public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws Exception { this.tajoClient = tajoClient; this.queryId = queryId; this.conf = conf; @@ -70,17 +71,17 @@ public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, Tabl } public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table, long maxRowNum) - throws IOException { + throws Exception { this(tajoClient, queryId, conf, table); this.maxRowNum = maxRowNum; initScanner(); init(); } - private void initScanner() throws IOException { + private void initScanner() throws Exception { if(desc != null) { schema = desc.getSchema(); - fs = FileScanner.getFileSystem(conf, desc.getPath()); + fs = FileSystemUtil.getFileSystem(desc.getPath(), conf); if (maxRowNum != null) { this.totalRow = maxRowNum; } else { @@ -172,7 +173,7 @@ public void beforeFirst() throws SQLException { initScanner(); } init(); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 461b4c1899..fca3383d11 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -238,9 +238,6 @@ private void checkAndInitializeSystemDirectories() throws Exception { defaultFS.mkdirs(tajoRootPath, new FsPermission(TAJO_ROOT_DIR_PERMISSION)); LOG.info("Tajo Root Directory '" + tajoRootPath + "' is created."); } - String delegationToken = systemConf.getVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN); - LOG.info("Delegation token :" + delegationToken); - // Check and Create system and system resource dir Path systemPath = TajoConf.getSystemDir(systemConf); if (!defaultFS.exists(systemPath)) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index 5e9f729dbc..c3be4162f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -27,6 +27,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.querymaster.QueryJobManager; import org.apache.tajo.master.rm.Worker; @@ -34,6 +35,7 @@ import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; import org.apache.tajo.util.NetUtils; import java.net.InetSocketAddress; @@ -168,5 +170,12 @@ public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullP } done.run(builder.build()); } + + @Override + public void getHDFSDelegationToken(RpcController controller, PrimitiveProtos.NullProto request, + RpcCallback done) { + String delegationToken = conf.getVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN); + done.run(StringProto.newBuilder().setValue(delegationToken).build()); + } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 3768edf605..60991dca5e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -48,6 +48,10 @@ import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.StaticHttpServer; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; import java.io.*; import java.lang.management.ManagementFactory; @@ -58,6 +62,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import static org.apache.tajo.conf.TajoConf.ConfVars; @@ -160,7 +165,7 @@ private void setWorkerMode(String[] args) { System.exit(0); } } - + @Override public void init(Configuration conf) { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); @@ -230,7 +235,6 @@ public void init(Configuration conf) { } } } - LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode + ", qmRpcPort=" + qmManagerPort + ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort + @@ -256,6 +260,34 @@ public void init(Configuration conf) { workerHeartbeatThread = new WorkerHeartbeatService(workerContext); workerHeartbeatThread.init(conf); addIfService(workerHeartbeatThread); + + LOG.info("--------------------- Delegation token start ----------"); + getDelegationToken(); + LOG.info("--------------------- Delegation token end ----------"); + } + + private void getDelegationToken() { + NettyClientBase rpc = null; + try { + rpc = connPool.getConnection(tajoMasterAddress, + TajoMasterProtocol.class, true); + TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub(); + + CallFuture callBack = + new CallFuture(); + + masterService.getHDFSDelegationToken(callBack.getController(), + PrimitiveProtos.NullProto.getDefaultInstance(), callBack); + + PrimitiveProtos.StringProto dfsTokenString = callBack.get(2, TimeUnit.SECONDS); + LOG.info("Got following delegation token: " + dfsTokenString.getValue()); + systemConf.setVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN, dfsTokenString.getValue()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(rpc); + } + return; } private void initWorkerMetrics() { diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index 8fccbaf976..4bdff61e23 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -151,4 +151,5 @@ service TajoMasterProtocolService { rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); rpc stopQueryMaster(QueryIdProto) returns (BoolProto); rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); -} \ No newline at end of file + rpc getHDFSDelegationToken(NullProto) returns (StringProto); +} diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java index 478e6be1fb..184e35e8b5 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java @@ -19,15 +19,10 @@ package org.apache.tajo.storage; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; import java.security.PrivilegedExceptionAction; -import javax.security.auth.Subject; -import javax.security.auth.kerberos.KerberosPrincipal; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,22 +31,21 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.token.Token; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; - public class FileSystemUtil { private static final Log LOG = LogFactory.getLog(FileSystemUtil.class); + /** + * Returns instance of DistributedFileSystem object pointing to secure hadoop using + * hadoop delegation token. + * @param path Path for which fileSystem is requested + * @param systemConf TajoConf + * @param rootUriParam URI string param pointing to root of filesytem + * @return Returns filesystem object + * + */ private static FileSystem getDFSUsingDelegationToken(Path path, TajoConf systemConf, String rootUriParam) throws Exception { String delegationToken = systemConf.getVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN); @@ -86,8 +80,19 @@ public FileSystem run() throws Exception { }); return fs; } + + /** + * Returns instance of DistributedFileSystem object of secure hadoop cluster using + * user principal and keytab file. + * + * @param path Path for which fileSystem is requested + * @param systemConf TajoConf + * @param rootUriParam URI string param pointing to root of filesytem + * @return Returns filesystem object + * @throws Exception Throws exception for wrong config values. + */ private static FileSystem getDFSUsingKeyTab(Path path, TajoConf systemConf, - String rootUriParam) throws Exception { + String rootUriParam) throws Exception { Configuration conf = new Configuration(); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf ); UserGroupInformation.setConfiguration(conf); @@ -101,20 +106,24 @@ private static FileSystem getDFSUsingKeyTab(Path path, TajoConf systemConf, loginUserFromKeytabAndReturnUGI(kerberosPrincipal, kerberosKeyTabLocation); final String rootUri = rootUriParam; FileSystem fs = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set("fs.defaultFs", rootUri); - conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); - return FileSystem.get(conf); - } + @Override + public FileSystem run() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set("fs.defaultFs", rootUri); + conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); + return FileSystem.get(conf); + } }); - Token dfsToken = fs.getDelegationToken(ugi.getShortUserName()); + Token dfsToken = fs.getDelegationToken(ugi.getShortUserName()); String dfsTokenString = dfsToken.encodeToUrlString(); systemConf.setVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN, dfsTokenString); return fs; } + /** + * This method is helper method to be called from worker code. + * + */ public static FileSystem getFileSystem(Path path, TajoConf systemConf ) throws Exception { return getFileSystem(path, systemConf, false); @@ -124,6 +133,12 @@ public static FileSystem getFileSystem(Path path, TajoConf systemConf * Returns file system object from the path. * Does the authentication with underlying filesystem if it is required * (Eg. Hadoop kerberose/tokens) + * + * @param path Input path for which FileSystem object is requested + * @param systemConf TajoConf object + * @param callFromTajoMaster boolean value indicating if this call is being made form master + * @return Returns filesystem object corresponding to input path + * */ public static FileSystem getFileSystem(Path path, TajoConf systemConf, boolean callFromTajoMaster @@ -147,7 +162,7 @@ public static FileSystem getFileSystem(Path path, TajoConf systemConf, throw new Exception("Host missing/malformed in hdfs path :" + path); } String rootUri = path.toUri().getScheme() + "://" + path.toUri().getHost() + ":" - + path.toUri().getPort(); + + path.toUri().getPort(); if(callFromTajoMaster) return getDFSUsingKeyTab(path, systemConf, rootUri); return getDFSUsingDelegationToken(path, systemConf, rootUri); From aa2674750255cbe36d7490aba03af9dcf01898a2 Mon Sep 17 00:00:00 2001 From: Prafulla T Date: Sat, 7 Jun 2014 20:53:47 -0700 Subject: [PATCH 3/5] TAJO-858 - Support for hadoop kerberos authentication in Tajo Added new protocol message to exchange hdfs delegation token from TajoMaster to client. This is needed when client needs to read from HDFS. --- .../java/org/apache/tajo/cli/TajoCli.java | 6 +++++ .../org/apache/tajo/client/TajoClient.java | 27 ++++++++++++++----- .../main/proto/TajoMasterClientProtocol.proto | 3 +++ .../tajo/master/TajoMasterClientService.java | 9 +++++++ .../apache/tajo/LocalTajoTestingUtility.java | 2 +- .../org/apache/tajo/jdbc/TajoConnection.java | 3 +++ 6 files changed, 42 insertions(+), 8 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java index e0ca62aaee..6ab2cec9b0 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java @@ -187,6 +187,11 @@ public TajoCli(TajoConf c, String [] args, InputStream in, OutputStream out) thr client = new TajoClient(conf, baseDatabase); } + String dfsTokenString = client.getHDFSDelegationToken(); + System.err.println("HDFS Delegation token " + dfsTokenString); + conf.setVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN, dfsTokenString); + + context = new TajoCliContext(); context.setCurrentDatabase(client.getCurrentDatabase()); initHistory(); @@ -215,6 +220,7 @@ public TajoCli(TajoConf c, String [] args, InputStream in, OutputStream out) thr } addShutdownHook(); + } public TajoCliContext getContext() { diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index 2f9e1386bd..b114b782cb 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -19,6 +19,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.util.KeyValueSet; @@ -213,6 +215,17 @@ public String call(NettyClientBase client) throws ServiceException { }.withRetries(); } + public String getHDFSDelegationToken() throws ServiceException { + return new ServerCallable(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) { + public String call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getHDFSDelegationToken(null, NullProto.getDefaultInstance()).getValue(); + } + }.withRetries(); + } + public Boolean selectDatabase(final String databaseName) throws ServiceException { return new ServerCallable(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) { @@ -343,7 +356,7 @@ public SubmitQueryResponse call(NettyClientBase client) throws ServiceException * @return If failed, return null. */ public ResultSet executeQueryAndGetResult(final String sql) - throws ServiceException, IOException { + throws Exception { SubmitQueryResponse response = executeQuery(sql); QueryId queryId = new QueryId(response.getQueryId()); @@ -367,7 +380,7 @@ public ResultSet executeQueryAndGetResult(final String sql) } } - public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException { + public ResultSet executeJsonQueryAndGetResult(final String json) throws Exception { SubmitQueryResponse response = executeQueryWithJson(json); QueryId queryId = new QueryId(response.getQueryId()); @@ -449,7 +462,7 @@ public static boolean isQueryRunnning(QueryState state) { } public ResultSet getQueryResult(QueryId queryId) - throws ServiceException, IOException { + throws Exception { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return createNullResultSet(queryId); } @@ -460,14 +473,14 @@ public ResultSet getQueryResult(QueryId queryId) } public static ResultSet createResultSet(TajoClient client, QueryId queryId, GetQueryResultResponse response) - throws IOException { + throws Exception { TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc()); TajoConf conf = new TajoConf(client.getConf()); conf.setVar(ConfVars.USERNAME, response.getTajoUserName()); return new TajoResultSet(client, queryId, conf, desc); } - public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException { + public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws Exception { if (response.hasTableDesc()) { TajoConf conf = new TajoConf(client.getConf()); conf.setVar(ConfVars.USERNAME, response.getUserName()); @@ -495,7 +508,7 @@ public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse r } private ResultSet getQueryResultAndWait(QueryId queryId) - throws ServiceException, IOException { + throws Exception { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return createNullResultSet(queryId); } @@ -526,7 +539,7 @@ private ResultSet getQueryResultAndWait(QueryId queryId) } } - public ResultSet createNullResultSet(QueryId queryId) throws IOException { + public ResultSet createNullResultSet(QueryId queryId) throws Exception { return new TajoResultSet(this, queryId); } diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto index 9495fb15dd..f699071742 100644 --- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto @@ -65,4 +65,7 @@ service TajoMasterClientProtocolService { rpc getTableList(GetTableListRequest) returns (GetTableListResponse); rpc getTableDesc(GetTableDescRequest) returns (TableResponse); rpc getFunctionList(SessionedStringProto) returns (FunctionResponse); + + // HDFS Authentication related API + rpc getHDFSDelegationToken(NullProto) returns (StringProto); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 97f59ef734..da9dc15592 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -751,5 +751,14 @@ public FunctionResponse getFunctionList(RpcController controller, SessionedStrin throw new ServiceException(t); } } + + @Override + public StringProto getHDFSDelegationToken(RpcController controller, + PrimitiveProtos.NullProto request) throws ServiceException + { + String delegationToken = conf.getVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN); + return (StringProto.newBuilder().setValue(delegationToken).build()); + } + } } diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index 84522e1942..ec11724fb4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -122,7 +122,7 @@ public TajoTestingCluster getTestingCluster() { return util; } - public ResultSet execute(String query) throws IOException, ServiceException { + public ResultSet execute(String query) throws Exception { return client.executeQueryAndGetResult(query); } diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java index 9439f0f1ff..9dc73be1da 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java @@ -268,6 +268,9 @@ public boolean isValid(int timeout) throws SQLException { } catch (IOException e) { LOG.error("JDBC connection is not valid."); return false; + } catch(Exception e) { + LOG.error("General error while accessing tajomaster"); + return false; } } From d0c09ba06383ae14e5d4cf2b3c50873861395c00 Mon Sep 17 00:00:00 2001 From: Prafulla T Date: Sat, 7 Jun 2014 22:42:20 -0700 Subject: [PATCH 4/5] TAJO-858 - Support for hadoop kerberos authentication in Tajo Added new class TajoConfException for execeptions caused by wrong value in configuration. Modified FileSystemUtil.getFileSystem code to wrap above exception in IOException and rethrow it. This will help not change any try/catch/throw code in client classes where FileSystemUtil.getFileSystem is used. --- .../java/org/apache/tajo/cli/TajoCli.java | 2 - .../org/apache/tajo/client/TajoClient.java | 14 +-- .../org/apache/tajo/jdbc/TajoResultSet.java | 8 +- .../apache/tajo/conf/TajoConfException.java | 51 ++++++++ .../org/apache/tajo/master/TajoMaster.java | 3 +- .../master/querymaster/QueryMasterTask.java | 2 +- .../org/apache/tajo/worker/TajoWorker.java | 6 +- .../apache/tajo/LocalTajoTestingUtility.java | 2 +- .../apache/tajo/storage/FileSystemUtil.java | 111 +++++++++++------- 9 files changed, 133 insertions(+), 66 deletions(-) create mode 100644 tajo-common/src/main/java/org/apache/tajo/conf/TajoConfException.java diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java index 6ab2cec9b0..0d5c83edc0 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java @@ -191,7 +191,6 @@ public TajoCli(TajoConf c, String [] args, InputStream in, OutputStream out) thr System.err.println("HDFS Delegation token " + dfsTokenString); conf.setVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN, dfsTokenString); - context = new TajoCliContext(); context.setCurrentDatabase(client.getCurrentDatabase()); initHistory(); @@ -220,7 +219,6 @@ public TajoCli(TajoConf c, String [] args, InputStream in, OutputStream out) thr } addShutdownHook(); - } public TajoCliContext getContext() { diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index b114b782cb..bf95ed0e95 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -356,7 +356,7 @@ public SubmitQueryResponse call(NettyClientBase client) throws ServiceException * @return If failed, return null. */ public ResultSet executeQueryAndGetResult(final String sql) - throws Exception { + throws ServiceException, IOException { SubmitQueryResponse response = executeQuery(sql); QueryId queryId = new QueryId(response.getQueryId()); @@ -380,7 +380,7 @@ public ResultSet executeQueryAndGetResult(final String sql) } } - public ResultSet executeJsonQueryAndGetResult(final String json) throws Exception { + public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException { SubmitQueryResponse response = executeQueryWithJson(json); QueryId queryId = new QueryId(response.getQueryId()); @@ -462,7 +462,7 @@ public static boolean isQueryRunnning(QueryState state) { } public ResultSet getQueryResult(QueryId queryId) - throws Exception { + throws ServiceException, IOException { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return createNullResultSet(queryId); } @@ -473,14 +473,14 @@ public ResultSet getQueryResult(QueryId queryId) } public static ResultSet createResultSet(TajoClient client, QueryId queryId, GetQueryResultResponse response) - throws Exception { + throws IOException { TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc()); TajoConf conf = new TajoConf(client.getConf()); conf.setVar(ConfVars.USERNAME, response.getTajoUserName()); return new TajoResultSet(client, queryId, conf, desc); } - public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws Exception { + public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException { if (response.hasTableDesc()) { TajoConf conf = new TajoConf(client.getConf()); conf.setVar(ConfVars.USERNAME, response.getUserName()); @@ -508,7 +508,7 @@ public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse r } private ResultSet getQueryResultAndWait(QueryId queryId) - throws Exception { + throws ServiceException, IOException { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return createNullResultSet(queryId); } @@ -539,7 +539,7 @@ private ResultSet getQueryResultAndWait(QueryId queryId) } } - public ResultSet createNullResultSet(QueryId queryId) throws Exception { + public ResultSet createNullResultSet(QueryId queryId) throws IOException { return new TajoResultSet(this, queryId); } diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java index 517f73dab8..ba0271984e 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java @@ -61,7 +61,7 @@ public TajoResultSet(TajoClient tajoClient, QueryId queryId) { init(); } - public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws Exception { + public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException { this.tajoClient = tajoClient; this.queryId = queryId; this.conf = conf; @@ -71,14 +71,14 @@ public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, Tabl } public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table, long maxRowNum) - throws Exception { + throws IOException { this(tajoClient, queryId, conf, table); this.maxRowNum = maxRowNum; initScanner(); init(); } - private void initScanner() throws Exception { + private void initScanner() throws IOException { if(desc != null) { schema = desc.getSchema(); fs = FileSystemUtil.getFileSystem(desc.getPath(), conf); @@ -173,7 +173,7 @@ public void beforeFirst() throws SQLException { initScanner(); } init(); - } catch (Exception e) { + } catch (IOException e) { e.printStackTrace(); } } diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConfException.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConfException.java new file mode 100644 index 0000000000..ec67f116b5 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConfException.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.conf; + + +/** + * Exception caused for wrong value in Tajo configuration + */ +public class TajoConfException extends Exception { + + public TajoConfException() { + super(); + } + + public TajoConfException(String message) { + super(message); + } + + public TajoConfException(String message, Throwable cause) { + super(message, cause); + } + + public TajoConfException(Throwable cause) { + super(cause); + } + + public TajoConfException(String format, Object... args) { + super(String.format(format, args)); + } + + public TajoConfException(Throwable cause, String format, Object... args) { + super(String.format(format, args), cause); + } + +} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index fca3383d11..504df597fa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -74,7 +74,6 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -225,7 +224,7 @@ private void initWebServer() throws Exception { } } - private void checkAndInitializeSystemDirectories() throws Exception { + private void checkAndInitializeSystemDirectories() throws IOException { // Get Tajo root dir this.tajoRootPath = TajoConf.getTajoRootDir(systemConf); LOG.info("Tajo Root Directory: " + tajoRootPath); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 21eb0021ba..ffe4d73c4d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -365,7 +365,7 @@ public synchronized void startQuery() { } } - private void initStagingDir() throws Exception { + private void initStagingDir() throws IOException { Path stagingDir = null; Path outputDir = null; FileSystem defaultFS = FileSystemUtil.getFileSystem(TajoConf.getWarehouseDir(systemConf), systemConf); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 60991dca5e..63977ef19f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -261,12 +261,10 @@ public void init(Configuration conf) { workerHeartbeatThread.init(conf); addIfService(workerHeartbeatThread); - LOG.info("--------------------- Delegation token start ----------"); - getDelegationToken(); - LOG.info("--------------------- Delegation token end ----------"); + getAndSetDelegationToken(); } - private void getDelegationToken() { + private void getAndSetDelegationToken() { NettyClientBase rpc = null; try { rpc = connPool.getConnection(tajoMasterAddress, diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index ec11724fb4..84522e1942 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -122,7 +122,7 @@ public TajoTestingCluster getTestingCluster() { return util; } - public ResultSet execute(String query) throws Exception { + public ResultSet execute(String query) throws IOException, ServiceException { return client.executeQueryAndGetResult(query); } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java index 184e35e8b5..1f02289d28 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileSystemUtil.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage; +import java.io.IOException; import java.security.PrivilegedExceptionAction; import org.apache.commons.logging.Log; @@ -33,6 +34,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.conf.TajoConfException; public class FileSystemUtil { private static final Log LOG = LogFactory.getLog(FileSystemUtil.class); @@ -47,11 +49,11 @@ public class FileSystemUtil { * */ private static FileSystem getDFSUsingDelegationToken(Path path, TajoConf systemConf, - String rootUriParam) throws Exception { + String rootUriParam) throws IOException, TajoConfException { String delegationToken = systemConf.getVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN); LOG.info("Delegation token :" + delegationToken); if(delegationToken.equals("null")) - throw new Exception("Hadoop DFS delegationToken is null, It should have been set in TajoMaster"); + throw new TajoConfException("Hadoop DFS delegationToken is null, It should have been set in TajoMaster"); Token dfsToken = new Token(); @@ -66,18 +68,24 @@ private static FileSystem getDFSUsingDelegationToken(Path path, TajoConf systemC LOG.info("DFS Token added to hadoop usergroup information"); final String kerberosPrincipal = systemConf.getVar(ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL); if(kerberosPrincipal.equals("null")) - throw new Exception("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL+" : null"); + throw new TajoConfException("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL+" : null"); final String rootUri = rootUriParam; - FileSystem fs = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set("fs.defaultFs", rootUri); - conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); - return FileSystem.get(conf); - } - }); + FileSystem fs = null; + try { + fs = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws InterruptedException, IOException { + Configuration conf = new HdfsConfiguration(); + conf.set("fs.defaultFs", rootUri); + conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); + return FileSystem.get(conf); + } + }); + } + catch(InterruptedException e) { + throw new IOException(e); + } return fs; } @@ -92,28 +100,35 @@ public FileSystem run() throws Exception { * @throws Exception Throws exception for wrong config values. */ private static FileSystem getDFSUsingKeyTab(Path path, TajoConf systemConf, - String rootUriParam) throws Exception { + String rootUriParam) throws IOException, TajoConfException { Configuration conf = new Configuration(); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf ); UserGroupInformation.setConfiguration(conf); final String kerberosPrincipal = systemConf.getVar(ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL); final String kerberosKeyTabLocation = systemConf.getVar(ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_KEYTAB_LOC); if(kerberosPrincipal.equals("null")) - throw new Exception("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL+" : null"); + throw new TajoConfException("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_PRINCIPAL+" : null"); if(kerberosKeyTabLocation.equals("null")) - throw new Exception("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_KEYTAB_LOC+" : null"); + throw new TajoConfException("Wrong value for "+ConfVars.HADOOP_DFS_NAMENODE_KERBEROS_KEYTAB_LOC+" : null"); UserGroupInformation ugi = UserGroupInformation. loginUserFromKeytabAndReturnUGI(kerberosPrincipal, kerberosKeyTabLocation); final String rootUri = rootUriParam; - FileSystem fs = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set("fs.defaultFs", rootUri); - conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); - return FileSystem.get(conf); - } - }); + FileSystem fs = null; + try { + fs = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws InterruptedException, IOException { + Configuration conf = new HdfsConfiguration(); + conf.set("fs.defaultFs", rootUri); + conf.set("dfs.namenode.kerberos.principal",kerberosPrincipal); + return FileSystem.get(conf); + } + }); + } + catch(InterruptedException e) { + throw new IOException(e); + } + Token dfsToken = fs.getDelegationToken(ugi.getShortUserName()); String dfsTokenString = dfsToken.encodeToUrlString(); systemConf.setVar(ConfVars.HADOOP_DFS_DELEGATION_TOKEN, dfsTokenString); @@ -125,8 +140,9 @@ public FileSystem run() throws Exception { * */ public static FileSystem getFileSystem(Path path, TajoConf systemConf - ) throws Exception { - return getFileSystem(path, systemConf, false); + ) throws IOException { + FileSystem fs = getFileSystem(path, systemConf, false); + return fs; } /** @@ -142,30 +158,35 @@ public static FileSystem getFileSystem(Path path, TajoConf systemConf */ public static FileSystem getFileSystem(Path path, TajoConf systemConf, boolean callFromTajoMaster - ) throws Exception { + ) throws IOException { path = new Path(path.toString().toLowerCase()); if(path.toUri().getScheme() != null && path.toUri().getScheme().equals("hdfs")) { - String hdfsSecurtyType = systemConf.getVar(ConfVars.HADOOP_SECURTY_AUTH_TYPE); - LOG.info("HDFS securty type "+ hdfsSecurtyType); - if(hdfsSecurtyType.equals("simple")) { - return path.getFileSystem(systemConf); - } - if(hdfsSecurtyType.equals("kerberos") == false) { - throw new Exception("Unsupported value for " + ConfVars.HADOOP_SECURTY_AUTH_TYPE + - " , Supported values : kerberos, simple"); - } + try { + String hdfsSecurtyType = systemConf.getVar(ConfVars.HADOOP_SECURTY_AUTH_TYPE); + LOG.info("HDFS securty type "+ hdfsSecurtyType); + if(hdfsSecurtyType.equals("simple")) { + return path.getFileSystem(systemConf); + } + if(hdfsSecurtyType.equals("kerberos") == false) { + throw new TajoConfException("Unsupported value for " + ConfVars.HADOOP_SECURTY_AUTH_TYPE + + " , Supported values : kerberos, simple"); + } - if(path.toUri().getPort() == -1) { - throw new Exception("Port missing in hdfs path :"+path); + if(path.toUri().getPort() == -1) { + throw new TajoConfException("Port missing in hdfs path :"+path); + } + if(path.toUri().getHost() == null) { + throw new TajoConfException("Host missing/malformed in hdfs path :" + path); + } + String rootUri = path.toUri().getScheme() + "://" + path.toUri().getHost() + ":" + + path.toUri().getPort(); + if(callFromTajoMaster) + return getDFSUsingKeyTab(path, systemConf, rootUri); + return getDFSUsingDelegationToken(path, systemConf, rootUri); } - if(path.toUri().getHost() == null) { - throw new Exception("Host missing/malformed in hdfs path :" + path); + catch(TajoConfException e) { + throw new IOException(e); } - String rootUri = path.toUri().getScheme() + "://" + path.toUri().getHost() + ":" - + path.toUri().getPort(); - if(callFromTajoMaster) - return getDFSUsingKeyTab(path, systemConf, rootUri); - return getDFSUsingDelegationToken(path, systemConf, rootUri); } return path.getFileSystem(systemConf); } From ce497808d1ca4e77eb7d0d285da5f6b87b6a4b44 Mon Sep 17 00:00:00 2001 From: Prafulla T Date: Sun, 8 Jun 2014 18:05:12 -0700 Subject: [PATCH 5/5] TAJO-858 - Support for hadoop kerberos authentication in Tajo Replaced normal getFileSystem method with FileSystemUtil.getFileSystem at few places in the codebase. --- .../apache/tajo/catalog/store/HCatalogStore.java | 5 +++-- .../planner/physical/ColPartitionStoreExec.java | 3 ++- .../physical/HashBasedColPartitionStoreExec.java | 5 +++-- .../physical/HashShuffleFileWriteExec.java | 16 ++++++++-------- .../physical/SortBasedColPartitionStoreExec.java | 3 ++- .../rewrite/PartitionedTableRewriter.java | 5 +++-- .../org/apache/tajo/master/GlobalEngine.java | 10 +++++----- .../tajo/master/TajoMasterClientService.java | 3 ++- .../apache/tajo/master/querymaster/Query.java | 9 +++++---- .../java/org/apache/tajo/worker/TaskRunner.java | 4 +++- .../org/apache/tajo/storage/FileScanner.java | 9 +++++++-- .../tajo/storage/StorageManagerFactory.java | 3 ++- 12 files changed, 45 insertions(+), 30 deletions(-) diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java index 7924af167e..075970223d 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java @@ -46,6 +46,7 @@ import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.InternalException; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; @@ -219,7 +220,7 @@ public final CatalogProtos.TableDescProto getTable(String databaseName, final St totalSize = Long.parseLong(properties.getProperty("totalSize")); } else { try { - FileSystem fs = path.getFileSystem(conf); + FileSystem fs = FileSystemUtil.getFileSystem(path, conf); if (fs.exists(path)) { totalSize = fs.getContentSummary(path).getLength(); } @@ -438,7 +439,7 @@ public final void createTable(final CatalogProtos.TableDescProto tableDescProto) table.setTableType(TableType.EXTERNAL_TABLE.name()); table.putToParameters("EXTERNAL", "TRUE"); - FileSystem fs = tableDesc.getPath().getFileSystem(conf); + FileSystem fs = FileSystemUtil.getFileSystem(tableDesc.getPath(), conf); if (fs.isFile(tableDesc.getPath())) { LOG.warn("A table path is a file, but HCatalog does not allow a file path."); sd.setLocation(tableDesc.getPath().getParent().toString()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index fe3690523a..c55f640d01 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -28,6 +28,7 @@ import org.apache.tajo.engine.planner.logical.CreateTableNode; import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.engine.planner.logical.StoreTableNode; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.worker.TaskAttemptContext; @@ -87,7 +88,7 @@ public void init() throws IOException { super.init(); storeTablePath = context.getOutputPath(); - FileSystem fs = storeTablePath.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(storeTablePath, context.getConf()); if (!fs.exists(storeTablePath.getParent())) { fs.mkdirs(storeTablePath.getParent()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index df32d0b400..96e9af931d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -28,6 +28,7 @@ import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.logical.StoreTableNode; import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -60,7 +61,7 @@ private Appender getAppender(String partition) throws IOException { if (appender == null) { Path dataFile = getDataFile(partition); - FileSystem fs = dataFile.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(dataFile, context.getConf()); if (fs.exists(dataFile.getParent())) { LOG.info("Path " + dataFile.getParent() + " already exists!"); @@ -129,4 +130,4 @@ public Tuple next() throws IOException { public void rescan() throws IOException { // nothing to do } -} \ No newline at end of file +} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 678b745efe..acc44bcc58 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -53,7 +53,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { private Map appenderMap = new HashMap(); private final int numShuffleOutputs; private final int [] shuffleKeyIds; - + public HashShuffleFileWriteExec(TaskAttemptContext context, final AbstractStorageManager sm, final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), child); @@ -82,13 +82,13 @@ public void init() throws IOException { FileSystem fs = new RawLocalFileSystem(); fs.mkdirs(storeTablePath); } - + private Appender getAppender(int partId) throws IOException { Appender appender = appenderMap.get(partId); if (appender == null) { Path dataFile = getDataFile(partId); - FileSystem fs = dataFile.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(dataFile, context.getConf()); if (fs.exists(dataFile)) { LOG.info("File " + dataFile + " already exists!"); FileStatus status = fs.getFileStatus(dataFile); @@ -119,7 +119,7 @@ public Tuple next() throws IOException { appender = getAppender(partId); appender.addTuple(tuple); } - + List statSet = new ArrayList(); for (Map.Entry entry : appenderMap.entrySet()) { int partNum = entry.getKey(); @@ -131,17 +131,17 @@ public Tuple next() throws IOException { context.addShuffleFileOutput(partNum, getDataFile(partNum).getName()); } } - + // Collect and aggregated statistics data TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet); context.setResultStats(aggregated); - + return null; } @Override public void rescan() throws IOException { - // nothing to do + // nothing to do } @Override @@ -157,4 +157,4 @@ public void close() throws IOException{ progress = 1.0f; } -} \ No newline at end of file +} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 8c55d7f503..66534de6d1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -32,6 +32,7 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.StorageManagerFactory; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -65,7 +66,7 @@ public void init() throws IOException { private Appender getAppender(String partition) throws IOException { Path dataFile = getDataFile(partition); - FileSystem fs = dataFile.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(dataFile, context.getConf()); if (fs.exists(dataFile.getParent())) { LOG.info("Path " + dataFile.getParent() + " already exists!"); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java index e6373414e3..f95e837033 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java @@ -35,6 +35,7 @@ import org.apache.tajo.engine.planner.PlanningException; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.utils.TupleUtil; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -133,7 +134,7 @@ public String toString() { private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(systemConf); + FileSystem fs = FileSystemUtil.getFileSystem(tablePath, systemConf); PathFilter [] filters; if (conjunctiveForms == null) { @@ -327,7 +328,7 @@ private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) { private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException { if (scanNode.getInputPaths().length > 0) { try { - FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf); + FileSystem fs = FileSystemUtil.getFileSystem(scanNode.getInputPaths()[0], systemConf); long totalVolume = 0; for (Path input : scanNode.getInputPaths()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 3b81ce2ea1..5509b53e70 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -305,7 +305,7 @@ private void insertNonFromQuery(InsertNode insertNode, SubmitQueryResponse.Build String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName(); String queryId = nodeUniqName + "_" + System.currentTimeMillis(); - FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(TajoConf.getWarehouseDir(context.getConf()), context.getConf()); Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), fs, queryId.toString()); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); @@ -560,7 +560,7 @@ public void alterTable(final Session session, final AlterTableNode alterTable) t databaseName, simpleTableName); Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), databaseName, alterTable.getNewTableName()); - FileSystem fs = oldPath.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(oldPath , context.getConf()); if (!fs.exists(oldPath)) { throw new IOException("No such a table directory: " + oldPath); @@ -654,7 +654,7 @@ public TableDesc createTableOnPath(Session session, String tableName, Schema sch } } - FileSystem fs = path.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(path, context.getConf()); if (isExternal) { if(!fs.exists(path)) { @@ -716,7 +716,7 @@ public boolean createDatabase(@Nullable Session session, String databaseName, if (catalog.createDatabase(databaseName, tablespaceName)) { String normalized = databaseName; Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized); - FileSystem fs = databaseDir.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(databaseDir, context.getConf()); fs.mkdirs(databaseDir); } @@ -780,7 +780,7 @@ public boolean dropTable(Session session, String tableName, boolean ifExists, bo if (purge) { try { - FileSystem fs = path.getFileSystem(context.getConf()); + FileSystem fs = FileSystemUtil.getFileSystem(path, context.getConf()); fs.delete(path, true); } catch (IOException e) { throw new InternalError(e.getMessage()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index da9dc15592..e75b11d114 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -52,6 +52,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; @@ -674,7 +675,7 @@ public TableResponse createExternalTable(RpcController controller, CreateTableRe Session session = context.getSessionManager().getSession(request.getSessionId().getId()); Path path = new Path(request.getPath()); - FileSystem fs = path.getFileSystem(conf); + FileSystem fs = FileSystemUtil.getFileSystem(path, conf); if (!fs.exists(path)) { throw new IOException("No such a directory: " + path); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 04e82cabcb..4a7a2443cf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -47,6 +47,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.*; import org.apache.tajo.storage.AbstractStorageManager; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.util.TUtil; import java.io.IOException; @@ -303,11 +304,11 @@ public MasterPlan getPlan() { public StateMachine getStateMachine() { return stateMachine; } - + public void addSubQuery(SubQuery subquery) { subqueries.put(subquery.getId(), subquery); } - + public QueryId getId() { return this.id; } @@ -396,7 +397,7 @@ public Path commitOutputData(Query query) { if (queryContext.hasOutputPath()) { finalOutputDir = queryContext.getOutputPath(); try { - FileSystem fs = stagingResultDir.getFileSystem(query.systemConf); + FileSystem fs = FileSystemUtil.getFileSystem(stagingResultDir, query.systemConf); if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO @@ -579,7 +580,7 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo } public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(systemConf); + FileSystem fs = FileSystemUtil.getFileSystem(tablePath, systemConf); ContentSummary directorySummary = fs.getContentSummary(tablePath); return directorySummary.getLength(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 9e904cdaad..e688fbe923 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -46,6 +46,7 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.util.TajoIdUtils; import java.net.InetSocketAddress; @@ -172,7 +173,8 @@ public void init(Configuration conf) { try { // initialize DFS and LocalFileSystems - defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf); + defaultFS = FileSystemUtil.getFileSystem(TajoConf.getTajoRootDir(systemConf), + this.systemConf); localFS = FileSystem.getLocal(conf); // the base dir for an output dir diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java index f15c4c97a5..74e3b75fc6 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -30,6 +30,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.FileSystemUtil; import java.io.IOException; @@ -102,10 +103,14 @@ public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOEx fs = FileSystem.get(path.toUri(), tajoConf, tajoUser); } catch (InterruptedException e) { LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]"); - fs = FileSystem.get(path.toUri(), tajoConf); + fs = FileSystemUtil.getFileSystem(path, tajoConf); + } + catch (IOException e) { + LOG.warn("IOException while getting FileSystem object, Tring FileSystemUtil.getFileSystem to check secure access"); + fs = FileSystemUtil.getFileSystem(path, tajoConf); } } else { - fs = FileSystem.get(path.toUri(), tajoConf); + fs = FileSystemUtil.getFileSystem(path, tajoConf); } return fs; diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java index 85bb8619a4..ee400a1e2f 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java @@ -26,6 +26,7 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.FileSystemUtil; import org.apache.tajo.storage.v2.StorageManagerV2; import java.io.IOException; @@ -89,7 +90,7 @@ public static synchronized SeekableScanner getSeekableScanner( public static synchronized SeekableScanner getSeekableScanner( TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { - FileSystem fs = path.getFileSystem(conf); + FileSystem fs = FileSystemUtil.getFileSystem(path, conf); FileStatus status = fs.getFileStatus(path); FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());