/
Plan.java
393 lines (347 loc) · 12.8 KB
/
Plan.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Visitable;
import org.apache.flink.util.Visitor;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* This class represents Flink programs, in the form of dataflow plans.
*
* <p>The dataflow is referenced by the data sinks, from which all connected
* operators of the data flow can be reached via backwards traversal</p>.
*/
@Internal
public class Plan implements Visitable<Operator<?>> {
/**
* A collection of all sinks in the plan. Since the plan is traversed from the sinks to the sources, this
* collection must contain all the sinks.
*/
protected final List<GenericDataSinkBase<?>> sinks = new ArrayList<>(4);
/** The name of the job. */
protected String jobName;
/** The default parallelism to use for nodes that have no explicitly specified parallelism. */
protected int defaultParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
/** Hash map for files in the distributed cache: registered name to cache entry. */
protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<>();
/** Config object for runtime execution parameters. */
protected ExecutionConfig executionConfig;
/** The ID of the Job that this dataflow plan belongs to */
private JobID jobId;
private long sessionTimeout;
// ------------------------------------------------------------------------
/**
* Creates a new dataflow plan with the given name, describing the data flow that ends at the
* given data sinks.
*
* <p>If not all of the sinks of a data flow are given to the plan, the flow might
* not be translated entirely.</p>
*
* @param sinks The collection will the sinks of the job's data flow.
* @param jobName The name to display for the job.
*/
public Plan(Collection<? extends GenericDataSinkBase<?>> sinks, String jobName) {
this(sinks, jobName, ExecutionConfig.PARALLELISM_DEFAULT);
}
/**
* Creates a new program plan with the given name and default parallelism, describing the data flow that ends
* at the given data sinks.
* <p>
* If not all of the sinks of a data flow are given to the plan, the flow might
* not be translated entirely.
*
* @param sinks The collection will the sinks of the job's data flow.
* @param jobName The name to display for the job.
* @param defaultParallelism The default parallelism for the job.
*/
public Plan(Collection<? extends GenericDataSinkBase<?>> sinks, String jobName, int defaultParallelism) {
this.sinks.addAll(sinks);
this.jobName = jobName;
this.defaultParallelism = defaultParallelism;
}
/**
* Creates a new program plan with the given name, containing initially a single data sink.
* <p>
* If not all of the sinks of a data flow are given, the flow might
* not be translated entirely, but only the parts of the flow reachable by traversing backwards
* from the given data sinks.
*
* @param sink The data sink of the data flow.
* @param jobName The name to display for the job.
*/
public Plan(GenericDataSinkBase<?> sink, String jobName) {
this(sink, jobName, ExecutionConfig.PARALLELISM_DEFAULT);
}
/**
* Creates a new program plan with the given name and default parallelism, containing initially a single data
* sink.
* <p>
* If not all of the sinks of a data flow are given, the flow might
* not be translated entirely, but only the parts of the flow reachable by traversing backwards
* from the given data sinks.
*
* @param sink The data sink of the data flow.
* @param jobName The name to display for the job.
* @param defaultParallelism The default parallelism for the job.
*/
public Plan(GenericDataSinkBase<?> sink, String jobName, int defaultParallelism) {
this(Collections.<GenericDataSinkBase<?>>singletonList(sink), jobName, defaultParallelism);
}
/**
* Creates a new program plan, describing the data flow that ends at the
* given data sinks. The display name for the job is generated using a timestamp.
* <p>
* If not all of the sinks of a data flow are given, the flow might
* not be translated entirely, but only the parts of the flow reachable by traversing backwards
* from the given data sinks.
*
* @param sinks The collection will the sinks of the data flow.
*/
public Plan(Collection<? extends GenericDataSinkBase<?>> sinks) {
this(sinks, ExecutionConfig.PARALLELISM_DEFAULT);
}
/**
* Creates a new program plan with the given default parallelism, describing the data flow that ends at the
* given data sinks. The display name for the job is generated using a timestamp.
* <p>
* If not all of the sinks of a data flow are given, the flow might
* not be translated entirely, but only the parts of the flow reachable by traversing backwards
* from the given data sinks.
*
* @param sinks The collection will the sinks of the data flow.
* @param defaultParallelism The default parallelism for the job.
*/
public Plan(Collection<? extends GenericDataSinkBase<?>> sinks, int defaultParallelism) {
this(sinks, "Flink Job at " + Calendar.getInstance().getTime(), defaultParallelism);
}
/**
* Creates a new program plan with single data sink.
* The display name for the job is generated using a timestamp.
* <p>
* If not all of the sinks of a data flow are given to the plan, the flow might
* not be translated entirely.
*
* @param sink The data sink of the data flow.
*/
public Plan(GenericDataSinkBase<?> sink) {
this(sink, ExecutionConfig.PARALLELISM_DEFAULT);
}
/**
* Creates a new program plan with single data sink and the given default parallelism.
* The display name for the job is generated using a timestamp.
* <p>
* If not all of the sinks of a data flow are given to the plan, the flow might
* not be translated entirely.
*
* @param sink The data sink of the data flow.
* @param defaultParallelism The default parallelism for the job.
*/
public Plan(GenericDataSinkBase<?> sink, int defaultParallelism) {
this(sink, "Flink Job at " + Calendar.getInstance().getTime(), defaultParallelism);
}
// ------------------------------------------------------------------------
/**
* Adds a data sink to the set of sinks in this program.
*
* @param sink The data sink to add.
*/
public void addDataSink(GenericDataSinkBase<?> sink) {
checkNotNull(jobName, "The data sink must not be null.");
if (!this.sinks.contains(sink)) {
this.sinks.add(sink);
}
}
/**
* Gets all the data sinks of this job.
*
* @return All sinks of the program.
*/
public Collection<? extends GenericDataSinkBase<?>> getDataSinks() {
return this.sinks;
}
/**
* Gets the name of this job.
*
* @return The name of the job.
*/
public String getJobName() {
return this.jobName;
}
/**
* Sets the jobName for this Plan.
*
* @param jobName The jobName to set.
*/
public void setJobName(String jobName) {
checkNotNull(jobName, "The job name must not be null.");
this.jobName = jobName;
}
/**
* Gets the ID of the job that the dataflow plan belongs to.
* If this ID is not set, then the dataflow represents its own
* independent job.
*
* @return The ID of the job that the dataflow plan belongs to.
*/
public JobID getJobId() {
return jobId;
}
/**
* Sets the ID of the job that the dataflow plan belongs to.
* If this ID is set to {@code null}, then the dataflow represents its own
* independent job.
*
* @param jobId The ID of the job that the dataflow plan belongs to.
*/
public void setJobId(JobID jobId) {
this.jobId = jobId;
}
public void setSessionTimeout(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
public long getSessionTimeout() {
return sessionTimeout;
}
/**
* Gets the default parallelism for this job. That degree is always used when an operator
* is not explicitly given a parallelism.
*
* @return The default parallelism for the plan.
*/
public int getDefaultParallelism() {
return this.defaultParallelism;
}
/**
* Sets the default parallelism for this plan. That degree is always used when an operator
* is not explicitly given a parallelism.
*
* @param defaultParallelism The default parallelism for the plan.
*/
public void setDefaultParallelism(int defaultParallelism) {
checkArgument(defaultParallelism >= 1 || defaultParallelism == ExecutionConfig.PARALLELISM_DEFAULT,
"The default parallelism must be positive, or ExecutionConfig.PARALLELISM_DEFAULT if the system should use the globally configured default.");
this.defaultParallelism = defaultParallelism;
}
/**
* Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
* for data types and is specific to a particular data model (record, tuple, Scala, ...)
*
* @return The name of the class implementing the optimizer post-pass.
*/
public String getPostPassClassName() {
return "org.apache.flink.optimizer.postpass.JavaApiPostPass";
}
/**
* Gets the execution config object.
*
* @return The execution config object.
*/
public ExecutionConfig getExecutionConfig() {
if(executionConfig == null) {
throw new RuntimeException("Execution config has not been set properly for this plan");
}
return executionConfig;
}
/**
* Sets the runtime config object defining execution parameters.
*
* @param executionConfig The execution config to use.
*/
public void setExecutionConfig(ExecutionConfig executionConfig) {
this.executionConfig = executionConfig;
}
// ------------------------------------------------------------------------
/**
* Traverses the job depth first from all data sinks on towards the sources.
*
* @see Visitable#accept(Visitor)
*/
@Override
public void accept(Visitor<Operator<?>> visitor) {
for (GenericDataSinkBase<?> sink : this.sinks) {
sink.accept(visitor);
}
}
/**
* register cache files in program level
* @param entry contains all relevant information
* @param name user defined name of that file
* @throws java.io.IOException
*/
public void registerCachedFile(String name, DistributedCacheEntry entry) throws IOException {
if (!this.cacheFile.containsKey(name)) {
try {
URI u = new URI(entry.filePath);
if (!u.getPath().startsWith("/")) {
u = new File(entry.filePath).toURI();
}
FileSystem fs = FileSystem.get(u);
if (fs.exists(new Path(u.getPath()))) {
this.cacheFile.put(name, new DistributedCacheEntry(u.toString(), entry.isExecutable));
} else {
throw new IOException("File " + u.toString() + " doesn't exist.");
}
} catch (URISyntaxException ex) {
throw new IOException("Invalid path: " + entry.filePath, ex);
}
} else {
throw new IOException("cache file " + name + "already exists!");
}
}
/**
* return the registered caches files
* @return Set of (name, filePath) pairs
*/
public Set<Entry<String,DistributedCacheEntry>> getCachedFiles() {
return this.cacheFile.entrySet();
}
public int getMaximumParallelism() {
MaxDopVisitor visitor = new MaxDopVisitor();
accept(visitor);
return Math.max(visitor.maxDop, this.defaultParallelism);
}
// --------------------------------------------------------------------------------------------
private static final class MaxDopVisitor implements Visitor<Operator<?>> {
private int maxDop = -1;
@Override
public boolean preVisit(Operator<?> visitable) {
this.maxDop = Math.max(this.maxDop, visitable.getParallelism());
return true;
}
@Override
public void postVisit(Operator<?> visitable) {}
}
}