diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index e6a39a5f1a048..b828534bea571 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -33,6 +33,7 @@ import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.rest.HttpHeader; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkUserCodeClassLoaders; import org.apache.flink.util.SerializedThrowable; @@ -43,6 +44,8 @@ import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; @@ -185,4 +188,24 @@ public static ScheduledExecutorService reportHeartbeatPeriodically( TimeUnit.MILLISECONDS); return scheduledExecutor; } + + public static Collection readHeadersFromEnvironmentVariable(String envVarName) { + List headers = new ArrayList<>(); + String rawHeaders = System.getenv(envVarName); + + if (rawHeaders != null) { + String[] lines = rawHeaders.split("\n"); + for (String line : lines) { + String[] keyValue = line.split(":", 2); + if (keyValue.length == 2) { + headers.add(new HttpHeader(keyValue[0], keyValue[1])); + } else { + LOG.info( + "Skipped a malformed header {} from FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in format header_name:header_value.", + line); + } + } + } + return headers; + } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java similarity index 91% rename from flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java rename to flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java index 9e905029fc4d6..bd6398176c574 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.gateway.rest.header.util; +package org.apache.flink.client.program.rest; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.RestClient; @@ -24,10 +24,12 @@ import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; -import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import java.util.Collection; + import static org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkNotNull; /** @@ -41,7 +43,7 @@ */ public class UrlPrefixDecorator< R extends RequestBody, P extends ResponseBody, M extends MessageParameters> - implements SqlGatewayMessageHeaders { + implements MessageHeaders { private final String prefixedUrl; private final MessageHeaders decorated; @@ -100,4 +102,9 @@ public Class getRequestClass() { public M getUnresolvedMessageParameters() { return decorated.getUnresolvedMessageParameters(); } + + @Override + public Collection> getSupportedAPIVersions() { + return decorated.getSupportedAPIVersions(); + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java index b97653221ebc1..98cef5df1f688 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java @@ -19,6 +19,8 @@ package org.apache.flink.table.client.gateway; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.program.rest.UrlPrefixDecorator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; @@ -52,7 +54,6 @@ import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders; -import org.apache.flink.table.gateway.rest.header.util.UrlPrefixDecorator; import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters; import org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody; import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody; @@ -84,7 +85,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -178,7 +178,8 @@ public ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String sessio this.gatewayUrl = gatewayUrl; this.rowFormat = rowFormat; this.customHttpHeaders = - readHeadersFromEnvironmentVariable(ConfigConstants.FLINK_REST_CLIENT_HEADERS); + ClientUtils.readHeadersFromEnvironmentVariable( + ConfigConstants.FLINK_REST_CLIENT_HEADERS); try { // register required resource this.executorService = Executors.newCachedThreadPool(); @@ -595,26 +596,6 @@ private void closeSession() throws SqlExecutionException { } } - private static Collection readHeadersFromEnvironmentVariable(String envVarName) { - List headers = new ArrayList<>(); - String rawHeaders = System.getenv(envVarName); - - if (rawHeaders != null) { - String[] lines = rawHeaders.split("\n"); - for (String line : lines) { - String[] keyValue = line.split(":", 2); - if (keyValue.length == 2) { - headers.add(new HttpHeader(keyValue[0], keyValue[1])); - } else { - LOG.info( - "Skipped a malformed header {} from FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in format header_name:header_value.", - line); - } - } - } - return headers; - } - @VisibleForTesting Collection getCustomHttpHeaders() { return customHttpHeaders;