Skip to content

Commit

Permalink
common,mds,mgr,mon,osd: store event only if it's added
Browse files Browse the repository at this point in the history
otherwise
* we will try to cancel it even it's never been added
* we will keep a dangling pointer around. which is, well,
  scaring.
* static analyzer will yell at us:
  Memory - illegal accesses  (USE_AFTER_FREE)

Signed-off-by: Kefu Chai <kchai@redhat.com>
  • Loading branch information
tchaikov committed Jul 13, 2017
1 parent 9cb7e07 commit e704073
Show file tree
Hide file tree
Showing 26 changed files with 161 additions and 190 deletions.
23 changes: 7 additions & 16 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5945,19 +5945,6 @@ void Client::unmount()
ldout(cct, 2) << "unmounted." << dendl;
}



class C_C_Tick : public Context {
Client *client;
public:
explicit C_C_Tick(Client *c) : client(c) {}
void finish(int r) override {
// Called back via Timer, which takes client_lock for us
assert(client->client_lock.is_locked_by_me());
client->tick();
}
};

void Client::flush_cap_releases()
{
// send any cap releases
Expand Down Expand Up @@ -5986,9 +5973,13 @@ void Client::tick()
}

ldout(cct, 21) << "tick" << dendl;
tick_event = new C_C_Tick(this);
timer.add_event_after(cct->_conf->client_tick_interval, tick_event);

tick_event = timer.add_event_after(
cct->_conf->client_tick_interval,
new FunctionContext([this](int) {
// Called back via Timer, which takes client_lock for us
assert(client_lock.is_locked_by_me());
tick();
}));
utime_t now = ceph_clock_now();

