Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common,mds,mgr,mon,osd: store event only if it's added #16312

Merged
merged 1 commit into from
Aug 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 7 additions & 16 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5944,19 +5944,6 @@ void Client::unmount()
ldout(cct, 2) << "unmounted." << dendl;
}



class C_C_Tick : public Context {
Client *client;
public:
explicit C_C_Tick(Client *c) : client(c) {}
void finish(int r) override {
// Called back via Timer, which takes client_lock for us
assert(client->client_lock.is_locked_by_me());
client->tick();
}
};

void Client::flush_cap_releases()
{
// send any cap releases
Expand Down Expand Up @@ -5985,9 +5972,13 @@ void Client::tick()
}

ldout(cct, 21) << "tick" << dendl;
tick_event = new C_C_Tick(this);
timer.add_event_after(cct->_conf->client_tick_interval, tick_event);

tick_event = timer.add_event_after(
cct->_conf->client_tick_interval,
new FunctionContext([this](int) {
// Called back via Timer, which takes client_lock for us
assert(client_lock.is_locked_by_me());
tick();
}));
utime_t now = ceph_clock_now();

if (!mounted && !mds_requests.empty()) {
Expand Down
1 change: 0 additions & 1 deletion src/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ class Client : public Dispatcher, public md_config_obs_t {
friend class C_Client_CacheInvalidate; // calls ino_invalidate_cb
friend class C_Client_DentryInvalidate; // calls dentry_invalidate_cb
friend class C_Block_Sync; // Calls block map and protected helpers
friend class C_C_Tick; // Asserts on client_lock
friend class C_Client_RequestInterrupt;
friend class C_Client_Remount;
friend void intrusive_ptr_release(Inode *in);
Expand Down
8 changes: 4 additions & 4 deletions src/common/Timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void SafeTimer::timer_thread()
lock.Unlock();
}

bool SafeTimer::add_event_after(double seconds, Context *callback)
Context* SafeTimer::add_event_after(double seconds, Context *callback)
{
assert(lock.is_locked());

Expand All @@ -123,14 +123,14 @@ bool SafeTimer::add_event_after(double seconds, Context *callback)
return add_event_at(when, callback);
}

bool SafeTimer::add_event_at(utime_t when, Context *callback)
Context* SafeTimer::add_event_at(utime_t when, Context *callback)
{
assert(lock.is_locked());
ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
if (stopping) {
ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
delete callback;
return false;
return nullptr;
}
scheduled_map_t::value_type s_val(when, callback);
scheduled_map_t::iterator i = schedule.insert(s_val);
Expand All @@ -145,7 +145,7 @@ bool SafeTimer::add_event_at(utime_t when, Context *callback)
* adjust our timeout. */
if (i == schedule.begin())
cond.Signal();
return true;
return callback;
}

bool SafeTimer::cancel_event(Context *callback)
Expand Down
4 changes: 2 additions & 2 deletions src/common/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class SafeTimer

/* Schedule an event in the future
* Call with the event_lock LOCKED */
bool add_event_after(double seconds, Context *callback);
bool add_event_at(utime_t when, Context *callback);
Context* add_event_after(double seconds, Context *callback);
Context* add_event_at(utime_t when, Context *callback);

/* Cancel an event.
* Call with the event_lock LOCKED
Expand Down
6 changes: 3 additions & 3 deletions src/journal/JournalMetadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,9 @@ void JournalMetadata::schedule_commit_task() {
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
m_commit_position_task_ctx = new C_CommitPositionTask(this);
m_timer->add_event_after(m_settings.commit_interval,
m_commit_position_task_ctx);
m_commit_position_task_ctx =
m_timer->add_event_after(m_settings.commit_interval,
new C_CommitPositionTask(this));
}
}

Expand Down
13 changes: 6 additions & 7 deletions src/journal/ObjectPlayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,12 @@ void ObjectPlayer::schedule_watch() {
}

ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
assert(m_watch_task == NULL);
m_watch_task = new C_WatchTask(this);
m_timer.add_event_after(m_watch_interval, m_watch_task);
assert(m_watch_task == nullptr);
m_watch_task = m_timer.add_event_after(
m_watch_interval,
new FunctionContext([this](int) {
handle_watch_task();
}));
}

bool ObjectPlayer::cancel_watch() {
Expand Down Expand Up @@ -301,10 +304,6 @@ void ObjectPlayer::C_Fetch::finish(int r) {
on_finish->complete(r);
}

void ObjectPlayer::C_WatchTask::finish(int r) {
object_player->handle_watch_task();
}

void ObjectPlayer::C_WatchFetch::finish(int r) {
object_player->handle_watch_fetched(r);
}
Expand Down
6 changes: 0 additions & 6 deletions src/journal/ObjectPlayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ class ObjectPlayer : public RefCountedObject {
}
void finish(int r) override;
};
struct C_WatchTask : public Context {
ObjectPlayerPtr object_player;
C_WatchTask(ObjectPlayer *o) : object_player(o) {
}
void finish(int r) override;
};
struct C_WatchFetch : public Context {
ObjectPlayerPtr object_player;
C_WatchFetch(ObjectPlayer *o) : object_player(o) {
Expand Down
10 changes: 6 additions & 4 deletions src/journal/ObjectRecorder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
m_timer_lock(timer_lock), m_handler(handler), m_order(order),
m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0),
m_lock(lock), m_append_tid(0), m_pending_bytes(0),
m_size(0), m_overflowed(false), m_object_closed(false),
m_in_flight_flushes(false), m_aio_scheduled(false) {
m_ioctx.dup(ioctx);
Expand Down Expand Up @@ -194,9 +194,11 @@ void ObjectRecorder::cancel_append_task() {

void ObjectRecorder::schedule_append_task() {
Mutex::Locker locker(m_timer_lock);
if (m_append_task == NULL && m_flush_age > 0) {
m_append_task = new C_AppendTask(this);
m_timer.add_event_after(m_flush_age, m_append_task);
if (m_append_task == nullptr && m_flush_age > 0) {
m_append_task = m_timer.add_event_after(
m_flush_age, new FunctionContext([this](int) {
handle_append_task();
}));
}
}

Expand Down
10 changes: 1 addition & 9 deletions src/journal/ObjectRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,6 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {
object_recorder->flush(future);
}
};
struct C_AppendTask : public Context {
ObjectRecorder *object_recorder;
C_AppendTask(ObjectRecorder *o) : object_recorder(o) {
}
void finish(int r) override {
object_recorder->handle_append_task();
}
};
struct C_AppendFlush : public Context {
ObjectRecorder *object_recorder;
uint64_t tid;
Expand Down Expand Up @@ -132,7 +124,7 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {

FlushHandler m_flush_handler;

C_AppendTask *m_append_task;
Context *m_append_task = nullptr;

mutable std::shared_ptr<Mutex> m_lock;
AppendBuffers m_append_buffers;
Expand Down
22 changes: 7 additions & 15 deletions src/mds/Beacon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,13 @@
#define dout_prefix *_dout << "mds.beacon." << name << ' '


class Beacon::C_MDS_BeaconSender : public Context {
public:
explicit C_MDS_BeaconSender(Beacon *beacon_) : beacon(beacon_) {}
void finish(int r) override {
assert(beacon->lock.is_locked_by_me());
beacon->sender = NULL;
beacon->_send();
}
private:
Beacon *beacon;
};

Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string name_) :
Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
name(name_), standby_for_rank(MDS_RANK_NONE),
standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
awaiting_seq(-1)
{
last_seq = 0;
sender = NULL;
was_laggy = false;

epoch = 0;
Expand Down Expand Up @@ -191,8 +178,13 @@ void Beacon::_send()
if (sender) {
timer.cancel_event(sender);
}
sender = new C_MDS_BeaconSender(this);
timer.add_event_after(g_conf->mds_beacon_interval, sender);
sender = timer.add_event_after(
g_conf->mds_beacon_interval,
new FunctionContext([this](int) {
assert(lock.is_locked_by_me());
sender = nullptr;
_send();
}));

if (!cct->get_heartbeat_map()->is_healthy()) {
/* If anything isn't progressing, let avoid sending a beacon so that
Expand Down
3 changes: 1 addition & 2 deletions src/mds/Beacon.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ class Beacon : public Dispatcher
MDSHealth health;

// Ticker
class C_MDS_BeaconSender;
C_MDS_BeaconSender *sender;
Context *sender = nullptr;

version_t awaiting_seq;
Cond waiting_cond;
Expand Down
21 changes: 6 additions & 15 deletions src/mds/MDSDaemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,6 @@
#undef dout_prefix
#define dout_prefix *_dout << "mds." << name << ' '


class MDSDaemon::C_MDS_Tick : public Context {
protected:
MDSDaemon *mds_daemon;
public:
explicit C_MDS_Tick(MDSDaemon *m) : mds_daemon(m) {}
void finish(int r) override {
assert(mds_daemon->mds_lock.is_locked_by_me());
mds_daemon->tick();
}
};

// cons/des
MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
Expand All @@ -102,7 +90,6 @@ MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
mgrc(m->cct, m),
log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
mds_rank(NULL),
tick_event(0),
asok_hook(NULL)
{
orig_argc = 0;
Expand Down Expand Up @@ -539,8 +526,12 @@ void MDSDaemon::reset_tick()
if (tick_event) timer.cancel_event(tick_event);

// schedule
tick_event = new C_MDS_Tick(this);
timer.add_event_after(g_conf->mds_tick_interval, tick_event);
tick_event = timer.add_event_after(
g_conf->mds_tick_interval,
new FunctionContext([this](int) {
assert(mds_lock.is_locked_by_me());
tick();
}));
}

void MDSDaemon::tick()
Expand Down
3 changes: 1 addition & 2 deletions src/mds/MDSDaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ class MDSDaemon : public Dispatcher, public md_config_obs_t {
const std::set <std::string> &changed) override;
protected:
// tick and other timer fun
class C_MDS_Tick;
C_MDS_Tick *tick_event;
Context *tick_event = nullptr;
void reset_tick();

void wait_for_omap_osds();
Expand Down
11 changes: 6 additions & 5 deletions src/mgr/MgrClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ void MgrClient::reconnect()
when += cct->_conf->mgr_connect_retry_interval;
if (now < when) {
if (!connect_retry_callback) {
connect_retry_callback = new FunctionContext([this](int r){
connect_retry_callback = nullptr;
reconnect();
});
timer.add_event_at(when, connect_retry_callback);
connect_retry_callback = timer.add_event_at(
when,
new FunctionContext([this](int r){
connect_retry_callback = nullptr;
reconnect();
}));
}
ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
return;
Expand Down
10 changes: 5 additions & 5 deletions src/mon/Elector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ void Elector::reset_timer(double plus)
* as far as we know, we may even be dead); so, just propose ourselves as the
* Leader.
*/
expire_event = new C_MonContext(mon, [this](int) {
expire();
});
mon->timer.add_event_after(g_conf->mon_election_timeout + plus,
expire_event);
expire_event = mon->timer.add_event_after(
g_conf->mon_election_timeout + plus,
new C_MonContext(mon, [this](int) {
expire();
}));
}


Expand Down
7 changes: 4 additions & 3 deletions src/mon/MgrMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,11 @@ void MgrMonitor::send_digests()
sub->session->con->send_message(mdigest);
}

digest_event = new C_MonContext(mon, [this](int){
digest_event = mon->timer.add_event_after(
g_conf->mon_mgr_digest_period,
new C_MonContext(mon, [this](int) {
send_digests();
});
mon->timer.add_event_after(g_conf->mon_mgr_digest_period, digest_event);
}));
}

void MgrMonitor::cancel_timer()
Expand Down