Skip to content
Permalink
Browse files
[Flink-26112][k8s]Refactor the FlinkKubeClient interface to deal with…
… service name directly
  • Loading branch information
Aitozi authored and wangyang0918 committed May 9, 2022
1 parent ff7d619 commit ab3eb40d920fa609f49164a0bbb5fcbb3004a808
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 76 deletions.
@@ -42,9 +42,9 @@
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -177,7 +177,8 @@ public ClusterClientProvider<String> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
if (client.getService(KubernetesService.ServiceType.REST_SERVICE, clusterId).isPresent()) {
if (client.getService(ExternalServiceDecorator.getExternalServiceName(clusterId))
.isPresent()) {
throw new ClusterDeploymentException(
"The Flink cluster " + clusterId + " already exists.");
}
@@ -27,10 +27,11 @@
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.Constants;
@@ -257,25 +258,22 @@ private void updateKubernetesServiceTargetPortIfNecessary() throws Exception {
ResourceManagerUtils.parseRestBindPortFromWebInterfaceUrl(webInterfaceUrl);
Preconditions.checkArgument(
restPort > 0, "Failed to parse rest port from " + webInterfaceUrl);
final String restServiceName = ExternalServiceDecorator.getExternalServiceName(clusterId);
flinkKubeClient
.updateServiceTargetPort(
KubernetesService.ServiceType.REST_SERVICE,
clusterId,
Constants.REST_PORT_NAME,
restPort)
.updateServiceTargetPort(restServiceName, Constants.REST_PORT_NAME, restPort)
.get();
if (!HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
final String internalServiceName =
InternalServiceDecorator.getInternalServiceName(clusterId);
flinkKubeClient
.updateServiceTargetPort(
KubernetesService.ServiceType.INTERNAL_SERVICE,
clusterId,
internalServiceName,
Constants.BLOB_SERVER_PORT_NAME,
Integer.parseInt(flinkConfig.getString(BlobServerOptions.PORT)))
.get();
flinkKubeClient
.updateServiceTargetPort(
KubernetesService.ServiceType.INTERNAL_SERVICE,
clusterId,
internalServiceName,
Constants.JOB_MANAGER_RPC_PORT_NAME,
flinkConfig.getInteger(JobManagerOptions.PORT))
.get();
@@ -36,7 +36,7 @@
import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.FlinkException;

@@ -109,7 +109,7 @@ private int run(String[] args) throws FlinkException, CliArgsException {
// Retrieve or create a session cluster.
if (clusterId != null
&& kubeClient
.getService(KubernetesService.ServiceType.REST_SERVICE, clusterId)
.getService(ExternalServiceDecorator.getExternalServiceName(clusterId))
.isPresent()) {
clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
} else {
@@ -22,7 +22,6 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
@@ -180,7 +179,7 @@ public CompletableFuture<Void> stopPod(String podName) {
@Override
public Optional<Endpoint> getRestEndpoint(String clusterId) {
Optional<KubernetesService> restService =
getService(KubernetesService.ServiceType.REST_SERVICE, clusterId);
getService(ExternalServiceDecorator.getExternalServiceName(clusterId));
if (!restService.isPresent()) {
return Optional.empty();
}
@@ -224,9 +223,7 @@ public void stopAndCleanupCluster(String clusterId) {
}

@Override
public Optional<KubernetesService> getService(
KubernetesService.ServiceType serviceType, String clusterId) {
final String serviceName = getServiceName(serviceType, clusterId);
public Optional<KubernetesService> getService(String serviceName) {
final Service service =
this.internalClient.services().withName(serviceName).fromServer().get();
if (service == null) {
@@ -388,14 +385,11 @@ public KubernetesPod loadPodFromTemplateFile(File file) {

@Override
public CompletableFuture<Void> updateServiceTargetPort(
KubernetesService.ServiceType serviceType,
String clusterId,
String portName,
int targetPort) {
String serviceName, String portName, int targetPort) {
LOG.debug("Update {} target port to {}", portName, targetPort);
return CompletableFuture.runAsync(
() ->
getService(serviceType, clusterId)
getService(serviceName)
.ifPresent(
service -> {
final Service updatedService =
@@ -416,32 +410,12 @@ public CompletableFuture<Void> updateServiceTargetPort(
.build();
this.internalClient
.services()
.withName(
getServiceName(serviceType, clusterId))
.withName(serviceName)
.replace(updatedService);
}),
kubeClientExecutorService);
}

/**
* Get the Kubernetes service name.
*
* @param serviceType The service type
* @param clusterId The cluster id
* @return Return the Kubernetes service name if the service type is known.
*/
private String getServiceName(KubernetesService.ServiceType serviceType, String clusterId) {
switch (serviceType) {
case REST_SERVICE:
return ExternalServiceDecorator.getExternalServiceName(clusterId);
case INTERNAL_SERVICE:
return InternalServiceDecorator.getInternalServiceName(clusterId);
default:
throw new IllegalArgumentException(
"Unrecognized service type: " + serviceType.name());
}
}

private void setOwnerReference(Deployment deployment, List<HasMetadata> resources) {
final OwnerReference deploymentOwnerReference =
new OwnerReferenceBuilder()
@@ -76,12 +76,10 @@ public interface FlinkKubeClient extends AutoCloseable {
/**
* Get the kubernetes service of the given flink clusterId.
*
* @param serviceType Internal/Rest
* @param clusterId cluster id
* @return Return the optional rest service of the specified cluster id.
* @param serviceName the name of the service
* @return Return the optional kubernetes service of the specified name.
*/
Optional<KubernetesService> getService(
KubernetesService.ServiceType serviceType, String clusterId);
Optional<KubernetesService> getService(String serviceName);

/**
* Get the rest endpoint for access outside cluster.
@@ -207,16 +205,13 @@ CompletableFuture<Boolean> checkAndUpdateConfigMap(
/**
* Update the target ports of the given Kubernetes service.
*
* @param serviceType The service type which needs to be updated
* @param serviceName The name of the service which needs to be updated
* @param portName The port name which needs to be updated
* @param targetPort The updated target port
* @return Return the update service target port future
*/
CompletableFuture<Void> updateServiceTargetPort(
KubernetesService.ServiceType serviceType,
String clusterId,
String portName,
int targetPort);
String serviceName, String portName, int targetPort);

/** Callback handler for kubernetes resources. */
interface WatchCallbackHandler<T> {
@@ -26,10 +26,4 @@ public class KubernetesService extends KubernetesResource<Service> {
public KubernetesService(Service internalResource) {
super(internalResource);
}

/** The flink service type. */
public enum ServiceType {
REST_SERVICE,
INTERNAL_SERVICE,
}
}
@@ -30,11 +30,11 @@
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -172,12 +172,10 @@ void testUpdateRestServicePort() throws Exception {
this.flinkKubeClient.createJobManagerComponent(this.kubernetesJobManagerSpecification);

final int expectedRestPort = 9081;
final String restServiceName = ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID);
flinkKubeClient
.updateServiceTargetPort(
KubernetesService.ServiceType.REST_SERVICE,
CLUSTER_ID,
Constants.REST_PORT_NAME,
expectedRestPort)
restServiceName, Constants.REST_PORT_NAME, expectedRestPort)
.get();
final int updatedRestPort =
getServiceTargetPort(
@@ -190,12 +188,11 @@ void testUpdateInternalServicePort() throws Exception {
this.flinkKubeClient.createJobManagerComponent(this.kubernetesJobManagerSpecification);

final int expectedBlobPort = 9082;
final String internalServiceName =
InternalServiceDecorator.getInternalServiceName(CLUSTER_ID);
flinkKubeClient
.updateServiceTargetPort(
KubernetesService.ServiceType.INTERNAL_SERVICE,
CLUSTER_ID,
Constants.BLOB_SERVER_PORT_NAME,
expectedBlobPort)
internalServiceName, Constants.BLOB_SERVER_PORT_NAME, expectedBlobPort)
.get();
final int updatedBlobPort =
getServiceTargetPort(CLUSTER_ID, Constants.BLOB_SERVER_PORT_NAME);
@@ -138,8 +138,7 @@ public void stopAndCleanupCluster(String clusterId) {
}

@Override
public Optional<KubernetesService> getService(
KubernetesService.ServiceType serviceType, String clusterId) {
public Optional<KubernetesService> getService(String serviceName) {
throw new UnsupportedOperationException();
}

@@ -211,10 +210,7 @@ public KubernetesPod loadPodFromTemplateFile(File file) {

@Override
public CompletableFuture<Void> updateServiceTargetPort(
KubernetesService.ServiceType serviceType,
String clusterId,
String portName,
int targetPort) {
String serviceName, String portName, int targetPort) {
throw new UnsupportedOperationException();
}

0 comments on commit ab3eb40

Please sign in to comment.