Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private <K, T, OUT> DataStream<OUT> readWindowOperator(
throws IOException {
KeyedStateInputFormat<K, W, OUT> format =
new KeyedStateInputFormat<>(
metadata.getOperatorState(uid),
metadata.getOperatorState(OperatorIdentifier.forUid(uid)),
stateBackend,
MutableConfig.of(env.getConfiguration()),
operator);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.api;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

/** Identifies an operator, either based on a {@code uid} or {@code uidHash}. */
@Internal
public class OperatorIdentifier implements Serializable {
// this is only used for logging purposes
@Nullable private final String uid;
// this is the runtime representation of a uid hash
private final OperatorID operatorId;

private OperatorIdentifier(OperatorID operatorId, @Nullable String uid) {
this.operatorId = operatorId;
this.uid = uid;
}

public static OperatorIdentifier forUidHash(String uidHash) {
Preconditions.checkNotNull(uidHash);
return new OperatorIdentifier(new OperatorID(StringUtils.hexStringToByte(uidHash)), null);
}

public static OperatorIdentifier forUid(String uid) {
Preconditions.checkNotNull(uid);
return new OperatorIdentifier(OperatorIDGenerator.fromUid(uid), uid);
}

public Optional<String> getUid() {
return Optional.ofNullable(uid);
}

public OperatorID getOperatorId() {
return operatorId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OperatorIdentifier that = (OperatorIdentifier) o;
return Objects.equals(operatorId, that.operatorId);
}

@Override
public int hashCode() {
return Objects.hash(operatorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private <T> DataStream<T> readListState(
String uid, TypeInformation<T> typeInfo, ListStateDescriptor<T> descriptor)
throws IOException {

OperatorState operatorState = metadata.getOperatorState(uid);
OperatorState operatorState = metadata.getOperatorState(OperatorIdentifier.forUid(uid));
ListStateInputFormat<T> inputFormat =
new ListStateInputFormat<>(
operatorState,
Expand Down Expand Up @@ -229,7 +229,7 @@ private <T> DataStream<T> readUnionState(
String uid, TypeInformation<T> typeInfo, ListStateDescriptor<T> descriptor)
throws IOException {

OperatorState operatorState = metadata.getOperatorState(uid);
OperatorState operatorState = metadata.getOperatorState(OperatorIdentifier.forUid(uid));
UnionStateInputFormat<T> inputFormat =
new UnionStateInputFormat<>(
operatorState,
Expand Down Expand Up @@ -302,7 +302,7 @@ private <K, V> DataStream<Tuple2<K, V>> readBroadcastState(
MapStateDescriptor<K, V> descriptor)
throws IOException {

OperatorState operatorState = metadata.getOperatorState(uid);
OperatorState operatorState = metadata.getOperatorState(OperatorIdentifier.forUid(uid));
BroadcastStateInputFormat<K, V> inputFormat =
new BroadcastStateInputFormat<>(
operatorState,
Expand Down Expand Up @@ -380,7 +380,7 @@ public <K, OUT> DataStream<OUT> readKeyedState(
TypeInformation<OUT> outTypeInfo)
throws IOException {

OperatorState operatorState = metadata.getOperatorState(uid);
OperatorState operatorState = metadata.getOperatorState(OperatorIdentifier.forUid(uid));
KeyedStateInputFormat<K, VoidNamespace, OUT> inputFormat =
new KeyedStateInputFormat<>(
operatorState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private SavepointWriter(SavepointMetadataV2 metadata, @Nullable StateBackend sta
* @return A modified savepoint.
*/
public SavepointWriter removeOperator(String uid) {
metadata.removeOperator(uid);
metadata.removeOperator(OperatorIdentifier.forUid(uid));
return this;
}

Expand All @@ -193,7 +193,7 @@ public SavepointWriter removeOperator(String uid) {
*/
public <T> SavepointWriter withOperator(
String uid, StateBootstrapTransformation<T> transformation) {
metadata.addOperator(uid, transformation);
metadata.addOperator(OperatorIdentifier.forUid(uid), transformation);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private <K, T, OUT> DataStream<OUT> readWindowOperator(
throws IOException {
KeyedStateInputFormat<K, W, OUT> format =
new KeyedStateInputFormat<>(
metadata.getOperatorState(uid),
metadata.getOperatorState(OperatorIdentifier.forUid(uid)),
stateBackend,
MutableConfig.of(env.getConfiguration()),
operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.state.api.StateBootstrapTransformation;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.state.api.runtime.StateBootstrapTransformationWithID;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -82,27 +82,38 @@ public Collection<MasterState> getMasterStates() {
* @return Operator state for the given UID.
* @throws IOException If the savepoint does not contain operator state with the given uid.
*/
public OperatorState getOperatorState(String uid) throws IOException {
OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
public OperatorState getOperatorState(OperatorIdentifier identifier) throws IOException {
OperatorID operatorID = identifier.getOperatorId();

OperatorStateSpecV2 operatorState = operatorStateIndex.get(operatorID);
if (operatorState == null || operatorState.isNewStateTransformation()) {
throw new IOException("Savepoint does not contain state with operator uid " + uid);
throw new IOException(
"Savepoint does not contain state with operator "
+ identifier
.getUid()
.map(uid -> "uid " + uid)
.orElse("hash " + operatorID.toHexString()));
}

return operatorState.asExistingState();
}

public void removeOperator(String uid) {
operatorStateIndex.remove(OperatorIDGenerator.fromUid(uid));
public void removeOperator(OperatorIdentifier identifier) {
operatorStateIndex.remove(identifier.getOperatorId());
}

public void addOperator(String uid, StateBootstrapTransformation<?> transformation) {
OperatorID id = OperatorIDGenerator.fromUid(uid);
public void addOperator(
OperatorIdentifier identifier, StateBootstrapTransformation<?> transformation) {
OperatorID id = identifier.getOperatorId();

if (operatorStateIndex.containsKey(id)) {
throw new IllegalArgumentException(
"The savepoint already contains uid " + uid + ". All uid's must be unique");
"The savepoint already contains "
+ identifier
.getUid()
.map(uid -> "uid " + uid)
.orElse("hash " + id.toHexString())
+ ". All uid's/hashes must be unique.");
}

operatorStateIndex.put(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.api;

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class OperatorIdentifierTest {

@Test
void testForUidHash() {
final OperatorID operatorId = new OperatorID();
final String hash = operatorId.toHexString();

assertThat(OperatorIdentifier.forUidHash(hash).getOperatorId()).isEqualTo(operatorId);
}

@Test
void testForUid() {
final String uid = "uid";
final OperatorID operatorId = OperatorIDGenerator.fromUid(uid);

assertThat(OperatorIdentifier.forUid(uid).getOperatorId()).isEqualTo(operatorId);
}

@Test
void testEqualityBasedOnOperatorId() {
final String uid = "uid";
final OperatorID operatorId = OperatorIDGenerator.fromUid(uid);

assertThat(OperatorIdentifier.forUid(uid))
.isEqualTo(OperatorIdentifier.forUidHash(operatorId.toHexString()));
}
}