Skip to content

Commit

Permalink
StatsWatcher, bugfix: fixup unexpected thread holding occured on call…
Browse files Browse the repository at this point in the history
…ing `fs.watchFile`. (#624)
  • Loading branch information
richardo2016 committed Nov 26, 2020
1 parent 8905dcb commit 1c1930f
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 105 deletions.
162 changes: 77 additions & 85 deletions fibjs/include/StatsWatcher.h
Expand Up @@ -67,15 +67,6 @@ class StatsWatcher : public StatsWatcher_base {
EVENT_FUNC(error);

public:
static bool isTargetBeingWatched(exlib::string& target)
{
s_TargetWatcherMapLock.lock();
bool watching = (bool)s_TargetWatcherMap.count(target);
s_TargetWatcherMapLock.unlock();

return watching;
}

static bool setTargetWatcher(exlib::string& target, StatsWatcher* watcher)
{
s_TargetWatcherMapLock.lock();
Expand All @@ -102,88 +93,69 @@ class StatsWatcher : public StatsWatcher_base {

static void removeTargetWatcher(exlib::string& target)
{
if (!isTargetBeingWatched(target))
return;
s_TargetWatcherMapLock.lock();

s_TargetWatcherMap.erase(target);
if ((bool)s_TargetWatcherMap.count(target))
s_TargetWatcherMap.erase(target);

s_TargetWatcherMapLock.unlock();
}

public:
class AsyncMonitorStatsChangeProc : public AsyncUVTask {
public:
AsyncMonitorStatsChangeProc(StatsWatcher* watcher)
: m_watcher(watcher)
{
}
~AsyncMonitorStatsChangeProc()
{
}

public:
virtual int32_t post(int32_t v);

virtual void invoke();

static void freeSelfAfterUVHandleStop(uv_handle_t* handle)
{
StatsWatcher* proc = (StatsWatcher*)(handle->data);
delete proc;
}
void onError(result_t hr, const char* msg)
{
_emit("error", new EventInfo(this, "error", hr, msg));
}

public:
StatsWatcher* m_watcher;
uv_timer_t m_timer_req;
public:
static void timer_callback(uv_timer_t* timer_req)
{
StatsWatcher* pThis = NULL;
pThis = container_of(timer_req, StatsWatcher, m_timer_req);

private:
static void timer_callback(uv_timer_t* timer_req)
{
AsyncMonitorStatsChangeProc* p = NULL;
p = ((AsyncMonitorStatsChangeProc*)((intptr_t)timer_req - (intptr_t)&p->m_timer_req));
assert(&pThis->m_timer_req == timer_req);

assert(&p->m_timer_req == timer_req);
if (!pThis)
return;

if (!p)
return;
uv_handle_t* handle = (uv_handle_t*)timer_req;
if (pThis->m_closed) {
handle->data = (void*)pThis;

uv_handle_t* handle = (uv_handle_t*)timer_req;
if (p->m_watcher->m_closed) {
handle->data = (void*)p;
uv_close(handle, AsyncMonitorStatsChangeProc::freeSelfAfterUVHandleStop);
} else
p->m_watcher->checkStatsChangeInUVThread();
uv_close(handle, NULL);
} else {
pThis->checkStatsChangeOnTimerCb();
}
};

public:
void onError(result_t hr, const char* msg)
{
_emit("error", new EventInfo(this, "error", hr, msg));
}

public:
void start()
result_t start()
{
if (m_closed)
return;
return 0;

m_vholder = new ValueHolder(wrap());

if (m_Persistent)
isolate_ref();

asyncCall(startWatchInNativeThread, this);
watcherReadyWaitor.wait();
};
int ret = uv_call([&] {
uv_timer_init(s_uv_loop, &m_timer_req);
int32_t uv_err_no = uv_timer_start(&m_timer_req, timer_callback, 0, getIntervalMS());
if (uv_err_no != 0) {
onError(CALL_E_INVALID_CALL, uv_strerror(uv_err_no));
close();

static result_t startWatchInNativeThread(StatsWatcher* watcher)
{
(new AsyncMonitorStatsChangeProc(watcher))->post(0);
return uv_err_no;
}

return 0;
});

if (ret != 0)
return CHECK_ERROR(Runtime::setError(uv_strerror(ret)));

return 0;
}
};

public:
void bindChangeHandler(v8::Local<v8::Function> callback)
Expand Down Expand Up @@ -254,41 +226,59 @@ class StatsWatcher : public StatsWatcher_base {
};

public:
result_t checkStatsChangeInUVThread()
static result_t asyncQueryStat(StatsWatcher* m_pThis)
{
Isolate* isolate = holder();
if (m_pThis->m_closed)
return 0;

result_t hr;
bool existed;
bool notReady = !watcherReadyWaitor.isSet();
if (notReady)
watcherReadyWaitor.set();
bool ready = m_pThis->m_bStarted;
if (!ready)
m_pThis->m_bStarted = true;

hr = fs_base::cc_exists(m_target, existed, isolate);
if (hr < 0)
return hr;
hr = fs_base::cc_exists(m_pThis->m_target, existed, m_pThis->holder());
if (hr < 0) {
return CHECK_ERROR(hr);
}

if (existed) {
obj_ptr<Stat_base> fileStat;
fs_base::cc_stat(m_target, fileStat, isolate);
hr = fs_base::cc_stat(m_pThis->m_target, fileStat, m_pThis->holder());

prev = cur;
cur = fileStat;
if (hr < 0) {
return CHECK_ERROR(hr);
}

m_pThis->prev = m_pThis->cur;
m_pThis->cur = fileStat;

double pt;
prev->get_mtimeMs(pt);
m_pThis->prev->get_mtimeMs(pt);
double ct;
cur->get_mtimeMs(ct);
if (pt == ct)
m_pThis->cur->get_mtimeMs(ct);

if (pt == ct) {
return 0;
} else if (!notReady)
}
} else if (ready)
return 0;

Variant args[2];
args[0] = cur;
args[1] = prev;
args[0] = m_pThis->cur;
args[1] = m_pThis->prev;

m_pThis->_emit("change", args, 2);

return 0;
}

_emit("change", args, 2);
result_t checkStatsChangeOnTimerCb()
{
if (m_closed)
return 0;

asyncCall(asyncQueryStat, this);

return 0;
}
Expand All @@ -299,8 +289,7 @@ class StatsWatcher : public StatsWatcher_base {
bool isUseBigInt() { return m_useBigInt; }

exlib::atomic m_closed;

exlib::Event watcherReadyWaitor;
exlib::atomic m_bStarted;

protected:
exlib::string m_target;
Expand All @@ -311,6 +300,9 @@ class StatsWatcher : public StatsWatcher_base {
obj_ptr<ValueHolder> m_vholder;
obj_ptr<Stat_base> prev;
obj_ptr<Stat_base> cur;

private:
uv_timer_t m_timer_req;
};
}

Expand Down
18 changes: 0 additions & 18 deletions fibjs/src/fs/AsyncFSIO.cpp
Expand Up @@ -35,24 +35,6 @@ void FSWatcher::AsyncWatchFSProc::invoke()
}
}

int32_t StatsWatcher::AsyncMonitorStatsChangeProc::post(int32_t v)
{
s_uvAsyncUVTasks.putTail(this);
uv_async_send(&s_uv_asyncWatcher);

return 0;
}

void StatsWatcher::AsyncMonitorStatsChangeProc::invoke()
{
uv_timer_init(s_uv_loop, &m_timer_req);
int32_t uv_err_no = uv_timer_start(&m_timer_req, timer_callback, 0, m_watcher->getIntervalMS());
if (uv_err_no != 0) {
m_watcher->onError(CALL_E_INVALID_CALL, uv_strerror(uv_err_no));
m_watcher->close();
}
}

int uv_call(std::function<int(void)> proc)
{
class UVCall : public AsyncEvent {
Expand Down
7 changes: 5 additions & 2 deletions fibjs/src/fs/fs.cpp
Expand Up @@ -634,14 +634,17 @@ result_t fs_base::watchFile(exlib::string fname, v8::Local<v8::Object> options,
pSW = new StatsWatcher(safe_name, persistent, interval, useBigInt);
retVal = pSW;

pSW->start();
hr = pSW->start();

if (hr < 0)
return hr;
}

pSW->bindChangeHandler(callback);

retVal = pSW;

return 0;
return hr;
}

result_t fs_base::unwatchFile(exlib::string fname)
Expand Down
3 changes: 3 additions & 0 deletions test/fswatch_test.js
Expand Up @@ -457,8 +457,10 @@ describe('fs.watch*', () => {

it("fs.watchFile nil", () => {
let nilTuple = []
let onlyOnceCount = 0;
const watcher = fs.watchFile(relfile, { interval: 50 }, (cur, prev) => {
triggeredThoughWatchNil = true;
onlyOnceCount++;
nilTuple = [cur, prev]
})

Expand All @@ -469,6 +471,7 @@ describe('fs.watch*', () => {
assert.ok(nilTuple.length === 2);
assert.ok(nilTuple[0].ctimeMs === 0);
assert.ok(nilTuple[1].ctimeMs === 0);
assert.equal(onlyOnceCount, 1);
});

it("fs.unwatchFile nil", () => {
Expand Down

0 comments on commit 1c1930f

Please sign in to comment.