Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2692] Support proxy user in terminal #2693

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.netease.arctic.server.dashboard.model.SessionInfo;
import com.netease.arctic.server.dashboard.model.SqlExample;
import com.netease.arctic.server.dashboard.model.SqlResult;
import com.netease.arctic.server.dashboard.response.ErrorResponse;
import com.netease.arctic.server.dashboard.response.OkResponse;
import com.netease.arctic.server.terminal.TerminalManager;
import io.javalin.http.Context;
import io.javalin.http.HttpCode;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -65,8 +67,15 @@ public void executeScript(Context ctx) {
String catalog = ctx.pathParam("catalog");
Map<String, String> bodyParams = ctx.bodyAsClass(Map.class);
String sql = bodyParams.get("sql");
String proxyUser = bodyParams.get("proxyUser");
String terminalId = ctx.cookie("JSESSIONID");
String sessionId = terminalManager.executeScript(terminalId, catalog, sql);
String sessionId;
try {
sessionId = terminalManager.executeScript(terminalId, catalog, sql, proxyUser);
} catch (IllegalStateException e) {
ctx.json(ErrorResponse.of(HttpCode.CONFLICT, e.getMessage()));
return;
}

ctx.json(OkResponse.of(new SessionInfo(sessionId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ public ErrorResponse(HttpCode httpStatus, String message, String requestId) {
}

public ErrorResponse(String message) {
super(HttpCode.BAD_REQUEST.getStatus(), message);
this.requestId = null;
this(HttpCode.BAD_REQUEST.getStatus(), message, null);
}

public static ErrorResponse of(String message) {
return new ErrorResponse(message);
}

public static ErrorResponse of(HttpCode httpStatus, String message) {
return new ErrorResponse(httpStatus, message, null);
}

public String getRequestId() {
return requestId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,17 @@ public TerminalManager(Configurations conf, TableService tableService) {
* @param terminalId - id to mark different terminal windows
* @param catalog - current catalog to execute script
* @param script - sql script to be executed
* @param proxyUser - proxy user to execute script
* @return - sessionId, session refer to a sql execution context
*/
public String executeScript(String terminalId, String catalog, String script) {
public String executeScript(String terminalId, String catalog, String script, String proxyUser) {
LOG.debug(
"current active sessions: {}, queue size {}",
sessionMap.size(),
executionPool.getQueue().size());
LOG.debug(
"execute script, terminalId: {}, catalog: {}, script: {}", terminalId, catalog, script);

CatalogMeta catalogMeta = tableService.getCatalogMeta(catalog);
TableMetaStore metaStore = getCatalogTableMetaStore(catalogMeta);
String sessionId = getSessionId(terminalId, metaStore, catalog);
Expand Down Expand Up @@ -134,7 +142,7 @@ public String executeScript(String terminalId, String catalog, String script) {
throw new IllegalStateException(
"current session is not ready to execute script. status:" + context.getStatus());
}
context.submit(catalog, script, resultLimits, stopOnError);
context.submit(catalog, metaStore, proxyUser, script, resultLimits, stopOnError);
return sessionId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.LineNumberReader;
Expand Down Expand Up @@ -84,8 +86,14 @@ private boolean isStatusReadyToExecute(ExecutionStatus status) {
}

public synchronized void submit(
String catalog, String script, int fetchLimit, boolean stopOnError) {
ExecutionTask task = new ExecutionTask(catalog, script, fetchLimit, stopOnError);
String catalog,
TableMetaStore metaStore,
String proxyUser,
String script,
int fetchLimit,
boolean stopOnError) {
ExecutionTask task =
new ExecutionTask(catalog, metaStore, proxyUser, script, fetchLimit, stopOnError);
if (!isReadyToExecute()) {
throw new IllegalStateException(
"current session is not ready to execute. status: " + status.get().name());
Expand Down Expand Up @@ -178,9 +186,20 @@ private class ExecutionTask implements Supplier<ExecutionStatus> {
private final int fetchLimits;
private final boolean stopOnError;
private final String catalog;
private final TableMetaStore tableMetaStore;

@Nullable private final String proxyUser;

public ExecutionTask(String catalog, String script, int fetchLimits, boolean stopOnError) {
public ExecutionTask(
String catalog,
TableMetaStore tableMetaStore,
String proxyUser,
String script,
int fetchLimits,
boolean stopOnError) {
this.catalog = catalog;
this.tableMetaStore = tableMetaStore;
this.proxyUser = proxyUser;
if (script.trim().endsWith(";")) {
this.script = script;
} else {
Expand All @@ -193,7 +212,8 @@ public ExecutionTask(String catalog, String script, int fetchLimits, boolean sto
@Override
public ExecutionStatus get() {
try {
return metaStore.doAs(
return metaStore.doAsImpersonating(
proxyUser,
() -> {
TerminalSession session = lazyLoadSession(this);
executionResult.appendLog("fetch terminal session: " + sessionId);
Expand Down Expand Up @@ -278,7 +298,9 @@ boolean executeStatement(TerminalSession session, String statement, int lineNo)
TerminalSession.ResultSet rs = null;
long begin = System.currentTimeMillis();
try {
rs = session.executeStatement(catalog, statement);
rs =
tableMetaStore.doAsImpersonating(
proxyUser, () -> session.executeStatement(catalog, statement));
Comment on lines +301 to +303
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to check whether to call doAs or doAsImpersonating here, and try not to modify the code of the TableMetastore.

executionResult.appendLogs(session.logs());
} catch (Throwable t) {
executionResult.appendLogs(session.logs());
Expand Down
25 changes: 16 additions & 9 deletions core/src/main/java/com/netease/arctic/table/TableMetaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,22 @@ public <T> T doAsImpersonating(String proxyUser, Callable<T> callable) {
if (disableAuth) {
return call(callable);
}
// create proxy user ugi and execute
UserGroupInformation proxyUgi =
UserGroupInformation.createProxyUser(proxyUser, Objects.requireNonNull(getUGI()));
LOG.debug(
"Access through the proxy account {}, proxy ugi {}, original ugi {}.",
proxyUser,
proxyUgi,
getUGI());
return proxyUgi.doAs((PrivilegedAction<T>) () -> call(callable));
if (StringUtils.isBlank(proxyUser)) {
// use the catalog ugi to execute
UserGroupInformation catalogUgi = getUGI();
LOG.debug("Access through the catalog ugi {}.", catalogUgi);
return catalogUgi.doAs((PrivilegedAction<T>) () -> call(callable));
} else {
// create proxy user ugi and execute
UserGroupInformation proxyUgi =
UserGroupInformation.createProxyUser(proxyUser, Objects.requireNonNull(getUGI()));
LOG.debug(
"Access through the proxy account {}, proxy ugi {}, original ugi {}.",
proxyUser,
proxyUgi,
getUGI());
return proxyUgi.doAs((PrivilegedAction<T>) () -> call(callable));
}
}

private <T> T call(Callable<T> callable) {
Expand Down