Skip to content

Commit

Permalink
[FLINK-2976] [streaming-contrib] Use ApplicationID in DbStateBackend …
Browse files Browse the repository at this point in the history
…instead of JobID

[comments] Set larger timeout for future when triggering savepoint
  • Loading branch information
uce committed Jan 11, 2016
1 parent 52a287a commit d974334
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,23 @@

package org.apache.flink.client;

import java.io.File;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import akka.actor.ActorSystem;

import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.cli.SavepointOptions;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
Expand All @@ -69,28 +51,52 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestRunningJobsStatus;

/**
* Implementation of a simple command line frontend for executing programs.
*/
Expand Down Expand Up @@ -486,7 +492,7 @@ protected int list(String[] args) {

LOG.info("Connecting to JobManager to retrieve list of jobs");
Future<Object> response = jobManagerGateway.ask(
JobManagerMessages.getRequestRunningJobsStatus(),
getRequestRunningJobsStatus(),
askTimeout);

Object result;
Expand Down Expand Up @@ -694,12 +700,15 @@ protected int savepoint(String[] args) {
private int triggerSavepoint(SavepointOptions options, JobID jobId) {
try {
ActorGateway jobManager = getJobManagerGateway(options);
Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId), askTimeout);

logAndSysout("Triggering savepoint for job " + jobId + ".");
Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId),
new FiniteDuration(1, TimeUnit.HOURS));

