Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ public class ConfigurationKeys {
public static final String FLOW_EDGE_ID_KEY = "flow.edgeId";
public static final String FLOW_DESCRIPTION_KEY = "flow.description";
public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId";
/**
* Epoch milliseconds at which the LAUNCH {@link org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction}
* for this flow execution was inserted into the {@link org.apache.gobblin.service.modules.orchestration.DagActionStore}.
* Set on the {@link org.apache.gobblin.runtime.api.FlowSpec} during {@code LaunchDagProc} initialization so that
* downstream {@link org.apache.gobblin.runtime.api.SpecProducer} implementations can compute end-to-end launch latency.
* Absent if the underlying store cannot report it.
*/
public static final String DAG_ACTION_INSERT_TIME_MILLIS_KEY = "flow.dagAction.insertTimeMillis";
public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
public static final String FLOW_APPLY_RETENTION = "flow.applyRetention";
public static final String FLOW_APPLY_INPUT_RETENTION = "flow.applyInputRetention";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Optional;

import lombok.Data;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -197,4 +198,21 @@ default void addFlowDagAction(String flowGroup, String flowName, long flowExecut
* @throws IOException Exception in retrieving {@link DagAction}s.
*/
Collection<DagAction> getDagActions() throws IOException;

/**
* Returns the time the given {@link DagAction} was inserted into the store, in epoch milliseconds, or
* {@link Optional#empty()} if the row is not present (e.g. already deleted by retention or post-processing
* cleanup). Implementations that cannot supply this information may return {@link Optional#empty()}.
*
* <p>The returned timestamp is read directly from the underlying store, so it is consistent across all
* processes regardless of local clock skew. Note that the underlying column may be persisted at second
* precision by some implementations.
*
* @param dagAction the action to look up
* @return the insert time in epoch milliseconds, or empty if unknown
* @throws IOException on lookup failure
*/
default Optional<Long> getDagActionInsertTimeMillis(DagAction dagAction) throws IOException {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,12 @@ default void addFlowDagAction(String flowGroup, String flowName, long flowExecut
* @throws IOException Exception in retrieving {@link DagActionStore.DagAction}s.
*/
Collection<DagActionStore.DagAction> getDagActions() throws IOException;

/**
* See {@link DagActionStore#getDagActionInsertTimeMillis(DagActionStore.DagAction)}.
* @throws IOException on lookup failure
*/
default Optional<Long> getDagActionInsertTimeMillis(DagActionStore.DagAction dagAction) throws IOException {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,9 @@ public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOExce
public Collection<DagActionStore.DagAction> getDagActions() throws IOException {
return this.dagActionStore.getDagActions();
}

@Override
public Optional<Long> getDagActionInsertTimeMillis(DagActionStore.DagAction dagAction) throws IOException {
return this.dagActionStore.getDagActionInsertTimeMillis(dagAction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import com.google.inject.Inject;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class MysqlDagActionStore implements DagActionStore {
+ "VALUES (?, ?, ?, ?, ?)";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?";
private static final String GET_ALL_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, job_name, dag_action FROM %s";
private static final String GET_INSERT_TIME_STATEMENT = "SELECT UNIX_TIMESTAMP(modified_time) * 1000 FROM %s WHERE flow_group = ? AND flow_name = ? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?";
private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" +
"flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, "
+ "flow_execution_id varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
Expand Down Expand Up @@ -157,6 +159,24 @@ public Collection<DagAction> getDagActions() throws IOException {
}, true);
}

@Override
public Optional<Long> getDagActionInsertTimeMillis(DagAction dagAction) throws IOException {
return dbStatementExecutor.withPreparedStatement(String.format(GET_INSERT_TIME_STATEMENT, tableName), getStatement -> {
fillPreparedStatement(dagAction.getFlowGroup(), dagAction.getFlowName(), dagAction.getFlowExecutionId(),
dagAction.getJobName(), dagAction.getDagActionType(), getStatement);
try (ResultSet rs = getStatement.executeQuery()) {
if (rs.next()) {
long millis = rs.getLong(1);
return rs.wasNull() ? Optional.<Long>empty() : Optional.of(millis);
}
return Optional.<Long>empty();
} catch (SQLException e) {
throw new IOException(String.format("Failure looking up insert time for DagAction: %s in table %s",
dagAction, tableName), e);
}
}, true);
}

private static void fillPreparedStatement(String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionType dagActionType, PreparedStatement statement)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dag
try {
FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId()));
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, getDagId().getFlowExecutionId());
stampDagActionInsertTimeOnFlowSpec(flowSpec, dagManagementStateStore);
Optional<Dag<JobExecutionPlan>> dag = this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
if (dag.isPresent()) {
dagManagementStateStore.addDag(dag.get());
Expand All @@ -82,6 +83,25 @@ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dag
}
}

/**
* Records the LAUNCH DagAction's persisted insert time (read from the {@link DagManagementStateStore}) onto the
* {@link FlowSpec}, so that compiled {@link JobExecutionPlan}s carry it forward for downstream latency
* instrumentation in {@code SpecProducer}s. Trigger type (scheduled vs ad-hoc) can be inferred downstream from
* the existing {@link ConfigurationKeys#JOB_SCHEDULE_KEY}; no companion key is added here. Best effort: any
* lookup failure is logged and swallowed so it cannot break flow launch.
*/
private void stampDagActionInsertTimeOnFlowSpec(FlowSpec flowSpec, DagManagementStateStore dagManagementStateStore) {
try {
Optional<Long> insertTimeMillis =
dagManagementStateStore.getDagActionInsertTimeMillis(getDagTask().getDagAction());
insertTimeMillis.ifPresent(millis ->
flowSpec.addProperty(ConfigurationKeys.DAG_ACTION_INSERT_TIME_MILLIS_KEY, millis));
} catch (Exception e) {
log.warn("Failed to stamp DagAction insert time on FlowSpec for dagId {}; latency metric will be skipped",
getDagId(), e);
}
}

@Override
protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag,
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;

import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -113,7 +114,40 @@ public void testDeleteAction() throws IOException {
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.RESUME));
Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL));
Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.RESUME));
}

