Skip to content

Commit

Permalink
[FLINK-19166][table-runtime] StreamingFileWriter should register List…
Browse files Browse the repository at this point in the history
…ener before the initialization of buckets
  • Loading branch information
JingsongLi committed Sep 9, 2020
1 parent 99cd44f commit beb8f3f
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 8 deletions.
Expand Up @@ -79,17 +79,10 @@ public StreamingFileWriter(
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
helper = new StreamingFileSinkHelper<>(
buckets,
context.isRestored(),
context.getOperatorStateStore(),
getRuntimeContext().getProcessingTimeService(),
bucketCheckInterval);

// Set listener before the initialization of Buckets.
inactivePartitions = new HashSet<>();
currentWatermark = Long.MIN_VALUE;
buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {

@Override
public void bucketCreated(Bucket<RowData, String> bucket) {
}
Expand All @@ -99,6 +92,14 @@ public void bucketInactive(Bucket<RowData, String> bucket) {
inactivePartitions.add(bucket.getBucketId());
}
});

helper = new StreamingFileSinkHelper<>(
buckets,
context.isRestored(),
context.getOperatorStateStore(),
getRuntimeContext().getProcessingTimeService(),
bucketCheckInterval);
currentWatermark = Long.MIN_VALUE;
}

@Override
Expand Down
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.filesystem.stream;

import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;

import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Test for {@link StreamingFileWriter}.
*/
public class StreamingFileWriterTest {

@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private Path path;

@Before
public void before() throws IOException {
File file = TEMPORARY_FOLDER.newFile();
file.delete();
path = new Path(file.toURI());
}

@Test
public void testFailover() throws Exception {
OperatorSubtaskState state;
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeEmptyState();
harness.open();
harness.processElement(row("1"), 0);
harness.processElement(row("2"), 0);
harness.processElement(row("2"), 0);
state = harness.snapshot(1, 1);
harness.processElement(row("3"), 0);
harness.processElement(row("4"), 0);
harness.notifyOfCompletedCheckpoint(1);
List<String> partitions = collect(harness);
Assert.assertEquals(Arrays.asList("1", "2"), partitions);
}

// first retry, no partition {1, 2} records
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeState(state);
harness.open();
harness.processElement(row("3"), 0);
harness.processElement(row("4"), 0);
state = harness.snapshot(2, 2);
harness.notifyOfCompletedCheckpoint(2);
List<String> partitions = collect(harness);
Assert.assertEquals(Arrays.asList("1", "2", "3", "4"), partitions);
}

// second retry, partition {4} repeat
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeState(state);
harness.open();
harness.processElement(row("4"), 0);
harness.processElement(row("5"), 0);
state = harness.snapshot(3, 3);
harness.notifyOfCompletedCheckpoint(3);
List<String> partitions = collect(harness);
Assert.assertEquals(Arrays.asList("3", "4", "5"), partitions);
}

// third retry, multiple snapshots
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeState(state);
harness.open();
harness.processElement(row("6"), 0);
harness.processElement(row("7"), 0);
harness.snapshot(4, 4);
harness.processElement(row("8"), 0);
harness.snapshot(5, 5);
harness.processElement(row("9"), 0);
harness.snapshot(6, 6);
harness.notifyOfCompletedCheckpoint(5);
List<String> partitions = collect(harness);
// should not contains partition {9}
Assert.assertEquals(Arrays.asList("4", "5", "6", "7", "8"), partitions);
}
}

private static RowData row(String s) {
return GenericRowData.of(StringData.fromString(s));
}

private static List<String> collect(
OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness) {
List<String> parts = new ArrayList<>();
harness.extractOutputValues().forEach(m -> parts.addAll(m.partitions));
return parts;
}

private OneInputStreamOperatorTestHarness<RowData, CommitMessage> create() throws Exception {
StreamingFileWriter writer = new StreamingFileWriter(1000, StreamingFileSink.forRowFormat(
path, (Encoder<RowData>) (element, stream) ->
stream.write((element.getString(0) + "\n").getBytes(StandardCharsets.UTF_8)))
.withBucketAssigner(new BucketAssigner<RowData, String>() {
@Override
public String getBucketId(RowData element, Context context) {
return element.getString(0).toString();
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
})
.withRollingPolicy(OnCheckpointRollingPolicy.build()));
OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = new OneInputStreamOperatorTestHarness<>(
writer, 1, 1, 0);
harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
return harness;
}
}

0 comments on commit beb8f3f

Please sign in to comment.