Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5163] Ports the production functions to the new state abstraction. #2871

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ under the License.
<version>${rabbitmq.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.SerializedCheckpointData;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.junit.After;
import org.junit.Before;
Expand All @@ -53,6 +56,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;


/**
Expand Down Expand Up @@ -83,7 +87,13 @@ public class RMQSourceTest {
@Before
public void beforeTest() throws Exception {

OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);

source = new RMQTestSource();
source.initializeState(mockContext);
source.open(config);

messageId = 0;
Expand Down Expand Up @@ -128,6 +138,12 @@ public void throwExceptionIfConnectionFactoryReturnNull() throws Exception {
@Test
public void testCheckpointing() throws Exception {
source.autoAck = false;

StreamSource<String, RMQSource<String>> src = new StreamSource<>(source);
AbstractStreamOperatorTestHarness<String> testHarness =
new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
testHarness.open();

sourceThread.start();

Thread.sleep(5);
Expand All @@ -141,10 +157,10 @@ public void testCheckpointing() throws Exception {

for (int i=0; i < numSnapshots; i++) {
long snapshotId = random.nextLong();
SerializedCheckpointData[] data;
OperatorStateHandles data;

synchronized (DummySourceContext.lock) {
data = source.snapshotState(snapshotId, System.currentTimeMillis());
data = testHarness.snapshot(snapshotId, System.currentTimeMillis());
previousSnapshotId = lastSnapshotId;
lastSnapshotId = messageId;
}
Expand All @@ -153,15 +169,25 @@ public void testCheckpointing() throws Exception {

// check if the correct number of messages have been snapshotted
final long numIds = lastSnapshotId - previousSnapshotId;
assertEquals(numIds, data[0].getNumIds());
// deserialize and check if the last id equals the last snapshotted id
ArrayDeque<Tuple2<Long, List<String>>> deque = SerializedCheckpointData.toDeque(data, new StringSerializer());

RMQTestSource sourceCopy = new RMQTestSource();
StreamSource<String, RMQTestSource> srcCopy = new StreamSource<>(sourceCopy);
AbstractStreamOperatorTestHarness<String> testHarnessCopy =
new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);

testHarnessCopy.setup();
testHarnessCopy.initializeState(data);
testHarnessCopy.open();

ArrayDeque<Tuple2<Long, List<String>>> deque = sourceCopy.getRestoredState();
List<String> messageIds = deque.getLast().f1;

assertEquals(numIds, messageIds.size());
if (messageIds.size() > 0) {
assertEquals(lastSnapshotId, (long) Long.valueOf(messageIds.get(messageIds.size() - 1)));
}

// check if the messages are being acknowledged and the transaction comitted
// check if the messages are being acknowledged and the transaction committed
synchronized (DummySourceContext.lock) {
source.notifyCheckpointComplete(snapshotId);
}
Expand Down Expand Up @@ -313,12 +339,24 @@ public TypeInformation<String> getProducedType() {

private class RMQTestSource extends RMQSource<String> {

private ArrayDeque<Tuple2<Long, List<String>>> restoredState;

public RMQTestSource() {
super(new RMQConnectionConfig.Builder().setHost("hostTest")
.setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
, "queueDummy", true, new StringDeserializationScheme());
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
super.initializeState(context);
this.restoredState = this.pendingCheckpoints;
}

public ArrayDeque<Tuple2<Long, List<String>>> getRestoredState() {
return this.restoredState;
}

@Override
public void open(Configuration config) throws Exception {
super.open(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected void testProgram() throws Exception {
env.setParallelism(PARALLELISM);

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(), INTERVAL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -117,10 +119,10 @@ public static void destroyHDFS() {
public void testInvalidPathSpecification() throws Exception {

String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/invalid/";
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(invalidPath));

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, invalidPath,
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
try {
monitoringFunction.run(new DummySourceContext() {
Expand All @@ -135,7 +137,7 @@ public void collect(TimestampedFileInputSplit element) {
Assert.fail("Test passed with an invalid path.");

} catch (FileNotFoundException e) {
Assert.assertEquals("The provided file path " + invalidPath + " does not exist.", e.getMessage());
Assert.assertEquals("The provided file path " + format.getFilePath().toString() + " does not exist.", e.getMessage());
}
}

Expand Down Expand Up @@ -491,6 +493,8 @@ private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit

private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {

private static final long serialVersionUID = -6727603565381560267L;

private final OneShotLatch latch;

private FileInputSplit split;
Expand Down Expand Up @@ -556,14 +560,17 @@ public void testFilePathFiltering() throws Exception {

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new FilePathFilter() {

private static final long serialVersionUID = 2611449927338589804L;

@Override
public boolean filterPath(Path filePath) {
return filePath.getName().startsWith("**");
}
});

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

final FileVerifyingSourceContext context =
Expand Down Expand Up @@ -601,7 +608,7 @@ public void testSortingOnModTime() throws Exception {
FileInputSplit[] splits = format.createInputSplits(1);

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);
Expand Down Expand Up @@ -633,7 +640,7 @@ public void testProcessOnce() throws Exception {
format.setFilesFilter(FilePathFilter.createDefaultFilter());

final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);
Expand Down Expand Up @@ -682,6 +689,80 @@ public void run() {
}
}

@Test
public void testFunctionRestore() throws Exception {

org.apache.hadoop.fs.Path path = null;
long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
path = file.f0;
fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));

final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);

final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
testHarness.open();

final Throwable[] error = new Throwable[1];

final OneShotLatch latch = new OneShotLatch();

// run the source asynchronously
Thread runner = new Thread() {
@Override
public void run() {
try {
monitoringFunction.run(new DummySourceContext() {
@Override
public void collect(TimestampedFileInputSplit element) {
latch.trigger();
}
});
}
catch (Throwable t) {
t.printStackTrace();
error[0] = t;
}
}
};
runner.start();

if (!latch.isTriggered()) {
latch.await();
}

OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
monitoringFunction.cancel();
runner.join();

testHarness.close();

final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
new StreamSource<>(monitoringFunctionCopy);

AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarnessCopy =
new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
testHarnessCopy.initializeState(snapshot);
testHarnessCopy.open();

Assert.assertNull(error[0]);
Assert.assertEquals(fileModTime, monitoringFunctionCopy.getGlobalModificationTime());

hdfs.delete(path, false);
}

@Test
public void testProcessContinuously() throws Exception {
final OneShotLatch latch = new OneShotLatch();
Expand All @@ -698,7 +779,7 @@ public void testProcessContinuously() throws Exception {
format.setFilesFilter(FilePathFilter.createDefaultFilter());

final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,9 +1351,7 @@ private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFo
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");

ContinuousFileMonitoringFunction<OUT> monitoringFunction =
new ContinuousFileMonitoringFunction<>(
inputFormat, inputFormat.getFilePath().toString(),
monitoringMode, getParallelism(), interval);
new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);

ContinuousFileReaderOperator<OUT> reader =
new ContinuousFileReaderOperator<>(inputFormat);
Expand Down