-
Notifications
You must be signed in to change notification settings - Fork 13k
/
ArchivedExecutionGraph.java
473 lines (407 loc) · 19 KB
/
ArchivedExecutionGraph.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
/** An archived execution graph represents a serializable form of an {@link ExecutionGraph}. */
public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
private static final long serialVersionUID = 7231383912742578428L;
// --------------------------------------------------------------------------------------------
/** The ID of the job this graph has been built for. */
private final JobID jobID;
/** The name of the original job graph. */
private final String jobName;
/** All job vertices that are part of this graph. */
private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
/** All vertices, in the order in which they were created. * */
private final List<ArchivedExecutionJobVertex> verticesInCreationOrder;
/**
* Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
* execution graph transitioned into a certain state. The index into this array is the ordinal
* of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
* stateTimestamps[RUNNING.ordinal()]}.
*/
private final long[] stateTimestamps;
// ------ Configuration of the Execution -------
// ------ Execution status and progress. These values are volatile, and accessed under the lock
// -------
/** Current status of the job execution. */
private final JobStatus state;
/**
* The exception that caused the job to fail. This is set to the first root exception that was
* not recoverable and triggered job failure
*/
@Nullable private final ErrorInfo failureCause;
// ------ Fields that are only relevant for archived execution graphs ------------
private final String jsonPlan;
private final StringifiedAccumulatorResult[] archivedUserAccumulators;
private final ArchivedExecutionConfig archivedExecutionConfig;
private final boolean isStoppable;
private final Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators;
@Nullable private final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
@Nullable private final CheckpointStatsSnapshot checkpointStatsSnapshot;
@Nullable private final String stateBackendName;
@Nullable private final String checkpointStorageName;
@Nullable private final TernaryBoolean stateChangelogEnabled;
@Nullable private final String changelogStorageName;
public ArchivedExecutionGraph(
JobID jobID,
String jobName,
Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
List<ArchivedExecutionJobVertex> verticesInCreationOrder,
long[] stateTimestamps,
JobStatus state,
@Nullable ErrorInfo failureCause,
String jsonPlan,
StringifiedAccumulatorResult[] archivedUserAccumulators,
Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators,
ArchivedExecutionConfig executionConfig,
boolean isStoppable,
@Nullable CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot,
@Nullable String stateBackendName,
@Nullable String checkpointStorageName,
@Nullable TernaryBoolean stateChangelogEnabled,
@Nullable String changelogStorageName) {
this.jobID = Preconditions.checkNotNull(jobID);
this.jobName = Preconditions.checkNotNull(jobName);
this.tasks = Preconditions.checkNotNull(tasks);
this.verticesInCreationOrder = Preconditions.checkNotNull(verticesInCreationOrder);
this.stateTimestamps = Preconditions.checkNotNull(stateTimestamps);
this.state = Preconditions.checkNotNull(state);
this.failureCause = failureCause;
this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
this.archivedUserAccumulators = Preconditions.checkNotNull(archivedUserAccumulators);
this.serializedUserAccumulators = Preconditions.checkNotNull(serializedUserAccumulators);
this.archivedExecutionConfig = Preconditions.checkNotNull(executionConfig);
this.isStoppable = isStoppable;
this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
this.checkpointStatsSnapshot = checkpointStatsSnapshot;
this.stateBackendName = stateBackendName;
this.checkpointStorageName = checkpointStorageName;
this.stateChangelogEnabled = stateChangelogEnabled;
this.changelogStorageName = changelogStorageName;
}
// --------------------------------------------------------------------------------------------
@Override
public String getJsonPlan() {
return jsonPlan;
}
@Override
public JobID getJobID() {
return jobID;
}
@Override
public String getJobName() {
return jobName;
}
@Override
public JobStatus getState() {
return state;
}
@Nullable
@Override
public ErrorInfo getFailureInfo() {
return failureCause;
}
@Override
public ArchivedExecutionJobVertex getJobVertex(JobVertexID id) {
return this.tasks.get(id);
}
@Override
public Map<JobVertexID, AccessExecutionJobVertex> getAllVertices() {
return Collections.<JobVertexID, AccessExecutionJobVertex>unmodifiableMap(this.tasks);
}
@Override
public Iterable<ArchivedExecutionJobVertex> getVerticesTopologically() {
// we return a specific iterator that does not fail with concurrent modifications
// the list is append only, so it is safe for that
final int numElements = this.verticesInCreationOrder.size();
return new Iterable<ArchivedExecutionJobVertex>() {
@Override
public Iterator<ArchivedExecutionJobVertex> iterator() {
return new Iterator<ArchivedExecutionJobVertex>() {
private int pos = 0;
@Override
public boolean hasNext() {
return pos < numElements;
}
@Override
public ArchivedExecutionJobVertex next() {
if (hasNext()) {
return verticesInCreationOrder.get(pos++);
} else {
throw new NoSuchElementException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
@Override
public Iterable<ArchivedExecutionVertex> getAllExecutionVertices() {
return new Iterable<ArchivedExecutionVertex>() {
@Override
public Iterator<ArchivedExecutionVertex> iterator() {
return new AllVerticesIterator<>(getVerticesTopologically().iterator());
}
};
}
@Override
public long getStatusTimestamp(JobStatus status) {
return this.stateTimestamps[status.ordinal()];
}
@Override
public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
return jobCheckpointingConfiguration;
}
@Override
public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
return checkpointStatsSnapshot;
}
@Override
public ArchivedExecutionConfig getArchivedExecutionConfig() {
return archivedExecutionConfig;
}
@Override
public boolean isStoppable() {
return isStoppable;
}
@Override
public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
return archivedUserAccumulators;
}
@Override
public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized() {
return serializedUserAccumulators;
}
@Override
public Optional<String> getStateBackendName() {
return Optional.ofNullable(stateBackendName);
}
@Override
public Optional<String> getCheckpointStorageName() {
return Optional.ofNullable(checkpointStorageName);
}
@Override
public TernaryBoolean isChangelogStateBackendEnabled() {
return stateChangelogEnabled;
}
@Override
public Optional<String> getChangelogStorageName() {
return Optional.ofNullable(changelogStorageName);
}
/**
* Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}.
*
* @param executionGraph to create the ArchivedExecutionGraph from
* @return ArchivedExecutionGraph created from the given ExecutionGraph
*/
public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
return createFrom(executionGraph, null);
}
/**
* Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}.
*
* @param executionGraph to create the ArchivedExecutionGraph from
* @param statusOverride optionally overrides the JobStatus of the ExecutionGraph with a
* non-globally-terminal state and clears timestamps of globally-terminal states
* @return ArchivedExecutionGraph created from the given ExecutionGraph
*/
public static ArchivedExecutionGraph createFrom(
ExecutionGraph executionGraph, @Nullable JobStatus statusOverride) {
Preconditions.checkArgument(
statusOverride == null || !statusOverride.isGloballyTerminalState(),
"Status override is only allowed for non-globally-terminal states.");
Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();
List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
for (ExecutionJobVertex task : executionGraph.getVerticesTopologically()) {
ArchivedExecutionJobVertex archivedTask = task.archive();
archivedVerticesInCreationOrder.add(archivedTask);
archivedTasks.put(task.getJobVertexId(), archivedTask);
}
final Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators =
executionGraph.getAccumulatorsSerialized();
final long[] timestamps = new long[JobStatus.values().length];
// if the state is overridden with a non-globally-terminal state then we need to erase
// traces of globally-terminal states for consistency
final boolean clearGloballyTerminalStateTimestamps = statusOverride != null;
for (JobStatus jobStatus : JobStatus.values()) {
final int ordinal = jobStatus.ordinal();
if (!(clearGloballyTerminalStateTimestamps && jobStatus.isGloballyTerminalState())) {
timestamps[ordinal] = executionGraph.getStatusTimestamp(jobStatus);
}
}
return new ArchivedExecutionGraph(
executionGraph.getJobID(),
executionGraph.getJobName(),
archivedTasks,
archivedVerticesInCreationOrder,
timestamps,
statusOverride == null ? executionGraph.getState() : statusOverride,
executionGraph.getFailureInfo(),
executionGraph.getJsonPlan(),
executionGraph.getAccumulatorResultsStringified(),
serializedUserAccumulators,
executionGraph.getArchivedExecutionConfig(),
executionGraph.isStoppable(),
executionGraph.getCheckpointCoordinatorConfiguration(),
executionGraph.getCheckpointStatsSnapshot(),
executionGraph.getStateBackendName().orElse(null),
executionGraph.getCheckpointStorageName().orElse(null),
executionGraph.isChangelogStateBackendEnabled(),
executionGraph.getChangelogStorageName().orElse(null));
}
/**
* Create a sparse ArchivedExecutionGraph for a job. Most fields will be empty, only job status
* and error-related fields are set.
*/
public static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
JobID jobId,
String jobName,
JobStatus jobStatus,
@Nullable Throwable throwable,
@Nullable JobCheckpointingSettings checkpointingSettings,
long initializationTimestamp) {
return createSparseArchivedExecutionGraph(
jobId,
jobName,
jobStatus,
Collections.emptyMap(),
Collections.emptyList(),
throwable,
checkpointingSettings,
initializationTimestamp);
}
public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVertices(
JobID jobId,
String jobName,
JobStatus jobStatus,
@Nullable Throwable throwable,
@Nullable JobCheckpointingSettings checkpointingSettings,
long initializationTimestamp,
Iterable<JobVertex> jobVertices,
VertexParallelismStore initialParallelismStore) {
final Map<JobVertexID, ArchivedExecutionJobVertex> archivedJobVertices = new HashMap<>();
final List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
for (JobVertex jobVertex : jobVertices) {
final VertexParallelismInformation parallelismInfo =
initialParallelismStore.getParallelismInfo(jobVertex.getID());
ArchivedExecutionJobVertex archivedJobVertex =
new ArchivedExecutionJobVertex(
new ArchivedExecutionVertex[0],
jobVertex.getID(),
jobVertex.getName(),
parallelismInfo.getParallelism(),
parallelismInfo.getMaxParallelism(),
jobVertex.getSlotSharingGroup(),
ResourceProfile.fromResourceSpec(
jobVertex.getMinResources(), MemorySize.ZERO),
new StringifiedAccumulatorResult[0]);
archivedVerticesInCreationOrder.add(archivedJobVertex);
archivedJobVertices.put(archivedJobVertex.getJobVertexId(), archivedJobVertex);
}
return createSparseArchivedExecutionGraph(
jobId,
jobName,
jobStatus,
archivedJobVertices,
archivedVerticesInCreationOrder,
throwable,
checkpointingSettings,
initializationTimestamp);
}
private static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
JobID jobId,
String jobName,
JobStatus jobStatus,
Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks,
List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder,
@Nullable Throwable throwable,
@Nullable JobCheckpointingSettings checkpointingSettings,
long initializationTimestamp) {
final Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators =
Collections.emptyMap();
StringifiedAccumulatorResult[] archivedUserAccumulators =
new StringifiedAccumulatorResult[] {};
final long[] timestamps = new long[JobStatus.values().length];
timestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
String jsonPlan = "{}";
ErrorInfo failureInfo = null;
if (throwable != null) {
Preconditions.checkState(
jobStatus == JobStatus.FAILED || jobStatus == JobStatus.SUSPENDED);
long failureTime = System.currentTimeMillis();
failureInfo = new ErrorInfo(throwable, failureTime);
timestamps[jobStatus.ordinal()] = failureTime;
}
return new ArchivedExecutionGraph(
jobId,
jobName,
archivedTasks,
archivedVerticesInCreationOrder,
timestamps,
jobStatus,
failureInfo,
jsonPlan,
archivedUserAccumulators,
serializedUserAccumulators,
new ExecutionConfig().archive(),
false,
checkpointingSettings == null
? null
: checkpointingSettings.getCheckpointCoordinatorConfiguration(),
checkpointingSettings == null ? null : CheckpointStatsSnapshot.empty(),
checkpointingSettings == null ? null : "Unknown",
checkpointingSettings == null ? null : "Unknown",
checkpointingSettings == null
? TernaryBoolean.UNDEFINED
: checkpointingSettings.isChangelogStateBackendEnabled(),
checkpointingSettings == null ? null : "Unknown");
}
}