/
HiveServer2.java
1631 lines (1502 loc) · 68.2 KB
/
HiveServer2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.service.server;
import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveServer2TransportMode;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreClientWithLocalCache;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll;
import org.apache.hadoop.hive.ql.parse.CalcitePlanner;
import org.apache.hadoop.hive.ql.parse.repl.metric.MetricSink;
import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService;
import org.apache.hadoop.hive.ql.security.authorization.HiveMetastoreAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.PolicyProviderContainer;
import org.apache.hadoop.hive.ql.security.authorization.PrivilegeSynchronizer;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.registry.impl.ZookeeperUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.http.HttpServer;
import org.apache.hive.http.JdbcJarDownloadServlet;
import org.apache.hive.http.LlapServlet;
import org.apache.hive.http.security.PamAuthenticator;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.AuthType;
import org.apache.hive.service.auth.saml.HiveSaml2Client;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.session.HiveSession;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
import org.apache.hive.service.servlet.HS2LeadershipStatus;
import org.apache.hive.service.servlet.HS2Peers;
import org.apache.hive.service.servlet.QueriesRESTfulAPIServlet;
import org.apache.hive.service.servlet.QueryProfileServlet;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.servlet.jsp.JspFactory;
/**
* HiveServer2.
*
*/
public class HiveServer2 extends CompositeService {
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri";
private static final int SHUTDOWN_TIME = 60;
private static CountDownLatch zkDeleteSignal;
private static volatile KeeperException.Code zkDeleteResultCode;
private CLIService cliService;
private ThriftCLIService thriftCLIService;
private CuratorFramework zKClientForPrivSync = null;
private HttpServer webServer; // Web UI
private TezSessionPoolManager tezSessionPoolManager;
private WorkloadManager wm;
private PamAuthenticator pamAuthenticator;
private Map<String, String> confsToPublish = new HashMap<String, String>();
private String serviceUri;
private boolean serviceDiscovery;
private boolean activePassiveHA;
private LeaderLatchListener leaderLatchListener;
private ExecutorService leaderActionsExecutorService;
private HS2ActivePassiveHARegistry hs2HARegistry;
private Hive sessionHive;
private String wmQueue;
private AtomicBoolean isLeader = new AtomicBoolean(false);
// used for testing
private SettableFuture<Boolean> isLeaderTestFuture = SettableFuture.create();
private SettableFuture<Boolean> notLeaderTestFuture = SettableFuture.create();
private ZooKeeperHiveHelper zooKeeperHelper = null;
private ScheduledQueryExecutionService scheduledQueryService;
public HiveServer2() {
super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
}
@VisibleForTesting
public HiveServer2(PamAuthenticator pamAuthenticator) {
super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
this.pamAuthenticator = pamAuthenticator;
}
@VisibleForTesting
public CLIService getCliService() {
return this.cliService;
}
@VisibleForTesting
public void setPamAuthenticator(PamAuthenticator pamAuthenticator) {
this.pamAuthenticator = pamAuthenticator;
}
@VisibleForTesting
public SettableFuture<Boolean> getIsLeaderTestFuture() {
return isLeaderTestFuture;
}
@VisibleForTesting
public SettableFuture<Boolean> getNotLeaderTestFuture() {
return notLeaderTestFuture;
}
private void resetIsLeaderTestFuture() {
isLeaderTestFuture = SettableFuture.create();
}
private void resetNotLeaderTestFuture() {
notLeaderTestFuture = SettableFuture.create();
}
@Override
public synchronized void init(HiveConf hiveConf) {
//Initialize metrics first, as some metrics are for initialization stuff.
try {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
MetricsFactory.init(hiveConf);
}
} catch (Throwable t) {
LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t);
}
// Do not allow sessions - leader election or initialization will allow them for an active HS2.
cliService = new CLIService(this, false);
addService(cliService);
final HiveServer2 hiveServer2 = this;
boolean isHttpTransportMode = isHttpTransportMode(hiveConf);
boolean isAllTransportMode = isAllTransportMode(hiveConf);
if (isHttpTransportMode || isAllTransportMode) {
thriftCLIService = new ThriftHttpCLIService(cliService);
addService(thriftCLIService);
}
if (!isHttpTransportMode || isAllTransportMode) {
thriftCLIService = new ThriftBinaryCLIService(cliService);
addService(thriftCLIService); //thriftCliService instance is used for zookeeper purposes
}
super.init(hiveConf);
// Set host name in conf
try {
hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getServerHost());
} catch (Throwable t) {
throw new Error("Unable to initialize HiveServer2", t);
}
if (HiveConf.getBoolVar(hiveConf, ConfVars.LLAP_HS2_ENABLE_COORDINATOR)) {
// See method comment.
try {
LlapCoordinator.initializeInstance(hiveConf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// Trigger the creation of LLAP registry client, if in use. Clients may be using a different
// cluster than the default one, but at least for the default case we'd have it covered.
String llapHosts = HiveConf.getVar(hiveConf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
if (llapHosts != null && !llapHosts.isEmpty()) {
LlapRegistryService.getClient(hiveConf);
}
// Initialize metadata provider class and trimmer
CalcitePlanner.warmup();
try {
sessionHive = Hive.get(hiveConf);
} catch (HiveException e) {
throw new RuntimeException("Failed to get metastore connection", e);
}
// Create views registry
HiveMaterializedViewsRegistry.get().init();
StatsSources.initialize(hiveConf);
if (hiveConf.getBoolVar(ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_ENABLED)) {
scheduledQueryService = ScheduledQueryExecutionService.startScheduledQueryExecutorService(hiveConf);
}
// Setup cache if enabled.
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
try {
QueryResultsCache.initialize(hiveConf);
} catch (Exception err) {
throw new RuntimeException("Error initializing the query results cache", err);
}
}
// setup metastore client cache
if (hiveConf.getBoolVar(ConfVars.MSC_CACHE_ENABLED)) {
HiveMetaStoreClientWithLocalCache.init(hiveConf);
}
try {
NotificationEventPoll.initialize(hiveConf);
} catch (Exception err) {
throw new RuntimeException("Error initializing notification event poll", err);
}
wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim();
this.zooKeeperAclProvider = getACLProvider(hiveConf);
this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY);
this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE);
try {
if (serviceDiscovery) {
serviceUri = getServerInstanceURI();
addConfsToPublish(hiveConf, confsToPublish, serviceUri);
if (activePassiveHA) {
hiveConf.set(INSTANCE_URI_CONFIG, serviceUri);
leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get());
leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Leader Actions Handler Thread").build());
hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false);
}
}
} catch (Exception e) {
throw new ServiceException(e);
}
try {
logCompactionParameters(hiveConf);
maybeStartCompactorThreads(hiveConf);
} catch (Exception e) {
throw new RuntimeException(e);
}
// Setup web UI
final int webUIPort;
final String webHost;
try {
webUIPort = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT);
webHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST);
// We disable web UI in tests unless the test is explicitly setting a
// unique web ui port so that we don't mess up ptests.
boolean uiDisabledInTest = hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST) &&
(webUIPort == Integer.valueOf(ConfVars.HIVE_SERVER2_WEBUI_PORT.getDefaultValue()));
if (uiDisabledInTest) {
LOG.info("Web UI is disabled in test mode since webui port was not specified");
} else {
if (webUIPort <= 0) {
LOG.info("Web UI is disabled since port is set to " + webUIPort);
} else {
LOG.info("Starting Web UI on port "+ webUIPort);
if (JspFactory.getDefaultFactory() == null) {
// Set the default JspFactory to avoid NPE while opening the home page
JspFactory.setDefaultFactory(new org.apache.jasper.runtime.JspFactoryImpl());
}
HttpServer.Builder builder = new HttpServer.Builder("hiveserver2");
builder.setPort(webUIPort).setConf(hiveConf);
builder.setHost(webHost);
builder.setMaxThreads(
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_THREADS));
builder.setAdmins(hiveConf.getVar(ConfVars.USERS_IN_ADMIN_ROLE));
// SessionManager is initialized
builder.setContextAttribute("hive.sm",
cliService.getSessionManager());
hiveConf.set("startcode",
String.valueOf(System.currentTimeMillis()));
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL)) {
String keyStorePath = hiveConf.getVar(
ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_PATH);
if (StringUtils.isBlank(keyStorePath)) {
throw new IllegalArgumentException(
ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_PATH.varname
+ " Not configured for SSL connection");
}
builder.setKeyStorePassword(ShimLoader.getHadoopShims().getPassword(
hiveConf, ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_PASSWORD.varname));
builder.setKeyStorePath(keyStorePath);
builder.setKeyStoreType(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_TYPE));
builder.setKeyManagerFactoryAlgorithm(
hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYMANAGERFACTORY_ALGORITHM));
builder.setExcludeCiphersuites(
hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_SSL_EXCLUDE_CIPHERSUITES));
builder.setUseSSL(true);
}
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_SPNEGO)) {
String spnegoPrincipal = hiveConf.getVar(
ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_PRINCIPAL);
String spnegoKeytab = hiveConf.getVar(
ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_KEYTAB);
if (StringUtils.isBlank(spnegoPrincipal) || StringUtils.isBlank(spnegoKeytab)) {
throw new IllegalArgumentException(
ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_PRINCIPAL.varname
+ "/" + ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_KEYTAB.varname
+ " Not configured for SPNEGO authentication");
}
builder.setSPNEGOPrincipal(spnegoPrincipal);
builder.setSPNEGOKeytab(spnegoKeytab);
builder.setUseSPNEGO(true);
}
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS)) {
builder.setEnableCORS(true);
String allowedOrigins = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_ORIGINS);
String allowedMethods = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_METHODS);
String allowedHeaders = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_HEADERS);
if (StringUtils.isBlank(allowedOrigins) || StringUtils.isBlank(allowedMethods) || StringUtils.isBlank(allowedHeaders)) {
throw new IllegalArgumentException("CORS enabled. But " +
ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_ORIGINS.varname + "/" +
ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_METHODS.varname + "/" +
ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_HEADERS.varname + "/" +
" is not configured");
}
builder.setAllowedOrigins(allowedOrigins);
builder.setAllowedMethods(allowedMethods);
builder.setAllowedHeaders(allowedHeaders);
LOG.info("CORS enabled - allowed-origins: {} allowed-methods: {} allowed-headers: {}", allowedOrigins,
allowedMethods, allowedHeaders);
}
if(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_XFRAME_ENABLED)){
builder.configureXFrame(true).setXFrameOption(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_XFRAME_VALUE));
}
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM)) {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL)) {
String hiveServer2PamServices = hiveConf.getVar(ConfVars.HIVE_SERVER2_PAM_SERVICES);
if (hiveServer2PamServices == null || hiveServer2PamServices.isEmpty()) {
throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_PAM_SERVICES.varname + " are not configured.");
}
builder.setPAMAuthenticator(pamAuthenticator == null ? new PamAuthenticator(hiveConf) : pamAuthenticator);
builder.setUsePAM(true);
} else if (hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST)) {
builder.setPAMAuthenticator(pamAuthenticator == null ? new PamAuthenticator(hiveConf) : pamAuthenticator);
builder.setUsePAM(true);
} else {
throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL.varname + " has false value. It is recommended to set to true when PAM is used.");
}
}
if (serviceDiscovery && activePassiveHA) {
builder.setContextAttribute("hs2.isLeader", isLeader);
builder.setContextAttribute("hs2.failover.callback", new FailoverHandlerCallback(hs2HARegistry));
builder.setContextAttribute("hiveconf", hiveConf);
builder.addServlet("leader", HS2LeadershipStatus.class);
builder.addServlet("peers", HS2Peers.class);
}
builder.addServlet("llap", LlapServlet.class);
builder.addServlet("jdbcjar", JdbcJarDownloadServlet.class);
builder.setContextRootRewriteTarget("/hiveserver2.jsp");
webServer = builder.build();
webServer.addServlet("query_page", "/query_page.html", QueryProfileServlet.class);
webServer.addServlet("api", "/api/*", QueriesRESTfulAPIServlet.class);
}
}
} catch (IOException ie) {
throw new ServiceException(ie);
}
// Add a shutdown hook for catching SIGTERM & SIGINT
long timeout = HiveConf.getTimeVar(getHiveConf(),
HiveConf.ConfVars.HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT, TimeUnit.SECONDS);
// Extra time for releasing the resources if timeout sets to 0
ShutdownHookManager.addGracefulShutDownHook(() -> graceful_stop(), timeout == 0 ? 30 : timeout);
}
private void logCompactionParameters(HiveConf hiveConf) {
LOG.info("Compaction HS2 parameters:");
String runWorkerIn = MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN);
LOG.info("hive.metastore.runworker.in = {}", runWorkerIn);
int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
LOG.info("metastore.compactor.worker.threads = {}", numWorkers);
if ("hs2".equals(runWorkerIn) && numWorkers < 1) {
LOG.warn("Invalid number of Compactor Worker threads({}) on HS2", numWorkers);
}
}
private WMFullResourcePlan createTestResourcePlan() {
WMFullResourcePlan resourcePlan;
WMPool pool = new WMPool("testDefault", "llap");
pool.setAllocFraction(1f);
pool.setQueryParallelism(1);
resourcePlan = new WMFullResourcePlan(
new WMResourcePlan("testDefault"), Lists.newArrayList(pool));
resourcePlan.getPlan().setDefaultPoolPath("testDefault");
return resourcePlan;
}
public static boolean isHttpTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
if (transportMode != null
&& (transportMode.equalsIgnoreCase(HiveServer2TransportMode.http.toString()))) {
return true;
}
return false;
}
public static boolean isAllTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
if (transportMode != null && (transportMode.equalsIgnoreCase(HiveServer2TransportMode.all.toString()))) {
return true;
}
return false;
}
/**
* ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
*/
private ACLProvider zooKeeperAclProvider;
private ACLProvider getACLProvider(HiveConf hiveConf) {
final boolean isSecure = ZookeeperUtils.isKerberosEnabled(hiveConf);
return new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
List<ACL> nodeAcls = new ArrayList<ACL>();
if (isSecure) {
// Read all to the world
nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
// Create/Delete/Write/Admin to the authenticated user
nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
} else {
// ACLs for znodes on a non-kerberized cluster
// Create/Read/Delete/Write/Admin to the world
nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
}
return nodeAcls;
}
@Override
public List<ACL> getAclForPath(String path) {
return getDefaultAcl();
}
};
}
/**
* Add conf keys, values that HiveServer2 will publish to ZooKeeper.
* @param hiveConf
*/
private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish, String serviceUri) {
// Hostname
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST));
// Hostname:port
confsToPublish.put(INSTANCE_URI_CONFIG, serviceUri);
// Transport mode
confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE));
// Transport specific confs
boolean isHttpTransportMode = isHttpTransportMode(hiveConf);
boolean isAllTransportMode = isAllTransportMode(hiveConf);
if (isHttpTransportMode || isAllTransportMode) {
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname,
Integer.toString(hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT)));
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
}
if (!isHttpTransportMode || isAllTransportMode) {
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname,
Integer.toString(hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT)));
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP));
}
// Auth specific confs
confsToPublish.put(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION));
if (AuthType.isKerberosAuthMode(hiveConf)) {
confsToPublish.put(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
}
// SSL conf
confsToPublish.put(ConfVars.HIVE_SERVER2_USE_SSL.varname,
Boolean.toString(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)));
}
/**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
* @param hiveConf
* @return
* @throws Exception
*/
private static void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
if (ZookeeperUtils.isKerberosEnabled(hiveConf)) {
String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
if (principal.isEmpty()) {
throw new IOException("HiveServer2 Kerberos principal is empty");
}
String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
if (keyTabFile.isEmpty()) {
throw new IOException("HiveServer2 Kerberos keytab is empty");
}
// Install the JAAS Configuration for the runtime
Utils.setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
}
}
public boolean isLeader() {
return isLeader.get();
}
public int getOpenSessionsCount() {
return cliService != null ? cliService.getSessionManager().getOpenSessionCount() : 0;
}
interface FailoverHandler {
void failover() throws Exception;
}
public static class FailoverHandlerCallback implements FailoverHandler {
private HS2ActivePassiveHARegistry hs2HARegistry;
FailoverHandlerCallback(HS2ActivePassiveHARegistry hs2HARegistry) {
this.hs2HARegistry = hs2HARegistry;
}
@Override
public void failover() throws Exception {
hs2HARegistry.failover();
}
}
/**
* The watcher class shuts down the server if there are no more active client
* sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
*/
public class DeRegisterWatcher extends ZKDeRegisterWatcher {
public DeRegisterWatcher(ZooKeeperHiveHelper zooKeeperHiveHelper) {
super(zooKeeperHiveHelper);
}
@Override
public void process(WatchedEvent event) {
super.process(event);
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
// If there are no more active client sessions, stop the server
if (cliService.getSessionManager().getOpenSessionCount() == 0) {
LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ "instances available for dynamic service discovery. "
+ "The last client session has ended - will shutdown now.");
HiveServer2.this.stop();
}
}
}
}
/**
* @return true if the server instance was deregistered from ZooKeeper, else false. The function might
* be called even when the instance is not registered with the ZooKeeper (See
* SessionManage.closeSessionInternal()). In that case, return false since the deregistration has
* not really happened.
*/
public boolean isDeregisteredWithZooKeeper() {
if (serviceDiscovery && !activePassiveHA) {
if (zooKeeperHelper != null) {
return zooKeeperHelper.isDeregisteredWithZooKeeper();
}
}
return false;
}
public String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return thriftCLIService.getServerIPAddress().getCanonicalHostName() + ":"
+ thriftCLIService.getPortNumber();
}
public String getServerHost() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return thriftCLIService.getServerIPAddress().getCanonicalHostName();
}
@Override
public synchronized void start() {
super.start();
// If we're supporting dynamic service discovery, we'll add the service uri for this
// HiveServer2 instance to Zookeeper as a znode.
HiveConf hiveConf = getHiveConf();
if (!serviceDiscovery || !activePassiveHA) {
allowClientSessions();
}
if (serviceDiscovery) {
try {
if (activePassiveHA) {
hs2HARegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService);
hs2HARegistry.start();
LOG.info("HS2 HA registry started");
} else {
boolean publishConfigs =
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS);
String instanceURI = getServerInstanceURI();
String znodeData;
if (publishConfigs) {
// HiveServer2 configs that this instance will publish to ZooKeeper,
// so that the clients can read these and configure themselves properly.
addConfsToPublish(hiveConf, confsToPublish, getServerInstanceURI());
znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
} else {
znodeData = instanceURI;
}
// Add a Znode to the specified ZooKeeper with name: serverUri=host:port;
// version=versionInfo; sequence=sequenceNumber
zooKeeperHelper = hiveConf.getZKConfig();
String znodePathPrefix = "serverUri=" + instanceURI + ";" +
"version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
zooKeeperHelper.addServerInstanceToZooKeeper(znodePathPrefix, znodeData,
zooKeeperAclProvider, new DeRegisterWatcher(zooKeeperHelper));
}
} catch (Exception e) {
LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e);
throw new ServiceException(e);
}
}
try {
startPrivilegeSynchronizer(hiveConf);
} catch (Exception e) {
LOG.error("Error starting priviledge synchronizer: ", e);
throw new ServiceException(e);
}
if (webServer != null) {
try {
webServer.start();
LOG.info("Web UI has started on port " + webServer.getPort());
} catch (Exception e) {
LOG.error("Error starting Web UI: ", e);
throw new ServiceException(e);
}
}
if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
if (!activePassiveHA) {
LOG.info("HS2 interactive HA not enabled. Starting tez sessions..");
try {
startOrReconnectTezSessions();
} catch (Exception e) {
LOG.error("Error starting Tez sessions: ", e);
throw new ServiceException(e);
}
} else {
LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader.");
}
}
}
private static class HS2LeaderLatchListener implements LeaderLatchListener {
private HiveServer2 hiveServer2;
private SessionState parentSession;
HS2LeaderLatchListener(final HiveServer2 hs2, final SessionState parentSession) {
this.hiveServer2 = hs2;
this.parentSession = parentSession;
}
// leadership status change happens inside synchronized methods LeaderLatch.setLeadership().
// Also we use single threaded executor service for handling notifications which guarantees ordering for
// notification handling. if a leadership status change happens when tez sessions are getting created,
// the notLeader notification will get queued in executor service.
@Override
public void isLeader() {
LOG.info("HS2 instance {} became the LEADER. Starting/Reconnecting tez sessions..", hiveServer2.serviceUri);
hiveServer2.isLeader.set(true);
if (parentSession != null) {
SessionState.setCurrentSessionState(parentSession);
}
hiveServer2.startOrReconnectTezSessions();
LOG.info("Started/Reconnected tez sessions.");
hiveServer2.allowClientSessions();
// resolve futures used for testing
if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) {
hiveServer2.isLeaderTestFuture.set(true);
hiveServer2.resetNotLeaderTestFuture();
}
}
@Override
public void notLeader() {
LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri);
hiveServer2.isLeader.set(false);
hiveServer2.closeAndDisallowHiveSessions();
hiveServer2.stopOrDisconnectTezSessions();
LOG.info("Stopped/Disconnected tez sessions.");
// resolve futures used for testing
if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) {
hiveServer2.notLeaderTestFuture.set(true);
hiveServer2.resetIsLeaderTestFuture();
}
}
}
private void startOrReconnectTezSessions() {
LOG.info("Starting/Reconnecting tez sessions..");
// TODO: add tez session reconnect after TEZ-3875
WMFullResourcePlan resourcePlan;
try {
resourcePlan = sessionHive.getActiveResourcePlan();
} catch (HiveException e) {
if (!HiveConf.getBoolVar(getHiveConf(), ConfVars.HIVE_IN_TEST_SSL)) {
throw new RuntimeException(e);
} else {
resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured.
}
}
if (resourcePlan == null && HiveConf.getBoolVar(
getHiveConf(), ConfVars.HIVE_IN_TEST)) {
LOG.info("Creating a default resource plan for test");
resourcePlan = createTestResourcePlan();
}
initAndStartTezSessionPoolManager(resourcePlan);
initAndStartWorkloadManager(resourcePlan);
}
private void allowClientSessions() {
cliService.getSessionManager().allowSessions(true);
}
private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) {
// starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid
// SessionState.get() return null during createTezDir
try {
// will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session.
LOG.info("Initializing tez session pool manager. Active resource plan: {}",
resourcePlan == null || resourcePlan.getPlan() == null ? "null" : resourcePlan.getPlan().getName());
tezSessionPoolManager = TezSessionPoolManager.getInstance();
HiveConf hiveConf = getHiveConf();
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
tezSessionPoolManager.setupPool(hiveConf);
} else {
tezSessionPoolManager.setupNonPool(hiveConf);
}
tezSessionPoolManager.startPool(hiveConf, resourcePlan);
LOG.info("Tez session pool manager initialized.");
} catch (Exception e) {
throw new ServiceException("Unable to setup tez session pool", e);
}
}
private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) {
if (!StringUtils.isEmpty(wmQueue)) {
// Initialize workload management.
LOG.info("Initializing workload management. Active resource plan: {}",
resourcePlan == null || resourcePlan.getPlan() == null ? "null" : resourcePlan.getPlan().getName());
try {
wm = WorkloadManager.create(wmQueue, getHiveConf(), resourcePlan);
wm.start();
LOG.info("Workload manager initialized.");
} catch (Exception e) {
throw new ServiceException("Unable to instantiate and start Workload Manager", e);
}
} else {
LOG.info("Workload management is not enabled as {} config is not set",
ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname);
}
}
private void closeAndDisallowHiveSessions() {
LOG.info("Closing all open hive sessions.");
if (cliService == null) {
return;
}
cliService.getSessionManager().allowSessions(false);
// No sessions can be opened after the above call. Close the existing ones if any.
try {
for (HiveSession session : cliService.getSessionManager().getSessions()) {
cliService.getSessionManager().closeSession(session.getSessionHandle());
}
LOG.info("Closed all open hive sessions");
} catch (HiveSQLException e) {
LOG.error("Unable to close all open sessions.", e);
}
}
private void stopOrDisconnectTezSessions() {
LOG.info("Stopping/Disconnecting tez sessions.");
// There should already be an instance of the session pool manager.
// If not, ignoring is fine while stopping HiveServer2.
if (tezSessionPoolManager != null) {
try {
tezSessionPoolManager.stop();
LOG.info("Stopped tez session pool manager.");
} catch (Exception e) {
LOG.error("Error while stopping tez session pool manager.", e);
}
}
if (wm != null) {
try {
wm.stop();
LOG.info("Stopped workload manager.");
} catch (Exception e) {
LOG.error("Error while stopping workload manager.", e);
}
}
}
/**
* Decommission HiveServer2. As a consequence, SessionManager stops
* opening new sessions, OperationManager refuses running new queries and
* HiveServer2 deregisters itself from Zookeeper if service discovery is enabled,
* but the decommissioning has no effect on the current running queries.
*/
public synchronized void decommission() {
LOG.info("Decommissioning HiveServer2");
// Remove this server instance from ZooKeeper if dynamic service discovery is set
if (zooKeeperHelper != null && !isDeregisteredWithZooKeeper()) {
try {
zooKeeperHelper.removeServerInstanceFromZooKeeper();
} catch (Exception e) {
LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
}
}
String pidDir = StringUtils.defaultIfEmpty(System.getenv("HIVESERVER2_PID_DIR"),
System.getenv("HIVE_CONF_DIR"));
if (StringUtils.isNotEmpty(pidDir)) {
File pidFile = new File(pidDir, "hiveserver2.pid");
LOG.info("Deleting the tmp HiveServer2 pid file: {}", pidFile);
FileUtils.deleteQuietly(pidFile);
}
super.decommission();
}
public synchronized void graceful_stop() {
try {
decommission();
// Need 30s for stop() to release server's resources
long maxTimeForWait = HiveConf.getTimeVar(getHiveConf(),
HiveConf.ConfVars.HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT, TimeUnit.MILLISECONDS) - 30000;
long timeout = maxTimeForWait, startTime = System.currentTimeMillis();
try {
// The service should be started before when reaches here, as decommissioning would throw
// IllegalStateException otherwise, so both cliService and sessionManager should not be null.
while (timeout > 0 && !getCliService().getSessionManager().getOperations().isEmpty()) {
// For gracefully stopping, sleeping some time while looping does not bring much overhead,
// that is, at most 100ms are wasted for waiting for OperationManager to be done,
// and this code path will only be executed when HS2 is being terminated.
Thread.sleep(Math.min(100, timeout));
timeout = maxTimeForWait + startTime - System.currentTimeMillis();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for all live operations to be done");
Thread.currentThread().interrupt();
}
LOG.info("Spent {}ms waiting for live operations to be done, current live operations: {}"
, System.currentTimeMillis() - startTime
, getCliService().getSessionManager().getOperations().size());
} finally {
stop();
}
}
@Override
public synchronized void stop() {
LOG.info("Shutting down HiveServer2");
HiveConf hiveConf = this.getHiveConf();
super.stop();
if (scheduledQueryService != null) {
try {
scheduledQueryService.close();
} catch (Exception e) {
LOG.error("Error stopping schq", e);
}
}
//Shutdown metric collection
MetricSink.getInstance().tearDown();
if (hs2HARegistry != null) {
hs2HARegistry.stop();
shutdownExecutor(leaderActionsExecutorService);
LOG.info("HS2 HA registry stopped");
hs2HARegistry = null;
}
if (webServer != null) {
try {
webServer.stop();
LOG.info("Web UI has stopped");
} catch (Exception e) {
LOG.error("Error stopping Web UI: ", e);
}
}
// Shutdown Metrics
if (MetricsFactory.getInstance() != null) {
try {
MetricsFactory.close();
} catch (Exception e) {
LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ e.getMessage(), e);
}
}
// Remove this server instance from ZooKeeper if dynamic service discovery is set
if (serviceDiscovery && !activePassiveHA) {
try {
if (zooKeeperHelper != null) {
zooKeeperHelper.removeServerInstanceFromZooKeeper();