Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a612d99
[FLINK-25774][network] Restrict the maximum number of buffers can be …
Jan 24, 2022
dfa672f
[FLINK-25780][network] Reduce the maximum size of data output buffer …
Jan 24, 2022
34d0972
[FLINK-25781][network] Adjust the maximum number of buffers can be us…
Jan 24, 2022
8bd62ec
[FLINK-25786][network] Adjust the generation of subpartition data sto…
Jan 24, 2022
f4fd4f8
[FLINK-25035][runtime] Move consumedSubpartitionIndex from SingleInpu…
wanglijie95 Jan 26, 2022
5c2ec72
[FLINK-25035][runtime] SingleInputGate supports consuming subpartitio…
wanglijie95 Jan 26, 2022
9df1980
[FLINK-25810][connector/kinesis] Adding Kinesis data streams SQL clie…
vahmed-hamdy Jan 17, 2022
c31452b
[FLINK-24703][connectors][format] Adds CSV File System decoding forma…
afedulov Nov 26, 2021
ca58a70
[FLINK-24703][connectors][format] Add CSV File System encoding format…
afedulov Nov 26, 2021
8d5a4dd
[hotfix] Adds fail to DispatcherTest
XComp Dec 22, 2021
bdb3657
[hotfix] Adds missing JavaDoc
XComp Nov 15, 2021
3814867
[hotfix] Makes intention of comment clearer
XComp Nov 15, 2021
cfeaff1
[hotfix] Migrates ApplicationStatusTest to JUnit5 and AssertJ
XComp Dec 30, 2021
bf1a310
[hotfix] Adds TestLogger extension to ApplicationDispatcherBootstrapTest
XComp Jan 21, 2022
46952ef
[hotfix] Introduces TestingPartialDispatcherServices
XComp Jan 21, 2022
8d235d8
[hotfix] Removes @Nonnull annotations from DispatcherServices and Par…
XComp Nov 29, 2021
5c39a7e
[FLINK-25430][runtime] Add JobResultStore
XComp Jan 25, 2022
01b14fc
[FLINK-25430][runtime] Integrates JobResultStoreinto the Dispatcher
XComp Jan 25, 2022
adbe872
[FLINK-25430][runtime] Renames JobGraphStoreFactory into JobPersisten…
XComp Nov 29, 2021
6c5ae20
[FLINK-25430][runtime] Renames PartialDispatcherServicesWithJobGraphS…
XComp Nov 29, 2021
31d9caa
[FLINK-25430][runtime] Integrates JobResultStore initialization along…
XComp Nov 29, 2021
8c54900
[hotfix] Migrates SessionDispatcherLeaderProcessTest to JUnit5/AssertJ
XComp Jan 25, 2022
68948ae
[hotfix] Migrates ApplicationDispatcherBootstrapITCase to JUnit5/AssertJ
XComp Jan 20, 2022
4d98230
[hotfix] Makes FileSystemBlobStore.(delete|deleteAll) comply to the B…
XComp Jan 22, 2022
2514d97
[hotfix] Removes unused classloader parameter from CheckpointRecovery…
XComp Nov 25, 2021
f422135
[hotfix] Refactors nested if statements
XComp Dec 8, 2021
99c371c
[FLINK-25432] Adds generic interfaces for cleaning up Job-related data
XComp Dec 15, 2021
57d3177
[FLINK-25432] Refactors JobGraphWriter interface to implement Locally…
XComp Dec 15, 2021
7a895e8
[FLINK-25432] Makes BlobServer implement LocallyCleanableResource and…
XComp Dec 15, 2021
d05c9a9
[FLINK-25432] Makes HighAvailabilityServices implement LocallyCleanab…
XComp Dec 15, 2021
a011193
[FLINK-25432] Makes JobManagerMetricGroup implement LocallyCleanableR…
XComp Dec 15, 2021
febcf75
[FLINK-25432] Adds JobManagerRunnerRegistry and integrates it into th…
XComp Dec 15, 2021
c769cc2
[FLINK-25432] Refactors Dispatcher constructor signature
XComp Dec 15, 2021
6c5e878
[FLINK-25432] Adds ResourceCleaner (+ default implementation) and int…
XComp Dec 15, 2021
ecb5d0e
[hotfix] Migrates BlobServerCleanupTest to JUnit5/AssertJ
XComp Jan 22, 2022
836f747
[FLINK-25432][runtime] Makes TestingCompletedCheckpointStore more gen…
XComp Jan 13, 2022
9e2461a
[FLINK-25432][runtime] Makes TestingCheckpointIDCounter more generic
XComp Jan 13, 2022
183efa3
[FLINK-25432][runtime] Renames createFromInitializingJob into more ge…
XComp Dec 9, 2021
65a96bc
[FLINK-25432][runtime] Moves maximum retained checkpoints parameter e…
XComp Nov 26, 2021
129f3c7
[FLINK-25432][runtime] Adds mapping from ApplicationStatus to JobStatus
XComp Nov 29, 2021
64ef052
[FLINK-25432][runtime] Introduces TestingDispatcher.Builder
XComp Jan 28, 2022
ab85e7b
[FLINK-25432] Adds CheckpointResourcesCleanupRunner
XComp Dec 9, 2021
abb7f1d
[FLINK-25432] Refactors createJobManagerRunner to be an initializing …
XComp Dec 15, 2021
06ca371
[FLINK-25432] Integrates CheckpointResourcesCleanupRunner into Dispat…
XComp Dec 15, 2021
6f16fe8
[FLINK-25432] Integrates CleanupRunnerFactory into test infrastructure
XComp Jan 28, 2022
16afda4
[FLINK-25432] Adds test cases for checking whether the CleanupRunner …
XComp Jan 13, 2022
dea89fc
[FLINK-25432][runtime] Moves BlobServer cleanup tests into BlobServer…
XComp Jan 6, 2022
9fbdb09
[FLINK-25432] Removes custom TestingHighAvailabilityServices
XComp Jan 6, 2022
965dc9a
[FLINK-25432][runtime] Moves cleanup test from DispatcherTest into Di…
XComp Jan 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayService;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -83,7 +85,9 @@ public ApplicationDispatcherGatewayServiceFactory(
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
Collection<JobResult> recoveredDirtyJobResults,
JobGraphWriter jobGraphWriter,
JobResultStore jobResultStore) {

final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

Expand All @@ -94,6 +98,7 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
rpcService,
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new ApplicationDispatcherBootstrap(
application,
Expand All @@ -102,8 +107,8 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
dispatcherGateway,
scheduledExecutor,
errorHandler),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices, jobGraphWriter, jobResultStore));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessFactory;
import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;

