-
Notifications
You must be signed in to change notification settings - Fork 271
/
director-connection.c
2280 lines (2008 loc) · 66.4 KB
/
director-connection.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Copyright (c) 2010-2017 Dovecot authors, see the included COPYING file */
/*
Handshaking:
Incoming director connections send:
VERSION
ME
<wait for DONE from remote handshake>
DONE
<make this connection our "left" connection, potentially disconnecting
another one>
Outgoing director connections send:
VERSION
ME
[0..n] DIRECTOR
HOST-HAND-START
[0..n] HOST
HOST-HAND-END
[0..n] USER
<possibly other non-handshake commands between USERs>
DONE
<wait for DONE from remote>
<make this connection our "right" connection, potentially disconnecting
another one>
*/
#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "net.h"
#include "istream.h"
#include "ostream.h"
#include "str.h"
#include "strescape.h"
#include "time-util.h"
#include "master-service.h"
#include "mail-host.h"
#include "director.h"
#include "director-host.h"
#include "director-request.h"
#include "director-connection.h"
#include <unistd.h>
#define MAX_INBUF_SIZE 1024
#define MAX_OUTBUF_SIZE (1024*1024*10)
#define OUTBUF_FLUSH_THRESHOLD (1024*128)
/* Max time to wait for connect() to finish before aborting */
#define DIRECTOR_CONNECTION_CONNECT_TIMEOUT_MSECS (10*1000)
/* Max idling time before "ME" command must have been received,
or we'll disconnect. */
#define DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS (10*1000)
/* Max time to wait for USERs in handshake to be sent. With a lot of users the
kernel may quickly eat up everything we send, while the receiver is busy
parsing the data. */
#define DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS (30*1000)
/* Max idling time before "DONE" command must have been received,
or we'll disconnect. */
#define DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS (30*1000)
/* How long to wait for PONG for an idling connection */
#define DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS (10*1000)
/* Maximum time to wait for PONG reply */
#define DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS (60*1000)
/* How long to wait to send PING when connection is idle */
#define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
/* How long to wait before sending PING while waiting for SYNC reply */
#define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000
/* If outgoing director connection exists for less than this many seconds,
mark the host as failed so we won't try to reconnect to it immediately */
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40
#define DIRECTOR_WAIT_DISCONNECT_SECS 10
#define DIRECTOR_HANDSHAKE_WARN_SECS 29
#define DIRECTOR_HANDSHAKE_BYTES_LOG_MIN_SECS (60*30)
#define DIRECTOR_MAX_SYNC_SEQ_DUPLICATES 4
/* If we receive SYNCs with a timestamp this many seconds higher than the last
valid received SYNC timestamp, assume that we lost the director's restart
notification and reset the last_sync_seq */
#define DIRECTOR_SYNC_STALE_TIMESTAMP_RESET_SECS (60*2)
#define DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS 1
#if DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS
# error DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS is too low
#endif
#if DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS
# error DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS is too low
#endif
#define CMD_IS_USER_HANDHAKE(args) \
(str_array_length(args) > 2)
#define DIRECTOR_OPT_CONSISTENT_HASHING "consistent-hashing"
struct director_connection {
struct director *dir;
char *name;
time_t created;
unsigned int minor_version;
struct timeval last_input, last_output;
/* for incoming connections the director host isn't known until
ME-line is received */
struct director_host *host;
/* this is set only for wrong connections: */
struct director_host *connect_request_to;
int fd;
struct io *io;
struct istream *input;
struct ostream *output;
struct timeout *to_disconnect, *to_ping, *to_pong;
struct director_user_iter *user_iter;
/* set during command execution */
const char *cur_cmd, *cur_line;
unsigned int in:1;
unsigned int connected:1;
unsigned int version_received:1;
unsigned int me_received:1;
unsigned int handshake_received:1;
unsigned int ignore_host_events:1;
unsigned int handshake_sending_hosts:1;
unsigned int ping_waiting:1;
unsigned int synced:1;
unsigned int wrong_host:1;
unsigned int verifying_left:1;
unsigned int users_unsorted:1;
};
static void director_finish_sending_handshake(struct director_connection *conn);
static void director_connection_disconnected(struct director_connection **conn,
const char *reason);
static void director_connection_reconnect(struct director_connection **conn,
const char *reason);
static void
director_connection_log_disconnect(struct director_connection *conn, int err,
const char *errstr);
static int director_connection_send_done(struct director_connection *conn);
static void ATTR_FORMAT(2, 3)
director_cmd_error(struct director_connection *conn, const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
i_error("director(%s): Command %s: %s (input: %s)", conn->name,
conn->cur_cmd, t_strdup_vprintf(fmt, args), conn->cur_line);
va_end(args);
if (conn->host != NULL)
conn->host->last_protocol_failure = ioloop_time;
}
static void
director_connection_init_timeout(struct director_connection *conn)
{
unsigned int secs = ioloop_time - conn->created;
if (!conn->connected) {
i_error("director(%s): Connect timed out (%u secs)",
conn->name, secs);
} else if (conn->io == NULL) {
i_error("director(%s): Sending handshake (%u secs)",
conn->name, secs);
} else if (!conn->me_received) {
i_error("director(%s): Handshaking ME timed out (%u secs)",
conn->name, secs);
} else {
i_error("director(%s): Handshaking DONE timed out (%u secs)",
conn->name, secs);
}
director_connection_disconnected(&conn, "Handshake timeout");
}
static void
director_connection_set_ping_timeout(struct director_connection *conn)
{
unsigned int msecs;
msecs = conn->synced || !conn->handshake_received ?
DIRECTOR_CONNECTION_PING_INTERVAL_MSECS :
DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS;
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(msecs, director_connection_ping, conn);
}
static void director_connection_wait_timeout(struct director_connection *conn)
{
director_connection_log_disconnect(conn, ETIMEDOUT, "");
director_connection_deinit(&conn,
"Timeout waiting for disconnect after CONNECT");
}
static void director_connection_send_connect(struct director_connection *conn,
struct director_host *host)
{
const char *connect_str;
if (conn->to_disconnect != NULL)
return;
connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
net_ip2addr(&host->ip), host->port);
director_connection_send(conn, connect_str);
o_stream_uncork(conn->output);
/* wait for a while for the remote to disconnect, so it will hopefully
see our CONNECT command. we'll also log the warning later to avoid
multiple log lines about it. */
conn->connect_request_to = host;
director_host_ref(conn->connect_request_to);
conn->to_disconnect =
timeout_add(DIRECTOR_WAIT_DISCONNECT_SECS*1000,
director_connection_wait_timeout, conn);
}
static void director_connection_assigned(struct director_connection *conn)
{
struct director *dir = conn->dir;
if (dir->left != NULL && dir->right != NULL) {
/* we're connected to both directors. see if the ring is
finished by sending a SYNC. if we get it back, it's done. */
dir->sync_seq++;
director_set_ring_unsynced(dir);
director_sync_send(dir, dir->self_host, dir->sync_seq,
DIRECTOR_VERSION_MINOR, ioloop_time,
mail_hosts_hash(dir->mail_hosts));
}
director_connection_set_ping_timeout(conn);
}
static bool director_connection_assign_left(struct director_connection *conn)
{
struct director *dir = conn->dir;
i_assert(conn->in);
i_assert(dir->left != conn);
/* make sure this is the correct incoming connection */
if (conn->host->self) {
i_error("Connection from self, dropping");
return FALSE;
} else if (dir->left == NULL) {
/* no conflicts yet */
} else if (dir->left->host == conn->host) {
i_warning("Replacing left director connection %s with %s",
dir->left->host->name, conn->host->name);
director_connection_deinit(&dir->left, t_strdup_printf(
"Replacing with %s", conn->host->name));
} else if (dir->left->verifying_left) {
/* we're waiting to verify if our current left is still
working. if we don't receive a PONG, the current left
gets disconnected and a new left gets assigned. if we do
receive a PONG, we'll wait until the current left
disconnects us and then reassign the new left. */
return TRUE;
} else if (director_host_cmp_to_self(dir->left->host, conn->host,
dir->self_host) < 0) {
/* the old connection is the correct one.
refer the client there (FIXME: do we ever get here?) */
director_connection_send_connect(conn, dir->left->host);
return TRUE;
} else {
/* this new connection is the correct one, but wait until the
old connection gets disconnected before using this one.
that guarantees that the director inserting itself into
the ring has finished handshaking its left side, so the
switch will be fast. */
return TRUE;
}
dir->left = conn;
i_free(conn->name);
conn->name = i_strdup_printf("%s/left", conn->host->name);
director_connection_assigned(conn);
return TRUE;
}
static void director_assign_left(struct director *dir)
{
struct director_connection *conn, *const *connp;
array_foreach(&dir->connections, connp) {
conn = *connp;
if (conn->in && conn->handshake_received &&
conn->to_disconnect == NULL && conn != dir->left) {
/* either use this or disconnect it */
if (!director_connection_assign_left(conn)) {
/* we don't want this */
director_connection_deinit(&conn,
"Unwanted incoming connection");
director_assign_left(dir);
break;
}
}
}
}
static bool director_has_outgoing_connections(struct director *dir)
{
struct director_connection *const *connp;
array_foreach(&dir->connections, connp) {
if (!(*connp)->in && (*connp)->to_disconnect == NULL)
return TRUE;
}
return FALSE;
}
static void director_send_delayed_syncs(struct director *dir)
{
struct director_host *const *hostp;
i_assert(dir->right != NULL);
dir_debug("director(%s): Sending delayed SYNCs", dir->right->name);
array_foreach(&dir->dir_hosts, hostp) {
if ((*hostp)->delayed_sync_seq == 0)
continue;
director_sync_send(dir, *hostp, (*hostp)->delayed_sync_seq,
(*hostp)->delayed_sync_minor_version,
(*hostp)->delayed_sync_timestamp,
(*hostp)->delayed_sync_hosts_hash);
(*hostp)->delayed_sync_seq = 0;
}
}
static bool director_connection_assign_right(struct director_connection *conn)
{
struct director *dir = conn->dir;
i_assert(!conn->in);
if (dir->right != NULL) {
/* see if we should disconnect or keep the existing
connection. */
if (director_host_cmp_to_self(conn->host, dir->right->host,
dir->self_host) <= 0) {
/* the old connection is the correct one */
i_warning("Aborting incorrect outgoing connection to %s "
"(already connected to correct one: %s)",
conn->host->name, dir->right->host->name);
conn->wrong_host = TRUE;
return FALSE;
}
i_warning("Replacing right director connection %s with %s",
dir->right->host->name, conn->host->name);
director_connection_deinit(&dir->right, t_strdup_printf(
"Replacing with %s", conn->host->name));
}
dir->right = conn;
i_free(conn->name);
conn->name = i_strdup_printf("%s/right", conn->host->name);
director_connection_assigned(conn);
director_send_delayed_syncs(dir);
return TRUE;
}
static bool
director_args_parse_ip_port(struct director_connection *conn,
const char *const *args,
struct ip_addr *ip_r, in_port_t *port_r)
{
if (args[0] == NULL || args[1] == NULL) {
director_cmd_error(conn, "Missing IP+port parameters");
return FALSE;
}
if (net_addr2ip(args[0], ip_r) < 0) {
director_cmd_error(conn, "Invalid IP address: %s", args[0]);
return FALSE;
}
if (net_str2port(args[1], port_r) < 0) {
director_cmd_error(conn, "Invalid port: %s", args[1]);
return FALSE;
}
return TRUE;
}
static bool director_cmd_me(struct director_connection *conn,
const char *const *args)
{
struct director *dir = conn->dir;
const char *connect_str;
struct ip_addr ip;
in_port_t port;
time_t next_comm_attempt;
if (!director_args_parse_ip_port(conn, args, &ip, &port))
return FALSE;
if (conn->me_received) {
director_cmd_error(conn, "Duplicate ME");
return FALSE;
}
if (!conn->in && (!net_ip_compare(&conn->host->ip, &ip) ||
conn->host->port != port)) {
i_error("Remote director thinks it's someone else "
"(connected to %s:%u, remote says it's %s:%u)",
net_ip2addr(&conn->host->ip), conn->host->port,
net_ip2addr(&ip), port);
return FALSE;
}
conn->me_received = TRUE;
if (args[2] != NULL) {
time_t remote_time;
int diff;
if (str_to_time(args[2], &remote_time) < 0) {
director_cmd_error(conn, "Invalid ME timestamp");
return FALSE;
}
diff = ioloop_time - remote_time;
if (diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS ||
(diff < 0 && -diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS)) {
i_warning("Director %s clock differs from ours by %d secs",
conn->name, diff);
}
}
timeout_remove(&conn->to_ping);
if (conn->in) {
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS,
director_connection_init_timeout, conn);
} else {
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS,
director_connection_init_timeout, conn);
}
if (!conn->in)
return TRUE;
/* Incoming connection:
a) we don't have an established ring yet. make sure we're connecting
to our right side (which might become our left side).
b) it's our current "left" connection. the previous connection
is most likely dead.
c) we have an existing ring. tell our current "left" to connect to
it with CONNECT command.
d) the incoming connection doesn't belong to us at all, refer it
elsewhere with CONNECT. however, before disconnecting it verify
first that our left side is actually still functional.
*/
i_assert(conn->host == NULL);
conn->host = director_host_get(dir, &ip, port);
/* the host shouldn't be removed at this point, but if for some
reason it is we don't want to crash */
conn->host->removed = FALSE;
director_host_ref(conn->host);
/* make sure we don't keep old sequence values across restarts */
director_host_restarted(conn->host);
next_comm_attempt = conn->host->last_protocol_failure +
DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS;
if (next_comm_attempt > ioloop_time) {
/* the director recently sent invalid protocol data,
don't try retrying yet */
i_error("director(%s): Remote sent invalid protocol data recently, "
"waiting %u secs before allowing further communication",
conn->name, (unsigned int)(next_comm_attempt-ioloop_time));
return FALSE;
} else if (dir->left == NULL) {
/* a) - just in case the left is also our right side reset
its failed state, so we can connect to it */
conn->host->last_network_failure = 0;
if (!director_has_outgoing_connections(dir))
director_connect(dir);
} else if (dir->left->host == conn->host) {
/* b) */
i_assert(dir->left != conn);
director_connection_deinit(&dir->left,
"Replacing with new incoming connection");
} else if (director_host_cmp_to_self(conn->host, dir->left->host,
dir->self_host) < 0) {
/* c) */
connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
net_ip2addr(&conn->host->ip),
conn->host->port);
director_connection_send(dir->left, connect_str);
} else {
/* d) */
dir->left->verifying_left = TRUE;
director_connection_ping(dir->left);
}
return TRUE;
}
static bool
director_user_refresh(struct director_connection *conn,
unsigned int username_hash, struct mail_host *host,
time_t timestamp, bool weak, bool *forced_r,
struct user **user_r)
{
struct director *dir = conn->dir;
struct user *user;
bool ret = FALSE, unset_weak_user = FALSE;
struct user_directory *users = host->tag->users;
*forced_r = FALSE;
user = user_directory_lookup(users, username_hash);
if (user == NULL) {
*user_r = user_directory_add(users, username_hash,
host, timestamp);
(*user_r)->weak = weak;
dir_debug("user refresh: %u added", username_hash);
return TRUE;
}
if (user->weak) {
if (!weak) {
/* removing user's weakness */
dir_debug("user refresh: %u weakness removed",
username_hash);
unset_weak_user = TRUE;
user->weak = FALSE;
ret = TRUE;
} else {
/* weak user marked again as weak */
}
} else if (weak &&
!user_directory_user_is_recently_updated(users, user)) {
/* mark the user as weak */
dir_debug("user refresh: %u set weak", username_hash);
user->weak = TRUE;
ret = TRUE;
} else if (weak) {
dir_debug("user refresh: %u weak update to %s ignored, "
"we recently changed it to %s",
username_hash, net_ip2addr(&host->ip),
net_ip2addr(&user->host->ip));
host = user->host;
ret = TRUE;
} else if (user->host == host) {
/* update to the same host */
} else if (user_directory_user_is_near_expiring(users, user)) {
/* host conflict for a user that is already near expiring. we can
assume that the other director had already dropped this user
and we should have as well. use the new host. */
dir_debug("user refresh: %u is nearly expired, "
"replacing host %s with %s", username_hash,
net_ip2addr(&user->host->ip), net_ip2addr(&host->ip));
ret = TRUE;
} else if (USER_IS_BEING_KILLED(user)) {
/* user is still being moved - ignore conflicting host updates
from other directors who don't yet know about the move. */
dir_debug("user refresh: %u is being moved, "
"preserve its host %s instead of replacing with %s",
username_hash, net_ip2addr(&user->host->ip),
net_ip2addr(&host->ip));
host = user->host;
} else {
/* non-weak user received a non-weak update with
conflicting host. this shouldn't happen. */
string_t *str = t_str_new(128);
str_printfa(str, "User hash %u "
"is being redirected to two hosts: %s and %s",
username_hash, net_ip2addr(&user->host->ip),
net_ip2addr(&host->ip));
str_printfa(str, " (old_ts=%ld", (long)user->timestamp);
if (!conn->handshake_received) {
str_printfa(str, ",handshaking,recv_ts=%ld",
(long)timestamp);
}
if (USER_IS_BEING_KILLED(user)) {
if (user->kill_ctx->to_move != NULL)
str_append(str, ",moving");
str_printfa(str, ",kill_state=%s",
user_kill_state_names[user->kill_ctx->kill_state]);
}
str_append_c(str, ')');
i_error("%s", str_c(str));
/* we want all the directors to redirect the user to same
server, but we don't want two directors fighting over which
server it belongs to, so always use the lower IP address */
if (net_ip_cmp(&user->host->ip, &host->ip) > 0) {
/* change the host. we'll also need to remove the user
from the old host's user_count, because we can't
keep track of the user for more than one host.
send the updated USER back to the sender as well. */
*forced_r = TRUE;
} else {
/* keep the host */
host = user->host;
}
/* especially IMAP connections can take a long time to die.
make sure we kill off the connections in the wrong
backends. */
director_kick_user_hash(dir, dir->self_host, NULL,
username_hash, &host->ip);
ret = TRUE;
}
if (user->host != host) {
user->host->user_count--;
user->host = host;
user->host->user_count++;
ret = TRUE;
}
if (timestamp == ioloop_time && (time_t)user->timestamp != timestamp) {
user_directory_refresh(users, user);
ret = TRUE;
}
dir_debug("user refresh: %u refreshed timeout to %ld",
username_hash, (long)user->timestamp);
if (unset_weak_user) {
/* user is no longer weak. handle pending requests for
this user if there are any */
director_set_state_changed(conn->dir);
}
*user_r = user;
return ret;
}
static bool
director_handshake_cmd_user(struct director_connection *conn,
const char *const *args)
{
unsigned int username_hash, timestamp;
struct ip_addr ip;
struct mail_host *host;
struct user *user;
bool weak, forced;
if (str_array_length(args) < 3 ||
str_to_uint(args[0], &username_hash) < 0 ||
net_addr2ip(args[1], &ip) < 0 ||
str_to_uint(args[2], ×tamp) < 0) {
director_cmd_error(conn, "Invalid parameters");
return FALSE;
}
weak = args[3] != NULL && args[3][0] == 'w';
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
i_error("director(%s): USER used unknown host %s in handshake",
conn->name, args[1]);
return FALSE;
}
(void)director_user_refresh(conn, username_hash, host,
timestamp, weak, &forced, &user);
if (user->timestamp < timestamp) {
conn->users_unsorted = TRUE;
user->timestamp = timestamp;
}
return TRUE;
}
static bool
director_cmd_user(struct director_connection *conn,
const char *const *args)
{
unsigned int username_hash;
struct ip_addr ip;
struct mail_host *host;
struct user *user;
bool forced;
/* NOTE: if more parameters are added, update also
CMD_IS_USER_HANDHAKE() macro */
if (str_array_length(args) != 2 ||
str_to_uint(args[0], &username_hash) < 0 ||
net_addr2ip(args[1], &ip) < 0) {
director_cmd_error(conn, "Invalid parameters");
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
/* we probably just removed this host. */
return TRUE;
}
if (director_user_refresh(conn, username_hash,
host, ioloop_time, FALSE, &forced, &user)) {
struct director_host *src_host =
forced ? conn->dir->self_host : conn->host;
i_assert(!user->weak);
director_update_user(conn->dir, src_host, user);
}
return TRUE;
}
static bool director_cmd_director(struct director_connection *conn,
const char *const *args)
{
struct director_host *host;
struct ip_addr ip;
in_port_t port;
if (!director_args_parse_ip_port(conn, args, &ip, &port))
return FALSE;
host = director_host_lookup(conn->dir, &ip, port);
if (host != NULL) {
if (host == conn->dir->self_host) {
/* ignore updates to ourself */
return TRUE;
}
if (host->removed) {
/* ignore re-adds of removed directors */
return TRUE;
}
/* already have this. just reset its last_network_failure
timestamp, since it might be up now. */
host->last_network_failure = 0;
/* it also may have been restarted, reset its state */
director_host_restarted(host);
} else {
/* save the director and forward it */
host = director_host_add(conn->dir, &ip, port);
}
/* just forward this to the entire ring until it reaches back to
itself. some hosts may see this twice, but that's the only way to
guarantee that it gets seen by everyone. reseting the host multiple
times may cause us to handle its commands multiple times, but the
commands can handle that. however, we need to also handle a
situation where the added director never comes back - we don't want
to send the director information in a loop forever. */
if (conn->dir->right != NULL &&
director_host_cmp_to_self(host, conn->dir->right->host,
conn->dir->self_host) > 0) {
dir_debug("Received DIRECTOR update for a host where we should be connected to. "
"Not forwarding it since it's probably crashed.");
} else {
director_notify_ring_added(host,
director_connection_get_host(conn));
}
return TRUE;
}
static bool director_cmd_director_remove(struct director_connection *conn,
const char *const *args)
{
struct director_host *host;
struct ip_addr ip;
in_port_t port;
if (!director_args_parse_ip_port(conn, args, &ip, &port))
return FALSE;
host = director_host_lookup(conn->dir, &ip, port);
if (host != NULL && !host->removed)
director_ring_remove(host, director_connection_get_host(conn));
return TRUE;
}
static bool
director_cmd_host_hand_start(struct director_connection *conn,
const char *const *args)
{
const ARRAY_TYPE(mail_host) *hosts;
struct mail_host *const *hostp;
unsigned int remote_ring_completed;
if (args[0] == NULL ||
str_to_uint(args[0], &remote_ring_completed) < 0) {
director_cmd_error(conn, "Invalid parameters");
return FALSE;
}
if (remote_ring_completed && !conn->dir->ring_handshaked) {
/* clear everything we have and use only what remote sends us */
dir_debug("%s: We're joining a ring - replace all hosts",
conn->name);
hosts = mail_hosts_get(conn->dir->mail_hosts);
while (array_count(hosts) > 0) {
hostp = array_idx(hosts, 0);
director_remove_host(conn->dir, NULL, NULL, *hostp);
}
} else if (!remote_ring_completed && conn->dir->ring_handshaked) {
/* ignore whatever remote sends */
dir_debug("%s: Remote is joining our ring - "
"ignore all remote HOSTs", conn->name);
conn->ignore_host_events = TRUE;
} else {
dir_debug("%s: Merge rings' hosts", conn->name);
}
conn->handshake_sending_hosts = TRUE;
return TRUE;
}
static int
director_cmd_is_seen_full(struct director_connection *conn,
const char *const **_args, unsigned int *seq_r,
struct director_host **host_r)
{
const char *const *args = *_args;
struct ip_addr ip;
in_port_t port;
unsigned int seq;
struct director_host *host;
if (str_array_length(args) < 3 ||
net_addr2ip(args[0], &ip) < 0 ||
net_str2port(args[1], &port) < 0 ||
str_to_uint(args[2], &seq) < 0) {
director_cmd_error(conn, "Invalid parameters");
return -1;
}
*_args = args + 3;
*seq_r = seq;
host = director_host_lookup(conn->dir, &ip, port);
if (host == NULL || host->removed) {
/* director is already gone, but we can't be sure if this
command was sent everywhere. re-send it as if it was from
ourself. */
*host_r = NULL;
} else {
*host_r = host;
if (seq <= host->last_seq) {
/* already seen this */
return 1;
}
host->last_seq = seq;
}
return 0;
}
static int
director_cmd_is_seen(struct director_connection *conn,
const char *const **_args,
struct director_host **host_r)
{
unsigned int seq;
return director_cmd_is_seen_full(conn, _args, &seq, host_r);
}
static bool
director_cmd_user_weak(struct director_connection *conn,
const char *const *args)
{
struct director_host *dir_host;
struct ip_addr ip;
unsigned int username_hash;
struct mail_host *host;
struct user *user;
struct director_host *src_host = conn->host;
bool weak = TRUE, weak_forward = FALSE, forced;
int ret;
/* note that unlike other commands we don't want to just ignore
duplicate commands */
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) < 0)
return FALSE;
if (str_array_length(args) != 2 ||
str_to_uint(args[0], &username_hash) < 0 ||
net_addr2ip(args[1], &ip) < 0) {
director_cmd_error(conn, "Invalid parameters");
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
/* we probably just removed this host. */
return TRUE;
}
if (ret == 0) {
/* First time we're seeing this - forward it to others also.
We'll want to do it even if the user was already marked as
weak, because otherwise if two directors mark the user weak
at the same time both the USER-WEAK notifications reach
only half the directors until they collide and neither one
finishes going through the whole ring marking the user
non-weak. */
weak_forward = TRUE;
} else if (dir_host == conn->dir->self_host) {
/* We originated this USER-WEAK request. The entire ring has seen
it and there weren't any conflicts. Make the user non-weak. */
dir_debug("user refresh: %u Our USER-WEAK seen by the entire ring",
username_hash);
src_host = conn->dir->self_host;
weak = FALSE;
} else {
/* The original USER-WEAK sender will send a new non-weak USER
update saying what really happened. We'll still need to forward
this around the ring to the origin so it also knows it has
travelled through the ring. */
dir_debug("user refresh: %u Remote USER-WEAK from %s seen by the entire ring, ignoring",
username_hash, net_ip2addr(&dir_host->ip));
weak_forward = TRUE;
}
if (director_user_refresh(conn, username_hash,
host, ioloop_time, weak, &forced, &user) ||
weak_forward) {
if (forced)
src_host = conn->dir->self_host;
if (!user->weak)
director_update_user(conn->dir, src_host, user);
else {
director_update_user_weak(conn->dir, src_host, conn,
dir_host, user);
}
}
return TRUE;
}
static bool ATTR_NULL(3)
director_cmd_host_int(struct director_connection *conn, const char *const *args,
struct director_host *dir_host)
{
struct director_host *src_host = conn->host;
struct mail_host *host;
struct ip_addr ip;
const char *tag = "", *host_tag, *hostname = NULL;
unsigned int arg_count, vhost_count;
bool update, down = FALSE;
time_t last_updown_change = 0;
arg_count = str_array_length(args);
if (arg_count < 2 ||
net_addr2ip(args[0], &ip) < 0 ||
str_to_uint(args[1], &vhost_count) < 0) {
director_cmd_error(conn, "Invalid parameters");
return FALSE;
}
if (arg_count >= 3)
tag = args[2];
if (arg_count >= 4) {
if ((args[3][0] != 'D' && args[3][0] != 'U') ||
str_to_time(args[3]+1, &last_updown_change) < 0) {
director_cmd_error(conn, "Invalid updown parameters");
return FALSE;
}
down = args[3][0] == 'D';
}
if (arg_count >= 5)
hostname = args[4];
if (conn->ignore_host_events) {
/* remote is sending hosts in a handshake, but it doesn't have
a completed ring and we do. */
i_assert(conn->handshake_sending_hosts);
return TRUE;
}
if (tag[0] != '\0' && conn->minor_version < DIRECTOR_VERSION_TAGS_V2) {
director_cmd_error(conn, "Received a host tag from older director version with incompatible tagging support");
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
host = mail_host_add_hostname(conn->dir->mail_hosts,
hostname, &ip, tag);
update = TRUE;
} else {
update = host->vhost_count != vhost_count ||
host->down != down;
host_tag = mail_host_get_tag(host);
if (strcmp(tag, host_tag) != 0) {
i_error("director(%s): Host %s changed tag from '%s' to '%s'",
conn->name, net_ip2addr(&host->ip),
host_tag, tag);
mail_host_set_tag(host, tag);
update = TRUE;
}
if (update && host->desynced) {
string_t *str = t_str_new(128);
str_printfa(str, "director(%s): Host %s is being updated before previous update had finished (",
conn->name, net_ip2addr(&host->ip));
if (host->down != down &&
host->last_updown_change > last_updown_change) {
/* our host has a newer change. preserve it. */
down = host->down;
}
if (host->down != down) {
if (host->down)
str_append(str, "down -> up");
else
str_append(str, "up -> down");
}
if (host->vhost_count != vhost_count) {