Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.security.UserGroupInformation;
import javax.security.auth.Subject;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
/**
* A MapReduce/Hive input format for ORC files.
* <p>
Expand Down Expand Up @@ -493,13 +500,21 @@ static final class FileGenerator implements Runnable {
private final Context context;
private final FileSystem fs;
private final Path dir;
private UserGroupInformation ugi;

FileGenerator(Context context, FileSystem fs, Path dir) {
this.context = context;
this.fs = fs;
this.dir = dir;
}

FileGenerator(Context context, FileSystem fs, Path dir, UserGroupInformation ugi) {
this.context = context;
this.fs = fs;
this.dir = dir;
this.ugi = ugi;
}

private void scheduleSplits(FileStatus file,
boolean isOriginal,
boolean hasBase,
Expand All @@ -517,6 +532,32 @@ private void scheduleSplits(FileStatus file,
*/
@Override
public void run() {
if(ugi == null)
runInternal();
else{
AccessControlContext accessControlContext = AccessController.getContext();
Subject subject = Subject.getSubject(accessControlContext);
Set<Principal> principals = subject.getPrincipals();

try {
SplitStrategy strategy = this.ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {

AccessControlContext accessControlContext = AccessController.getContext();
Subject subject = Subject.getSubject(accessControlContext);
Set<Principal> principals = subject.getPrincipals();
runInternal();
return true;
}
});
}catch(Exception e){
throw e;
}
}
}

private void runInternal(){
try {
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
context.conf, context.transactionList);
Expand Down Expand Up @@ -559,7 +600,7 @@ public void run() {
for (int b = 0; b < context.numBuckets; ++b) {
if (!covered[b]) {
context.splits.add(new OrcSplit(dir, b, 0, new String[0], null,
false, false, deltas));
false, false, deltas));
}
}
}
Expand Down Expand Up @@ -933,9 +974,10 @@ static List<OrcSplit> generateSplitsInfo(Configuration conf)
throws IOException {
// use threads to resolve directories into splits
Context context = new Context(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
for(Path dir: getInputPaths(conf)) {
FileSystem fs = dir.getFileSystem(conf);
context.schedule(new FileGenerator(context, fs, dir));
context.schedule(new FileGenerator(context, fs, dir, ugi));
}
context.waitForTasks();
// deal with exceptions
Expand Down