Expand Down Expand Up @@ -59,7 +59,7 @@ private ApplicationDispatcherLeaderProcessFactoryFactory(

@Override
public DispatcherLeaderProcessFactory createFactory(
JobGraphStoreFactory jobGraphStoreFactory,
JobPersistenceComponentFactory jobPersistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices,
Expand All @@ -74,7 +74,10 @@ public DispatcherLeaderProcessFactory createFactory(
partialDispatcherServices);

return new SessionDispatcherLeaderProcessFactory(
dispatcherServiceFactory, jobGraphStoreFactory, ioExecutor, fatalErrorHandler);
dispatcherServiceFactory,
jobPersistenceComponentFactory,
ioExecutor,
fatalErrorHandler);
}

public static ApplicationDispatcherLeaderProcessFactoryFactory create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.testjar.BlockingJob;
import org.apache.flink.client.testjar.ErrorHandlingSubmissionJob;
import org.apache.flink.client.testjar.FailingJob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
Expand All @@ -48,11 +54,13 @@
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.UUID;
Expand All @@ -61,11 +69,10 @@
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Integration tests related to {@link ApplicationDispatcherBootstrap}. */
public class ApplicationDispatcherBootstrapITCase extends TestLogger {
@ExtendWith(TestLoggerExtension.class)
public class ApplicationDispatcherBootstrapITCase {

private static final Duration TIMEOUT = Duration.ofMinutes(10);

Expand Down Expand Up @@ -127,7 +134,9 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
haServices.revokeDispatcherLeadership();
// make sure the leadership is revoked to avoid race conditions
assertEquals(ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus());
assertThat(firstJobResult.get())
.extracting(JobResult::getApplicationStatus)
.isEqualTo(ApplicationStatus.UNKNOWN);
haServices.grantDispatcherLeadership();

// job is suspended, wait until it's running
Expand All @@ -141,10 +150,11 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
BlockingJob.unblock(blockId);

// and wait for it to actually finish
final CompletableFuture<JobResult> secondJobResult =
cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
assertTrue(secondJobResult.get().isSuccess());
assertEquals(ApplicationStatus.SUCCEEDED, secondJobResult.get().getApplicationStatus());
final JobResult secondJobResult =
cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID).get();
assertThat(secondJobResult.isSuccess()).isTrue();
assertThat(secondJobResult.getApplicationStatus())
.isEqualTo(ApplicationStatus.SUCCEEDED);

// the cluster should shut down automatically once the application completes
awaitClusterStopped(cluster, deadline);
Expand All @@ -153,6 +163,64 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
}
}

