-
Notifications
You must be signed in to change notification settings - Fork 60
/
connection.cc
2710 lines (2367 loc) · 92.4 KB
/
connection.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2015 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "connection.h"
#include "buckets.h"
#include "connections.h"
#include "cookie.h"
#include "external_auth_manager_thread.h"
#include "front_end_thread.h"
#include "listening_port.h"
#include "mc_time.h"
#include "mcaudit.h"
#include "memcached.h"
#include "protocol/mcbp/engine_wrapper.h"
#include "runtime.h"
#include "server_event.h"
#include "settings.h"
#include <logger/logger.h>
#include <mcbp/dcp_snapshot_marker_codec.h>
#include <mcbp/mcbp.h>
#include <mcbp/protocol/framebuilder.h>
#include <mcbp/protocol/header.h>
#include <memcached/durability_spec.h>
#include <nlohmann/json.hpp>
#include <phosphor/phosphor.h>
#include <platform/cbassert.h>
#include <platform/checked_snprintf.h>
#include <platform/socket.h>
#include <platform/strerror.h>
#include <platform/string_hex.h>
#include <platform/timeutils.h>
#include <utilities/logtags.h>
#include <gsl/gsl>
#include <cctype>
#include <exception>
#ifndef WIN32
#include <netinet/tcp.h> // For TCP_NODELAY etc
#endif
/// The TLS packet is using the following format:
/// Byte 0 - Content type
/// Byte 1 and 2 - Version
/// Byte 3 and 4 - length
/// n bytes of user payload
/// m bytes of MAC
/// o bytes of padding for block ciphers
/// Create a constant which represents the maxium amount of data we
/// may put in a single TLS frame
static constexpr std::size_t TlsFrameSize = 16 * 1024;
std::string to_string(Connection::Priority priority) {
switch (priority) {
case Connection::Priority::High:
return "High";
case Connection::Priority::Medium:
return "Medium";
case Connection::Priority::Low:
return "Low";
}
throw std::invalid_argument("No such priority: " +
std::to_string(int(priority)));
}
bool Connection::setTcpNoDelay(bool enable) {
if (socketDescriptor == INVALID_SOCKET) {
// Our unit test run without a connected socket (and there is
// no point of running setsockopt on an invalid socket and
// get the error message from there).. But we don't want them
// (the unit tests) to flood the console with error messages
// that setsockopt failed
return false;
}
const int flags = enable ? 1 : 0;
int error = cb::net::setsockopt(socketDescriptor,
IPPROTO_TCP,
TCP_NODELAY,
reinterpret_cast<const void*>(&flags),
sizeof(flags));
if (error != 0) {
std::string errmsg = cb_strerror(cb::net::get_socket_error());
LOG_WARNING("setsockopt(TCP_NODELAY): {}", errmsg);
nodelay = false;
return false;
} else {
nodelay = enable;
}
return true;
}
/**
* Get a JSON representation of an event mask
*
* @param mask the mask to convert to JSON
* @return the json representation.
*/
static nlohmann::json event_mask_to_json(const short mask) {
nlohmann::json ret;
nlohmann::json array = nlohmann::json::array();
ret["raw"] = cb::to_hex(uint16_t(mask));
if (mask & EV_READ) {
array.push_back("read");
}
if (mask & EV_WRITE) {
array.push_back("write");
}
if (mask & EV_PERSIST) {
array.push_back("persist");
}
if (mask & EV_TIMEOUT) {
array.push_back("timeout");
}
ret["decoded"] = array;
return ret;
}
nlohmann::json Connection::toJSON() const {
nlohmann::json ret;
ret["connection"] = cb::to_hex(uint64_t(this));
if (socketDescriptor == INVALID_SOCKET) {
ret["socket"] = "disconnected";
return ret;
}
ret["socket"] = socketDescriptor;
ret["yields"] = yields.load();
ret["protocol"] = "memcached";
ret["peername"] = getPeername().c_str();
ret["sockname"] = getSockname().c_str();
ret["parent_port"] = listening_port->port;
ret["bucket_index"] = getBucketIndex();
ret["internal"] = isInternal();
if (authenticated) {
if (internal) {
// We want to be able to map these connections, and given
// that it is internal we don't reveal any user data
ret["username"] = username;
} else {
ret["username"] = cb::tagUserData(username);
}
}
ret["refcount"] = refcount;
nlohmann::json features = nlohmann::json::array();
if (isSupportsMutationExtras()) {
features.push_back("mutation extras");
}
if (isXerrorSupport()) {
features.push_back("xerror");
}
if (nodelay) {
features.push_back("tcp nodelay");
}
if (allowUnorderedExecution()) {
features.push_back("unordered execution");
}
if (tracingEnabled) {
features.push_back("tracing");
}
if (isCollectionsSupported()) {
features.push_back("collections");
}
if (isDuplexSupported()) {
features.push_back("duplex");
}
if (isClustermapChangeNotificationSupported()) {
features.push_back("CCN");
}
ret["features"] = features;
ret["thread"] = getThread().index;
ret["priority"] = to_string(priority);
if (clustermap_revno == -2) {
ret["clustermap_revno"] = "unknown";
} else {
ret["clustermap_revno"] = clustermap_revno;
}
ret["total_cpu_time"] = std::to_string(total_cpu_time.count());
ret["min_sched_time"] = std::to_string(min_sched_time.count());
ret["max_sched_time"] = std::to_string(max_sched_time.count());
nlohmann::json arr = nlohmann::json::array();
for (const auto& c : cookies) {
arr.push_back(c->toJSON());
}
ret["cookies"] = arr;
if (agentName.front() != '\0') {
ret["agent_name"] = std::string(agentName.data());
}
if (connectionId.front() != '\0') {
ret["connection_id"] = std::string(connectionId.data());
}
ret["sasl_enabled"] = saslAuthEnabled;
ret["dcp"] = isDCP();
ret["dcp_xattr_aware"] = isDcpXattrAware();
ret["dcp_deleted_user_xattr"] = isDcpDeletedUserXattr();
ret["dcp_no_value"] = isDcpNoValue();
ret["max_reqs_per_event"] = max_reqs_per_event;
ret["nevents"] = numEvents;
ret["state"] = getStateName();
nlohmann::json libevt;
libevt["registered"] = isRegisteredInLibevent();
libevt["ev_flags"] = event_mask_to_json(ev_flags);
libevt["which"] = event_mask_to_json(currentEvent);
ret["libevent"] = libevt;
if (read) {
ret["read"] = read->to_json();
}
if (write) {
ret["write"] = write->to_json();
}
ret["write_and_go"] = std::string(stateMachine.getStateName(write_and_go));
nlohmann::json iovobj;
iovobj["size"] = iov.size();
iovobj["used"] = iovused;
ret["iov"] = iovobj;
nlohmann::json msg;
msg["used"] = msglist.size();
msg["curr"] = msgcurr;
msg["bytes"] = msgbytes;
ret["msglist"] = msg;
nlohmann::json ilist;
ilist["size"] = reservedItems.size();
ret["itemlist"] = ilist;
nlohmann::json talloc;
talloc["size"] = temp_alloc.size();
ret["temp_alloc_list"] = talloc;
ret["ssl"] = ssl.toJSON();
ret["total_recv"] = totalRecv;
ret["total_send"] = totalSend;
ret["datatype"] = mcbp::datatype::to_string(datatype.getRaw()).c_str();
return ret;
}
void Connection::setDCP(bool dcp) {
Connection::dcp = dcp;
if (isSslEnabled()) {
try {
// Make sure that we have space for up to a single TLS frame
// in our send buffer (so that we can stick all of the mutations
// in that buffer)
write->ensureCapacity(TlsFrameSize);
} catch (const std::bad_alloc&) {
}
}
}
void Connection::restartAuthentication() {
if (authenticated && domain == cb::sasl::Domain::External) {
externalAuthManager->logoff(username);
}
sasl_conn.reset();
setInternal(false);
authenticated = false;
username = "";
}
cb::engine_errc Connection::dropPrivilege(cb::rbac::Privilege privilege) {
if (privilegeContext.dropPrivilege(privilege)) {
return cb::engine_errc::success;
}
return cb::engine_errc::no_access;
}
in_port_t Connection::getParentPort() const {
return listening_port->port;
}
cb::rbac::PrivilegeAccess Connection::checkPrivilege(
cb::rbac::Privilege privilege, Cookie& cookie) {
cb::rbac::PrivilegeAccess ret;
unsigned int retries = 0;
const unsigned int max_retries = 100;
while ((ret = privilegeContext.check(privilege)) ==
cb::rbac::PrivilegeAccess::Stale &&
retries < max_retries) {
++retries;
const auto opcode = cookie.getRequest(Cookie::PacketContent::Header)
.getClientOpcode();
const std::string command(to_string(opcode));
// The privilege context we had could have been a dummy entry
// (created when the client connected, and used until the
// connection authenticates). Let's try to automatically update it,
// but let the client deal with whatever happens after
// a single update.
try {
privilegeContext = cb::rbac::createContext(
getUsername(), getDomain(), all_buckets[bucketIndex].name);
} catch (const cb::rbac::NoSuchBucketException&) {
// Remove all access to the bucket
privilegeContext =
cb::rbac::createContext(getUsername(), getDomain(), "");
LOG_INFO(
"{}: RBAC: Connection::checkPrivilege({}) {} No access "
"to "
"bucket [{}]. command: [{}] new privilege set: {}",
getId(),
to_string(privilege),
getDescription(),
all_buckets[bucketIndex].name,
command,
privilegeContext.to_string());
} catch (const cb::rbac::Exception& error) {
LOG_WARNING(
"{}: RBAC: Connection::checkPrivilege({}) {}: An "
"exception occurred. command: [{}] bucket: [{}] UUID:"
"[{}] message: {}",
getId(),
to_string(privilege),
getDescription(),
command,
all_buckets[bucketIndex].name,
cookie.getEventId(),
error.what());
// Add a textual error as well
cookie.setErrorContext("An exception occurred. command: [" +
command + "]");
return cb::rbac::PrivilegeAccess::Fail;
}
}
if (retries == max_retries) {
LOG_INFO(
"{}: RBAC: Gave up rebuilding privilege context after {} "
"times. Let the client handle the stale authentication "
"context",
getId(),
retries);
} else if (retries > 1) {
LOG_INFO("{}: RBAC: Had to rebuild privilege context {} times",
getId(),
retries);
}
if (ret == cb::rbac::PrivilegeAccess::Fail) {
const auto opcode = cookie.getRequest(Cookie::PacketContent::Header)
.getClientOpcode();
const std::string command(to_string(opcode));
const std::string privilege_string = cb::rbac::to_string(privilege);
const std::string context = privilegeContext.to_string();
if (Settings::instance().isPrivilegeDebug()) {
audit_privilege_debug(*this,
command,
all_buckets[bucketIndex].name,
privilege_string,
context);
LOG_INFO(
"{}: RBAC privilege debug:{} command:[{}] bucket:[{}] "
"privilege:[{}] context:{}",
getId(),
getDescription(),
command,
all_buckets[bucketIndex].name,
privilege_string,
context);
return cb::rbac::PrivilegeAccess::Ok;
} else {
LOG_INFO(
"{} RBAC {} missing privilege {} for {} in bucket:[{}] "
"with context: "
"{} UUID:[{}]",
getId(),
getDescription(),
privilege_string,
command,
all_buckets[bucketIndex].name,
context,
cookie.getEventId());
// Add a textual error as well
cookie.setErrorContext("Authorization failure: can't execute " +
command + " operation without the " +
privilege_string + " privilege");
}
}
return ret;
}
Bucket& Connection::getBucket() const {
return all_buckets[getBucketIndex()];
}
EngineIface* Connection::getBucketEngine() const {
return getBucket().getEngine();
}
ENGINE_ERROR_CODE Connection::remapErrorCode(ENGINE_ERROR_CODE code) {
if (xerror_support) {
return code;
}
// Check our whitelist
switch (code) {
case ENGINE_SUCCESS: // FALLTHROUGH
case ENGINE_KEY_ENOENT: // FALLTHROUGH
case ENGINE_KEY_EEXISTS: // FALLTHROUGH
case ENGINE_ENOMEM: // FALLTHROUGH
case ENGINE_NOT_STORED: // FALLTHROUGH
case ENGINE_EINVAL: // FALLTHROUGH
case ENGINE_ENOTSUP: // FALLTHROUGH
case ENGINE_EWOULDBLOCK: // FALLTHROUGH
case ENGINE_E2BIG: // FALLTHROUGH
case ENGINE_DISCONNECT: // FALLTHROUGH
case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
case ENGINE_TMPFAIL: // FALLTHROUGH
case ENGINE_ERANGE: // FALLTHROUGH
case ENGINE_ROLLBACK: // FALLTHROUGH
case ENGINE_EBUSY: // FALLTHROUGH
case ENGINE_DELTA_BADVAL: // FALLTHROUGH
case ENGINE_PREDICATE_FAILED:
case ENGINE_FAILED:
return code;
case ENGINE_LOCKED:
return ENGINE_KEY_EEXISTS;
case ENGINE_LOCKED_TMPFAIL:
return ENGINE_TMPFAIL;
case ENGINE_UNKNOWN_COLLECTION:
case ENGINE_COLLECTIONS_MANIFEST_IS_AHEAD:
return isCollectionsSupported() ? code : ENGINE_EINVAL;
case ENGINE_EACCESS:break;
case ENGINE_NO_BUCKET:break;
case ENGINE_AUTH_STALE:break;
case ENGINE_DURABILITY_INVALID_LEVEL:
case ENGINE_DURABILITY_IMPOSSIBLE:
case ENGINE_SYNC_WRITE_PENDING:
break;
case ENGINE_SYNC_WRITE_IN_PROGRESS:
case ENGINE_SYNC_WRITE_RECOMMIT_IN_PROGRESS:
// we can return tmpfail to old clients and have them retry the
// operation
return ENGINE_TMPFAIL;
case ENGINE_SYNC_WRITE_AMBIGUOUS:
case ENGINE_DCP_STREAMID_INVALID:
break;
}
// Seems like the rest of the components in our system isn't
// prepared to receive access denied or authentincation stale.
// For now we should just disconnect them
auto errc = cb::make_error_condition(cb::engine_errc(code));
LOG_WARNING(
"{} - Client {} not aware of extended error code ({}). "
"Disconnecting",
getId(),
getDescription().c_str(),
errc.message().c_str());
setTerminationReason("XError not enabled on client");
return ENGINE_DISCONNECT;
}
void Connection::resetUsernameCache() {
if (sasl_conn.isInitialized()) {
username = sasl_conn.getUsername();
domain = sasl_conn.getDomain();
} else {
username = "unknown";
domain = cb::sasl::Domain::Local;
}
updateDescription();
}
void Connection::updateDescription() {
description.assign("[ " + getPeername() + " - " + getSockname());
if (authenticated) {
description += " (";
if (isInternal()) {
description += "System, ";
}
description += cb::tagUserData(getUsername());
if (domain == cb::sasl::Domain::External) {
description += " (LDAP)";
}
description += ")";
} else {
description += " (not authenticated)";
}
description += " ]";
}
void Connection::setBucketIndex(int bucketIndex) {
Connection::bucketIndex.store(bucketIndex, std::memory_order_relaxed);
// Update the privilege context. If a problem occurs within the RBAC
// module we'll assign an empty privilege context to the connection.
try {
if (authenticated) {
// The user have logged in, so we should create a context
// representing the users context in the desired bucket.
privilegeContext = cb::rbac::createContext(
username, getDomain(), all_buckets[bucketIndex].name);
} else if (is_default_bucket_enabled() &&
strcmp("default", all_buckets[bucketIndex].name) == 0) {
// We've just connected to the _default_ bucket, _AND_ the client
// is unknown.
// Personally I think the "default bucket" concept is a really
// really bad idea, but we need to be backwards compatible for
// a while... lets look up a profile named "default" and
// assign that. It should only contain access to the default
// bucket.
privilegeContext = cb::rbac::createContext(
"default", getDomain(), all_buckets[bucketIndex].name);
} else {
// The user has not authenticated, and this isn't for the
// "default bucket". Assign an empty profile which won't give
// you any privileges.
privilegeContext = cb::rbac::PrivilegeContext{getDomain()};
}
} catch (const cb::rbac::Exception&) {
privilegeContext = cb::rbac::PrivilegeContext{getDomain()};
}
if (bucketIndex == 0) {
// If we're connected to the no bucket we should return
// no bucket instead of EACCESS. Lets give the connection all
// possible bucket privileges
privilegeContext.setBucketPrivileges();
}
}
void Connection::addCpuTime(std::chrono::nanoseconds ns) {
total_cpu_time += ns;
min_sched_time = std::min(min_sched_time, ns);
max_sched_time = std::max(min_sched_time, ns);
}
void Connection::enqueueServerEvent(std::unique_ptr<ServerEvent> event) {
server_events.push(std::move(event));
}
bool Connection::unregisterEvent() {
if (!registered_in_libevent) {
LOG_WARNING(
"Connection::unregisterEvent: Not registered in libevent - "
"ignoring unregister attempt");
return false;
}
cb_assert(socketDescriptor != INVALID_SOCKET);
if (event_del(event.get()) == -1) {
LOG_WARNING("Failed to remove connection to libevent: {}",
cb_strerror());
return false;
}
registered_in_libevent = false;
return true;
}
bool Connection::registerEvent() {
if (registered_in_libevent) {
LOG_WARNING(
"Connection::registerEvent: Already registered in"
" libevent - ignoring register attempt");
return false;
}
if (event_add(event.get(), nullptr) == -1) {
LOG_WARNING("Failed to add connection to libevent: {}", cb_strerror());
return false;
}
registered_in_libevent = true;
return true;
}
bool Connection::updateEvent(const short new_flags) {
if (ssl.isEnabled() && ssl.isConnected() && (new_flags & EV_READ)) {
/*
* If we want more data and we have SSL, that data might be inside
* SSL's internal buffers rather than inside the socket buffer. In
* that case signal an EV_READ event without actually polling the
* socket.
*/
if (ssl.havePendingInputData()) {
// signal a call to the handler
event_active(event.get(), EV_READ, 0);
return true;
}
}
if (ev_flags == new_flags) {
// We do "cache" the current libevent state (using EV_PERSIST) to avoid
// having to re-register it when it doesn't change (which it mostly
// don't).
return true;
}
if (!unregisterEvent()) {
LOG_WARNING(
"{}: Failed to remove connection from event notification "
"library. Shutting down connection {}",
getId(),
getDescription());
return false;
}
if (event_assign(event.get(),
base,
socketDescriptor,
new_flags,
event_handler,
reinterpret_cast<void*>(this)) == -1) {
LOG_WARNING(
"{}: Failed to set up event notification. "
"Shutting down connection {}",
getId(),
getDescription());
return false;
}
ev_flags = new_flags;
if (!registerEvent()) {
LOG_WARNING(
"{}: Failed to add connection to the event notification "
"library. Shutting down connection {}",
getId(),
getDescription());
return false;
}
return true;
}
bool Connection::initializeEvent() {
short event_flags = (EV_READ | EV_PERSIST);
event.reset(event_new(base,
socketDescriptor,
event_flags,
event_handler,
reinterpret_cast<void*>(this)));
if (!event) {
throw std::bad_alloc();
}
ev_flags = event_flags;
return registerEvent();
}
void Connection::shrinkBuffers() {
// We share the buffers with the thread, so we don't need to worry
// about the read and write buffer.
if (msglist.size() > MSG_LIST_HIGHWAT) {
try {
msglist.resize(MSG_LIST_INITIAL);
msglist.shrink_to_fit();
} catch (const std::bad_alloc&) {
LOG_WARNING("{}: Failed to shrink msglist down to {} elements.",
getId(),
MSG_LIST_INITIAL);
}
}
if (iov.size() > IOV_LIST_HIGHWAT) {
try {
iov.resize(IOV_LIST_INITIAL);
iov.shrink_to_fit();
} catch (const std::bad_alloc&) {
LOG_WARNING("{}: Failed to shrink iov down to {} elements.",
getId(),
IOV_LIST_INITIAL);
}
}
}
void Connection::setAuthenticated(bool authenticated) {
Connection::authenticated = authenticated;
if (authenticated) {
updateDescription();
privilegeContext = cb::rbac::createContext(username, getDomain(), "");
} else {
resetUsernameCache();
privilegeContext = cb::rbac::PrivilegeContext{getDomain()};
}
}
bool Connection::tryAuthFromSslCert(const std::string& userName) {
username.assign(userName);
domain = cb::sasl::Domain::Local;
try {
auto context =
cb::rbac::createInitialContext(getUsername(), getDomain());
setAuthenticated(true);
setInternal(context.second);
audit_auth_success(*this);
LOG_INFO(
"{}: Client {} authenticated as '{}' via X509 "
"certificate",
getId(),
getPeername(),
cb::UserDataView(getUsername()));
// Connections authenticated by using X.509 certificates should not
// be able to use SASL to change it's identity.
saslAuthEnabled = false;
} catch (const cb::rbac::NoSuchUserException& e) {
setAuthenticated(false);
LOG_WARNING("{}: User [{}] is not defined as a user in Couchbase",
getId(),
cb::UserDataView(e.what()));
return false;
}
return true;
}
void Connection::logSslErrorInfo(const std::string& method, int rval) {
const int error = ssl.getError(rval);
unsigned long code = ERR_peek_error();
if (code == 0) {
LOG_WARNING("{}: ERROR: {} returned {} with error {}",
getId(),
method,
rval,
error);
return;
}
try {
std::string errmsg(method + "() returned " +
std::to_string(rval) + " with error " +
std::to_string(error));
while ((code = ERR_get_error()) != 0) {
std::vector<char> ssl_err(1024);
ERR_error_string_n(
code, ssl_err.data(), ssl_err.size());
LOG_WARNING("{}: {}: {}", getId(), errmsg, ssl_err.data());
}
} catch (const std::bad_alloc&) {
// unable to print error message; continue.
LOG_WARNING("{}: {}() returned {} with error {}",
getId(),
method,
rval,
error);
}
}
int Connection::sslAcceptWithRetry() {
while (true) {
int r = ssl.accept();
if (r == 1) {
// handshake completed.
return r;
}
auto sslError = ssl.getError(r);
if (sslError == SSL_ERROR_WANT_READ ||
sslError == SSL_ERROR_WANT_WRITE) {
// Drain send and receive pipes.
// Note: This is somewhat of a naive implementation - ideally we
// would only drain the specific pipe direction based on the status
// code, repeating any drains until there is no more data ready
// to transfer. However that requires a more expressive interface on
// drainBio{Send,Recv}Pipe. Given SSL_accept() only occurs once
// per connect at the start, having a simpler (but technically
// sub-optimal) handing of errors here seems reasonable.
ssl.drainBioSendPipe(socketDescriptor);
if (ssl.hasError()) {
cb::net::set_econnreset();
return -1;
}
ssl.drainBioRecvPipe(socketDescriptor);
if (ssl.hasError()) {
cb::net::set_econnreset();
return -1;
}
// Continue SSL accept handshake.
continue;
} else {
logSslErrorInfo("SSL_accept", r);
cb::net::set_econnreset();
return -1;
}
}
folly::assume_unreachable();
}
int Connection::sslPreConnection() {
int r = sslAcceptWithRetry();
if (r == 1) {
ssl.drainBioSendPipe(socketDescriptor);
ssl.setConnected();
auto certResult = ssl.getCertUserName();
bool disconnect = false;
switch (certResult.first) {
case cb::x509::Status::NoMatch:
case cb::x509::Status::Error:
disconnect = true;
break;
case cb::x509::Status::NotPresent:
if (Settings::instance().getClientCertMode() ==
cb::x509::Mode::Mandatory) {
disconnect = true;
} else if (is_default_bucket_enabled()) {
associate_bucket(*this, "default");
}
break;
case cb::x509::Status::Success:
if (!tryAuthFromSslCert(certResult.second)) {
disconnect = true;
// Don't print an error message... already logged
certResult.second.resize(0);
}
}
if (disconnect) {
// Set the username to "[unknown]" if we failed to pick
// out a username from the certificate to avoid the
// audit event being "empty"
if (certResult.first == cb::x509::Status::NotPresent) {
audit_auth_failure(
*this, "Client did not provide an X.509 certificate");
} else {
audit_auth_failure(
*this,
"Failed to use client provided X.509 certificate");
}
cb::net::set_econnreset();
if (!certResult.second.empty()) {
LOG_WARNING(
"{}: SslPreConnection: disconnection client due to"
" error [{}]",
getId(),
certResult.second);
}
return -1;
}
LOG_INFO(
"{}: Using SSL cipher:{}", getId(), ssl.getCurrentCipherName());
}
return r;
}
int Connection::recv(char* dest, size_t nbytes) {
if (nbytes == 0) {
throw std::logic_error("Connection::recv: Can't read 0 bytes");
}
int res = -1;
if (ssl.isEnabled()) {
ssl.drainBioRecvPipe(socketDescriptor);
if (ssl.hasError()) {
cb::net::set_econnreset();
return -1;
}
if (!ssl.isConnected()) {
res = sslPreConnection();
if (res == -1) {
return -1;
}
}
/* The SSL negotiation might be complete at this time */
if (ssl.isConnected()) {
res = sslRead(dest, nbytes);
}
} else {
res = (int)::cb::net::recv(socketDescriptor, dest, nbytes, 0);
if (res > 0) {
totalRecv += res;
}
}
return res;
}
ssize_t Connection::sendmsg(struct msghdr* m) {
ssize_t res = 0;
if (ssl.isEnabled()) {
for (int ii = 0; ii < int(m->msg_iovlen); ++ii) {
int n = sslWrite(reinterpret_cast<char*>(m->msg_iov[ii].iov_base),
m->msg_iov[ii].iov_len);
if (n > 0) {
res += n;
if (n != int(m->msg_iov[ii].iov_len)) {
// We didnt' send the entire chunk. return the number
// of bytes sent so far to the caller and let them
// deal with adjusting the pointers and retry
return res;
}
} else {
// We failed to send the data over ssl. it might be
// because the underlying socket buffer is full, or
// if there is a real error with the socket or inside
// OpenSSL. If the error is because we the network
// is full we'll return the number of bytes we've sent
// so far (so that it may adjust the iov_base and iov_len
// fields before it'll try to call us again and the
// send will most likely fail again, but this we'll
// return -1 and when the caller checks it'll see it is
// because the network buffer is full).
auto error = cb::net::get_socket_error();
if (cb::net::is_blocking(error) && res > 0) {
return res;
}
return -1;
}
}
ssl.drainBioSendPipe(socketDescriptor);
return res;
} else {
res = cb::net::sendmsg(socketDescriptor, m, 0);
if (res > 0) {
totalSend += res;
}
}
return res;
}
/**
* Adjust the msghdr by "removing" n bytes of data from it.
*
* @param m the msgheader to update
* @param nbytes
* @return the number of bytes left in the current iov entry
*/
size_t adjust_msghdr(cb::Pipe& pipe, struct msghdr* m, ssize_t nbytes) {
auto rbuf = pipe.rdata();
// We've written some of the data. Remove the completed
// iovec entries from the list of pending writes.
while (m->msg_iovlen > 0 && nbytes >= ssize_t(m->msg_iov->iov_len)) {
if (rbuf.data() == static_cast<const uint8_t*>(m->msg_iov->iov_base)) {
pipe.consumed(m->msg_iov->iov_len);
rbuf = pipe.rdata();
}
nbytes -= (ssize_t)m->msg_iov->iov_len;
m->msg_iovlen--;
m->msg_iov++;
}
// Might have written just part of the last iovec entry;
// adjust it so the next write will do the rest.
if (nbytes > 0) {
if (rbuf.data() == static_cast<const uint8_t*>(m->msg_iov->iov_base)) {
pipe.consumed(nbytes);
}