-
Notifications
You must be signed in to change notification settings - Fork 13k
/
AbstractYarnClusterDescriptor.java
1654 lines (1412 loc) · 63.6 KB
/
AbstractYarnClusterDescriptor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.APPLICATION_STAGINGDIR;
/**
* The descriptor with deployment information for deploying a Flink cluster on Yarn.
*/
public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnClusterDescriptor.class);
private final YarnConfiguration yarnConfiguration;
private final YarnClient yarnClient;
/** True if the descriptor must not shut down the YarnClient. */
private final boolean sharedYarnClient;
private String yarnQueue;
private String configurationDirectory;
private Path flinkJarPath;
private String dynamicPropertiesEncoded;
/** Lazily initialized list of files to ship. */
protected List<File> shipFiles = new LinkedList<>();
private final Configuration flinkConfiguration;
private boolean detached;
private String customName;
private String zookeeperNamespace;
private String nodeLabel;
private String applicationType;
private YarnConfigOptions.UserJarInclusion userJarInclusion;
public AbstractYarnClusterDescriptor(
Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration,
String configurationDirectory,
YarnClient yarnClient,
boolean sharedYarnClient) {
this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);
this.yarnClient = Preconditions.checkNotNull(yarnClient);
this.sharedYarnClient = sharedYarnClient;
this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
}
public YarnClient getYarnClient() {
return yarnClient;
}
/**
* The class to start the application master with. This class runs the main
* method in case of session cluster.
*/
protected abstract String getYarnSessionClusterEntrypoint();
/**
* The class to start the application master with. This class runs the main
* method in case of the job cluster.
*/
protected abstract String getYarnJobClusterEntrypoint();
public Configuration getFlinkConfiguration() {
return flinkConfiguration;
}
public void setQueue(String queue) {
this.yarnQueue = queue;
}
public void setLocalJarPath(Path localJarPath) {
if (!localJarPath.toString().endsWith("jar")) {
throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
}
this.flinkJarPath = localJarPath;
}
/**
* Adds the given files to the list of files to ship.
*
* <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
* {@link #uploadAndRegisterFiles(Collection, FileSystem, Path, ApplicationId, List, Map, StringBuilder)}
* since we upload the Flink uber jar ourselves and do not need to deploy it multiple times.
*
* @param shipFiles files to ship
*/
public void addShipFiles(List<File> shipFiles) {
this.shipFiles.addAll(shipFiles);
}
public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
}
public String getDynamicPropertiesEncoded() {
return this.dynamicPropertiesEncoded;
}
private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws YarnDeploymentException {
if (clusterSpecification.getNumberTaskManagers() <= 0) {
throw new YarnDeploymentException("Taskmanager count must be positive");
}
if (this.flinkJarPath == null) {
throw new YarnDeploymentException("The Flink jar path is null");
}
if (this.configurationDirectory == null) {
throw new YarnDeploymentException("Configuration directory not set");
}
if (this.flinkConfiguration == null) {
throw new YarnDeploymentException("Flink configuration object has not been set");
}
// Check if we don't exceed YARN's maximum virtual cores.
// Fetch numYarnMaxVcores from all the RUNNING nodes via yarnClient
final int numYarnMaxVcores;
try {
numYarnMaxVcores = yarnClient.getNodeReports(NodeState.RUNNING)
.stream()
.mapToInt(report -> report.getCapability().getVirtualCores())
.max()
.orElse(0);
} catch (Exception e) {
throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e);
}
int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
if (configuredAmVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format("The number of requested virtual cores for application master %d" +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
configuredAmVcores, numYarnMaxVcores));
}
int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format("The number of requested virtual cores per node %d" +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster." +
" Please note that the number of virtual cores is set to the number of task slots by default" +
" unless configured in the Flink config with '%s.'",
configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
}
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. " +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
"configuration for accessing YARN.");
}
}
private static boolean allocateResource(int[] nodeManagers, int toAllocate) {
for (int i = 0; i < nodeManagers.length; i++) {
if (nodeManagers[i] >= toAllocate) {
nodeManagers[i] -= toAllocate;
return true;
}
}
return false;
}
/**
* @deprecated The cluster descriptor should not know about this option.
*/
@Deprecated
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
/**
* @deprecated The cluster descriptor should not know about this option.
*/
@Deprecated
public boolean isDetachedMode() {
return detached;
}
public String getZookeeperNamespace() {
return zookeeperNamespace;
}
public void setZookeeperNamespace(String zookeeperNamespace) {
this.zookeeperNamespace = zookeeperNamespace;
}
public String getNodeLabel() {
return nodeLabel;
}
public void setNodeLabel(String nodeLabel) {
this.nodeLabel = nodeLabel;
}
// -------------------------------------------------------------
// Lifecycle management
// -------------------------------------------------------------
@Override
public void close() {
if (!sharedYarnClient) {
yarnClient.stop();
}
}
// -------------------------------------------------------------
// ClusterClient overrides
// -------------------------------------------------------------
@Override
public ClusterClient<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
try {
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
"configuration for accessing YARN.");
}
final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
// Flink cluster is not running anymore
LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}",
applicationId, appReport.getFinalApplicationStatus());
throw new RuntimeException("The Yarn application " + applicationId + " doesn't run anymore.");
}
final String host = appReport.getHost();
final int rpcPort = appReport.getRpcPort();
LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
host, rpcPort, applicationId);
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, rpcPort);
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, rpcPort);
return createYarnClusterClient(
this,
-1, // we don't know the number of task managers of a started Flink cluster
-1, // we don't know how many slots each task manager has for a started Flink cluster
appReport,
flinkConfiguration,
false);
} catch (Exception e) {
throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", e);
}
}
@Override
public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink session cluster",
getYarnSessionClusterEntrypoint(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
}
}
@Override
public void killCluster(ApplicationId applicationId) throws FlinkException {
try {
yarnClient.killApplication(applicationId);
Utils.deleteApplicationFiles(Collections.singletonMap(
YarnConfigKeys.FLINK_YARN_FILES,
getYarnFilesDir(applicationId).toUri().toString()));
} catch (YarnException | IOException e) {
throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
}
}
/**
* Method to validate cluster specification before deploy it, it will throw
* an {@link FlinkException} if the {@link ClusterSpecification} is invalid.
*
* @param clusterSpecification cluster specification to check against the configuration of the
* AbstractYarnClusterDescriptor
* @throws FlinkException if the cluster cannot be started with the provided {@link ClusterSpecification}
*/
private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
try {
final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
// We do the validation by calling the calculation methods here
// Internally these methods will check whether the cluster can be started with the provided
// ClusterSpecification and the configured memory requirements
final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
} catch (IllegalArgumentException iae) {
throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +
"cluster specification. Please increase the memory of the cluster.", iae);
}
}
/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
* @param clusterSpecification Initial cluster specification for the Flink cluster to be deployed
* @param applicationName name of the Yarn application to start
* @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
* @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
* @param detached True if the cluster should be started in detached mode
*/
protected ClusterClient<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
// ------------------ Check if configuration is valid --------------------
validateClusterSpecification(clusterSpecification);
if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
// so we check only in ticket cache scenario.
boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
&& useTicketCache && !loginUser.hasKerberosCredentials()) {
LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
"does not have Kerberos credentials");
}
}
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
checkYarnQueues(yarnClient);
// ------------------ Add dynamic properties to local flinkConfiguraton ------
Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}
// ------------------ Check if the YARN ClusterClient has the requested resources --------------
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
final ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
}
final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
final ClusterSpecification validClusterSpecification;
try {
validClusterSpecification = validateClusterResources(
clusterSpecification,
yarnMinAllocationMB,
maxRes,
freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
LOG.info("Cluster specification: {}", validClusterSpecification);
final ClusterEntrypoint.ExecutionMode executionMode = detached ?
ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
String host = report.getHost();
int port = report.getRpcPort();
// Correctly initialize the Flink config
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
this,
validClusterSpecification.getNumberTaskManagers(),
validClusterSpecification.getSlotsPerTaskManager(),
report,
flinkConfiguration,
true);
}
protected ClusterSpecification validateClusterResources(
ClusterSpecification clusterSpecification,
int yarnMinAllocationMB,
Resource maximumResourceCapability,
ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
int taskManagerCount = clusterSpecification.getNumberTaskManagers();
int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
"you requested will start.");
}
// set the memory to minAllocationMB to do the next checks correctly
if (jobManagerMemoryMb < yarnMinAllocationMB) {
jobManagerMemoryMb = yarnMinAllocationMB;
}
if (taskManagerMemoryMb < yarnMinAllocationMB) {
taskManagerMemoryMb = yarnMinAllocationMB;
}
final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
}
if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
}
final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources are currently not available in the cluster. " +
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
}
if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
// ----------------- check if the requested containers fit into the cluster.
int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
}
// allocate TaskManagers
for (int i = 0; i < taskManagerCount; i++) {
if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available in the YARN cluster. " +
"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
"the following NodeManagers are available: " + Arrays.toString(nmFree) + noteRsc);
}
}
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
.setTaskManagerMemoryMB(taskManagerMemoryMb)
.setNumberTaskManagers(clusterSpecification.getNumberTaskManagers())
.setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
.createClusterSpecification();
}
private void checkYarnQueues(YarnClient yarnClient) {
try {
List<QueueInfo> queues = yarnClient.getAllQueues();
if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
boolean queueFound = false;
for (QueueInfo queue : queues) {
if (queue.getQueueName().equals(this.yarnQueue)) {
queueFound = true;
break;
}
}
if (!queueFound) {
String queueNames = "";
for (QueueInfo queue : queues) {
queueNames += queue.getQueueName() + ", ";
}
LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
"Available queues: " + queueNames);
}
} else {
LOG.debug("The YARN cluster does not have any queues configured");
}
} catch (Throwable e) {
LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Error details", e);
}
}
}
public ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
// ------------------ Initialize the file systems -------------------------
org.apache.flink.core.fs.FileSystem.initialize(
configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
// initialize file system
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(yarnConfiguration);
final Path homeDir = getAppStagingDir(configuration, fs);
// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
//check if there is a logback or log4j file
File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
final boolean hasLogback = logbackFile.exists();
if (hasLogback) {
systemShipFiles.add(logbackFile);
}
File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
final boolean hasLog4j = log4jFile.exists();
if (hasLog4j) {
systemShipFiles.add(log4jFile);
if (hasLogback) {
// this means there is already a logback configuration file --> fail
LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
"Logback configuration files. Please delete or rename one of them.");
}
}
addEnvironmentFoldersToShipFiles(systemShipFiles);
// Set-up ApplicationSubmissionContext for the application
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
String zkNamespace = getZookeeperNamespace();
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
final Set<File> userJarFiles = (jobGraph == null)
// not per-job submission
? Collections.emptySet()
// add user code jars from the provided JobGraph
: jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());
// local resource map for Yarn
final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
// list of remote paths (after upload)
final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
// ship list that enables reuse of resources for task manager containers
StringBuilder envShipFileList = new StringBuilder();
// upload and register ship files
List<String> systemClassPaths = uploadAndRegisterFiles(
systemShipFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
final List<String> userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
Path remotePathJar = setupSingleLocalResource(
"flink.jar",
fs,
appId,
flinkJarPath,
localResources,
homeDir,
"");
// set the right configuration values for the TaskManager
configuration.setInteger(
TaskManagerOptions.NUM_TASK_SLOTS,
clusterSpecification.getSlotsPerTaskManager());
configuration.setString(
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
clusterSpecification.getTaskManagerMemoryMB() + "m");
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
tmpConfigurationFile.deleteOnExit();
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
Path remotePathConf = setupSingleLocalResource(
"flink-conf.yaml",
fs,
appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
paths.add(remotePathJar);
classPathBuilder.append("flink.jar").append(File.pathSeparator);
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
try {
File fp = File.createTempFile(appId.toString(), null);
fp.deleteOnExit();
try (FileOutputStream output = new FileOutputStream(fp);
ObjectOutputStream obOutput = new ObjectOutputStream(output);){
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
Path pathFromYarnURL = setupSingleLocalResource(
jobGraphFilename,
fs,
appId,
new Path(fp.toURI()),
localResources,
homeDir,
"");
paths.add(pathFromYarnURL);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
throw e;
}
}
final Path yarnFilesDir = getYarnFilesDir(appId);
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.
//To support Yarn Secure Integration Test Scenario
//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
//and KRB5 configuration files. We are adding these files as container local resources for the container
//applications (JM/TMs) to have proper secure cluster setup
Path remoteKrb5Path = null;
Path remoteYarnSiteXmlPath = null;
boolean hasKrb5 = false;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath = setupSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
fs,
appId,
yarnSitePath,
localResources,
homeDir,
"");
String krb5Config = System.getProperty("java.security.krb5.conf");
if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path = setupSingleLocalResource(
Utils.KRB5_FILE_NAME,
fs,
appId,
krb5ConfPath,
localResources,
homeDir,
"");
hasKrb5 = true;
}
}
// setup security tokens
Path remotePathKeytab = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab = setupSingleLocalResource(
Utils.KEYTAB_FILE_NAME,
fs,
appId,
new Path(keytab),
localResources,
homeDir,
"");
}
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
clusterSpecification.getMasterMemoryMB());
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, yarnConfiguration);
}
amContainer.setLocalResources(localResources);
fs.close();
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
appMasterEnv.putAll(Utils.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(clusterSpecification.getNumberTaskManagers()));
appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
}
//To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
if (dynamicPropertiesEncoded != null) {
appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
}
// set classpath from YARN configuration
Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(configuration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
setApplicationNodeLabel(appContext);
setApplicationTags(appContext);
// add a hook to clean up in case deployment fails