Skip to content

Commit

Permalink
[FLINK-7535] Port DashboardConfigHandler to new REST endpoint
Browse files Browse the repository at this point in the history
Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.
  • Loading branch information
tillrohrmann committed Sep 21, 2017
1 parent dbabdb1 commit c6243b8
Show file tree
Hide file tree
Showing 14 changed files with 406 additions and 51 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
Expand All @@ -47,6 +48,7 @@
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.nio.file.Files; import java.nio.file.Files;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
Expand Down Expand Up @@ -289,7 +291,7 @@ static FileWriter createOrGetFile(File folder, String name) throws IOException {


private void createDashboardConfigFile() throws IOException { private void createDashboardConfigFile() throws IOException {
try (FileWriter fw = createOrGetFile(webDir, "config")) { try (FileWriter fw = createOrGetFile(webDir, "config")) {
fw.write(DashboardConfigHandler.createConfigJson(webRefreshIntervalMillis)); fw.write(DashboardConfigHandler.createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
fw.flush(); fw.flush();
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Failed to write config file."); LOG.error("Failed to write config file.");
Expand Down
Expand Up @@ -20,15 +20,19 @@


import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter; import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand All @@ -51,26 +55,25 @@
public class DispatcherRestEndpoint extends RestServerEndpoint { public class DispatcherRestEndpoint extends RestServerEndpoint {


private final GatewayRetriever<DispatcherGateway> leaderRetriever; private final GatewayRetriever<DispatcherGateway> leaderRetriever;
private final Time timeout; private final RestHandlerConfiguration restConfiguration;
private final File tmpDir;
private final Executor executor; private final Executor executor;


public DispatcherRestEndpoint( public DispatcherRestEndpoint(
RestServerEndpointConfiguration configuration, RestServerEndpointConfiguration configuration,
GatewayRetriever<DispatcherGateway> leaderRetriever, GatewayRetriever<DispatcherGateway> leaderRetriever,
Time timeout, RestHandlerConfiguration restConfiguration,
File tmpDir,
Executor executor) { Executor executor) {
super(configuration); super(configuration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.timeout = Preconditions.checkNotNull(timeout); this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
this.executor = Preconditions.checkNotNull(executor); this.executor = Preconditions.checkNotNull(executor);
} }


@Override @Override
protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(2); ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);

final Time timeout = restConfiguration.getTimeout();


LegacyRestHandlerAdapter<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>( LegacyRestHandlerAdapter<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture, restAddressFuture,
Expand All @@ -81,7 +84,16 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executor, executor,
timeout)); timeout));


handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler)); LegacyRestHandlerAdapter<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> dashboardConfigurationHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
leaderRetriever,
timeout,
DashboardConfigurationHeaders.getInstance(),
new DashboardConfigHandler(
executor,
restConfiguration.getRefreshInterval()));

final File tmpDir = restConfiguration.getTmpDir();


Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;


Expand All @@ -96,6 +108,9 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
optWebContent = Optional.empty(); optWebContent = Optional.empty();
} }


handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));

optWebContent.ifPresent( optWebContent.ifPresent(
webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent))); webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));


Expand All @@ -106,6 +121,8 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
public void shutdown(Time timeout) { public void shutdown(Time timeout) {
super.shutdown(timeout); super.shutdown(timeout);


final File tmpDir = restConfiguration.getTmpDir();

try { try {
log.info("Removing cache directory {}", tmpDir); log.info("Removing cache directory {}", tmpDir);
FileUtils.deleteDirectory(tmpDir); FileUtils.deleteDirectory(tmpDir);
Expand Down
Expand Up @@ -20,7 +20,6 @@


import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.Dispatcher;
Expand All @@ -34,14 +33,14 @@
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkException;


import java.io.File;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


Expand Down Expand Up @@ -157,14 +156,10 @@ protected DispatcherRestEndpoint createDispatcherRestEndpoint(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever, LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
Executor executor) throws Exception { Executor executor) throws Exception {


Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));

return new DispatcherRestEndpoint( return new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration), RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever, dispatcherGatewayRetriever,
timeout, RestHandlerConfiguration.fromConfiguration(configuration),
tmpDir,
executor); executor);
} }


Expand Down
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.util.Preconditions;

import java.io.File;

/**
* Configuration object containing values for the rest handler configuration.
*/
public class RestHandlerConfiguration {

private final long refreshInterval;

private final Time timeout;

private final File tmpDir;

public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) {
Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
this.refreshInterval = refreshInterval;

this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
}

public long getRefreshInterval() {
return refreshInterval;
}

public Time getTimeout() {
return timeout;
}

public File getTmpDir() {
return tmpDir;
}

public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);

final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));

final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));

return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir);
}
}
Expand Up @@ -25,9 +25,9 @@
import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler; import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.EnvironmentInformation;
Expand Down
Expand Up @@ -18,15 +18,20 @@


package org.apache.flink.runtime.rest.handler.legacy; package org.apache.flink.runtime.rest.handler.legacy;


import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;


import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator;


import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.time.ZonedDateTime;
import java.util.Map; import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


Expand All @@ -35,16 +40,21 @@
* against this web server should behave. It defines for example the refresh interval, * against this web server should behave. It defines for example the refresh interval,
* and time zone of the server timestamps. * and time zone of the server timestamps.
*/ */
public class DashboardConfigHandler extends AbstractJsonRequestHandler { public class DashboardConfigHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> {


private static final String DASHBOARD_CONFIG_REST_PATH = "/config"; public static final String DASHBOARD_CONFIG_REST_PATH = "/config";


private final String configString; private final String configString;


private final DashboardConfiguration dashboardConfiguration;

public DashboardConfigHandler(Executor executor, long refreshInterval) { public DashboardConfigHandler(Executor executor, long refreshInterval) {
super(executor); super(executor);

dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now());

try { try {
this.configString = createConfigJson(refreshInterval); this.configString = createConfigJson(dashboardConfiguration);
} }
catch (Exception e) { catch (Exception e) {
// should never happen // should never happen
Expand All @@ -57,29 +67,26 @@ public String[] getPaths() {
return new String[]{DASHBOARD_CONFIG_REST_PATH}; return new String[]{DASHBOARD_CONFIG_REST_PATH};
} }


@Override
public CompletableFuture<DashboardConfiguration> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
return CompletableFuture.completedFuture(dashboardConfiguration);
}

@Override @Override
public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
return CompletableFuture.completedFuture(configString); return CompletableFuture.completedFuture(configString);
} }


public static String createConfigJson(long refreshInterval) throws IOException { public static String createConfigJson(DashboardConfiguration dashboardConfiguration) throws IOException {
StringWriter writer = new StringWriter(); StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);


TimeZone timeZone = TimeZone.getDefault();
String timeZoneName = timeZone.getDisplayName();
long timeZoneOffset = timeZone.getRawOffset();

gen.writeStartObject(); gen.writeStartObject();
gen.writeNumberField("refresh-interval", refreshInterval); gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, dashboardConfiguration.getRefreshInterval());
gen.writeNumberField("timezone-offset", timeZoneOffset); gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, dashboardConfiguration.getTimeZoneOffset());
gen.writeStringField("timezone-name", timeZoneName); gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, dashboardConfiguration.getTimeZoneName());
gen.writeStringField("flink-version", EnvironmentInformation.getVersion()); gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion());

gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision());
EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
if (revision != null) {
gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
}


gen.writeEndObject(); gen.writeEndObject();


Expand Down

0 comments on commit c6243b8

Please sign in to comment.