Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20357][docs] Split HA documentation up into a general overview and the specific implementations #14254

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<td><h5>kubernetes.cluster-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. If not set, the client will automatically generate it with a random ID.</td>
<td>The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. The id must only contain lowercase alphanumeric characters and "-". The required format is <span markdown="span">`[a-z]([-a-z0-9]*[a-z0-9])`</span>. If not set, the client will automatically generate it with a random ID.</td>
</tr>
<tr>
<td><h5>kubernetes.config.file</h5></td>
Expand Down
322 changes: 39 additions & 283 deletions docs/deployment/ha/index.md

Large diffs are not rendered by default.

322 changes: 39 additions & 283 deletions docs/deployment/ha/index.zh.md

Large diffs are not rendered by default.

97 changes: 35 additions & 62 deletions docs/deployment/ha/kubernetes_ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->

## Kubernetes Cluster High Availability
Kubernetes high availability service could support both [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.md %}).
Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for high availability services.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any restrictions on the K8s versions supported? Or on the required K8s features? (I guess the answer is that we only need ConfigMaps?)
I'm asking, so that users can evaluate if it also works with implementations such as https://k3s.io/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need @wangyang0918 to answer this question here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConfigMap and resource version is supported at the very begging(much lower that 1.9). I believe that very few users are still using the K8s with such version lower than 1.9 since the latest stable version is now 1.19.

BTW, the native K8s integration require Kubernetes 1.9 or above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to answer whether https://k3s.io/ works, one needs to try it out. I wouldn't block the PR on this, though. Thanks for the answer @wangyang0918.


When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater.
* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally.
* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster.
* Toc
{:toc}

### Configuration
{% highlight yaml %}
kubernetes.cluster-id: <ClusterId>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
{% endhighlight %}
Kubernetes high availability services can only be used when deploying to Kubernetes.
Consequently, they can be configured when using [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) or the [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.md %})

#### Example: Highly Available Standalone Flink Cluster on Kubernetes
Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated.

<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.md %}#using-plugins) for more information.

{% highlight yaml %}
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
...
kubernetes.cluster-id: <ClusterId>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
...
{% endhighlight %}
## Configuration

#### Example: Highly Available Native Kubernetes Cluster
Using the following command to start a native Flink application cluster on Kubernetes with high availability configured.
{% highlight bash %}
$ ./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=<CustomImageName> \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=s3://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \
local:///opt/flink/examples/streaming/StateMachineExample.jar
{% endhighlight %}
In order to start an HA-cluster you have to configure the following configuration keys:

### High Availability Data Clean Up
Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up.
- [high-availability]({% link deployment/config.md %}#high-availability-1) (required):
The `high-availability` option has to be set to `KubernetesHaServicesFactory`.

So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched.
{% highlight bash %}
$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
{% endhighlight %}
<pre>high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>

- [high-availability.storageDir]({% link deployment/config.md %}#high-availability-storagedir) (required):
JobManager metadata is persisted in the file system `high-availability.storageDir` and only a pointer to this state is stored in Kubernetes.

<pre>high-availability.storageDir: s3:///flink/recovery</pre>

The `storageDir` stores all metadata needed to recover a JobManager failure.

- [kubernetes.cluster-id]({% link deployment/config.md %}#kubernetes-cluster-id) (required):
In order to identify the Flink cluster, you have to specify a `kubernetes.cluster-id`.

<pre>kubernetes.cluster-id: cluster1337</pre>

### Example configuration

Configure high availability mode in `conf/flink-conf.yaml`:

The following commands will cancel the job in application or session cluster and effectively remove all its HA data.
{% highlight bash %}
# Cancel a Flink job in the existing session
$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID>
# Cancel a Flink application
$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID>
kubernetes.cluster-id: <cluster-id>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
{% endhighlight %}

To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`).
## High availability data clean up

To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <cluster-id>`).
All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap).
HA related ConfigMaps will be retained because they do not set the owner reference.
When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint.
When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

{% top %}
97 changes: 35 additions & 62 deletions docs/deployment/ha/kubernetes_ha.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->

## Kubernetes Cluster High Availability
Kubernetes high availability service could support both [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.zh.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.zh.md %}).
Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for high availability services.

When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater.
* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally.
* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster.
* Toc
{:toc}

### Configuration
{% highlight yaml %}
kubernetes.cluster-id: <ClusterId>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
{% endhighlight %}
Kubernetes high availability services can only be used when deploying to Kubernetes.
Consequently, they can be configured when using [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.zh.md %}) or the [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.zh.md %})

#### Example: Highly Available Standalone Flink Cluster on Kubernetes
Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.zh.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated.

<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.zh.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.zh.md %}#using-plugins) for more information.

{% highlight yaml %}
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
...
kubernetes.cluster-id: <ClusterId>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
...
{% endhighlight %}
## Configuration

#### Example: Highly Available Native Kubernetes Cluster
Using the following command to start a native Flink application cluster on Kubernetes with high availability configured.
{% highlight bash %}
$ ./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=<CustomImageName> \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=s3://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \
local:///opt/flink/examples/streaming/StateMachineExample.jar
{% endhighlight %}
In order to start an HA-cluster you have to configure the following configuration keys:

### High Availability Data Clean Up
Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up.
- [high-availability]({% link deployment/config.zh.md %}#high-availability-1) (required):
The `high-availability` option has to be set to `KubernetesHaServicesFactory`.

So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched.
{% highlight bash %}
$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
{% endhighlight %}
<pre>high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>

- [high-availability.storageDir]({% link deployment/config.zh.md %}#high-availability-storagedir) (required):
JobManager metadata is persisted in the file system `high-availability.storageDir` and only a pointer to this state is stored in Kubernetes.

<pre>high-availability.storageDir: s3:///flink/recovery</pre>

The `storageDir` stores all metadata needed to recover a JobManager failure.

- [kubernetes.cluster-id]({% link deployment/config.zh.md %}#kubernetes-cluster-id) (required):
In order to identify the Flink cluster, you have to specify a `kubernetes.cluster-id`.

<pre>kubernetes.cluster-id: cluster1337</pre>

### Example configuration

Configure high availability mode in `conf/flink-conf.yaml`:

The following commands will cancel the job in application or session cluster and effectively remove all its HA data.
{% highlight bash %}
# Cancel a Flink job in the existing session
$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID>
# Cancel a Flink application
$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID>
kubernetes.cluster-id: <cluster-id>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
{% endhighlight %}

To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`).
## High availability data clean up

To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <cluster-id>`).
All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap).
HA related ConfigMaps will be retained because they do not set the owner reference.
When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint.
When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

{% top %}