Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -185,7 +186,7 @@ public RocksDBKeyedStateBackend(
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoreState
Collection<KeyGroupsStateHandle> restoreState
) throws Exception {

this(jobId,
Expand Down Expand Up @@ -603,7 +604,7 @@ public RocksDBRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBack
* @throws ClassNotFoundException
* @throws RocksDBException
*/
public void doRestore(List<KeyGroupsStateHandle> keyGroupsStateHandles)
public void doRestore(Collection<KeyGroupsStateHandle> keyGroupsStateHandles)
throws IOException, ClassNotFoundException, RocksDBException {

for (KeyGroupsStateHandle keyGroupsStateHandle : keyGroupsStateHandles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.ObjectOutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -258,7 +259,7 @@ public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
Collection<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception {

lazyInitializeForJob(env, operatorIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
Expand Down Expand Up @@ -70,7 +70,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

/**
* Tests for asynchronous RocksDB Key/Value state checkpoints.
Expand Down Expand Up @@ -136,7 +136,7 @@ public String getKey(String value) throws Exception {
@Override
public void acknowledgeCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointStateHandles checkpointStateHandles) {
SubtaskState checkpointStateHandles) {

super.acknowledgeCheckpoint(checkpointMetaData);

Expand All @@ -148,8 +148,8 @@ public void acknowledgeCheckpoint(
e.printStackTrace();
}

// should be only one k/v state
assertEquals(1, checkpointStateHandles.getKeyGroupsStateHandle().size());
// should be one k/v state
assertNotNull(checkpointStateHandles.getManagedKeyedState());

// we now know that the checkpoint went through
ensureCheckpointLatch.trigger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public interface RuntimeContext {
* @return The distributed cache of the worker executing this instance.
*/
DistributedCache getDistributedCache();

// ------------------------------------------------------------------------
// Methods for accessing state
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -266,7 +266,7 @@ public interface RuntimeContext {
* Gets a handle to the system's key/value list state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* holds lists. One can adds elements to the list, or retrieve the list as a whole.
*
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
Expand Down Expand Up @@ -331,7 +331,7 @@ public interface RuntimeContext {
* return new Tuple2<>(value, sum.get());
* }
* });
*
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state;

import org.apache.flink.annotation.PublicEvolving;

/**
* This interface contains methods for registering keyed state with a managed store.
*/
@PublicEvolving
public interface KeyedStateStore {

/**
* Gets a handle to the system's key/value state. The key/value state is only accessible
* if the function is executed on a KeyedStream. On each access, the state exposes the value
* for the the key of the element currently processed by the function.
* Each function may have multiple partitioned states, addressed with different names.
*
* <p>Because the scope of each value is the key of the currently processed element,
* and the elements are distributed by the Flink runtime, the system can transparently
* scale out and redistribute the state and KeyedStream.
*
* <p>The following code example shows how to implement a continuous counter that counts
* how many times elements of a certain key occur, and emits an updated count for that
* element on each occurrence.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
*
* private ValueState<Long> count;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getState(
* new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* long count = state.value() + 1;
* state.update(value);
* return new Tuple2<>(value, count);
* }
* });
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key/value list state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* holds lists. One can adds elements to the list, or retrieve the list as a whole.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() {
*
* private ListState<MyType> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getListState(
* new ListStateDescriptor<>("myState", MyType.class));
* }
*
* public void flatMap(MyType value, Collector<MyType> out) {
* if (value.isDivider()) {
* for (MyType t : state.get()) {
* out.collect(t);
* }
* } else {
* state.add(value);
* }
* }
* });
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part os a KeyedStream).
*/
@PublicEvolving
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key/value list state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private ReducingState<Long> sum;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getReducingState(
* new ReducingStateDescriptor<>("sum", MyType.class, 0L, (a, b) -> a + b));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* sum.add(value.count());
* return new Tuple2<>(value, sum.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@

package org.apache.flink.api.common.state;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;
import java.util.Set;

/**
* Interface for a backend that manages operator state.
* This interface contains methods for registering operator state with a managed store.
*/
@PublicEvolving
public interface OperatorStateStore {

String DEFAULT_OPERATOR_STATE_NAME = "_default_";

/**
* Creates a state descriptor of the given name that uses Java serialization to persist the
* state.
Expand All @@ -39,7 +40,7 @@ public interface OperatorStateStore {
* @return A list state using Java serialization to serialize state objects.
* @throws Exception
*/
ListState<Serializable> getSerializableListState(String stateName) throws Exception;
<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;

/**
* Creates (or restores) a list state. Each state is registered under a unique name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

package org.apache.flink.core.fs.local;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

/**
* The <code>LocalDataInputStream</code> class is a wrapper class for a data
Expand All @@ -36,6 +36,7 @@ public class LocalDataInputStream extends FSDataInputStream {

/** The file input stream used to read data from.*/
private final FileInputStream fis;
private final FileChannel fileChannel;

/**
* Constructs a new <code>LocalDataInputStream</code> object from a given {@link File} object.
Expand All @@ -46,16 +47,19 @@ public class LocalDataInputStream extends FSDataInputStream {
*/
public LocalDataInputStream(File file) throws IOException {
this.fis = new FileInputStream(file);
this.fileChannel = fis.getChannel();
}

@Override
public void seek(long desired) throws IOException {
this.fis.getChannel().position(desired);
if (desired != getPos()) {
this.fileChannel.position(desired);
}
}

@Override
public long getPos() throws IOException {
return this.fis.getChannel().position();
return this.fileChannel.position();
}

@Override
Expand All @@ -70,6 +74,7 @@ public int read(@Nonnull byte[] buffer, int offset, int length) throws IOExcepti

@Override
public void close() throws IOException {
// Accoring to javadoc, this also closes the channel
this.fis.close();
}

Expand Down
37 changes: 37 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import java.util.Collection;
import java.util.Map;

public final class CollectionUtil {

private CollectionUtil() {
throw new AssertionError();
}

public static boolean isNullOrEmpty(Collection<?> collection) {
return collection == null || collection.isEmpty();
}

public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty();
}
}
Loading