if (!mounted && !mds_requests.empty()) {
Expand Down
1 change: 0 additions & 1 deletion src/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ class Client : public Dispatcher, public md_config_obs_t {
friend class C_Client_CacheInvalidate; // calls ino_invalidate_cb
friend class C_Client_DentryInvalidate; // calls dentry_invalidate_cb
friend class C_Block_Sync; // Calls block map and protected helpers
friend class C_C_Tick; // Asserts on client_lock
friend class C_Client_RequestInterrupt;
friend class C_Client_Remount;
friend void intrusive_ptr_release(Inode *in);
Expand Down
8 changes: 4 additions & 4 deletions src/common/Timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void SafeTimer::timer_thread()
lock.Unlock();
}

bool SafeTimer::add_event_after(double seconds, Context *callback)
Context* SafeTimer::add_event_after(double seconds, Context *callback)
{
assert(lock.is_locked());

Expand All @@ -123,14 +123,14 @@ bool SafeTimer::add_event_after(double seconds, Context *callback)
return add_event_at(when, callback);
}

bool SafeTimer::add_event_at(utime_t when, Context *callback)
Context* SafeTimer::add_event_at(utime_t when, Context *callback)
{
assert(lock.is_locked());
ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
if (stopping) {
ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
delete callback;
return false;
return nullptr;
}
scheduled_map_t::value_type s_val(when, callback);
scheduled_map_t::iterator i = schedule.insert(s_val);
Expand All @@ -145,7 +145,7 @@ bool SafeTimer::add_event_at(utime_t when, Context *callback)
* adjust our timeout. */
if (i == schedule.begin())
cond.Signal();
return true;
return callback;
}

bool SafeTimer::cancel_event(Context *callback)
Expand Down
4 changes: 2 additions & 2 deletions src/common/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class SafeTimer

/* Schedule an event in the future
* Call with the event_lock LOCKED */
bool add_event_after(double seconds, Context *callback);
bool add_event_at(utime_t when, Context *callback);
Context* add_event_after(double seconds, Context *callback);
Context* add_event_at(utime_t when, Context *callback);

/* Cancel an event.
* Call with the event_lock LOCKED
Expand Down
6 changes: 3 additions & 3 deletions src/journal/JournalMetadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,9 @@ void JournalMetadata::schedule_commit_task() {
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
m_commit_position_task_ctx = new C_CommitPositionTask(this);
m_timer->add_event_after(m_settings.commit_interval,
m_commit_position_task_ctx);
m_commit_position_task_ctx =
m_timer->add_event_after(m_settings.commit_interval,
new C_CommitPositionTask(this));
}
}

Expand Down
13 changes: 6 additions & 7 deletions src/journal/ObjectPlayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,12 @@ void ObjectPlayer::schedule_watch() {
}

ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
assert(m_watch_task == NULL);
m_watch_task = new C_WatchTask(this);
m_timer.add_event_after(m_watch_interval, m_watch_task);
assert(m_watch_task == nullptr);
m_watch_task = m_timer.add_event_after(
m_watch_interval,
new FunctionContext([this](int) {
handle_watch_task();
}));
}

bool ObjectPlayer::cancel_watch() {
Expand Down Expand Up @@ -301,10 +304,6 @@ void ObjectPlayer::C_Fetch::finish(int r) {
on_finish->complete(r);
}

void ObjectPlayer::C_WatchTask::finish(int r) {
object_player->handle_watch_task();
}

void ObjectPlayer::C_WatchFetch::finish(int r) {
object_player->handle_watch_fetched(r);
}
Expand Down
6 changes: 0 additions & 6 deletions src/journal/ObjectPlayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ class ObjectPlayer : public RefCountedObject {
}
void finish(int r) override;
};
struct C_WatchTask : public Context {
ObjectPlayerPtr object_player;
C_WatchTask(ObjectPlayer *o) : object_player(o) {
}
void finish(int r) override;
};
struct C_WatchFetch : public Context {
ObjectPlayerPtr object_player;
C_WatchFetch(ObjectPlayer *o) : object_player(o) {
Expand Down
10 changes: 6 additions & 4 deletions src/journal/ObjectRecorder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
m_timer_lock(timer_lock), m_handler(handler), m_order(order),
m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0),
m_lock(lock), m_append_tid(0), m_pending_bytes(0),
m_size(0), m_overflowed(false), m_object_closed(false),
m_in_flight_flushes(false), m_aio_scheduled(false) {
m_ioctx.dup(ioctx);
Expand Down Expand Up @@ -194,9 +194,11 @@ void ObjectRecorder::cancel_append_task() {

void ObjectRecorder::schedule_append_task() {
Mutex::Locker locker(m_timer_lock);
if (m_append_task == NULL && m_flush_age > 0) {
m_append_task = new C_AppendTask(this);
m_timer.add_event_after(m_flush_age, m_append_task);
if (m_append_task == nullptr && m_flush_age > 0) {
m_append_task = m_timer.add_event_after(
m_flush_age, new FunctionContext([this](int) {
handle_append_task();
}));
}
}

Expand Down
10 changes: 1 addition & 9 deletions src/journal/ObjectRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,6 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {
object_recorder->flush(future);
}
};
struct C_AppendTask : public Context {
ObjectRecorder *object_recorder;
C_AppendTask(ObjectRecorder *o) : object_recorder(o) {
}
void finish(int r) override {
object_recorder->handle_append_task();
}
};
struct C_AppendFlush : public Context {
ObjectRecorder *object_recorder;
uint64_t tid;
Expand Down Expand Up @@ -132,7 +124,7 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {

FlushHandler m_flush_handler;

C_AppendTask *m_append_task;
Context *m_append_task = nullptr;

mutable std::shared_ptr<Mutex> m_lock;
AppendBuffers m_append_buffers;
Expand Down
22 changes: 7 additions & 15 deletions src/mds/Beacon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,13 @@
#define dout_prefix *_dout << "mds.beacon." << name << ' '


class Beacon::C_MDS_BeaconSender : public Context {
public:
explicit C_MDS_BeaconSender(Beacon *beacon_) : beacon(beacon_) {}
void finish(int r) override {
assert(beacon->lock.is_locked_by_me());
beacon->sender = NULL;
beacon->_send();
}
private:
Beacon *beacon;
};

Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string name_) :
Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
name(name_), standby_for_rank(MDS_RANK_NONE),
standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
awaiting_seq(-1)
{
last_seq = 0;
sender = NULL;
was_laggy = false;

epoch = 0;
Expand Down Expand Up @@ -191,8 +178,13 @@ void Beacon::_send()
if (sender) {
timer.cancel_event(sender);
}
sender = new C_MDS_BeaconSender(this);
timer.add_event_after(g_conf->mds_beacon_interval, sender);
sender = timer.add_event_after(
g_conf->mds_beacon_interval,
new FunctionContext([this](int) {
assert(lock.is_locked_by_me());
sender = nullptr;
_send();
}));

if (!cct->get_heartbeat_map()->is_healthy()) {
/* If anything isn't progressing, let avoid sending a beacon so that
Expand Down
3 changes: 1 addition & 2 deletions src/mds/Beacon.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ class Beacon : public Dispatcher
MDSHealth health;

// Ticker
class C_MDS_BeaconSender;
C_MDS_BeaconSender *sender;
Context *sender = nullptr;

version_t awaiting_seq;
Cond waiting_cond;
Expand Down
24 changes: 7 additions & 17 deletions src/mds/MDSDaemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,6 @@
#undef dout_prefix
#define dout_prefix *_dout << "mds." << name << ' '


class MDSDaemon::C_MDS_Tick : public Context {
protected:
MDSDaemon *mds_daemon;
public:
explicit C_MDS_Tick(MDSDaemon *m) : mds_daemon(m) {}
void finish(int r) override {
assert(mds_daemon->mds_lock.is_locked_by_me());

mds_daemon->tick_event = 0;
mds_daemon->tick();
}
};

// cons/des
MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
Expand All @@ -105,7 +91,6 @@ MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
mgrc(m->cct, m),
log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
mds_rank(NULL),
tick_event(0),
asok_hook(NULL)
{
orig_argc = 0;
Expand Down Expand Up @@ -542,8 +527,13 @@ void MDSDaemon::reset_tick()
if (tick_event) timer.cancel_event(tick_event);

// schedule
tick_event = new C_MDS_Tick(this);
timer.add_event_after(g_conf->mds_tick_interval, tick_event);
tick_event = timer.add_event_after(
g_conf->mds_tick_interval,
new FunctionContext([this](int) {
assert(mds_lock.is_locked_by_me());
tick_event = nullptr;
tick();
}));
}

void MDSDaemon::tick()
Expand Down
3 changes: 1 addition & 2 deletions src/mds/MDSDaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ class MDSDaemon : public Dispatcher, public md_config_obs_t {
const std::set <std::string> &changed) override;
protected:
// tick and other timer fun
class C_MDS_Tick;
C_MDS_Tick *tick_event;
Context *tick_event = nullptr;
void reset_tick();

void wait_for_omap_osds();
Expand Down
11 changes: 6 additions & 5 deletions src/mgr/MgrClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ void MgrClient::reconnect()
when += cct->_conf->mgr_connect_retry_interval;
if (now < when) {
if (!connect_retry_callback) {
connect_retry_callback = new FunctionContext([this](int r){
connect_retry_callback = nullptr;
reconnect();
});
timer.add_event_at(when, connect_retry_callback);
connect_retry_callback = timer.add_event_at(
when,
new FunctionContext([this](int r){
connect_retry_callback = nullptr;
reconnect();
}));
}
ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
return;
Expand Down
10 changes: 5 additions & 5 deletions src/mon/Elector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ void Elector::reset_timer(double plus)
* as far as we know, we may even be dead); so, just propose ourselves as the
* Leader.
*/
expire_event = new C_MonContext(mon, [this](int) {
expire();
});
mon->timer.add_event_after(g_conf->mon_election_timeout + plus,
expire_event);
expire_event = mon->timer.add_event_after(
g_conf->mon_election_timeout + plus,
new C_MonContext(mon, [this](int) {
expire();
}));
}


Expand Down
7 changes: 4 additions & 3 deletions src/mon/MgrMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,11 @@ void MgrMonitor::send_digests()
sub->session->con->send_message(mdigest);
}

digest_event = new C_MonContext(mon, [this](int){
digest_event = mon->timer.add_event_after(
g_conf->mon_mgr_digest_period,
new C_MonContext(mon, [this](int) {
send_digests();
});
mon->timer.add_event_after(g_conf->mon_mgr_digest_period, digest_event);
}));
}

void MgrMonitor::cancel_timer()
Expand Down

0 comments on commit e704073

Please sign in to comment.