diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TerminalController.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TerminalController.java index b6a7703eb9..ea8c6d5b31 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TerminalController.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TerminalController.java @@ -22,9 +22,11 @@ import com.netease.arctic.server.dashboard.model.SessionInfo; import com.netease.arctic.server.dashboard.model.SqlExample; import com.netease.arctic.server.dashboard.model.SqlResult; +import com.netease.arctic.server.dashboard.response.ErrorResponse; import com.netease.arctic.server.dashboard.response.OkResponse; import com.netease.arctic.server.terminal.TerminalManager; import io.javalin.http.Context; +import io.javalin.http.HttpCode; import java.util.Arrays; import java.util.List; @@ -65,8 +67,15 @@ public void executeScript(Context ctx) { String catalog = ctx.pathParam("catalog"); Map bodyParams = ctx.bodyAsClass(Map.class); String sql = bodyParams.get("sql"); + String proxyUser = bodyParams.get("proxyUser"); String terminalId = ctx.cookie("JSESSIONID"); - String sessionId = terminalManager.executeScript(terminalId, catalog, sql); + String sessionId; + try { + sessionId = terminalManager.executeScript(terminalId, catalog, sql, proxyUser); + } catch (IllegalStateException e) { + ctx.json(ErrorResponse.of(HttpCode.CONFLICT, e.getMessage())); + return; + } ctx.json(OkResponse.of(new SessionInfo(sessionId))); } diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/response/ErrorResponse.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/response/ErrorResponse.java index a9fbc4ecb1..afecd296b8 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/response/ErrorResponse.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/response/ErrorResponse.java @@ -35,14 +35,17 @@ public ErrorResponse(HttpCode httpStatus, String message, String requestId) { } public ErrorResponse(String message) { - super(HttpCode.BAD_REQUEST.getStatus(), message); - this.requestId = null; + this(HttpCode.BAD_REQUEST.getStatus(), message, null); } public static ErrorResponse of(String message) { return new ErrorResponse(message); } + public static ErrorResponse of(HttpCode httpStatus, String message) { + return new ErrorResponse(httpStatus, message, null); + } + public String getRequestId() { return requestId; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalManager.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalManager.java index 0fdc60ab94..cd71213c7e 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalManager.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalManager.java @@ -99,9 +99,17 @@ public TerminalManager(Configurations conf, TableService tableService) { * @param terminalId - id to mark different terminal windows * @param catalog - current catalog to execute script * @param script - sql script to be executed + * @param proxyUser - proxy user to execute script * @return - sessionId, session refer to a sql execution context */ - public String executeScript(String terminalId, String catalog, String script) { + public String executeScript(String terminalId, String catalog, String script, String proxyUser) { + LOG.debug( + "current active sessions: {}, queue size {}", + sessionMap.size(), + executionPool.getQueue().size()); + LOG.debug( + "execute script, terminalId: {}, catalog: {}, script: {}", terminalId, catalog, script); + CatalogMeta catalogMeta = tableService.getCatalogMeta(catalog); TableMetaStore metaStore = getCatalogTableMetaStore(catalogMeta); String sessionId = getSessionId(terminalId, metaStore, catalog); @@ -134,7 +142,7 @@ public String executeScript(String terminalId, String catalog, String script) { throw new IllegalStateException( "current session is not ready to execute script. status:" + context.getStatus()); } - context.submit(catalog, script, resultLimits, stopOnError); + context.submit(catalog, metaStore, proxyUser, script, resultLimits, stopOnError); return sessionId; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionContext.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionContext.java index 54a76ef488..c7062aa929 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionContext.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionContext.java @@ -25,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.LineNumberReader; @@ -84,8 +86,14 @@ private boolean isStatusReadyToExecute(ExecutionStatus status) { } public synchronized void submit( - String catalog, String script, int fetchLimit, boolean stopOnError) { - ExecutionTask task = new ExecutionTask(catalog, script, fetchLimit, stopOnError); + String catalog, + TableMetaStore metaStore, + String proxyUser, + String script, + int fetchLimit, + boolean stopOnError) { + ExecutionTask task = + new ExecutionTask(catalog, metaStore, proxyUser, script, fetchLimit, stopOnError); if (!isReadyToExecute()) { throw new IllegalStateException( "current session is not ready to execute. status: " + status.get().name()); @@ -178,9 +186,20 @@ private class ExecutionTask implements Supplier { private final int fetchLimits; private final boolean stopOnError; private final String catalog; + private final TableMetaStore tableMetaStore; + + @Nullable private final String proxyUser; - public ExecutionTask(String catalog, String script, int fetchLimits, boolean stopOnError) { + public ExecutionTask( + String catalog, + TableMetaStore tableMetaStore, + String proxyUser, + String script, + int fetchLimits, + boolean stopOnError) { this.catalog = catalog; + this.tableMetaStore = tableMetaStore; + this.proxyUser = proxyUser; if (script.trim().endsWith(";")) { this.script = script; } else { @@ -193,7 +212,8 @@ public ExecutionTask(String catalog, String script, int fetchLimits, boolean sto @Override public ExecutionStatus get() { try { - return metaStore.doAs( + return metaStore.doAsImpersonating( + proxyUser, () -> { TerminalSession session = lazyLoadSession(this); executionResult.appendLog("fetch terminal session: " + sessionId); @@ -278,7 +298,9 @@ boolean executeStatement(TerminalSession session, String statement, int lineNo) TerminalSession.ResultSet rs = null; long begin = System.currentTimeMillis(); try { - rs = session.executeStatement(catalog, statement); + rs = + tableMetaStore.doAsImpersonating( + proxyUser, () -> session.executeStatement(catalog, statement)); executionResult.appendLogs(session.logs()); } catch (Throwable t) { executionResult.appendLogs(session.logs()); diff --git a/core/src/main/java/com/netease/arctic/table/TableMetaStore.java b/core/src/main/java/com/netease/arctic/table/TableMetaStore.java index 83ed7a3e73..2cf51fe3b3 100644 --- a/core/src/main/java/com/netease/arctic/table/TableMetaStore.java +++ b/core/src/main/java/com/netease/arctic/table/TableMetaStore.java @@ -218,15 +218,22 @@ public T doAsImpersonating(String proxyUser, Callable callable) { if (disableAuth) { return call(callable); } - // create proxy user ugi and execute - UserGroupInformation proxyUgi = - UserGroupInformation.createProxyUser(proxyUser, Objects.requireNonNull(getUGI())); - LOG.debug( - "Access through the proxy account {}, proxy ugi {}, original ugi {}.", - proxyUser, - proxyUgi, - getUGI()); - return proxyUgi.doAs((PrivilegedAction) () -> call(callable)); + if (StringUtils.isBlank(proxyUser)) { + // use the catalog ugi to execute + UserGroupInformation catalogUgi = getUGI(); + LOG.debug("Access through the catalog ugi {}.", catalogUgi); + return catalogUgi.doAs((PrivilegedAction) () -> call(callable)); + } else { + // create proxy user ugi and execute + UserGroupInformation proxyUgi = + UserGroupInformation.createProxyUser(proxyUser, Objects.requireNonNull(getUGI())); + LOG.debug( + "Access through the proxy account {}, proxy ugi {}, original ugi {}.", + proxyUser, + proxyUgi, + getUGI()); + return proxyUgi.doAs((PrivilegedAction) () -> call(callable)); + } } private T call(Callable callable) {