-
Notifications
You must be signed in to change notification settings - Fork 497
/
impala-server.cc
3266 lines (2981 loc) · 143 KB
/
impala-server.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/impala-server.h"
#include <netdb.h>
#include <unistd.h>
#include <algorithm>
#include <exception>
#include <fstream>
#include <sstream>
#ifdef CALLONCEHACK
#include <calloncehack.h>
#endif
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/unordered_set.hpp>
#include <gperftools/malloc_extension.h>
#include <gutil/strings/numbers.h>
#include <gutil/strings/split.h>
#include <gutil/strings/substitute.h>
#include <gutil/walltime.h>
#include <openssl/err.h>
#include <openssl/evp.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <rapidjson/error/en.h>
#include <sys/socket.h>
#include <sys/types.h>
#include "catalog/catalog-server.h"
#include "catalog/catalog-util.h"
#include "common/compiler-util.h"
#include "common/logging.h"
#include "common/object-pool.h"
#include "common/thread-debug-info.h"
#include "common/version.h"
#include "exec/external-data-source-executor.h"
#include "exprs/timezone_db.h"
#include "gen-cpp/CatalogService_constants.h"
#include "gen-cpp/admission_control_service.proxy.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/security/security_flags.h"
#include "kudu/util/random_util.h"
#include "rpc/authentication.h"
#include "rpc/rpc-mgr.h"
#include "rpc/rpc-trace.h"
#include "rpc/thrift-thread.h"
#include "rpc/thrift-util.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/lib-cache.h"
#include "runtime/query-driver.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "runtime/tmp-file-mgr.h"
#include "scheduling/admission-control-service.h"
#include "scheduling/admission-controller.h"
#include "service/cancellation-work.h"
#include "service/client-request-state.h"
#include "service/frontend.h"
#include "service/impala-http-handler.h"
#include "util/auth-util.h"
#include "util/bit-util.h"
#include "util/coding-util.h"
#include "util/debug-util.h"
#include "util/error-util.h"
#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
#include "util/jwt-util.h"
#include "util/metrics.h"
#include "util/network-util.h"
#include "util/openssl-util.h"
#include "util/parse-util.h"
#include "util/pretty-printer.h"
#include "util/redactor.h"
#include "util/runtime-profile-counters.h"
#include "util/runtime-profile.h"
#include "util/simple-logger.h"
#include "util/string-parser.h"
#include "util/summary-util.h"
#include "util/test-info.h"
#include "util/time.h"
#include "util/uid-util.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/ImpalaService.h"
#include "gen-cpp/DataSinks_types.h"
#include "gen-cpp/ImpalaService_types.h"
#include "gen-cpp/LineageGraph_types.h"
#include "gen-cpp/Frontend_types.h"
#include "common/names.h"
using boost::adopt_lock_t;
using boost::algorithm::is_any_of;
using boost::algorithm::istarts_with;
using boost::algorithm::join;
using boost::algorithm::replace_all_copy;
using boost::algorithm::split;
using boost::algorithm::token_compress_on;
using boost::get_system_time;
using boost::system_time;
using boost::uuids::random_generator;
using boost::uuids::uuid;
using google::protobuf::RepeatedPtrField;
using kudu::GetRandomSeed32;
using kudu::rpc::RpcContext;
using kudu::security::SecurityDefaults;
using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace beeswax;
using namespace boost::posix_time;
using namespace rapidjson;
using namespace strings;
DECLARE_string(nn);
DECLARE_int32(nn_port);
DECLARE_string(authorized_proxy_user_config);
DECLARE_string(authorized_proxy_user_config_delimiter);
DECLARE_string(authorized_proxy_group_config);
DECLARE_string(authorized_proxy_group_config_delimiter);
DECLARE_string(debug_actions);
DECLARE_bool(abort_on_config_error);
DECLARE_bool(disk_spill_encryption);
DECLARE_bool(enable_ldap_auth);
DECLARE_bool(gen_experimental_profile);
DECLARE_bool(use_local_catalog);
DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
"If 0 or less, the Beeswax server is not started. This interface is deprecated and "
"will be removed in a future version.");
DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served."
"If 0 or less, the HiveServer2 server is not started.");
DEFINE_int32(hs2_http_port, 28000, "port on which HiveServer2 HTTP(s) client "
"requests are served. If 0 or less, the HiveServer2 http server is not started.");
DEFINE_int32(external_fe_port, 0, "port on which External Frontend requests are served. "
"If 0 or less, the External Frontend server is not started. Careful consideration "
"must be taken when enabling due to the fact that this port is currently always "
"unauthenticated.");
DEFINE_int32(fe_service_threads, 64,
"number of threads available to serve client requests");
DEFINE_string(default_query_options, "", "key=value pair of default query options for"
" impalad, separated by ','");
DEFINE_int32(query_log_size, 100, "Number of queries to retain in the query log. If -1, "
"the query log has unbounded size.");
DEFINE_int32(query_stmt_size, 250, "length of the statements in the query log. If <=0, "
"the full statement is displayed in the query log without trimming.");
DEFINE_bool(log_query_to_file, true, "if true, logs completed query profiles to file.");
DEFINE_int64(max_result_cache_size, 100000L, "Maximum number of query results a client "
"may request to be cached on a per-query basis to support restarting fetches. This "
"option guards against unreasonably large result caches requested by clients. "
"Requests exceeding this maximum will be rejected.");
DEFINE_int32(max_audit_event_log_file_size, 5000, "The maximum size (in queries) of the "
"audit event log file before a new one is created (if event logging is enabled)");
DEFINE_string(audit_event_log_dir, "", "The directory in which audit event log files are "
"written. Setting this flag will enable audit event logging.");
DEFINE_bool(abort_on_failed_audit_event, true, "Shutdown Impala if there is a problem "
"recording an audit event.");
DEFINE_int32(max_audit_event_log_files, 0, "Maximum number of audit event log files "
"to retain. The most recent audit event log files are retained. If set to 0, "
"all audit event log files are retained.");
DEFINE_int32(max_lineage_log_file_size, 5000, "The maximum size (in queries) of "
"the lineage event log file before a new one is created (if lineage logging is "
"enabled)");
DEFINE_string(lineage_event_log_dir, "", "The directory in which lineage event log "
"files are written. Setting this flag with enable lineage logging.");
DEFINE_bool(abort_on_failed_lineage_event, true, "Shutdown Impala if there is a problem "
"recording a lineage record.");
DEFINE_string(profile_log_dir, "", "The directory in which profile log files are"
" written. If blank, defaults to <log_file_dir>/profiles");
DEFINE_int32(max_profile_log_file_size, 5000, "The maximum size (in queries) of the "
"profile log file before a new one is created");
DEFINE_int32(max_profile_log_files, 10, "Maximum number of profile log files to "
"retain. The most recent log files are retained. If set to 0, all log files "
"are retained.");
DEFINE_int32(cancellation_thread_pool_size, 5,
"(Advanced) Size of the thread-pool processing cancellations due to node failure");
DEFINE_int32(unregistration_thread_pool_size, 4,
"(Advanced) Size of the thread-pool for unregistering queries, including "
"finalizing runtime profiles");
// Limit the number of queries that can be queued for unregistration to avoid holding
// too many queries in memory unnecessary. The default is set fairly low so that if
// queries are finishing faster than they can be unregistered, there will be backpressure
// on query execution before too much memory fills up with queries pending unregistration.
DEFINE_int32(unregistration_thread_pool_queue_depth, 16,
"(Advanced) Max number of queries that can be queued for unregistration.");
DEFINE_string(ssl_server_certificate, "", "The full path to the SSL certificate file used"
" to authenticate Impala to clients. If set, both Beeswax and HiveServer2 ports will "
"only accept SSL connections");
DEFINE_string(ssl_private_key, "", "The full path to the private key used as a "
"counterpart to the public key contained in --ssl_server_certificate. If "
"--ssl_server_certificate is set, this option must be set as well.");
DEFINE_string(ssl_client_ca_certificate, "", "(Advanced) The full path to a certificate "
"used by Thrift clients to check the validity of a server certificate. May either be "
"a certificate for a third-party Certificate Authority, or a copy of the certificate "
"the client expects to receive from the server.");
DEFINE_string(ssl_private_key_password_cmd, "", "A Unix command whose output returns the "
"password used to decrypt the certificate private key file specified in "
"--ssl_private_key. If the .PEM key file is not password-protected, this command "
"will not be invoked. The output of the command will be truncated to 1024 bytes, and "
"then all trailing whitespace will be trimmed before it is used to decrypt the "
"private key");
// This defaults ssl_cipher_list to the same set of ciphers used by Kudu,
// which is based on Mozilla's intermediate compatibility recommendations
// from https://wiki.mozilla.org/Security/Server_Side_TLS
DEFINE_string(ssl_cipher_list, SecurityDefaults::kDefaultTlsCiphers,
"The cipher suite preferences to use for TLS-secured "
"Thrift RPC connections. Uses the OpenSSL cipher preference list format. See man (1) "
"ciphers for more information. If empty, the default cipher list for your platform "
"is used");
DEFINE_string(tls_ciphersuites,
kudu::security::SecurityDefaults::kDefaultTlsCipherSuites,
"The TLSv1.3 cipher suites to use for TLS-secured Thrift RPC and KRPC connections. "
"TLSv1.3 uses a new way to specify ciper suites that is independent of the older "
"TLSv1.2 and below cipher lists. See 'man (1) ciphers' for more information. "
"This flag is only effective if Impala is built with OpenSSL v1.1.1 or newer.");
const string SSL_MIN_VERSION_HELP = "The minimum SSL/TLS version that Thrift "
"services should use for both client and server connections. Supported versions are "
"TLSv1.0, TLSv1.1 and TLSv1.2 (as long as the system OpenSSL library supports them)";
DEFINE_string(ssl_minimum_version, "tlsv1.2", SSL_MIN_VERSION_HELP.c_str());
DEFINE_int32(idle_session_timeout, 0, "The time, in seconds, that a session may be idle"
" for before it is closed (and all running queries cancelled) by Impala. If 0, idle"
" sessions are never expired. It can be overridden by the query option"
" 'idle_session_timeout' for specific sessions");
DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be idle for"
" (i.e. no processing work is done and no updates are received from the client) "
"before it is cancelled. If 0, idle queries are never expired. The query option "
"QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
" the maximum allowable timeout.");
DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, that a "
"hiveserver2 session will be maintained after the last connection that it has been "
"used over is disconnected.");
DEFINE_int32(idle_client_poll_period_s, 30, "The poll period, in seconds, after "
"no activity from an Impala client which an Impala service thread (beeswax and HS2) "
"wakes up to check if the connection should be closed. If --idle_session_timeout is "
"also set, a client connection will be closed if all the sessions associated with it "
"have become idle. Set this to 0 to disable the polling behavior and clients' "
"connection will remain opened until they are explicitly closed.");
DEFINE_int32(status_report_interval_ms, 5000, "(Advanced) Interval between profile "
"reports in milliseconds. If set to <= 0, periodic reporting is disabled and only "
"the final report is sent.");
DEFINE_int32(status_report_max_retry_s, 600, "(Advanced) Max amount of time in seconds "
"for a backend to attempt to send a status report before cancelling. This must be > "
"--status_report_interval_ms. Effective only if --status_report_interval_ms > 0.");
DEFINE_int32(status_report_cancellation_padding, 20, "(Advanced) The coordinator will "
"wait --status_report_max_retry_s * (1 + --status_report_cancellation_padding / 100) "
"without receiving a status report before deciding that a backend is unresponsive "
"and the query should be cancelled. This must be > 0.");
DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and coordinate "
"queries from clients. If false, it will refuse client connections.");
DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
"fragments.");
DEFINE_string(executor_groups, "",
"List of executor groups, separated by comma. Each executor group specification can "
"optionally contain a minimum size, separated by a ':', e.g. --executor_groups "
"default-pool-1:3. Default minimum size is 1. Only when the cluster membership "
"contains at least that number of executors for the group will it be considered "
"healthy for admission. Currently only a single group may be specified.");
DEFINE_int32(num_expected_executors, 20,
"The number of executors that are expected to "
"be available for the execution of a single query. This value is used during "
"planning if no executors have started yet. Once a healthy executor group has "
"started, its size is used instead. NOTE: This flag is overridden by "
"'expected_executor_group_sets' which is a more expressive way of specifying "
"multiple executor group sets");
DEFINE_string(expected_executor_group_sets, "",
"Only used by the coordinator. List of expected executor group sets, separated by "
"comma in the following format: <executor_group_name_prefix>:<expected_group_size> . "
"For eg. “prefix1:10”, this set will include executor groups named like "
"prefix1-group1, prefix1-group2, etc. The expected group size (number of executors "
"in each group) is used during planning when no healthy executor group is available. "
"If this flag is used then any executor groups that do not map to the specified group"
" sets will never be used to schedule queries.");
// TODO: can we automatically choose a startup grace period based on the max admission
// control queue timeout + some margin for error?
DEFINE_int64(shutdown_grace_period_s, 120, "Shutdown startup grace period in seconds. "
"When the shutdown process is started for this daemon, it will wait for at least the "
"startup grace period before shutting down. This gives time for updated cluster "
"membership information to propagate to all coordinators and for fragment instances "
"that were scheduled based on old cluster membership to start executing (and "
"therefore be reflected in the metrics used to detect quiescence).");
DEFINE_int64(shutdown_deadline_s, 60 * 60, "Default time limit in seconds for the shut "
"down process. If this duration elapses after the shut down process is started, "
"the daemon shuts down regardless of any running queries.");
#ifndef NDEBUG
DEFINE_int64(stress_metadata_loading_pause_injection_ms, 0, "Simulates metadata loading"
"for a given query by injecting a sleep equivalent to this configuration in "
"milliseconds. Only used for testing.");
#endif
DEFINE_int64(accepted_client_cnxn_timeout, 300000,
"(Advanced) The amount of time in milliseconds an accepted connection will wait in "
"the post-accept, pre-setup connection queue before it is timed out and the "
"connection request is rejected. A value of 0 means there is no timeout.");
DEFINE_string(query_event_hook_classes, "", "Comma-separated list of java QueryEventHook "
"implementation classes to load and register at Impala startup. Class names should "
"be fully-qualified and on the classpath. Whitespace acceptable around delimiters.");
DEFINE_int32(query_event_hook_nthreads, 1, "Number of threads to use for "
"QueryEventHook execution. If this number is >1 then hooks will execute "
"concurrently.");
// Dumps used for debugging and diffing ExecRequests in text form.
DEFINE_string(dump_exec_request_path, "",
"If set, dump TExecRequest structures to {dump_exec_request_path}/"
"TExecRequest-{internal|external}.{query_id.hi}-{query_id.lo}");
DECLARE_bool(compact_catalog_topic);
DEFINE_bool(use_local_tz_for_unix_timestamp_conversions, false,
"When true, TIMESTAMPs are interpreted in the local time zone when converting to "
"and from Unix times. When false, TIMESTAMPs are interpreted in the UTC time zone. "
"Set to true for Hive compatibility. "
"Can be overriden with the query option with the same name.");
// Provide a workaround for IMPALA-1658.
DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
"be converted from UTC to local time. Writes are unaffected. "
"Can be overriden with the query option with the same name.");
DEFINE_int32(admission_heartbeat_frequency_ms, 1000,
"(Advanced) The time in milliseconds to wait between sending heartbeats to the "
"admission service, if enabled. Heartbeats are used to ensure resources are properly "
"accounted for even if rpcs to the admission service occasionally fail.");
DEFINE_bool(auto_check_compaction, false,
"When true, compaction checking will be conducted for each query in local catalog "
"mode. Note that this checking introduces additional overhead because Impala makes "
"additional RPCs to hive metastore for each table in a query during the query "
"compilation.");
// Flags for JWT token based authentication.
DECLARE_bool(jwt_token_auth);
DECLARE_bool(jwt_validate_signature);
DECLARE_string(jwks_file_path);
DECLARE_string(jwks_url);
namespace {
using namespace impala;
void SetExecutorGroups(const string& flag, BackendDescriptorPB* be_desc) {
vector<StringPiece> groups;
groups = Split(flag, ",", SkipEmpty());
if (groups.empty()) groups.push_back(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
DCHECK_EQ(1, groups.size());
// Name and optional minimum group size are separated by ':'.
for (const StringPiece& group : groups) {
int colon_idx = group.find_first_of(':');
ExecutorGroupDescPB* group_desc = be_desc->add_executor_groups();
group_desc->set_name(group.substr(0, colon_idx).as_string());
if (colon_idx != StringPiece::npos) {
StringParser::ParseResult result;
group_desc->set_min_size(StringParser::StringToInt<int64_t>(
group.data() + colon_idx + 1, group.length() - colon_idx - 1, &result));
if (result != StringParser::PARSE_SUCCESS) {
LOG(FATAL) << "Failed to parse minimum executor group size from group: "
<< group.ToString();
}
} else {
group_desc->set_min_size(1);
}
}
}
} // end anonymous namespace
namespace impala {
// Prefix of profile, event and lineage log filenames. The version number is
// internal, and does not correspond to an Impala release - it should
// be changed only when the file format changes.
//
// In the 1.0 version of the profile log, the timestamp at the beginning of each entry
// was relative to the local time zone. In log version 1.1, this was changed to be
// relative to UTC. The same time zone change was made for the audit log, but the
// version was kept at 1.0 because there is no known consumer of the timestamp.
const string PROFILE_LOG_FILE_PREFIX = "impala_profile_log_1.1-";
const string AUDIT_EVENT_LOG_FILE_PREFIX = "impala_audit_event_log_1.0-";
const string LINEAGE_LOG_FILE_PREFIX = "impala_lineage_log_1.0-";
const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
const string BEESWAX_SERVER_NAME = "beeswax-frontend";
const string HS2_SERVER_NAME = "hiveserver2-frontend";
const string HS2_HTTP_SERVER_NAME = "hiveserver2-http-frontend";
const string ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME = "default";
const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
const char* ImpalaServer::GET_LOG_QUERY_RETRY_INFO_FORMAT =
"Original query failed:\n$0\nQuery has been retried using query id: $1\n";
// Interval between checks for query expiration.
const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
// Template to return error messages for client requests that could not be found, belonged
// to the wrong session, or had a mismatched secret. We need to use this particular string
// in some places because the shell has a regex for it.
// TODO: Make consistent "Invalid or unknown query handle: $0" template used elsewhere.
// TODO: this should be turned into a proper error code and used throughout ImpalaServer.
static const char* LEGACY_INVALID_QUERY_HANDLE_TEMPLATE = "Query id $0 not found.";
ThreadSafeRandom ImpalaServer::rng_(GetRandomSeed32());
ImpalaServer::ImpalaServer(ExecEnv* exec_env)
: exec_env_(exec_env),
services_started_(false) {
// Initialize default config
InitializeConfigVariables();
Status status = exec_env_->frontend()->ValidateSettings();
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR(
"Aborting Impala Server startup due to improper configuration");
}
}
status = exec_env_->tmp_file_mgr()->Init(exec_env_->metrics());
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR("Aborting Impala Server startup due to improperly "
"configured scratch directories.");
}
}
if (!InitProfileLogging().ok()) {
LOG(ERROR) << "Query profile archival is disabled";
FLAGS_log_query_to_file = false;
}
if (!InitAuditEventLogging().ok()) {
CLEAN_EXIT_WITH_ERROR("Aborting Impala Server startup due to failure initializing "
"audit event logging");
}
if (!InitLineageLogging().ok()) {
CLEAN_EXIT_WITH_ERROR("Aborting Impala Server startup due to failure initializing "
"lineage logging");
}
if (!FLAGS_authorized_proxy_user_config.empty()) {
Status status = PopulateAuthorizedProxyConfig(FLAGS_authorized_proxy_user_config,
FLAGS_authorized_proxy_user_config_delimiter, &authorized_proxy_user_config_);
if (!status.ok()) {
CLEAN_EXIT_WITH_ERROR(Substitute("Invalid proxy user configuration."
"No mapping value specified for the proxy user. For more information review "
"usage of the --authorized_proxy_user_config flag: $0", status.GetDetail()));
}
}
if (!FLAGS_authorized_proxy_group_config.empty()) {
Status status = PopulateAuthorizedProxyConfig(FLAGS_authorized_proxy_group_config,
FLAGS_authorized_proxy_group_config_delimiter, &authorized_proxy_group_config_);
if (!status.ok()) {
CLEAN_EXIT_WITH_ERROR(Substitute("Invalid proxy group configuration. "
"No mapping value specified for the proxy group. For more information review "
"usage of the --authorized_proxy_group_config flag: $0", status.GetDetail()));
}
}
if (FLAGS_disk_spill_encryption) {
// Initialize OpenSSL for spilling encryption. This is not thread-safe so we
// initialize it once on startup.
// TODO: Set OpenSSL callbacks to provide locking to make the library thread-safe.
OpenSSL_add_all_algorithms();
ERR_load_crypto_strings();
}
ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env_->metrics()));
// Register the catalog update callback if running in a real cluster as a coordinator.
if (!TestInfo::is_test() && FLAGS_is_coordinator) {
auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
vector<TTopicDelta>* topic_updates) {
this->CatalogUpdateCallback(state, topic_updates);
};
// The 'local-catalog' implementation only needs minimal metadata to
// trigger cache invalidations.
// The legacy implementation needs full metadata objects.
string filter_prefix = FLAGS_use_local_catalog ?
g_CatalogService_constants.CATALOG_TOPIC_V2_PREFIX :
g_CatalogService_constants.CATALOG_TOPIC_V1_PREFIX;
ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
/* populate_min_subscriber_topic_version=*/ true,
filter_prefix, catalog_cb));
}
// Initialise the cancellation thread pool with 5 (by default) threads. The max queue
// size is deliberately set so high that it should never fill; if it does the
// cancellations will get ignored and retried on the next statestore heartbeat.
cancellation_thread_pool_.reset(new ThreadPool<CancellationWork>(
"impala-server", "cancellation-worker",
FLAGS_cancellation_thread_pool_size, MAX_CANCELLATION_QUEUE_SIZE,
bind<void>(&ImpalaServer::CancelFromThreadPool, this, _2)));
ABORT_IF_ERROR(cancellation_thread_pool_->Init());
unreg_thread_pool_.reset(new ThreadPool<QueryHandle>("impala-server",
"unregistration-worker", FLAGS_unregistration_thread_pool_size,
FLAGS_unregistration_thread_pool_queue_depth,
bind<void>(&ImpalaServer::FinishUnregisterQuery, this, _2)));
ABORT_IF_ERROR(unreg_thread_pool_->Init());
// Initialize a session expiry thread which blocks indefinitely until the first session
// with non-zero timeout value is opened. Note that a session which doesn't specify any
// idle session timeout value will use the default value FLAGS_idle_session_timeout.
ABORT_IF_ERROR(Thread::Create("impala-server", "session-maintenance",
bind<void>(&ImpalaServer::SessionMaintenance, this), &session_maintenance_thread_));
ABORT_IF_ERROR(Thread::Create("impala-server", "query-expirer",
bind<void>(&ImpalaServer::ExpireQueries, this), &query_expiration_thread_));
// Only enable the unresponsive backend thread if periodic status reporting is enabled.
if (FLAGS_status_report_interval_ms > 0) {
if (FLAGS_status_report_max_retry_s * 1000 <= FLAGS_status_report_interval_ms) {
const string& err = "Since --status_report_max_retry_s <= "
"--status_report_interval_ms, most queries will likely be cancelled.";
LOG(ERROR) << err;
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR(Substitute("Aborting Impala Server startup: $0", err));
}
}
if (FLAGS_status_report_cancellation_padding <= 0) {
const string& err = "--status_report_cancellationn_padding should be > 0.";
LOG(ERROR) << err;
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR(Substitute("Aborting Impala Server startup: $0", err));
}
}
ABORT_IF_ERROR(Thread::Create("impala-server", "unresponsive-backend-thread",
bind<void>(&ImpalaServer::UnresponsiveBackendThread, this),
&unresponsive_backend_thread_));
}
if (exec_env_->AdmissionServiceEnabled()) {
ABORT_IF_ERROR(Thread::Create("impala-server", "admission-heartbeat-thread",
bind<void>(&ImpalaServer::AdmissionHeartbeatThread, this),
&admission_heartbeat_thread_));
}
is_coordinator_ = FLAGS_is_coordinator;
is_executor_ = FLAGS_is_executor;
}
Status ImpalaServer::PopulateAuthorizedProxyConfig(
const string& authorized_proxy_config,
const string& authorized_proxy_config_delimiter,
AuthorizedProxyMap* authorized_proxy_config_map) {
// Parse the proxy user configuration using the format:
// <proxy user>=<comma separated list of users/groups they are allowed to delegate>
// See FLAGS_authorized_proxy_user_config or FLAGS_authorized_proxy_group_config
// for more details.
vector<string> proxy_config;
split(proxy_config, authorized_proxy_config, is_any_of(";"),
token_compress_on);
if (proxy_config.size() > 0) {
for (const string& config: proxy_config) {
size_t pos = config.find("=");
if (pos == string::npos) {
return Status(config);
}
string proxy_user = config.substr(0, pos);
boost::trim(proxy_user);
string config_str = config.substr(pos + 1);
boost::trim(config_str);
vector<string> parsed_allowed_users_or_groups;
split(parsed_allowed_users_or_groups, config_str,
is_any_of(authorized_proxy_config_delimiter), token_compress_on);
unordered_set<string> allowed_users_or_groups(
parsed_allowed_users_or_groups.begin(), parsed_allowed_users_or_groups.end());
authorized_proxy_config_map->insert({proxy_user, allowed_users_or_groups});
}
}
return Status::OK();
}
bool ImpalaServer::IsCoordinator() { return is_coordinator_; }
bool ImpalaServer::IsExecutor() { return is_executor_; }
bool ImpalaServer::IsHealthy() { return services_started_.load(); }
int ImpalaServer::GetBeeswaxPort() {
DCHECK(beeswax_server_ != nullptr);
return beeswax_server_->port();
}
int ImpalaServer::GetHS2Port() {
DCHECK(hs2_server_ != nullptr);
return hs2_server_->port();
}
bool ImpalaServer::IsLineageLoggingEnabled() {
return !FLAGS_lineage_event_log_dir.empty();
}
bool ImpalaServer::AreQueryHooksEnabled() {
return !FLAGS_query_event_hook_classes.empty();
}
Status ImpalaServer::InitLineageLogging() {
if (!IsLineageLoggingEnabled()) {
LOG(INFO) << "Lineage logging is disabled";
return Status::OK();
}
lineage_logger_.reset(new SimpleLogger(FLAGS_lineage_event_log_dir,
LINEAGE_LOG_FILE_PREFIX, FLAGS_max_lineage_log_file_size));
RETURN_IF_ERROR(lineage_logger_->Init());
RETURN_IF_ERROR(Thread::Create("impala-server", "lineage-log-flush",
&ImpalaServer::LineageLoggerFlushThread, this, &lineage_logger_flush_thread_));
return Status::OK();
}
bool ImpalaServer::IsAuditEventLoggingEnabled() {
return !FLAGS_audit_event_log_dir.empty();
}
Status ImpalaServer::InitAuditEventLogging() {
if (!IsAuditEventLoggingEnabled()) {
LOG(INFO) << "Event logging is disabled";
return Status::OK();
}
audit_event_logger_.reset(new SimpleLogger(FLAGS_audit_event_log_dir,
AUDIT_EVENT_LOG_FILE_PREFIX, FLAGS_max_audit_event_log_file_size,
FLAGS_max_audit_event_log_files));
RETURN_IF_ERROR(audit_event_logger_->Init());
RETURN_IF_ERROR(Thread::Create("impala-server", "audit-event-log-flush",
&ImpalaServer::AuditEventLoggerFlushThread, this,
&audit_event_logger_flush_thread_));
return Status::OK();
}
Status ImpalaServer::InitProfileLogging() {
if (!FLAGS_log_query_to_file) return Status::OK();
if (FLAGS_profile_log_dir.empty()) {
stringstream ss;
ss << FLAGS_log_dir << "/profiles/";
FLAGS_profile_log_dir = ss.str();
}
profile_logger_.reset(new SimpleLogger(FLAGS_profile_log_dir,
PROFILE_LOG_FILE_PREFIX, FLAGS_max_profile_log_file_size,
FLAGS_max_profile_log_files));
RETURN_IF_ERROR(profile_logger_->Init());
RETURN_IF_ERROR(Thread::Create("impala-server", "log-flush-thread",
&ImpalaServer::LogFileFlushThread, this, &profile_log_file_flush_thread_));
return Status::OK();
}
Status ImpalaServer::AppendAuditEntry(const string& entry ) {
DCHECK(IsAuditEventLoggingEnabled());
return audit_event_logger_->AppendEntry(entry);
}
Status ImpalaServer::AppendLineageEntry(const string& entry ) {
DCHECK(IsLineageLoggingEnabled());
return lineage_logger_->AppendEntry(entry);
}
Status ImpalaServer::GetRuntimeProfileOutput(const string& user,
const QueryHandle& query_handle, TRuntimeProfileFormat::type format,
RuntimeProfileOutput* profile) {
// For queries in INITIALIZED state, the profile information isn't populated yet.
if (query_handle->exec_state() == ClientRequestState::ExecState::INITIALIZED) {
return Status::Expected("Query plan is not ready.");
}
lock_guard<mutex> l(*query_handle->lock());
RETURN_IF_ERROR(CheckProfileAccess(
user, query_handle->effective_user(), query_handle->user_has_profile_access()));
if (query_handle->GetCoordinator() != nullptr) {
UpdateExecSummary(query_handle);
}
if (format == TRuntimeProfileFormat::BASE64) {
RETURN_IF_ERROR(
query_handle->profile()->SerializeToArchiveString(profile->string_output));
} else if (format == TRuntimeProfileFormat::THRIFT) {
query_handle->profile()->ToThrift(profile->thrift_output);
} else if (format == TRuntimeProfileFormat::JSON) {
query_handle->profile()->ToJson(profile->json_output);
} else {
DCHECK_EQ(format, TRuntimeProfileFormat::STRING);
query_handle->profile()->PrettyPrint(profile->string_output);
}
return Status::OK();
}
Status ImpalaServer::GetQueryRecord(
const TUniqueId& query_id, QueryLogIndex::const_iterator* query_record) {
lock_guard<mutex> l(query_log_lock_);
*query_record = query_log_index_.find(query_id);
if (*query_record == query_log_index_.end()) {
// Common error, so logging explicitly and eliding Status's stack trace.
string err =
strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id));
VLOG(1) << err;
return Status::Expected(err);
}
return Status::OK();
}
Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id,
const string& user, TRuntimeProfileFormat::type format,
RuntimeProfileOutput* original_profile, RuntimeProfileOutput* retried_profile,
bool* was_retried) {
DCHECK(original_profile != nullptr);
DCHECK(original_profile->string_output != nullptr);
DCHECK(retried_profile != nullptr);
DCHECK(retried_profile->string_output != nullptr);
// Search for the query id in the active query map
{
// QueryHandle of the active query and original query. If the query was retried the
// active handle points to the most recent query attempt and the original handle
// points to the original query attempt (the one that failed). If the query was not
// retried the active handle == the original handle.
QueryHandle active_query_handle;
QueryHandle original_query_handle;
Status status =
GetAllQueryHandles(query_id, &active_query_handle, &original_query_handle,
/*return_unregistered=*/ true);
if (status.ok()) {
// If the query was retried, then set the retried profile using the active query
// handle. The active query handle corresponds to the most recent query attempt,
// so it should be used to set the retried profile.
if (original_query_handle->WasRetried()) {
*was_retried = true;
RETURN_IF_ERROR(
GetRuntimeProfileOutput(user, active_query_handle, format, retried_profile));
}
// Set the profile for the original query.
RETURN_IF_ERROR(
GetRuntimeProfileOutput(user, original_query_handle, format, original_profile));
return Status::OK();
}
}
// The query was not found in the active query map, search the query log.
{
// Set the profile for the original query.
QueryLogIndex::const_iterator query_record;
RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record));
RETURN_IF_ERROR(CheckProfileAccess(user, query_record->second->effective_user,
query_record->second->user_has_profile_access));
RETURN_IF_ERROR(DecompressToProfile(format, query_record, original_profile));
// Set the profile for the retried query.
if (query_record->second->was_retried) {
*was_retried = true;
DCHECK(query_record->second->retried_query_id != nullptr);
QueryLogIndex::const_iterator retried_query_record;
// The profile of the retried profile should always be earlier in the query log
// compared to the original profile. Since the query log is a FIFO queue, this
// means that if the original profile is in the log, then the retried profile
// must be in the log as well.
Status status =
GetQueryRecord(*query_record->second->retried_query_id, &retried_query_record);
DCHECK(status.ok());
RETURN_IF_ERROR(status);
// If the original profile was accessible by the user, then the retried profile
// must be accessible by the user as well.
status = CheckProfileAccess(user, retried_query_record->second->effective_user,
retried_query_record->second->user_has_profile_access);
DCHECK(status.ok());
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(DecompressToProfile(format, retried_query_record, retried_profile));
}
}
return Status::OK();
}
Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id,
const string& user, TRuntimeProfileFormat::type format,
RuntimeProfileOutput* profile) {
DCHECK(profile != nullptr);
DCHECK(profile->string_output != nullptr);
// Search for the query id in the active query map
{
QueryHandle query_handle;
Status status = GetQueryHandle(query_id, &query_handle,
/*return_unregistered=*/ true);
if (status.ok()) {
RETURN_IF_ERROR(GetRuntimeProfileOutput(user, query_handle, format, profile));
return Status::OK();
}
}
// The query was not found the active query map, search the query log.
{
QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record));
RETURN_IF_ERROR(CheckProfileAccess(user, query_record->second->effective_user,
query_record->second->user_has_profile_access));
RETURN_IF_ERROR(DecompressToProfile(format, query_record, profile));
}
return Status::OK();
}
Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format,
QueryLogIndex::const_iterator query_record, RuntimeProfileOutput* profile) {
if (format == TRuntimeProfileFormat::BASE64) {
Base64Encode(query_record->second->compressed_profile, profile->string_output);
} else if (format == TRuntimeProfileFormat::THRIFT) {
RETURN_IF_ERROR(RuntimeProfile::DecompressToThrift(
query_record->second->compressed_profile, profile->thrift_output));
} else if (format == TRuntimeProfileFormat::JSON) {
ObjectPool tmp_pool;
RuntimeProfile* tmp_profile;
RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
tmp_profile->ToJson(profile->json_output);
} else {
DCHECK_EQ(format, TRuntimeProfileFormat::STRING);
ObjectPool tmp_pool;
RuntimeProfile* tmp_profile;
RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
tmp_profile->PrettyPrint(profile->string_output);
}
return Status::OK();
}
Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& user,
TExecSummary* result, TExecSummary* original_result, bool* was_retried) {
if (was_retried != nullptr) *was_retried = false;
// Search for the query id in the active query map.
{
// QueryHandle of the current query.
QueryHandle query_handle;
// QueryHandle or the original query if the query is retried.
QueryHandle original_query_handle;
Status status = GetAllQueryHandles(query_id, &query_handle, &original_query_handle,
/*return_unregistered=*/ true);
if (status.ok()) {
lock_guard<mutex> l(*query_handle->lock());
RETURN_IF_ERROR(CheckProfileAccess(user, query_handle->effective_user(),
query_handle->user_has_profile_access()));
if (query_handle->exec_state() == ClientRequestState::ExecState::PENDING) {
const string* admission_result = query_handle->summary_profile()->GetInfoString(
AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
if (admission_result != nullptr) {
if (*admission_result == AdmissionController::PROFILE_INFO_VAL_QUEUED) {
result->__set_is_queued(true);
const string* queued_reason = query_handle->summary_profile()->GetInfoString(
AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON);
if (queued_reason != nullptr) {
result->__set_queued_reason(*queued_reason);
}
}
}
} else if (query_handle->GetCoordinator() != nullptr) {
query_handle->GetCoordinator()->GetTExecSummary(result);
TExecProgress progress;
progress.__set_num_completed_scan_ranges(
query_handle->GetCoordinator()->progress().num_complete());
progress.__set_total_scan_ranges(
query_handle->GetCoordinator()->progress().total());
// TODO: does this not need to be synchronized?
result->__set_progress(progress);
} else {
*result = TExecSummary();
}
if (query_handle->IsRetriedQuery()) {
// Don't need to acquire lock on original_query_handle since the query is
// finished. There are no concurrent updates on its status.
result->error_logs.push_back(original_query_handle->query_status().GetDetail());
result->error_logs.push_back(Substitute("Retrying query using query id: $0",
PrintId(query_handle->query_id())));
result->__isset.error_logs = true;
if (was_retried != nullptr) {
*was_retried = true;
DCHECK(original_result != nullptr);
// The original query could not in PENDING state because it already fails.
// Handle the other two cases as above.
if (original_query_handle->GetCoordinator() != nullptr) {
original_query_handle->GetCoordinator()->GetTExecSummary(original_result);
} else {
*original_result = TExecSummary();
}
}
}
return Status::OK();
}
}
// Look for the query in completed query log.
// IMPALA-5275: Don't create Status while holding query_log_lock_
{
string effective_user;
bool user_has_profile_access = false;
bool is_query_missing = false;
TExecSummary exec_summary;
TExecSummary retried_exec_summary;
{
lock_guard<mutex> l(query_log_lock_);
QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
is_query_missing = query_record == query_log_index_.end();
if (!is_query_missing) {
effective_user = query_record->second->effective_user;
user_has_profile_access = query_record->second->user_has_profile_access;
exec_summary = query_record->second->exec_summary;
if (query_record->second->was_retried) {
if (was_retried != nullptr) *was_retried = true;
DCHECK(query_record->second->retried_query_id != nullptr);
QueryLogIndex::const_iterator retried_query_record =
query_log_index_.find(*query_record->second->retried_query_id);
// The retried query ran later than the original query. We should be able to
// find it in the query log since we have found the original query.
DCHECK(retried_query_record != query_log_index_.end());
retried_exec_summary = retried_query_record->second->exec_summary;
}
}
}
if (is_query_missing) {
// Common error, so logging explicitly and eliding Status's stack trace.
string err =
strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id));
VLOG(1) << err;
return Status::Expected(err);
}
RETURN_IF_ERROR(CheckProfileAccess(user, effective_user, user_has_profile_access));
if (was_retried != nullptr && *was_retried) {
DCHECK(original_result != nullptr);
// 'result' returns the latest summary so it's the retried one.
*result = retried_exec_summary;
*original_result = exec_summary;
} else {
*result = exec_summary;
}
}
return Status::OK();
}
[[noreturn]] void ImpalaServer::LogFileFlushThread() {
while (true) {
sleep(5);
const Status status = profile_logger_->Flush();
if (!status.ok()) {
LOG(WARNING) << "Error flushing profile log: " << status.GetDetail();
}
}
}
[[noreturn]] void ImpalaServer::AuditEventLoggerFlushThread() {
while (true) {