Skip to content

Commit

Permalink
job restart from checkpoint bug fixed #2438 (#2440)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Mar 14, 2023
1 parent f503000 commit ad44867
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 410 deletions.
6 changes: 6 additions & 0 deletions streampark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@

<build>
<plugins>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.enums;

import java.io.Serializable;

public enum ApplicationType implements Serializable {
/** StreamPark Flink */
STREAMPARK_FLINK(1, "StreamPark Flink"),
/** Apache Flink */
APACHE_FLINK(2, "Apache Flink"),
/** StreamPark Spark */
STREAMPARK_SPARK(3, "StreamPark Spark"),
/** Apache Spark */
APACHE_SPARK(4, "Apache Spark");

private final int type;
private final String name;

ApplicationType(int type, String name) {
this.type = type;
this.name = name;
}

public int getType() {
return type;
}

public String getName() {
return name;
}

public static ApplicationType of(int type) {
for (ApplicationType appType : ApplicationType.values()) {
if (appType.getType() == type) {
return appType;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,34 @@

import java.io.Serializable;

/**
* @since: 1.2.3
*/
/** @since: 1.2.3 */
public enum ClusterState implements Serializable {
/**
* The cluster was just created but not started
*/
CREATED(0),
/**
* cluster started
*/
STARTED(1),
/**
* cluster stopped
*/
STOPED(2),

/**
* cluster lost
*/
LOST(3);

private final Integer value;

ClusterState(Integer value) {
this.value = value;
}

public static ClusterState of(Integer value) {
for (ClusterState clusterState : values()) {
if (clusterState.value.equals(value)) {
return clusterState;
}
}
return null;
/** The cluster was just created but not started */
CREATED(0),
/** cluster started */
STARTED(1),
/** cluster stopped */
STOPED(2),

/** cluster lost */
LOST(3);

private final Integer value;

ClusterState(Integer value) {
this.value = value;
}

public static ClusterState of(Integer value) {
for (ClusterState clusterState : values()) {
if (clusterState.value.equals(value)) {
return clusterState;
}
}
return null;
}

public Integer getValue() {
return value;
}
public Integer getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,31 @@

public enum DevelopmentMode implements Serializable {

/**
* custom code
*/
CUSTOMCODE("Custom Code", 1),
/** custom code */
CUSTOMCODE("Custom Code", 1),

/**
* Flink SQL
*/
FLINKSQL("Flink SQL", 2);
/** Flink SQL */
FLINKSQL("Flink SQL", 2);

private final String mode;

private final String mode;
private final Integer value;

private final Integer value;
DevelopmentMode(String mode, Integer value) {
this.mode = mode;
this.value = value;
}

DevelopmentMode(String mode, Integer value) {
this.mode = mode;
this.value = value;
}

public static DevelopmentMode of(Integer value) {
for (DevelopmentMode mode : values()) {
if (mode.value.equals(value)) {
return mode;
}
}
return null;
}

public Integer getValue() {
return value;
public static DevelopmentMode of(Integer value) {
for (DevelopmentMode mode : values()) {
if (mode.value.equals(value)) {
return mode;
}
}
return null;
}

public Integer getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.enums;

import com.google.common.collect.Lists;

import java.io.Serializable;
import java.util.List;

public enum ExecutionMode implements Serializable {

/** Local mode */
LOCAL(0, "local"),
/** remote */
REMOTE(1, "remote"),
/** yarn-per-job mode */
YARN_PER_JOB(2, "yarn-per-job"),
/** yarn session */
YARN_SESSION(3, "yarn-session"),
/** yarn application */
YARN_APPLICATION(4, "yarn-application"),
/** kubernetes session */
KUBERNETES_NATIVE_SESSION(5, "kubernetes-session"),
/** kubernetes application */
KUBERNETES_NATIVE_APPLICATION(6, "kubernetes-application");

private final Integer mode;

private final String name;

ExecutionMode(Integer mode, String name) {
this.mode = mode;
this.name = name;
}

public static ExecutionMode of(Integer value) {
for (ExecutionMode executionMode : values()) {
if (executionMode.mode.equals(value)) {
return executionMode;
}
}
return null;
}

public static ExecutionMode of(String name) {
for (ExecutionMode executionMode : values()) {
if (executionMode.name.equals(name)) {
return executionMode;
}
}
return null;
}

public int getMode() {
return mode;
}

public String getName() {
return name;
}

public static boolean isYarnMode(ExecutionMode mode) {
return YARN_PER_JOB.equals(mode) || YARN_APPLICATION.equals(mode) || YARN_SESSION.equals(mode);
}

public static boolean isYarnSessionMode(ExecutionMode mode) {
return YARN_SESSION.equals(mode);
}

public static boolean isYarnMode(Integer value) {
return isYarnMode(of(value));
}

public static boolean isKubernetesSessionMode(Integer value) {
return KUBERNETES_NATIVE_SESSION.equals(of(value));
}

public static boolean isKubernetesMode(ExecutionMode mode) {
return KUBERNETES_NATIVE_SESSION.equals(mode) || KUBERNETES_NATIVE_APPLICATION.equals(mode);
}

public static boolean isKubernetesMode(Integer value) {
return isKubernetesMode(of(value));
}

public static boolean isKubernetesApplicationMode(Integer value) {
return KUBERNETES_NATIVE_APPLICATION.equals(of(value));
}

public static List<Integer> getKubernetesMode() {
return Lists.newArrayList(
KUBERNETES_NATIVE_SESSION.getMode(), KUBERNETES_NATIVE_APPLICATION.getMode());
}

public static boolean isSessionMode(ExecutionMode mode) {
return KUBERNETES_NATIVE_SESSION.equals(mode) || YARN_SESSION.equals(mode);
}

public static boolean isRemoteMode(Integer value) {
return isRemoteMode(of(value));
}

public static boolean isRemoteMode(ExecutionMode mode) {
return REMOTE.equals(mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,41 @@

import java.io.Serializable;

/**
* kubernetes.rest-service.exposed.type
*/
/** kubernetes.rest-service.exposed.type */
public enum FlinkK8sRestExposedType implements Serializable {

/**
* LoadBalancer
*/
LoadBalancer("LoadBalancer", 0),
/** LoadBalancer */
LoadBalancer("LoadBalancer", 0),

/**
* ClusterIP
*/
ClusterIP("ClusterIP", 1),
/** ClusterIP */
ClusterIP("ClusterIP", 1),

/**
* NodePort
*/
NodePort("NodePort", 2);
/** NodePort */
NodePort("NodePort", 2);

private final String name;
private final String name;

private final Integer value;
private final Integer value;

FlinkK8sRestExposedType(String name, Integer value) {
this.name = name;
this.value = value;
}
FlinkK8sRestExposedType(String name, Integer value) {
this.name = name;
this.value = value;
}

public static FlinkK8sRestExposedType of(Integer value) {
for (FlinkK8sRestExposedType order : values()) {
if (order.value.equals(value)) {
return order;
}
}
return null;
public static FlinkK8sRestExposedType of(Integer value) {
for (FlinkK8sRestExposedType order : values()) {
if (order.value.equals(value)) {
return order;
}
}
return null;
}

public String getName() {
return name;
}
public String getName() {
return name;
}

public Integer getValue() {
return value;
}
public Integer getValue() {
return value;
}
}
Loading

0 comments on commit ad44867

Please sign in to comment.