Skip to content

Commit a3cf047

Browse files
committed
Fixed bugs in sync_timer.c and sync_waiter.c when multiple fibers in the same thread to wake up others.
1 parent 4c821fd commit a3cf047

File tree

7 files changed

+53
-18
lines changed

7 files changed

+53
-18
lines changed

lib_fiber/c/src/common/mbox.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ void mbox_free(MBOX *mbox, void (*free_fn)(void*))
105105
}
106106

107107
int mbox_send(MBOX *mbox, void *msg)
108+
{
109+
return mbox_send2(mbox, mbox->out, msg);
110+
}
111+
112+
int mbox_send2(MBOX *mbox, socket_t out, void *msg)
108113
{
109114
int ret;
110115
long long n = 1;
@@ -134,9 +139,9 @@ int mbox_send(MBOX *mbox, void *msg)
134139
mbox->nsend++;
135140

136141
#if defined(_WIN32) || defined(_WIN64)
137-
ret = (int) acl_fiber_send(mbox->out, (const char*) &n, (int) sizeof(n), 0);
142+
ret = (int) acl_fiber_send(out, (const char*) &n, (int) sizeof(n), 0);
138143
#else
139-
ret = (int) acl_fiber_write(mbox->out, &n, sizeof(n));
144+
ret = (int) acl_fiber_write(out, &n, sizeof(n));
140145
#endif
141146

142147
#if !defined(HAS_EVENTFD)
@@ -147,7 +152,7 @@ int mbox_send(MBOX *mbox, void *msg)
147152

148153
if (ret == -1) {
149154
msg_error("%s(%d), %s: mbox write %d error %s", __FILE__,
150-
__LINE__, __FUNCTION__, mbox->out, last_serror());
155+
__LINE__, __FUNCTION__, (int) out, last_serror());
151156
return -1;
152157
}
153158

lib_fiber/c/src/common/mbox.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ void mbox_free(MBOX *mbox, void (*free_fn)(void*));
2727
* @return {int} 发送成功返回 0,否则返回 -1
2828
*/
2929
int mbox_send(MBOX *mbox, void *msg);
30+
int mbox_send2(MBOX *mbox, socket_t out, void *msg);
3031

3132
/**
3233
* 从消息队列中读取消息

lib_fiber/c/src/event.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ struct FILE_EVENT {
130130
#define STATUS_WRITEWAIT (unsigned) (1 << 6) // Wait for Writable
131131
#define STATUS_CLOSING (unsigned) (1 << 7) // In closing status
132132
#define STATUS_CLOSED (unsigned) (1 << 8) // In closed status
133+
#define STATUS_BUFFED (unsigned) (1 << 9)
133134

134135
#define SET_CONNECTING(x) ((x)->status |= STATUS_CONNECTING)
135136
#define SET_READABLE(x) ((x)->status |= STATUS_READABLE)

lib_fiber/c/src/fiber_io.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ void fiber_timer_add(ACL_FIBER *fb, size_t milliseconds)
178178
long long now = event_get_stamp(ev);
179179
TIMER_CACHE_NODE *timer;
180180

181-
fb->when = now + (ssize_t) milliseconds;
181+
fb->when = now + (long long) milliseconds;
182182
ring_detach(&fb->me); // Detach the previous binding.
183183
timer_cache_add(__thread_fiber->ev_timer, fb->when, &fb->me);
184184

@@ -849,6 +849,7 @@ FILE_EVENT *fiber_file_cache_get(socket_t fd)
849849
fe = file_event_alloc(fd);
850850
} else {
851851
file_event_init(fe, fd);
852+
fe->status &= ~STATUS_BUFFED;
852853
}
853854

854855
#ifdef HAS_IO_URING
@@ -866,7 +867,10 @@ void fiber_file_cache_put(FILE_EVENT *fe)
866867
fe->fd = INVALID_SOCKET;
867868

868869
if (array_size(__thread_fiber->cache) < __thread_fiber->cache_max) {
869-
array_push_back(__thread_fiber->cache, fe);
870+
if (!(fe->status & STATUS_BUFFED)) {
871+
array_push_back(__thread_fiber->cache, fe);
872+
fe->status |= STATUS_BUFFED;
873+
}
870874
} else {
871875
file_event_unrefer(fe);
872876
}

lib_fiber/c/src/sync/fiber_cond.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ static int fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
144144

145145
WAITER_INC(ev);
146146

147-
// Hang the current fiber and will wakeup if the timer arrives or
147+
// Hang the current fiber and will wake up if the timer arrives or
148148
// be awakened by the other fiber or thread.
149149
acl_fiber_switch();
150150

lib_fiber/c/src/sync/sync_timer.c

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ struct SYNC_TIMER {
1111
ACL_FIBER *fb;
1212
MBOX *box;
1313
int stop;
14-
TIMER_CACHE *waiters;
14+
// TIMER_CACHE *waiters;
1515
long tid;
1616
};
1717

@@ -23,7 +23,7 @@ static SYNC_TIMER *sync_timer_new(void)
2323

2424
pthread_mutex_init(&timer->lock, NULL);
2525
timer->box = mbox_create(MBOX_T_MPSC);
26-
timer->waiters = timer_cache_create();
26+
// timer->waiters = timer_cache_create();
2727
timer->tid = thread_self();
2828

2929
out = mbox_out(timer->box);
@@ -47,7 +47,7 @@ static void sync_timer_free(SYNC_TIMER *timer)
4747
{
4848
pthread_mutex_destroy(&timer->lock);
4949
mbox_free(timer->box, NULL);
50-
timer_cache_free(timer->waiters);
50+
// timer_cache_free(timer->waiters);
5151
mem_free(timer);
5252
}
5353

@@ -69,7 +69,7 @@ static void thread_init(void)
6969

7070
static void wakeup_waiter(SYNC_TIMER *timer UNUSED, SYNC_OBJ *obj)
7171
{
72-
// The fiber must has been awakened by the other fiber or thread.
72+
// The fiber must have been awakened by the other fiber or thread.
7373

7474
if (obj->delay < 0) {
7575
// No timer has been set if delay < 0,
@@ -117,9 +117,11 @@ static void fiber_waiting(ACL_FIBER *fiber fiber_unused, void *ctx)
117117
sync_obj_unrefer(obj);
118118
mem_free(msg);
119119

120+
/*
120121
if (timer_cache_size(timer->waiters) == 0) {
121122
delay = -1;
122123
}
124+
*/
123125
}
124126
}
125127

