Skip to content

Commit

Permalink
[FLINK-27609][Fix] Optimize REST call frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
morhidi committed May 17, 2022
1 parent aa7bd6b commit 3886e66
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,31 @@ public void observe(FlinkDeployment flinkApp, Context context) {
}

if (isJmDeploymentReady(flinkApp)) {
observeClusterInfo(flinkApp, observeConfig);
if (observeFlinkCluster(flinkApp, context, observeConfig)) {
if (reconciliationStatus.getState() != ReconciliationState.ROLLED_BACK) {
reconciliationStatus.markReconciledSpecAsStable();
if (observeClusterInfo(flinkApp, observeConfig)) {
if (observeFlinkCluster(flinkApp, context, observeConfig)) {
if (reconciliationStatus.getState() != ReconciliationState.ROLLED_BACK) {
reconciliationStatus.markReconciledSpecAsStable();
}
}
}
}

clearErrorsIfDeploymentIsHealthy(flinkApp);
}

private void observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
private boolean observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
if (flinkApp.getStatus().getClusterInfo() != null) {
return true;
}
try {
Map<String, String> clusterInfo = flinkService.getClusterinfo(configuration);
Map<String, String> clusterInfo = flinkService.getClusterInfo(configuration);
flinkApp.getStatus().setClusterInfo(clusterInfo);
logger.debug("ClusterInfo: {}", clusterInfo);
} catch (Exception e) {
logger.warn("Exception while fetching cluster info", e);
logger.error("Exception while fetching cluster info", e);
return false;
}
return true;
}

