Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-17707][k8s] Support configuring replicas of JobManager deployment when HA enabled #15286

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -292,6 +292,10 @@ For more details see the [official Kubernetes documentation](https://kubernetes.

For high availability on Kubernetes, you can use the [existing high availability services]({{< ref "docs/deployment/ha/overview" >}}).

Configure the value of <a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-replicas">kubernetes.jobmanager.replicas</a> to greater than 1 to start standby JobManagers.
It will help to achieve faster recovery.
Notice that high availability should be enabled when starting standby JobManagers.

### Manual Resource Cleanup

Flink uses [Kubernetes OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to clean up all cluster components.
Expand Down
Expand Up @@ -292,6 +292,10 @@ For more details see the [official Kubernetes documentation](https://kubernetes.

For high availability on Kubernetes, you can use the [existing high availability services]({{< ref "docs/deployment/ha/overview" >}}).

Configure the value of <a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-replicas">kubernetes.jobmanager.replicas</a> to greater than 1 to start standby JobManagers.
It will help to achieve faster recovery.
Notice that high availability should be enabled when starting standby JobManagers.

### Manual Resource Cleanup

Flink uses [Kubernetes OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to clean up all cluster components.
Expand Down
Expand Up @@ -116,6 +116,12 @@
<td>List&lt;Map&gt;</td>
<td>The user-specified <a href="https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#manual-resource-cleanup">Owner References</a> to be set to the JobManager Deployment. When all the owner resources are deleted, the JobManager Deployment will be deleted automatically, which also deletes all the resources created by this Flink cluster. The value should be formatted as a semicolon-separated list of owner references, where each owner reference is a comma-separated list of `key:value` pairs. E.g., apiVersion:v1,blockOwnerDeletion:true,controller:true,kind:FlinkApplication,name:flink-app-name,uid:flink-app-uid;apiVersion:v1,kind:Deployment,name:deploy-name,uid:deploy-uid</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.replicas</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Specify how many JobManager pods will be started simultaneously. Configure the value to greater than 1 to start standby JobManagers. It will help to achieve faster recovery. Notice that high availability should be enabled when starting standby JobManagers.</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.service-account</h5></td>
<td style="word-wrap: break-word;">"default"</td>
Expand Down
Expand Up @@ -422,6 +422,16 @@ public class KubernetesConfigOptions {
+ "(e.g. start/stop TaskManager pods, update leader related ConfigMaps, etc.). "
+ "Increasing the pool size allows to run more IO operations concurrently.");

public static final ConfigOption<Integer> KUBERNETES_JOBMANAGER_REPLICAS =
key("kubernetes.jobmanager.replicas")
.intType()
.defaultValue(1)
.withDescription(
"Specify how many JobManager pods will be started simultaneously. "
+ "Configure the value to greater than 1 to start standby JobManagers. "
+ "It will help to achieve faster recovery. "
+ "Notice that high availability should be enabled when starting standby JobManagers.");

private static String getDefaultFlinkImage() {
// The default container image that ties to the exact needed versions of both Flink and
// Scala.
Expand Down
Expand Up @@ -113,7 +113,7 @@ private static Deployment createJobManagerDeployment(
.collect(Collectors.toList()))
.endMetadata()
.editOrNewSpec()
.withReplicas(1)
.withReplicas(kubernetesJobManagerParameters.getReplicas())
.editOrNewTemplate()
.withMetadata(resolvedPod.getMetadata())
.withSpec(resolvedPod.getSpec())
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
Expand Down Expand Up @@ -160,4 +161,20 @@ public KubernetesConfigOptions.ServiceExposedType getRestServiceExposedType() {
public boolean isInternalServiceEnabled() {
return !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig);
}

public int getReplicas() {
final int replicas =
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS);
if (replicas < 1) {
throw new IllegalConfigurationException(
String.format(
"'%s' should not be configured less than one.",
KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS.key()));
} else if (replicas > 1
&& !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
throw new IllegalConfigurationException(
"High availability should be enabled when starting standby JobManagers.");
}
return replicas;
}
}
Expand Up @@ -19,12 +19,14 @@
package org.apache.flink.kubernetes.kubeclient.factory;

import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.kubernetes.KubernetesTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
Expand Down Expand Up @@ -60,6 +62,7 @@
import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -89,6 +92,8 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas
"testapp",
"e3c9aa3f-cc42-4178-814a-64aa15c82373"));

private static final int JOBMANAGER_REPLICAS = 2;

private final FlinkPod flinkPod = new FlinkPod.Builder().build();

protected KubernetesJobManagerSpecification kubernetesJobManagerSpecification;
Expand Down Expand Up @@ -462,4 +467,19 @@ public void testEmptyHadoopConfDirectory() throws IOException {
.getHadoopConfConfigMapName(
CLUSTER_ID))));
}

@Test
public void testSetJobManagerDeploymentReplicas() throws Exception {
flinkConfig.set(
HighAvailabilityOptions.HA_MODE,
KubernetesHaServicesFactory.class.getCanonicalName());
flinkConfig.set(
KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, JOBMANAGER_REPLICAS);
kubernetesJobManagerSpecification =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
flinkPod, kubernetesJobManagerParameters);
assertThat(
kubernetesJobManagerSpecification.getDeployment().getSpec().getReplicas(),
is(JOBMANAGER_REPLICAS));
}
}
Expand Up @@ -20,12 +20,15 @@

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.KubernetesTestBase;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -213,4 +216,25 @@ public void testPrioritizeBuiltInLabels() {
expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
assertThat(kubernetesJobManagerParameters.getLabels(), is(equalTo(expectedLabels)));
}

@Test(expected = IllegalConfigurationException.class)
public void testGetReplicasWithTwoShouldFailWhenHAIsNotEnabled() {
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
kubernetesJobManagerParameters.getReplicas();
}

@Test(expected = IllegalConfigurationException.class)
public void testGetReplicasWithInvalidValue() {
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 0);
kubernetesJobManagerParameters.getReplicas();
}

@Test
public void testGetReplicas() {
flinkConfig.set(
HighAvailabilityOptions.HA_MODE,
KubernetesHaServicesFactory.class.getCanonicalName());
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
assertThat(kubernetesJobManagerParameters.getReplicas(), is(2));
}
}