Skip to content

Commit

Permalink
[FLINK-8965][tests] Port TimestampITCase to flip6
Browse files Browse the repository at this point in the history
This closes #5728.
  • Loading branch information
zentol committed Mar 23, 2018
1 parent f0bd7b6 commit 81d809a
Showing 1 changed file with 38 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -45,17 +45,18 @@
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
Expand All @@ -75,34 +76,24 @@ public class TimestampITCase extends TestLogger {
// this is used in some tests to synchronize
static MultiShotLatch latch;

private static LocalFlinkMiniCluster cluster;
@ClassRule
public static final MiniClusterResource CLUSTER = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfiguration(),
NUM_TASK_MANAGERS,
NUM_TASK_SLOTS),
true);

@Before
public void setupLatch() {
// ensure that we get a fresh latch for each test
latch = new MultiShotLatch();
}

@BeforeClass
public static void startCluster() {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);

cluster = new LocalFlinkMiniCluster(config, false);

cluster.start();

TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
return config;
}

@AfterClass
public static void shutdownCluster() {
cluster.stop();
cluster = null;

TestStreamEnvironment.unsetAsContext();
@Before
public void setupLatch() {
// ensure that we get a fresh latch for each test
latch = new MultiShotLatch();
}

/**
Expand Down Expand Up @@ -162,7 +153,8 @@ public void testWatermarkPropagation() throws Exception {
public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {

// for this test to work, we need to be sure that no other jobs are being executed
while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
final ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
while (!getRunningJobs(clusterClient).isEmpty()) {
Thread.sleep(100);
}

Expand All @@ -185,22 +177,23 @@ public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.addSink(new DiscardingSink<Integer>());

new Thread("stopper") {
Thread t = new Thread("stopper") {
@Override
public void run() {
try {
// try until we get the running jobs
List<JobID> running;
while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
List<JobID> running = getRunningJobs(clusterClient);
while (running.isEmpty()) {
Thread.sleep(10);
running = getRunningJobs(clusterClient);
}

JobID id = running.get(0);

// send stop until the job is stopped
do {
try {
cluster.stopJob(id);
clusterClient.stop(id);
}
catch (Exception e) {
if (e.getCause() instanceof IllegalStateException) {
Expand All @@ -214,13 +207,14 @@ public void run() {
}
Thread.sleep(10);
}
while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
while (!getRunningJobs(clusterClient).isEmpty());
}
catch (Throwable t) {
t.printStackTrace();
}
}
}.start();
};
t.start();

env.execute();

Expand All @@ -246,6 +240,7 @@ public void run() {
subtaskWatermarks.get(subtaskWatermarks.size() - 1));
}
}
t.join();
}

/**
Expand Down Expand Up @@ -855,4 +850,12 @@ public void run(SourceContext<Integer> ctx) throws Exception {
@Override
public void cancel() {}
}

private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
Collection<JobStatusMessage> statusMessages = client.listJobs().get();
return statusMessages.stream()
.filter(status -> !status.getJobState().isGloballyTerminalState())
.map(JobStatusMessage::getJobId)
.collect(Collectors.toList());
}
}

0 comments on commit 81d809a

Please sign in to comment.