@Test(dependsOnMethods = "testDeleteAction")
public void testGetDagActionInsertTimeMillisReturnsTimestampForExistingRow() throws Exception {
long beforeMillis = currentSecondAlignedMillis();
this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 99999L, jobName, DagActionStore.DagActionType.LAUNCH);
long afterMillis = System.currentTimeMillis();

Optional<Long> insertTimeMillis = this.mysqlDagActionStore.getDagActionInsertTimeMillis(
new DagActionStore.DagAction(flowGroup, flowName, 99999L, jobName, DagActionStore.DagActionType.LAUNCH));

Assert.assertTrue(insertTimeMillis.isPresent(), "Expected insert time to be returned for existing row");
long observed = insertTimeMillis.get();
// MySQL TIMESTAMP is second precision by default, so allow a 1-second floor.
Assert.assertTrue(observed >= beforeMillis - 1000,
"Expected " + observed + " to be >= " + (beforeMillis - 1000));
Assert.assertTrue(observed <= afterMillis + 1000,
"Expected " + observed + " to be <= " + (afterMillis + 1000));
}

@Test(dependsOnMethods = "testDeleteAction")
public void testGetDagActionInsertTimeMillisReturnsEmptyForMissingRow() throws Exception {
Optional<Long> insertTimeMillis = this.mysqlDagActionStore.getDagActionInsertTimeMillis(
new DagActionStore.DagAction("noSuchGroup", "noSuchName", 1L, "noJob",
DagActionStore.DagActionType.LAUNCH));
Assert.assertFalse(insertTimeMillis.isPresent(), "Expected empty Optional for missing row");
}

// The TIMESTAMP column truncates sub-second precision. Align the lower bound to the start of the current second
// so a row inserted at e.g. 12:00:00.700 (rounded to 12:00:00.000) does not register as "before" the test started.
private static long currentSecondAlignedMillis() {
long now = System.currentTimeMillis();
return now - (now % 1000);
}
}