Skip to content

Commit

Permalink
[FLINK-3357] [core] Drop AbstractID#toShortString()
Browse files Browse the repository at this point in the history
This closes #1601
  • Loading branch information
uce authored and StephanEwen committed Feb 8, 2016
1 parent 5c47f38 commit 28feede
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 42 deletions.
Expand Up @@ -81,11 +81,11 @@ public void close() throws Exception {
}

private File getDbPath(String stateName) {
return new File(new File(new File(new File(dbBasePath), jobId.toShortString()), operatorIdentifier), stateName);
return new File(new File(new File(new File(dbBasePath), jobId.toString()), operatorIdentifier), stateName);
}

private String getCheckpointPath(String stateName) {
return checkpointDirectory + "/" + jobId.toShortString() + "/" + operatorIdentifier + "/" + stateName;
return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName;
}

@Override
Expand Down
Expand Up @@ -17,14 +17,6 @@

package org.apache.flink.contrib.streaming.state;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
Expand All @@ -42,6 +34,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;

import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;

/**
Expand Down Expand Up @@ -76,6 +76,8 @@ public class DbStateBackend extends AbstractStateBackend {

private transient Environment env;

private transient String appId;

// ------------------------------------------------------

private final DbBackendConfig dbConfig;
Expand Down Expand Up @@ -159,19 +161,14 @@ public DbStateHandle<S> call() throws Exception {
// store the checkpoint id and timestamp for bookkeeping
long handleId = rnd.nextLong();

// We use the ApplicationID here, because it is restored when
// the job is started from a savepoint (whereas the job ID
// changes with each submission).
String appIdShort = env.getApplicationID().toShortString();

byte[] serializedState = InstantiationUtil.serializeObject(state);
dbAdapter.setCheckpointInsertParams(appIdShort, insertStatement,
dbAdapter.setCheckpointInsertParams(appId, insertStatement,
checkpointID, timestamp, handleId,
serializedState);

insertStatement.executeUpdate();

return new DbStateHandle<>(appIdShort, checkpointID, timestamp, handleId,
return new DbStateHandle<>(appId, checkpointID, timestamp, handleId,
dbConfig, serializedState.length);
}
}, numSqlRetries, sqlRetrySleep);
Expand Down Expand Up @@ -253,6 +250,7 @@ public void initializeForJob(final Environment env,

this.rnd = new Random();
this.env = env;
this.appId = env.getApplicationID().toString().substring(0, 16);

connections = dbConfig.createShardedConnection();

Expand All @@ -270,8 +268,8 @@ public void initializeForJob(final Environment env,
if (nonPartitionedStateBackend == null) {
insertStatement = retry(new Callable<PreparedStatement>() {
public PreparedStatement call() throws SQLException {
dbAdapter.createCheckpointsTable(env.getApplicationID().toShortString(), getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(env.getApplicationID().toShortString(),
dbAdapter.createCheckpointsTable(appId, getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(appId,
getConnections().getFirst());
}
}, numSqlRetries, sqlRetrySleep);
Expand Down Expand Up @@ -300,9 +298,10 @@ public void close() throws Exception {
@Override
public void disposeAllStateForCurrentJob() throws Exception {
if (nonPartitionedStateBackend == null) {
dbAdapter.disposeAllStateForJob(env.getApplicationID().toShortString(), connections.getFirst());
dbAdapter.disposeAllStateForJob(appId, connections.getFirst());
} else {
nonPartitionedStateBackend.disposeAllStateForCurrentJob();
}
}

}
Expand Up @@ -136,14 +136,15 @@ public void testSetupAndSerialization() throws Exception {

Environment env = new DummyEnvironment("test", 1, 0);
backend.initializeForJob(env, "dummy-setup-ser", StringSerializer.INSTANCE);
String appId = env.getApplicationID().toString().substring(0, 16);

assertNotNull(backend.getConnections());
assertTrue(
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));

backend.disposeAllStateForCurrentJob();
assertFalse(
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
backend.close();

assertTrue(backend.getConnections().getFirst().isClosed());
Expand All @@ -153,6 +154,7 @@ public void testSetupAndSerialization() throws Exception {
public void testSerializableState() throws Exception {
Environment env = new DummyEnvironment("test", 1, 0);
DbStateBackend backend = new DbStateBackend(conf);
String appId = env.getApplicationID().toString().substring(0, 16);

backend.initializeForJob(env, "dummy-ser-state", StringSerializer.INSTANCE);

Expand All @@ -173,12 +175,12 @@ public void testSerializableState() throws Exception {
assertEquals(state2, handle2.getState(getClass().getClassLoader()));
handle2.discardState();

assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));

assertEquals(state3, handle3.getState(getClass().getClassLoader()));
handle3.discardState();

assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));

backend.close();

Expand Down
15 changes: 0 additions & 15 deletions flink-core/src/main/java/org/apache/flink/util/AbstractID.java
Expand Up @@ -51,9 +51,6 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
/** The memoized value returned by toString() */
private String toString;

/** The memoized value returned by toShortString() */
private String toShortString;

// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -145,7 +142,6 @@ public void read(DataInputView in) throws IOException {
this.upperPart = in.readLong();

this.toString = null;
this.toShortString = null;
}

@Override
Expand Down Expand Up @@ -188,17 +184,6 @@ public String toString() {

return this.toString;
}

public String toShortString() {
if (this.toShortString == null) {
final byte[] ba = new byte[SIZE_OF_LONG];
longToByteArray(upperPart, ba, 0);

this.toShortString = StringUtils.byteToHexString(ba);
}

return this.toShortString;
}

@Override
public int compareTo(AbstractID o) {
Expand Down
Expand Up @@ -48,7 +48,7 @@ public String handleRequest(Execution execAttempt, Map<String, String> params) t

gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex());
gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
gen.writeStringField("id", execAttempt.getAttemptId().toShortString());
gen.writeStringField("id", execAttempt.getAttemptId().toString());

gen.writeArrayFieldStart("user-accumulators");
for (StringifiedAccumulatorResult acc : accs) {
Expand Down
Expand Up @@ -83,7 +83,7 @@ public InputChannelDeploymentDescriptor[] getInputChannelDeploymentDescriptors()
public String toString() {
return String.format("InputGateDeploymentDescriptor [result id: %s, " +
"consumed subpartition index: %d, input channels: %s]",
consumedResultId.toShortString(), consumedSubpartitionIndex,
consumedResultId.toString(), consumedSubpartitionIndex,
Arrays.toString(inputChannels));
}
}
Expand Up @@ -72,6 +72,6 @@ public int hashCode() {

@Override
public String toString() {
return partitionId.toShortString() + "@" + producerId.toShortString();
return partitionId.toString() + "@" + producerId.toString();
}
}

0 comments on commit 28feede

Please sign in to comment.