Skip to content

Commit

Permalink
[FLINK-26290] Introduce serviceAccount as direct field, remove taskSlots
Browse files Browse the repository at this point in the history
Closes apache#10
  • Loading branch information
tweise authored and gyfora committed Feb 22, 2022
1 parent 29ef2f5 commit cdfc7a2
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 31 deletions.
4 changes: 2 additions & 2 deletions e2e-tests/data/cr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
kubernetes.service-account: flink-operator
taskmanager.numberOfTaskSlots: "2"
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
serviceAccount: flink-operator
podTemplate:
apiVersion: v1
kind: Pod
Expand Down Expand Up @@ -70,7 +71,6 @@ spec:
memory: "1024m"
cpu: 0.5
taskManager:
taskSlots: 2
resource:
memory: "1024m"
cpu: 0.5
Expand Down
2 changes: 1 addition & 1 deletion examples/basic-checkpoint-ha.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
Expand All @@ -34,7 +35,6 @@ spec:
memory: "2048m"
cpu: 1
taskManager:
taskSlots: 2
resource:
memory: "2048m"
cpu: 1
Expand Down
4 changes: 2 additions & 2 deletions examples/basic-ingress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ spec:
flinkConfiguration:
# rest.address: basic-example.flink.k8s.io
# rest.port: "80"
kubernetes.jobmanager.service-account: flink-operator
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink-operator
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
taskSlots: 2
resource:
memory: "2048m"
cpu: 1
Expand Down
4 changes: 2 additions & 2 deletions examples/basic-session.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
kubernetes.jobmanager.service-account: flink-operator
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink-operator
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
taskSlots: 2
resource:
memory: "2048m"
cpu: 1
4 changes: 2 additions & 2 deletions examples/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
kubernetes.jobmanager.service-account: flink-operator
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink-operator
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
taskSlots: 2
resource:
memory: "2048m"
cpu: 1
Expand Down
3 changes: 2 additions & 1 deletion examples/pod-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ metadata:
spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
podTemplate:
apiVersion: v1
kind: Pod
Expand Down Expand Up @@ -53,7 +55,6 @@ spec:
memory: "2048m"
cpu: 1
taskManager:
taskSlots: 2
resource:
memory: "2048m"
cpu: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
public class FlinkDeploymentSpec {
private String image;
private String imagePullPolicy;
private String serviceAccount;
private String flinkVersion;
private String ingressDomain;
private Map<String, String> flinkConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,5 @@
@AllArgsConstructor
public class TaskManagerSpec {
private Resource resource;
private int taskSlots;
private Pod podTemplate;
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ public FlinkConfigBuilder applyIngressDomain() {
return this;
}

public FlinkConfigBuilder applyServiceAccount() {
if (spec.getServiceAccount() != null) {
effectiveConfig.set(
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, spec.getServiceAccount());
}
return this;
}

public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
if (spec.getJobManager() != null) {
if (spec.getJobManager() != null) {
Expand All @@ -124,10 +132,6 @@ public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
spec.getTaskManager().getPodTemplate(),
effectiveConfig,
false);
if (spec.getTaskManager().getTaskSlots() > 0) {
effectiveConfig.set(
TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots());
}
}
return this;
}
Expand Down Expand Up @@ -166,6 +170,7 @@ public static Configuration buildFrom(FlinkDeployment dep)
.applyFlinkConfiguration()
.applyImage()
.applyImagePullPolicy()
.applyServiceAccount()
.applyCommonPodTemplate()
.applyIngressDomain()
.applyJobManagerSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -110,9 +107,7 @@ private static FlinkDeployment buildSessionCluster() {
FlinkDeploymentSpec spec = new FlinkDeploymentSpec();
spec.setImage(IMAGE);
spec.setFlinkVersion(FLINK_VERSION);
Map config = new HashMap<String, String>();
config.put("kubernetes.jobmanager.service-account", SERVICE_ACCOUNT);
spec.setFlinkConfiguration(config);
spec.setServiceAccount(SERVICE_ACCOUNT);
Resource resource = new Resource();
resource.setMemory("2048m");
resource.setCpu(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.flink.kubernetes.operator;

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
Expand Down Expand Up @@ -86,13 +86,12 @@ public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
return FlinkDeploymentSpec.builder()
.image(IMAGE)
.imagePullPolicy(IMAGE_POLICY)
.serviceAccount(SERVICE_ACCOUNT)
.flinkVersion(FLINK_VERSION)
.flinkConfiguration(
Collections.singletonMap(
KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
SERVICE_ACCOUNT))
Collections.singletonMap(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"))
.jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
.taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
.taskManager(new TaskManagerSpec(new Resource(1, "2048m"), null))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ public void testApplyImagePolicy() {
public void testApplyFlinkConfiguration() {
final Configuration configuration =
new FlinkConfigBuilder(flinkDeployment).applyFlinkConfiguration().build();
Assert.assertEquals(
SERVICE_ACCOUNT,
configuration.get(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT));
Assert.assertEquals(2, (int) configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
}

@Test
Expand Down Expand Up @@ -125,6 +123,15 @@ public void testApplyIngressDomain() {
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
}

@Test
public void testApplyServiceAccount() {
final Configuration configuration =
new FlinkConfigBuilder(flinkDeployment).applyServiceAccount().build();
Assert.assertEquals(
SERVICE_ACCOUNT,
configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
}

@Test
public void testApplyJobManagerSpec() throws Exception {
final Configuration configuration =
Expand Down Expand Up @@ -158,8 +165,6 @@ public void testApplyTaskManagerSpec() throws Exception {
configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
Double.valueOf(1), configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
Assert.assertEquals(
Integer.valueOf(2), configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
Assert.assertEquals("pod2 api version", tmPod.getApiVersion());
}

Expand Down

0 comments on commit cdfc7a2

Please sign in to comment.