/
JobVertex.java
671 lines (562 loc) · 22.9 KB
/
JobVertex.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobgraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** The base class for job vertexes. */
public class JobVertex implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_NAME = "(unnamed vertex)";
public static final int MAX_PARALLELISM_DEFAULT = -1;
// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
/** The ID of the vertex. */
private final JobVertexID id;
/**
* The IDs of all operators contained in this vertex.
*
* <p>The ID pairs are stored depth-first post-order; for the forking chain below the ID's would
* be stored as [D, E, B, C, A].
*
* <pre>
* A - B - D
* \ \
* C E
* </pre>
*
* <p>This is the same order that operators are stored in the {@code StreamTask}.
*/
private final List<OperatorIDPair> operatorIDs;
/** Produced data sets, one per writer. */
private final Map<IntermediateDataSetID, IntermediateDataSet> results = new LinkedHashMap<>();
/** List of edges with incoming data. One per Reader. */
private final List<JobEdge> inputs = new ArrayList<>();
/** The list of factories for operator coordinators. */
private final List<SerializedValue<OperatorCoordinator.Provider>> operatorCoordinators =
new ArrayList<>();
/** Number of subtasks to split this task into at runtime. */
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
/** Maximum number of subtasks to split this task into a runtime. */
private int maxParallelism = MAX_PARALLELISM_DEFAULT;
/** The minimum resource of the vertex. */
private ResourceSpec minResources = ResourceSpec.DEFAULT;
/** The preferred resource of the vertex. */
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
/** Custom configuration passed to the assigned task at runtime. */
private Configuration configuration;
/** The class of the invokable. */
private String invokableClassName;
/** Indicates of this job vertex is stoppable or not. */
private boolean isStoppable = false;
/** Optionally, a source of input splits. */
private InputSplitSource<?> inputSplitSource;
/**
* The name of the vertex. This will be shown in runtime logs and will be in the runtime
* environment.
*/
private String name;
/**
* Optionally, a sharing group that allows subtasks from different job vertices to run
* concurrently in one slot.
*/
@Nullable private SlotSharingGroup slotSharingGroup;
/** The group inside which the vertex subtasks share slots. */
@Nullable private CoLocationGroupImpl coLocationGroup;
/**
* Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON
* plan.
*/
private String operatorName;
/**
* Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce', to be
* included in the JSON plan.
*/
private String operatorDescription;
/** Optional, pretty name of the operator, to be displayed in the JSON plan. */
private String operatorPrettyName;
/**
* Optional, the JSON for the optimizer properties of the operator result, to be included in the
* JSON plan.
*/
private String resultOptimizerProperties;
/**
* The intermediateDataSetId of the cached intermediate dataset that the job vertex consumes.
*/
private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>();
/**
* Indicates whether this job vertex supports multiple attempts of the same subtask executing at
* the same time.
*/
private boolean supportsConcurrentExecutionAttempts = true;
private boolean anyOutputBlocking = false;
private boolean parallelismConfigured = false;
// --------------------------------------------------------------------------------------------
/**
* Constructs a new job vertex and assigns it with the given name.
*
* @param name The name of the new job vertex.
*/
public JobVertex(String name) {
this(name, null);
}
/**
* Constructs a new job vertex and assigns it with the given name.
*
* @param name The name of the new job vertex.
* @param id The id of the job vertex.
*/
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
OperatorIDPair operatorIDPair =
OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(this.id));
this.operatorIDs = Collections.singletonList(operatorIDPair);
}
/**
* Constructs a new job vertex and assigns it with the given name.
*
* @param name The name of the new job vertex.
* @param primaryId The id of the job vertex.
* @param operatorIDPairs The operator ID pairs of the job vertex.
*/
public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair> operatorIDPairs) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = primaryId == null ? new JobVertexID() : primaryId;
this.operatorIDs = Collections.unmodifiableList(operatorIDPairs);
}
// --------------------------------------------------------------------------------------------
/**
* Returns the ID of this job vertex.
*
* @return The ID of this job vertex
*/
public JobVertexID getID() {
return this.id;
}
/**
* Returns the name of the vertex.
*
* @return The name of the vertex.
*/
public String getName() {
return this.name;
}
/**
* Sets the name of the vertex.
*
* @param name The new name.
*/
public void setName(String name) {
this.name = name == null ? DEFAULT_NAME : name;
}
/**
* Returns the number of produced intermediate data sets.
*
* @return The number of produced intermediate data sets.
*/
public int getNumberOfProducedIntermediateDataSets() {
return this.results.size();
}
/**
* Returns the number of inputs.
*
* @return The number of inputs.
*/
public int getNumberOfInputs() {
return this.inputs.size();
}
public List<OperatorIDPair> getOperatorIDs() {
return operatorIDs;
}
/**
* Returns the vertex's configuration object which can be used to pass custom settings to the
* task at runtime.
*
* @return the vertex's configuration object
*/
public Configuration getConfiguration() {
if (this.configuration == null) {
this.configuration = new Configuration();
}
return this.configuration;
}
public void setInvokableClass(Class<? extends TaskInvokable> invokable) {
Preconditions.checkNotNull(invokable);
this.invokableClassName = invokable.getName();
}
// This method can only be called once when jobGraph generated
public void setParallelismConfigured(boolean parallelismConfigured) {
this.parallelismConfigured = parallelismConfigured;
}
public boolean isParallelismConfigured() {
return parallelismConfigured;
}
/**
* Returns the name of the invokable class which represents the task of this vertex.
*
* @return The name of the invokable class, <code>null</code> if not set.
*/
public String getInvokableClassName() {
return this.invokableClassName;
}
/**
* Returns the invokable class which represents the task of this vertex.
*
* @param cl The classloader used to resolve user-defined classes
* @return The invokable class, <code>null</code> if it is not set
*/
public Class<? extends TaskInvokable> getInvokableClass(ClassLoader cl) {
if (cl == null) {
throw new NullPointerException("The classloader must not be null.");
}
if (invokableClassName == null) {
return null;
}
try {
return Class.forName(invokableClassName, true, cl).asSubclass(TaskInvokable.class);
} catch (ClassNotFoundException e) {
throw new RuntimeException("The user-code class could not be resolved.", e);
} catch (ClassCastException e) {
throw new RuntimeException(
"The user-code class is no subclass of " + TaskInvokable.class.getName(), e);
}
}
/**
* Gets the parallelism of the task.
*
* @return The parallelism of the task.
*/
public int getParallelism() {
return parallelism;
}
/**
* Sets the parallelism for the task.
*
* @param parallelism The parallelism for the task.
*/
public void setParallelism(int parallelism) {
if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {
throw new IllegalArgumentException(
"The parallelism must be at least one, or "
+ ExecutionConfig.PARALLELISM_DEFAULT
+ " (unset).");
}
this.parallelism = parallelism;
}
/**
* Gets the maximum parallelism for the task.
*
* @return The maximum parallelism for the task.
*/
public int getMaxParallelism() {
return maxParallelism;
}
/**
* Sets the maximum parallelism for the task.
*
* @param maxParallelism The maximum parallelism to be set. must be between 1 and
* Short.MAX_VALUE + 1.
*/
public void setMaxParallelism(int maxParallelism) {
this.maxParallelism = maxParallelism;
}
/**
* Gets the minimum resource for the task.
*
* @return The minimum resource for the task.
*/
public ResourceSpec getMinResources() {
return minResources;
}
/**
* Gets the preferred resource for the task.
*
* @return The preferred resource for the task.
*/
public ResourceSpec getPreferredResources() {
return preferredResources;
}
/**
* Sets the minimum and preferred resources for the task.
*
* @param minResources The minimum resource for the task.
* @param preferredResources The preferred resource for the task.
*/
public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
this.minResources = checkNotNull(minResources);
this.preferredResources = checkNotNull(preferredResources);
}
public InputSplitSource<?> getInputSplitSource() {
return inputSplitSource;
}
public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
this.inputSplitSource = inputSplitSource;
}
public List<IntermediateDataSet> getProducedDataSets() {
return new ArrayList<>(results.values());
}
public List<JobEdge> getInputs() {
return this.inputs;
}
public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators() {
return Collections.unmodifiableList(operatorCoordinators);
}
public void addOperatorCoordinator(
SerializedValue<OperatorCoordinator.Provider> serializedCoordinatorProvider) {
operatorCoordinators.add(serializedCoordinatorProvider);
}
/**
* Associates this vertex with a slot sharing group for scheduling. Different vertices in the
* same slot sharing group can run one subtask each in the same slot.
*
* @param grp The slot sharing group to associate the vertex with.
*/
public void setSlotSharingGroup(SlotSharingGroup grp) {
checkNotNull(grp);
if (this.slotSharingGroup != null) {
this.slotSharingGroup.removeVertexFromGroup(this.getID());
}
grp.addVertexToGroup(this.getID());
this.slotSharingGroup = grp;
}
/**
* Gets the slot sharing group that this vertex is associated with. Different vertices in the
* same slot sharing group can run one subtask each in the same slot.
*
* @return The slot sharing group to associate the vertex with
*/
public SlotSharingGroup getSlotSharingGroup() {
if (slotSharingGroup == null) {
// create a new slot sharing group for this vertex if it was in no other slot sharing
// group.
// this should only happen in testing cases at the moment because production code path
// will
// always set a value to it before used
setSlotSharingGroup(new SlotSharingGroup());
}
return slotSharingGroup;
}
/**
* Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
* Strict co-location implies that the n'th subtask of this vertex will run on the same parallel
* computing instance (TaskManager) as the n'th subtask of the given vertex.
*
* <p>NOTE: Co-location is only possible between vertices in a slot sharing group.
*
* <p>NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That
* means that the respective vertex must be a (transitive) input of this vertex.
*
* @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks
* with.
* @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are
* not in a common slot sharing group.
* @see #setSlotSharingGroup(SlotSharingGroup)
*/
public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
if (this.slotSharingGroup == null
|| this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
throw new IllegalArgumentException(
"Strict co-location requires that both vertices are in the same slot sharing group.");
}
CoLocationGroupImpl thisGroup = this.coLocationGroup;
CoLocationGroupImpl otherGroup = strictlyCoLocatedWith.coLocationGroup;
if (otherGroup == null) {
if (thisGroup == null) {
CoLocationGroupImpl group = new CoLocationGroupImpl(this, strictlyCoLocatedWith);
this.coLocationGroup = group;
strictlyCoLocatedWith.coLocationGroup = group;
} else {
thisGroup.addVertex(strictlyCoLocatedWith);
strictlyCoLocatedWith.coLocationGroup = thisGroup;
}
} else {
if (thisGroup == null) {
otherGroup.addVertex(this);
this.coLocationGroup = otherGroup;
} else {
// both had yet distinct groups, we need to merge them
thisGroup.mergeInto(otherGroup);
}
}
}
@Nullable
public CoLocationGroup getCoLocationGroup() {
return coLocationGroup;
}
public void updateCoLocationGroup(CoLocationGroupImpl group) {
this.coLocationGroup = group;
}
// --------------------------------------------------------------------------------------------
public IntermediateDataSet getOrCreateResultDataSet(
IntermediateDataSetID id, ResultPartitionType partitionType) {
anyOutputBlocking |= partitionType.isBlockingOrBlockingPersistentResultPartition();
return this.results.computeIfAbsent(
id, key -> new IntermediateDataSet(id, partitionType, this));
}
public JobEdge connectNewDataSetAsInput(
JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) {
return connectNewDataSetAsInput(input, distPattern, partitionType, false);
}
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType,
boolean isBroadcast) {
return connectNewDataSetAsInput(
input, distPattern, partitionType, new IntermediateDataSetID(), isBroadcast);
}
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType,
IntermediateDataSetID intermediateDataSetId,
boolean isBroadcast) {
IntermediateDataSet dataSet =
input.getOrCreateResultDataSet(intermediateDataSetId, partitionType);
JobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast);
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
// --------------------------------------------------------------------------------------------
public boolean isInputVertex() {
return this.inputs.isEmpty();
}
public boolean isStoppable() {
return this.isStoppable;
}
public boolean isOutputVertex() {
return this.results.isEmpty();
}
public boolean hasNoConnectedInputs() {
return inputs.isEmpty();
}
public void setSupportsConcurrentExecutionAttempts(
boolean supportsConcurrentExecutionAttempts) {
this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
}
public boolean isSupportsConcurrentExecutionAttempts() {
return supportsConcurrentExecutionAttempts;
}
public boolean isAnyOutputBlocking() {
return anyOutputBlocking;
}
// --------------------------------------------------------------------------------------------
/**
* A hook that can be overwritten by sub classes to implement logic that is called by the master
* when the job starts.
*
* @param context Provides contextual information for the initialization
* @throws Exception The method may throw exceptions which cause the job to fail immediately.
*/
public void initializeOnMaster(InitializeOnMasterContext context) throws Exception {}
/**
* A hook that can be overwritten by sub classes to implement logic that is called by the master
* after the job completed.
*
* @param context Provides contextual information for the initialization
* @throws Exception The method may throw exceptions which cause the job to fail immediately.
*/
public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception {}
public interface InitializeOnMasterContext {
/** The class loader for user defined code. */
ClassLoader getClassLoader();
/**
* The actual parallelism this vertex will be run with. In contrast, the {@link
* #getParallelism()} is the original parallelism set when creating the {@link JobGraph} and
* might be updated e.g. by the {@link
* org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
*/
int getExecutionParallelism();
}
/** The context exposes some runtime infos for finalization. */
public interface FinalizeOnMasterContext {
/** The class loader for user defined code. */
ClassLoader getClassLoader();
/**
* The actual parallelism this vertex will be run with. In contrast, the {@link
* #getParallelism()} is the original parallelism set when creating the {@link JobGraph} and
* might be updated e.g. by the {@link
* org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
*/
int getExecutionParallelism();
/**
* Get the finished attempt number of subtask.
*
* @param subtaskIndex the subtask index.
* @return the finished attempt.
* @throws IllegalArgumentException Thrown, if subtaskIndex is invalid.
*/
int getFinishedAttempt(int subtaskIndex);
}
// --------------------------------------------------------------------------------------------
public String getOperatorName() {
return operatorName;
}
public void setOperatorName(String operatorName) {
this.operatorName = operatorName;
}
public String getOperatorDescription() {
return operatorDescription;
}
public void setOperatorDescription(String operatorDescription) {
this.operatorDescription = operatorDescription;
}
public void setOperatorPrettyName(String operatorPrettyName) {
this.operatorPrettyName = operatorPrettyName;
}
public String getOperatorPrettyName() {
return operatorPrettyName;
}
public String getResultOptimizerProperties() {
return resultOptimizerProperties;
}
public void setResultOptimizerProperties(String resultOptimizerProperties) {
this.resultOptimizerProperties = resultOptimizerProperties;
}
public void addIntermediateDataSetIdToConsume(IntermediateDataSetID intermediateDataSetId) {
intermediateDataSetIdsToConsume.add(intermediateDataSetId);
}
public List<IntermediateDataSetID> getIntermediateDataSetIdsToConsume() {
return intermediateDataSetIdsToConsume;
}
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
return this.name + " (" + this.invokableClassName + ')';
}
}