@Test
public void testDirtyJobResultRecoveryInApplicationMode() throws Exception {
final Deadline deadline = Deadline.fromNow(TIMEOUT);
final Configuration configuration = new Configuration();
configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100));
final TestingMiniClusterConfiguration clusterConfiguration =
TestingMiniClusterConfiguration.newBuilder()
.setConfiguration(configuration)
.build();

// having a dirty entry in the JobResultStore should make the ApplicationDispatcherBootstrap
// implementation fail to submit the job
final JobResultStore jobResultStore = new EmbeddedJobResultStore();
jobResultStore.createDirtyResult(
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
final EmbeddedHaServicesWithLeadershipControl haServices =
new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {

@Override
public JobResultStore getJobResultStore() {
return jobResultStore;
}
};

final TestingMiniCluster.Builder clusterBuilder =
TestingMiniCluster.newBuilder(clusterConfiguration)
.setHighAvailabilityServicesSupplier(() -> haServices)
.setDispatcherResourceManagerComponentFactorySupplier(
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
clusterConfiguration.getConfiguration(),
ErrorHandlingSubmissionJob.createPackagedProgram()));
try (final MiniCluster cluster = clusterBuilder.build()) {
// start mini cluster and submit the job
cluster.start();

// the cluster should shut down automatically once the application completes
awaitClusterStopped(cluster, deadline);
}

FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
.as(
"The job's main method shouldn't have been succeeded due to a DuplicateJobSubmissionException.")
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);

assertThat(
jobResultStore.hasDirtyJobResultEntry(
ApplicationDispatcherBootstrap.ZERO_JOB_ID))
.isFalse();
assertThat(
jobResultStore.hasCleanJobResultEntry(
ApplicationDispatcherBootstrap.ZERO_JOB_ID))
.isTrue();
}

@Test
public void testSubmitFailedJobOnApplicationError() throws Exception {
final Deadline deadline = Deadline.fromNow(TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -79,7 +80,8 @@
import static org.junit.jupiter.api.Assertions.fail;

/** Tests for the {@link ApplicationDispatcherBootstrap}. */
public class ApplicationDispatcherBootstrapTest extends TestLogger {
@ExtendWith(TestLoggerExtension.class)
public class ApplicationDispatcherBootstrapTest {

private static final int TIMEOUT_SECONDS = 10;

Expand Down Expand Up @@ -641,7 +643,7 @@ public void testDuplicateJobSubmissionWithTerminatedJobId() throws Throwable {
* In this scenario, job result is no longer present in the {@link
* org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has terminated and job
* manager failed over), but we know that job has already terminated from {@link
* org.apache.flink.runtime.highavailability.RunningJobsRegistry running jobs registry}.
* org.apache.flink.runtime.highavailability.JobResultStore}.
*/
@Test
public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() throws Throwable {
Expand Down Expand Up @@ -675,7 +677,7 @@ public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() thr
* In this scenario, job result is no longer present in the {@link
* org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has terminated and job
* manager failed over), but we know that job has already terminated from {@link
* org.apache.flink.runtime.highavailability.RunningJobsRegistry running jobs registry}.
* org.apache.flink.runtime.highavailability.JobResultStore}.
*/
@Test
public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResultAttached()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.testjar;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.cli.CliFrontendTestUtils;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.util.FlinkException;

import java.io.File;
import java.io.FileNotFoundException;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@code ErrorHandlingSubmissionJob} provides a factory method for creating a {@link
* PackagedProgram} that monitors the job submission within the job's {@code main} method.
*/
public class ErrorHandlingSubmissionJob {

private static final AtomicReference<Exception> SUBMISSION_EXCEPTION = new AtomicReference<>();

public static PackagedProgram createPackagedProgram() throws FlinkException {
try {
return PackagedProgram.newBuilder()
.setUserClassPaths(
Collections.singletonList(
new File(CliFrontendTestUtils.getTestJarPath())
.toURI()
.toURL()))
.setEntryPointClassName(ErrorHandlingSubmissionJob.class.getName())
.build();
} catch (ProgramInvocationException | FileNotFoundException | MalformedURLException e) {
throw new FlinkException("Could not load the provided entrypoint class.", e);
}
}

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(Arrays.asList(1, 2, 3))
.map(element -> element + 1)
.output(new DiscardingOutputFormat<>());

try {
env.execute();
} catch (Exception e) {
SUBMISSION_EXCEPTION.set(e);
throw e;
}
}

public static Exception getSubmissionException() {
return SUBMISSION_EXCEPTION.get();
}
}
Loading