-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
skynet_multicast.c
237 lines (210 loc) · 5.23 KB
/
skynet_multicast.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#include "skynet.h"
#include "skynet_multicast.h"
#include "skynet_server.h"
#include "skynet_handle.h"
#include <stdlib.h>
#include <string.h>
struct skynet_multicast_message {
int ref;
const void * msg;
size_t sz;
uint32_t source;
};
struct skynet_multicast_message *
skynet_multicast_create(const void * msg, size_t sz, uint32_t source) {
struct skynet_multicast_message * mc = malloc(sizeof(*mc));
mc->ref = 0;
mc->msg = msg;
mc->sz = sz;
mc->source = source;
return mc;
}
void
skynet_multicast_copy(struct skynet_multicast_message *mc, int copy) {
int r = __sync_add_and_fetch(&mc->ref, copy);
if (r == 0) {
free((void *)mc->msg);
free(mc);
}
}
void
skynet_multicast_dispatch(struct skynet_multicast_message * msg, void * ud, skynet_multicast_func func) {
if (func) {
func(ud, msg->source, msg->msg, msg->sz);
}
int ref = __sync_sub_and_fetch(&msg->ref, 1);
if (ref == 0) {
free((void *)msg->msg);
free(msg);
}
}
struct array {
int cap;
int number;
uint32_t *data;
};
struct skynet_multicast_group {
struct array enter_queue;
struct array leave_queue;
int cap;
int number;
uint32_t * data;
};
struct skynet_multicast_group *
skynet_multicast_newgroup() {
struct skynet_multicast_group * g = malloc(sizeof(*g));
memset(g,0,sizeof(*g));
return g;
}
void
skynet_multicast_deletegroup(struct skynet_multicast_group * g) {
free(g->data);
free(g->enter_queue.data);
free(g->leave_queue.data);
free(g);
}
static void
push_array(struct array * a, uint32_t v) {
if (a->number >= a->cap) {
a->cap *= 2;
if (a->cap == 0) {
a->cap = 4;
}
a->data = realloc(a->data, a->cap * sizeof(uint32_t));
}
a->data[a->number++] = v;
}
void
skynet_multicast_entergroup(struct skynet_multicast_group * group, uint32_t handle) {
push_array(&group->enter_queue, handle);
}
void
skynet_multicast_leavegroup(struct skynet_multicast_group * group, uint32_t handle) {
push_array(&group->leave_queue, handle);
}
static int
compar_uint(const void *a, const void *b) {
const uint32_t * aa = a;
const uint32_t * bb = b;
return (int)(*aa - *bb);
}
static void
combine_queue(struct skynet_context * from, struct skynet_multicast_group * group) {
qsort(group->enter_queue.data, group->enter_queue.number, sizeof(uint32_t), compar_uint);
qsort(group->leave_queue.data, group->leave_queue.number, sizeof(uint32_t), compar_uint);
int i;
int enter = group->enter_queue.number;
uint32_t last = 0;
int new_size = group->number + enter;
if (new_size > group->cap) {
group->data = realloc(group->data, new_size * sizeof(uint32_t));
group->cap = new_size;
}
// combine enter queue
int old_index = group->number - 1;
int new_index = new_size - 1;
for (i= enter - 1;i >=0 ; i--) {
uint32_t handle = group->enter_queue.data[i];
if (handle == last)
continue;
last = handle;
if (old_index < 0) {
group->data[new_index] = handle;
} else {
uint32_t p = group->data[old_index];
if (handle == p)
continue;
if (handle > p) {
group->data[new_index] = handle;
} else {
group->data[new_index] = group->data[old_index];
--old_index;
last = 0;
++i;
}
}
--new_index;
}
while (old_index >= 0) {
group->data[new_index] = group->data[old_index];
--old_index;
--new_index;
}
group->enter_queue.number = 0;
// remove leave queue
old_index = new_index + 1;
new_index = 0;
int count = new_size - old_index;
int leave = group->leave_queue.number;
for (i=0;i<leave;i++) {
if (old_index >= new_size) {
count = 0;
break;
}
uint32_t handle = group->leave_queue.data[i];
uint32_t p = group->data[old_index];
if (handle == p) {
--count;
++old_index;
} else if ( handle > p) {
group->data[new_index] = group->data[old_index];
++new_index;
++old_index;
--i;
} else {
skynet_error(from, "Try to remove a none exist handle : %x", handle);
}
}
while (new_index < count) {
group->data[new_index] = group->data[old_index];
++new_index;
++old_index;
}
group->leave_queue.number = 0;
group->number = new_index;
}
int
skynet_multicast_castgroup(struct skynet_context * from, struct skynet_multicast_group * group, struct skynet_multicast_message *msg) {
combine_queue(from, group);
int release = 0;
if (group->number > 0) {
uint32_t source = skynet_context_handle(from);
skynet_multicast_copy(msg, group->number);
int i;
for (i=0;i<group->number;i++) {
uint32_t p = group->data[i];
struct skynet_context * ctx = skynet_handle_grab(p);
if (ctx) {
skynet_context_send(ctx, msg, 0 , source, PTYPE_MULTICAST , 0);
skynet_context_release(ctx);
} else {
skynet_multicast_leavegroup(group, p);
++release;
}
}
}
skynet_multicast_copy(msg, -release);
return group->number - release;
}
void
skynet_multicast_cast(struct skynet_context * from, struct skynet_multicast_message *msg, const uint32_t *dests, int n) {
uint32_t source = skynet_context_handle(from);
skynet_multicast_copy(msg, n);
if (n == 0)
return;
int i;
int release = 0;
for (i=0;i<n;i++) {
uint32_t p = dests[i];
struct skynet_context * ctx = skynet_handle_grab(p);
if (ctx) {
skynet_context_send(ctx, msg, 0 , source, PTYPE_MULTICAST , 0);
skynet_context_release(ctx);
} else {
++release;
}
}
if (release != 0) {
skynet_multicast_copy(msg, -release);
}
}