protected void observeJmDeployment(
Expand All @@ -120,6 +126,8 @@ protected void observeJmDeployment(
return;
}

flinkApp.getStatus().setClusterInfo(null);

logger.info(
"Observing JobManager deployment. Previous status: {}", previousJmStatus.name());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.service;

import org.apache.flink.runtime.rest.messages.ResponseBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import lombok.Data;
import lombok.NoArgsConstructor;

/** Custom Response for handling dashboard configs. */
@JsonIgnoreProperties(ignoreUnknown = true)
@Data
@NoArgsConstructor
public class CustomDashboardConfiguration implements ResponseBody {

public static final String FIELD_NAME_FLINK_VERSION = "flink-version";
public static final String FIELD_NAME_FLINK_REVISION = "flink-revision";

@JsonProperty(FIELD_NAME_FLINK_VERSION)
private String flinkVersion;

@JsonProperty(FIELD_NAME_FLINK_REVISION)
private String flinkRevision;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.apache.flink.kubernetes.operator.service;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/** Message headers for the {@link CustomDashboardConfiguration}. */
public final class CustomDashboardConfigurationHeaders
implements MessageHeaders<
EmptyRequestBody, CustomDashboardConfiguration, EmptyMessageParameters> {

public static final CustomDashboardConfigurationHeaders INSTANCE =
new CustomDashboardConfigurationHeaders();

// make the constructor private since we want it to be a singleton
private CustomDashboardConfigurationHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return "/config";
}

@Override
public Class<CustomDashboardConfiguration> getResponseClass() {
return CustomDashboardConfiguration.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public EmptyMessageParameters getUnresolvedMessageParameters() {
return EmptyMessageParameters.getInstance();
}

public static CustomDashboardConfigurationHeaders getInstance() {
return INSTANCE;
}

@Override
public String getDescription() {
return "Returns the configuration of the WebUI.";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
Expand Down Expand Up @@ -677,16 +676,16 @@ public SavepointFetchResult fetchSavepointInfo(
}
}

public Map<String, String> getClusterinfo(Configuration conf) throws Exception {
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
Map<String, String> runtimeVersion = new HashMap<>();

try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {

DashboardConfiguration dashboardConfiguration =
CustomDashboardConfiguration dashboardConfiguration =
clusterClient
.sendRequest(
DashboardConfigurationHeaders.getInstance(),
CustomDashboardConfigurationHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public SubmittedJobInfo(
}

@Override
public Map<String, String> getClusterinfo(Configuration conf) throws Exception {
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
return CLUSTER_INFO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ public void observeApplicationCluster() throws Exception {
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
assertNull(deployment.getStatus().getClusterInfo());

// Stable ready
observer.observe(deployment, readyContext);
assertEquals(TestingFlinkService.CLUSTER_INFO, deployment.getStatus().getClusterInfo());
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
Expand Down Expand Up @@ -274,19 +276,4 @@ public void observeListJobsError() {
});
assertEquals(podFailedMessage, exception.getMessage());
}

@Test
public void observeClusterInfo() {
TestingFlinkService flinkService = new TestingFlinkService();
ApplicationObserver observer =
new ApplicationObserver(
null, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
bringToReadyStatus(deployment);
observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(TestingFlinkService.CLUSTER_INFO, deployment.getStatus().getClusterInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.util.RestMapperUtils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -209,7 +210,7 @@ public void testTriggerSavepoint() throws Exception {

@Test
public void testGetLastSavepointRestCompatibility() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
String flink14Response =
"{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0},\"summary\":{\"state_size\":{\"min\":8646,\"max\":25626,\"avg\":17136},\"end_to_end_duration\":{\"min\":95,\"max\":420,\"avg\":257},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0},\"processed_data\":{\"min\":0,\"max\":70,\"avg\":35},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0}},\"latest\":{\"completed\":{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652347653972,\"latest_ack_timestamp\":1652347654392,\"state_size\":8646,\"end_to_end_duration\":420,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-cp/9f096f515d5d66dbda0d854b5d5a7af2/chk-1\",\"discarded\":true},\"savepoint\":{\"@class\":\"completed\",\"id\":2,\"status\":\"COMPLETED\",\"is_savepoint\":true,\"trigger_timestamp\":1652347655184,\"latest_ack_timestamp\":1652347655279,\"state_size\":25626,\"end_to_end_duration\":95,\"alignment_buffered\":0,\"processed_data\":70,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"SYNC_SAVEPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-sp/savepoint-9f096f-cebc9a861a41\",\"discarded\":false},\"failed\":null,\"restored\":null},\"history\":[{\"@class\":\"completed\",\"id\":2,\"status\":\"COMPLETED\",\"is_savepoint\":true,\"trigger_timestamp\":1652347655184,\"latest_ack_timestamp\":1652347655279,\"state_size\":25626,\"end_to_end_duration\":95,\"alignment_buffered\":0,\"processed_data\":70,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"SYNC_SAVEPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-sp/savepoint-9f096f-cebc9a861a41\",\"discarded\":false},{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652347653972,\"latest_ack_timestamp\":1652347654392,\"state_size\":8646,\"end_to_end_duration\":420,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-cp/9f096f515d5d66dbda0d854b5d5a7af2/chk-1\",\"discarded\":true}]}";
String flink15Response =
Expand All @@ -219,6 +220,25 @@ public void testGetLastSavepointRestCompatibility() throws JsonProcessingExcepti
objectMapper.readValue(flink15Response, CheckpointHistoryWrapper.class);
}

@Test
public void testClusterInfoRestCompatibility() throws JsonProcessingException {
ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

String flink13Response =
"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.13.6\",\"flink-revision\":\"b2ca390 @ 2022-02-03T14:54:22+01:00\",\"features\":{\"web-submit\":false}}";
String flink14Response =
"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
String flink15Response =
"{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.15.0\",\"flink-revision\":\"3a4c113 @ 2022-04-20T19:50:32+02:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";

var dashboardConfiguration =
objectMapper.readValue(flink13Response, CustomDashboardConfiguration.class);
dashboardConfiguration =
objectMapper.readValue(flink14Response, CustomDashboardConfiguration.class);
dashboardConfiguration =
objectMapper.readValue(flink15Response, CustomDashboardConfiguration.class);
}

private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
return new FlinkService(client, new FlinkConfigManager(configuration)) {
@Override
Expand Down

0 comments on commit 3886e66

Please sign in to comment.