diff --git a/docs/_includes/generated/kubernetes_config_configuration.html b/docs/_includes/generated/kubernetes_config_configuration.html
index 6b19cec476204..f757edc59754f 100644
--- a/docs/_includes/generated/kubernetes_config_configuration.html
+++ b/docs/_includes/generated/kubernetes_config_configuration.html
@@ -34,9 +34,9 @@
kubernetes.container.image |
- "flink:latest" |
+ The default value depends on the actually running version. In general it looks like "flink:<FLINK_VERSION>-scala_<SCALA_VERSION>" |
String |
- Image to use for Flink containers. |
+ Image to use for Flink containers. The specified image must be based upon the same Apache Flink and Scala versions as used by the application. Visit https://hub.docker.com/_/flink?tab=tags for the images provided by the Flink project. |
kubernetes.container.image.pull-policy |
diff --git a/docs/ops/deployment/docker.md b/docs/ops/deployment/docker.md
index a32bdad2c8a20..59526cba22f94 100644
--- a/docs/ops/deployment/docker.md
+++ b/docs/ops/deployment/docker.md
@@ -46,6 +46,11 @@ For example, you can use the following aliases:
* `flink:latest` → `flink:-scala_`
* `flink:1.11` → `flink:1.11.-scala_2.11`
+Note It is recommended to always use an explicit version tag of the docker image that specifies both the needed Flink and Scala
+versions (for example `flink:1.11-scala_2.12`).
+This will avoid some class conflicts that can occur if the Flink and/or Scala versions used in the application are different
+from the versions provided by the docker image.
+
Note Prior to Flink 1.5 version, Hadoop dependencies were always bundled with Flink.
You can see that certain tags include the version of Hadoop, e.g. (e.g. `-hadoop28`).
Beginning with Flink 1.5, image tags that omit the Hadoop version correspond to Hadoop-free releases of Flink
diff --git a/docs/ops/deployment/docker.zh.md b/docs/ops/deployment/docker.zh.md
index ab2f4a5e14e24..eb1ef37d66d6a 100644
--- a/docs/ops/deployment/docker.zh.md
+++ b/docs/ops/deployment/docker.zh.md
@@ -46,6 +46,11 @@ For example, you can use the following aliases:
* `flink:latest` → `flink:-scala_`
* `flink:1.11` → `flink:1.11.-scala_2.11`
+NoteIt is recommended to always use an explicit version tag of the docker image that specifies both the needed Flink and Scala
+versions (for example `flink:1.11-scala_2.12`).
+This will avoid some class conflicts that can occur if the Flink and/or Scala versions used in the application are different
+from the versions provided by the docker image.
+
Note Prior to Flink 1.5 version, Hadoop dependencies were always bundled with Flink.
You can see that certain tags include the version of Hadoop, e.g. (e.g. `-hadoop28`).
Beginning with Flink 1.5, image tags that omit the Hadoop version correspond to Hadoop-free releases of Flink
diff --git a/docs/ops/deployment/kubernetes.md b/docs/ops/deployment/kubernetes.md
index 54eb3d44abd06..72252d1e8d74f 100644
--- a/docs/ops/deployment/kubernetes.md
+++ b/docs/ops/deployment/kubernetes.md
@@ -262,7 +262,7 @@ spec:
spec:
containers:
- name: jobmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
args: ["jobmanager"]
ports:
- containerPort: 6123
@@ -314,7 +314,7 @@ spec:
spec:
containers:
- name: taskmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
args: ["taskmanager"]
ports:
- containerPort: 6122
@@ -363,7 +363,7 @@ spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
env:
args: ["standalone-job", "--job-classname", "com.job.ClassName", ["--job-id", "",] ["--fromSavepoint", "/path/to/savepoint", ["--allowNonRestoredState",]] [job arguments]]
ports:
@@ -421,7 +421,7 @@ spec:
spec:
containers:
- name: taskmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
env:
args: ["taskmanager"]
ports:
diff --git a/docs/ops/deployment/kubernetes.zh.md b/docs/ops/deployment/kubernetes.zh.md
index e2124bd65adca..84dac3ee363ff 100644
--- a/docs/ops/deployment/kubernetes.zh.md
+++ b/docs/ops/deployment/kubernetes.zh.md
@@ -262,7 +262,7 @@ spec:
spec:
containers:
- name: jobmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
args: ["jobmanager"]
ports:
- containerPort: 6123
@@ -314,7 +314,7 @@ spec:
spec:
containers:
- name: taskmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
args: ["taskmanager"]
ports:
- containerPort: 6122
@@ -363,7 +363,7 @@ spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
env:
args: ["standalone-job", "--job-classname", "com.job.ClassName", ["--job-id", "",] ["--fromSavepoint", "/path/to/savepoint", ["--allowNonRestoredState",]] [job arguments]]
ports:
@@ -421,7 +421,7 @@ spec:
spec:
containers:
- name: taskmanager
- image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %}
+ image: flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest # The 'latest' tag contains the latest released version of Flink for a specific Scala version. Do not use the 'latest' tag in production as it will break your setup automatically when a new version is released.{% endif %}
env:
args: ["taskmanager"]
ports:
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 0c4303f1dedba..8ed7352674621 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -19,10 +19,13 @@
package org.apache.flink.kubernetes.configuration;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.runtime.util.EnvironmentInformation;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -137,11 +140,14 @@ public class KubernetesConfigOptions {
.withDescription("The cluster-id, which should be no more than 45 characters, is used for identifying " +
"a unique Flink cluster. If not set, the client will automatically generate it with a random ID.");
+ @Documentation.OverrideDefault("The default value depends on the actually running version. In general it looks like \"flink:-scala_\"")
public static final ConfigOption CONTAINER_IMAGE =
key("kubernetes.container.image")
.stringType()
- .defaultValue("flink:latest")
- .withDescription("Image to use for Flink containers.");
+ .defaultValue(getDefaultFlinkImage())
+ .withDescription("Image to use for Flink containers. " +
+ "The specified image must be based upon the same Apache Flink and Scala versions as used by the application. " +
+ "Visit https://hub.docker.com/_/flink?tab=tags for the images provided by the Flink project.");
/**
* The following config options need to be set according to the image.
@@ -229,6 +235,13 @@ public class KubernetesConfigOptions {
.withDescription("If configured, Flink will add \"resources.limits.\" and \"resources.requests.\" " +
"to the main container of TaskExecutor and set the value to the value of " + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+ private static String getDefaultFlinkImage() {
+ // The default container image that ties to the exact needed versions of both Flink and Scala.
+ boolean snapshot = EnvironmentInformation.getVersion().toLowerCase(Locale.ENGLISH).contains("snapshot");
+ String tag = snapshot ? "latest" : EnvironmentInformation.getVersion() + "-scala_" + EnvironmentInformation.getScalaVersion();
+ return "flink:" + tag;
+ }
+
/**
* The flink rest service exposed type.
*/