Object result;
try {
logAndSysout("Triggering savepoint for job " + jobId + ". Waiting for response...");
result = Await.result(response, askTimeout);
logAndSysout("Waiting for response...");
result = Await.result(response, FiniteDuration.Inf());
}
catch (Exception e) {
throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
Expand Down Expand Up @@ -733,11 +742,12 @@ else if (result instanceof TriggerSavepointFailure) {
private int disposeSavepoint(SavepointOptions options, String savepointPath) {
try {
ActorGateway jobManager = getJobManagerGateway(options);
logAndSysout("Disposing savepoint '" + savepointPath + "'.");
Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), askTimeout);

Object result;
try {
logAndSysout("Disposing savepoint '" + savepointPath + "'. Waiting for response...");
logAndSysout("Waiting for response...");
result = Await.result(response, askTimeout);
}
catch (Exception e) {
Expand Down Expand Up @@ -1118,7 +1128,8 @@ public Integer run() throws Exception {
return CliFrontend.this.run(params);
}
});
} catch (Exception e) {
}
catch (Exception e) {
return handleError(e);
}
}
Expand All @@ -1130,7 +1141,7 @@ public Integer run() throws Exception {
case ACTION_CANCEL:
return cancel(params);
case ACTION_SAVEPOINT:
return savepoint(params)
return savepoint(params);
case "-h":
case "--help":
CliFrontendParser.printHelp();
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,30 @@
/**
* Adapter interface for executing different checkpointing related operations on
* the underlying database.
*
*/
public interface DbAdapter extends Serializable {

/**
* Initialize tables for storing non-partitioned checkpoints for the given
* job id and database connection.
* application id and database connection.
*
*/
void createCheckpointsTable(String jobId, Connection con) throws SQLException;
void createCheckpointsTable(String appId, Connection con) throws SQLException;

/**
* Checkpoints will be inserted in the database using prepared statements.
* This methods should prepare and return the statement that will be used
* later to insert using the given connection.
*
*/
PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException;
PreparedStatement prepareCheckpointInsert(String appId, Connection con) throws SQLException;

/**
* Set the {@link PreparedStatement} parameters for the statement returned
* by {@link #prepareCheckpointInsert(String, Connection)}.
*
* @param jobId
* Id of the current job.
* @param appId
* Id of the current application.
* @param insertStatement
* Statement returned by
* {@link #prepareCheckpointInsert(String, Connection)}.
Expand All @@ -68,14 +67,14 @@ public interface DbAdapter extends Serializable {
* The serialized checkpoint.
* @throws SQLException
*/
void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
void setCheckpointInsertParams(String appId, PreparedStatement insertStatement, long checkpointId,
long timestamp, long handleId, byte[] checkpoint) throws SQLException;

/**
* Retrieve the serialized checkpoint data from the database.
*
* @param jobId
* Id of the current job.
* @param appId
* Id of the current application.
* @param con
* Database connection
* @param checkpointId
Expand All @@ -88,14 +87,14 @@ void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement,
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
byte[] getCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException;

/**
* Remove the given checkpoint from the database.
*
* @param jobId
* Id of the current job.
* @param appId
* Id of the current application.
* @param con
* Database connection
* @param checkpointId
Expand All @@ -108,16 +107,16 @@ byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long check
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
void deleteCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException;

/**
* Remove all states for the given JobId, by for instance dropping the
* entire table.
* Remove all states for the given {@link org.apache.flink.api.common.ApplicationID},
* by for instance dropping the entire table.
*
* @throws SQLException
*/
void disposeAllStateForJob(String jobId, Connection con) throws SQLException;
void disposeAllStateForJob(String appId, Connection con) throws SQLException;

/**
* Initialize the necessary tables for the given stateId. The state id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,20 @@ public DbStateHandle<S> call() throws Exception {
// We create a unique long id for each handle, but we also
// store the checkpoint id and timestamp for bookkeeping
long handleId = rnd.nextLong();
String jobIdShort = env.getJobID().toShortString();

// We use the ApplicationID here, because it is restored when
// the job is started from a savepoint (whereas the job ID
// changes with each submission).
String appIdShort = env.getApplicationID().toShortString();

byte[] serializedState = InstantiationUtil.serializeObject(state);
dbAdapter.setCheckpointInsertParams(jobIdShort, insertStatement,
dbAdapter.setCheckpointInsertParams(appIdShort, insertStatement,
checkpointID, timestamp, handleId,
serializedState);

insertStatement.executeUpdate();

return new DbStateHandle<S>(jobIdShort, checkpointID, timestamp, handleId,
return new DbStateHandle<S>(appIdShort, checkpointID, timestamp, handleId,
dbConfig, serializedState.length);
}
}, numSqlRetries, sqlRetrySleep);
Expand All @@ -181,7 +185,7 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkp
public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
return new LazyDbKvState<K, V>(
stateId + "_" + env.getJobID().toShortString(),
stateId + "_" + env.getApplicationID().toShortString(),
env.getTaskInfo().getIndexOfThisSubtask() == 0,
getConnections(),
getConfiguration(),
Expand Down Expand Up @@ -211,8 +215,8 @@ public void initializeForJob(final Environment env) throws Exception {
if (nonPartitionedStateBackend == null) {
insertStatement = retry(new Callable<PreparedStatement>() {
public PreparedStatement call() throws SQLException {
dbAdapter.createCheckpointsTable(env.getJobID().toShortString(), getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(env.getJobID().toShortString(),
dbAdapter.createCheckpointsTable(env.getApplicationID().toShortString(), getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(env.getApplicationID().toShortString(),
getConnections().getFirst());
}
}, numSqlRetries, sqlRetrySleep);
Expand Down Expand Up @@ -241,7 +245,7 @@ public void close() throws Exception {
@Override
public void disposeAllStateForCurrentJob() throws Exception {
if (nonPartitionedStateBackend == null) {
dbAdapter.disposeAllStateForJob(env.getJobID().toShortString(), connections.getFirst());
dbAdapter.disposeAllStateForJob(env.getApplicationID().toShortString(), connections.getFirst());
} else {
nonPartitionedStateBackend.disposeAllStateForCurrentJob();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DbStateHandle.class);

private final String jobId;
private final String appId;
private final DbBackendConfig dbConfig;

private final long checkpointId;
Expand All @@ -49,7 +49,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
private final long stateSize;

public DbStateHandle(
String jobId,
String appId,
long checkpointId,
long checkpointTs,
long handleId,
Expand All @@ -58,7 +58,7 @@ public DbStateHandle(

this.checkpointId = checkpointId;
this.handleId = handleId;
this.jobId = jobId;
this.appId = appId;
this.dbConfig = dbConfig;
this.checkpointTs = checkpointTs;
this.stateSize = stateSize;
Expand All @@ -68,7 +68,7 @@ protected byte[] getBytes() throws IOException {
return retry(new Callable<byte[]>() {
public byte[] call() throws Exception {
try (ShardedConnection con = dbConfig.createShardedConnection()) {
return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
return dbConfig.getDbAdapter().getCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs, handleId);
}
}
}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
Expand All @@ -80,7 +80,7 @@ public void discardState() {
retry(new Callable<Boolean>() {
public Boolean call() throws Exception {
try (ShardedConnection con = dbConfig.createShardedConnection()) {
dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
dbConfig.getDbAdapter().deleteCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs, handleId);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check

// ------------------------------------------------------

// Unique id for this state (jobID_operatorID_stateName)
// Unique id for this state (appID_operatorID_stateName)
private final String kvStateId;
private final boolean compact;

Expand Down

0 comments on commit d974334

Please sign in to comment.