@@ -160,16 +162,35 @@ void sync_timer_wakeup(SYNC_TIMER *timer, SYNC_OBJ *obj)
160162
// message with the temporary FILE_EVENT.
161163

162164
SYNC_MSG *msg = (SYNC_MSG*) mem_malloc(sizeof(SYNC_MSG));
165+
socket_t out, out2 = INVALID_SOCKET;
166+
FILE_EVENT *fe;
167+
163168
msg->obj = obj;
164169
msg->action = SYNC_ACTION_WAKEUP;
165-
socket_t out = mbox_out(timer->box);
166-
FILE_EVENT *fe = fiber_file_cache_get(out);
170+
sync_obj_refer(obj);
167171

168-
fe->mask |= EVENT_SYSIO;
172+
out = mbox_out(timer->box);
173+
174+
// Check if the out fd has been bound by the other fiber that
175+
// it was yield when calling acl_fiber_write in mbox_send2,
176+
// if so, we should duplicate another out fd to bind the new one.
177+
fe = fiber_file_get(out);
178+
if (fe) {
179+
out2 = dup(out);
180+
msg_warn("%s(%d): fe exists for out=%d, dup's out2=%d",
181+
__FUNCTION__, __LINE__, (int) out, (int) out2);
182+
fe = fiber_file_cache_get(out2);
183+
} else {
184+
fe = fiber_file_cache_get(out);
185+
}
169186

170-
sync_obj_refer(obj);
171-
mbox_send(timer->box, msg);
187+
fe->mask |= EVENT_SYSIO;
188+
mbox_send2(timer->box, out2 != INVALID_SOCKET ? out2 : out, msg);
172189
fiber_file_cache_put(fe);
190+
191+
if (out2 != INVALID_SOCKET) {
192+
acl_fiber_close(out2);
193+
}
173194
} else {
174195
// If the current notifier is a fiber in the same thread with
175196
// the one to be awakened, just wakeup it directly.

lib_fiber/c/src/sync/sync_waiter.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ SYNC_WAITER *sync_waiter_get(void)
9797

9898
void sync_waiter_wakeup(SYNC_WAITER *waiter, ACL_FIBER *fb)
9999
{
100-
if (var_hook_sys_api) {
100+
if (!var_hook_sys_api) {
101+
mbox_send(waiter->box, fb);
102+
} else {
101103
// When using io_uring, we should call the system API of write
102104
// to send data, because the fd is shared by multiple threads
103105
// and which can't use io_uring directly, so we set the mask
@@ -123,6 +125,8 @@ void sync_waiter_wakeup(SYNC_WAITER *waiter, ACL_FIBER *fb)
123125
assert(fe->mbox_wsem);
124126
}
125127

128+
file_event_refer(fe);
129+
126130
// Reduce the sem number and maybe be suspended if sem is 0.
127131
acl_fiber_sem_wait(fe->mbox_wsem);
128132

@@ -136,7 +140,6 @@ void sync_waiter_wakeup(SYNC_WAITER *waiter, ACL_FIBER *fb)
136140
if (acl_fiber_sem_post(fe->mbox_wsem) == 1) {
137141
fiber_file_cache_put(fe);
138142
}
139-
} else {
140-
mbox_send(waiter->box, fb);
143+
file_event_unrefer(fe);
141144
}
142145
}

0 commit comments

Comments
 (0)