diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 07215bf320b..3d6ae78c9d1 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -179,6 +179,14 @@ public class ConfigurationKeys { public static final String FLOW_EDGE_ID_KEY = "flow.edgeId"; public static final String FLOW_DESCRIPTION_KEY = "flow.description"; public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId"; + /** + * Epoch milliseconds at which the LAUNCH {@link org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction} + * for this flow execution was inserted into the {@link org.apache.gobblin.service.modules.orchestration.DagActionStore}. + * Set on the {@link org.apache.gobblin.runtime.api.FlowSpec} during {@code LaunchDagProc} initialization so that + * downstream {@link org.apache.gobblin.runtime.api.SpecProducer} implementations can compute end-to-end launch latency. + * Absent if the underlying store cannot report it. + */ + public static final String DAG_ACTION_INSERT_TIME_MILLIS_KEY = "flow.dagAction.insertTimeMillis"; public static final String FLOW_FAILURE_OPTION = "flow.failureOption"; public static final String FLOW_APPLY_RETENTION = "flow.applyRetention"; public static final String FLOW_APPLY_INPUT_RETENTION = "flow.applyInputRetention"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java index ec8aa84281a..07803f1def7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.Collection; +import java.util.Optional; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -197,4 +198,21 @@ default void addFlowDagAction(String flowGroup, String flowName, long flowExecut * @throws IOException Exception in retrieving {@link DagAction}s. */ Collection getDagActions() throws IOException; + + /** + * Returns the time the given {@link DagAction} was inserted into the store, in epoch milliseconds, or + * {@link Optional#empty()} if the row is not present (e.g. already deleted by retention or post-processing + * cleanup). Implementations that cannot supply this information may return {@link Optional#empty()}. + * + *

The returned timestamp is read directly from the underlying store, so it is consistent across all + * processes regardless of local clock skew. Note that the underlying column may be persisted at second + * precision by some implementations. + * + * @param dagAction the action to look up + * @return the insert time in epoch milliseconds, or empty if unknown + * @throws IOException on lookup failure + */ + default Optional getDagActionInsertTimeMillis(DagAction dagAction) throws IOException { + return Optional.empty(); + } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 8059eab4e1c..a3572f130ee 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -229,4 +229,12 @@ default void addFlowDagAction(String flowGroup, String flowName, long flowExecut * @throws IOException Exception in retrieving {@link DagActionStore.DagAction}s. */ Collection getDagActions() throws IOException; + + /** + * See {@link DagActionStore#getDagActionInsertTimeMillis(DagActionStore.DagAction)}. + * @throws IOException on lookup failure + */ + default Optional getDagActionInsertTimeMillis(DagActionStore.DagAction dagAction) throws IOException { + return Optional.empty(); + } } \ No newline at end of file diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index b14d6bc85c2..ee2954a0d63 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -258,4 +258,9 @@ public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOExce public Collection getDagActions() throws IOException { return this.dagActionStore.getDagActions(); } + + @Override + public Optional getDagActionInsertTimeMillis(DagActionStore.DagAction dagAction) throws IOException { + return this.dagActionStore.getDagActionInsertTimeMillis(dagAction); + } } \ No newline at end of file diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java index 0e4d10940ce..79de4fb8f28 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java @@ -24,6 +24,7 @@ import java.sql.SQLException; import java.util.Collection; import java.util.HashSet; +import java.util.Optional; import java.util.concurrent.TimeUnit; import com.google.inject.Inject; @@ -55,6 +56,7 @@ public class MysqlDagActionStore implements DagActionStore { + "VALUES (?, ?, ?, ?, ?)"; private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?"; private static final String GET_ALL_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, job_name, dag_action FROM %s"; + private static final String GET_INSERT_TIME_STATEMENT = "SELECT UNIX_TIMESTAMP(modified_time) * 1000 FROM %s WHERE flow_group = ? AND flow_name = ? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?"; private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, " @@ -157,6 +159,24 @@ public Collection getDagActions() throws IOException { }, true); } + @Override + public Optional getDagActionInsertTimeMillis(DagAction dagAction) throws IOException { + return dbStatementExecutor.withPreparedStatement(String.format(GET_INSERT_TIME_STATEMENT, tableName), getStatement -> { + fillPreparedStatement(dagAction.getFlowGroup(), dagAction.getFlowName(), dagAction.getFlowExecutionId(), + dagAction.getJobName(), dagAction.getDagActionType(), getStatement); + try (ResultSet rs = getStatement.executeQuery()) { + if (rs.next()) { + long millis = rs.getLong(1); + return rs.wasNull() ? Optional.empty() : Optional.of(millis); + } + return Optional.empty(); + } catch (SQLException e) { + throw new IOException(String.format("Failure looking up insert time for DagAction: %s in table %s", + dagAction, tableName), e); + } + }, true); + } + private static void fillPreparedStatement(String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionType dagActionType, PreparedStatement statement) throws SQLException { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java index f577667af0c..e9adccd1994 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java @@ -69,6 +69,7 @@ protected Optional> initialize(DagManagementStateStore dag try { FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId())); flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, getDagId().getFlowExecutionId()); + stampDagActionInsertTimeOnFlowSpec(flowSpec, dagManagementStateStore); Optional> dag = this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil(); if (dag.isPresent()) { dagManagementStateStore.addDag(dag.get()); @@ -82,6 +83,25 @@ protected Optional> initialize(DagManagementStateStore dag } } + /** + * Records the LAUNCH DagAction's persisted insert time (read from the {@link DagManagementStateStore}) onto the + * {@link FlowSpec}, so that compiled {@link JobExecutionPlan}s carry it forward for downstream latency + * instrumentation in {@code SpecProducer}s. Trigger type (scheduled vs ad-hoc) can be inferred downstream from + * the existing {@link ConfigurationKeys#JOB_SCHEDULE_KEY}; no companion key is added here. Best effort: any + * lookup failure is logged and swallowed so it cannot break flow launch. + */ + private void stampDagActionInsertTimeOnFlowSpec(FlowSpec flowSpec, DagManagementStateStore dagManagementStateStore) { + try { + Optional insertTimeMillis = + dagManagementStateStore.getDagActionInsertTimeMillis(getDagTask().getDagAction()); + insertTimeMillis.ifPresent(millis -> + flowSpec.addProperty(ConfigurationKeys.DAG_ACTION_INSERT_TIME_MILLIS_KEY, millis)); + } catch (Exception e) { + log.warn("Failed to stamp DagAction insert time on FlowSpec for dagId {}; latency metric will be skipped", + getDagId(), e); + } + } + @Override protected void act(DagManagementStateStore dagManagementStateStore, Optional> dag, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java index 5c5ea918c41..8feaef4cea7 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java @@ -21,6 +21,7 @@ import java.net.URISyntaxException; import java.util.Collection; import java.util.HashSet; +import java.util.Optional; import org.mockito.Mockito; import org.testng.Assert; @@ -113,7 +114,40 @@ public void testDeleteAction() throws IOException { new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.KILL)); Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2); Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.KILL)); - Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.RESUME)); Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL)); + Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.RESUME)); + } + + @Test(dependsOnMethods = "testDeleteAction") + public void testGetDagActionInsertTimeMillisReturnsTimestampForExistingRow() throws Exception { + long beforeMillis = currentSecondAlignedMillis(); + this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 99999L, jobName, DagActionStore.DagActionType.LAUNCH); + long afterMillis = System.currentTimeMillis(); + + Optional insertTimeMillis = this.mysqlDagActionStore.getDagActionInsertTimeMillis( + new DagActionStore.DagAction(flowGroup, flowName, 99999L, jobName, DagActionStore.DagActionType.LAUNCH)); + + Assert.assertTrue(insertTimeMillis.isPresent(), "Expected insert time to be returned for existing row"); + long observed = insertTimeMillis.get(); + // MySQL TIMESTAMP is second precision by default, so allow a 1-second floor. + Assert.assertTrue(observed >= beforeMillis - 1000, + "Expected " + observed + " to be >= " + (beforeMillis - 1000)); + Assert.assertTrue(observed <= afterMillis + 1000, + "Expected " + observed + " to be <= " + (afterMillis + 1000)); + } + + @Test(dependsOnMethods = "testDeleteAction") + public void testGetDagActionInsertTimeMillisReturnsEmptyForMissingRow() throws Exception { + Optional insertTimeMillis = this.mysqlDagActionStore.getDagActionInsertTimeMillis( + new DagActionStore.DagAction("noSuchGroup", "noSuchName", 1L, "noJob", + DagActionStore.DagActionType.LAUNCH)); + Assert.assertFalse(insertTimeMillis.isPresent(), "Expected empty Optional for missing row"); + } + + // The TIMESTAMP column truncates sub-second precision. Align the lower bound to the start of the current second + // so a row inserted at e.g. 12:00:00.700 (rounded to 12:00:00.000) does not register as "before" the test started. + private static long currentSecondAlignedMillis() { + long now = System.currentTimeMillis(); + return now - (now % 1000); } } \ No newline at end of file