Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue=1251, use clock_gettime(CLOCK_MONOTONIC) to get current time #1252

Merged
merged 4 commits into from
May 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ MONITOR_SRC := src/monitor/teramo_main.cc
MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc
TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \
src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \
src/master/test/master_impl_test.cc src/io/test/load_test.cc
src/master/test/master_impl_test.cc src/io/test/load_test.cc \
src/common/test/thread_pool_test.cc

TEST_OUTPUT := test_output
UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest
Expand Down Expand Up @@ -82,7 +83,8 @@ TERA_C_SO = libtera_c.so
JNILIBRARY = libjni_tera.so
BENCHMARK = tera_bench tera_mark
TESTS = prop_tree_test tprinter_test string_util_test tablet_io_test \
tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test
tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test \
thread_pool_test

.PHONY: all clean cleanall test

Expand Down Expand Up @@ -161,6 +163,9 @@ src/leveldb/libleveldb.a: FORCE
tera_bench:

# unit test
thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

prop_tree_test: src/utils/test/prop_tree_test.o $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

Expand Down
22 changes: 14 additions & 8 deletions src/common/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ class MutexLock {
class CondVar {
public:
explicit CondVar(Mutex* mu) : mu_(mu) {
PthreadCall("init condvar", pthread_cond_init(&cond_, NULL));
// use monotonic clock
PthreadCall("condattr init ", pthread_condattr_init(&attr_));
PthreadCall("condattr setclock ", pthread_condattr_setclock(&attr_, CLOCK_MONOTONIC));
PthreadCall("condvar init with attr", pthread_cond_init(&cond_, &attr_));
}
~CondVar() {
PthreadCall("destroy condvar", pthread_cond_destroy(&cond_));
PthreadCall("condvar destroy", pthread_cond_destroy(&cond_));
PthreadCall("condattr destroy", pthread_condattr_destroy(&attr_));
}
void Wait(const char* msg = NULL) {
int64_t msg_threshold = mu_->msg_threshold_;
Expand All @@ -134,12 +138,13 @@ class CondVar {
// Time wait in us
// timeout < 0 would cause ETIMEOUT and return false immediately
bool TimeWaitInUs(int timeout, const char* msg = NULL) {
timespec ts;
struct timeval tv;
gettimeofday(&tv, NULL);
int64_t usec = tv.tv_usec + timeout;
ts.tv_sec = tv.tv_sec + usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
// ref: http://www.qnx.com/developers/docs/6.5.0SP1.update/com.qnx.doc.neutrino_lib_ref/p/pthread_cond_timedwait.html
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
int64_t nsec = ((int64_t)timeout) * 1000 + ts.tv_nsec;
ts.tv_sec += nsec / 1000000000;
ts.tv_nsec = nsec % 1000000000;

int64_t msg_threshold = mu_->msg_threshold_;
mu_->BeforeUnlock();
int ret = pthread_cond_timedwait(&cond_, &mu_->mu_, &ts);
Expand All @@ -163,6 +168,7 @@ class CondVar {
void operator=(const CondVar&);
Mutex* mu_;
pthread_cond_t cond_;
pthread_condattr_t attr_;
};
} // namespace common

Expand Down
94 changes: 94 additions & 0 deletions src/common/test/thread_pool_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>

#include <functional>
#include <iostream>

#include "gtest/gtest.h"

#include "common/mutex.h"
#include "common/thread_pool.h"
#include "common/timer.h"

namespace tera {

TEST(TimerTest, Basic) {
struct timespec ts1, ts2, ts3;
struct timeval tv;

clock_gettime(CLOCK_MONOTONIC, &ts1);
clock_gettime(CLOCK_MONOTONIC_RAW, &ts3);
clock_gettime(CLOCK_REALTIME, &ts2);
gettimeofday(&tv, NULL);

std::cout << "ts1.tv_sec " << ts1.tv_sec
<< ", ts1.tv_nsec " << ts1.tv_nsec
<< std::endl;
std::cout << "ts2.tv_sec " << ts2.tv_sec
<< ", ts2.tv_nsec " << ts2.tv_nsec
<< std::endl;
std::cout << "ts3.tv_sec " << ts3.tv_sec
<< ", ts3.tv_nsec " << ts3.tv_nsec
<< std::endl;
std::cout << "tv.tv_sec " << tv.tv_sec
<< ", tv.tv_usec " << tv.tv_usec
<< std::endl;

int delta = 0;
delta = ts2.tv_sec - tv.tv_sec;
ASSERT_TRUE(-1 <= delta && delta <= 1);
ASSERT_TRUE(ts1.tv_sec < ts2.tv_sec);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts1是MONOTONIC的;
ts2是REALTIME的;
二者不是同一个东东,比较是啥意思。。

Copy link
Collaborator Author

@caijieming-ng caijieming-ng May 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

第一个断言,想说明gettimeofday和CLOCK_REALTIME两个函数获得的时间是一致的;
第二个断言,说明monotonic函数确实是取了系统启动后的时间,应该恒小于CLOCK_REALTIME

ASSERT_TRUE(ts1.tv_sec < tv.tv_sec);
}

TEST(TimerTest, test1) {
struct timespec ts1;
struct timeval tv;

clock_gettime(CLOCK_REALTIME, &ts1);
gettimeofday(&tv, NULL);
int64_t ts = common::timer::get_micros();

int delta = 0;
delta = ts1.tv_sec - tv.tv_sec;
ASSERT_TRUE(-1 <= delta && delta <= 1);
delta = ts / 1000000 - tv.tv_sec;
ASSERT_TRUE(-1 <= delta && delta <= 1);
}

common::Mutex mu;
common::CondVar cv(&mu);

void DelayTask_issue1(int32_t time, int32_t time_ms) {
struct timespec ts1;
clock_gettime(CLOCK_MONOTONIC, &ts1);
int delta = ts1.tv_sec - (time + time_ms / 1000);
ASSERT_TRUE(-1 <= delta && delta <= 1);
cv.Signal();
return;
}

TEST(ThreadPoolTest, Basic) {
mu.Lock();
common::ThreadPool* pool = new common::ThreadPool(1000);
struct timespec ts1;
clock_gettime(CLOCK_MONOTONIC, &ts1);
ThreadPool::Task task =
std::bind(&DelayTask_issue1, ts1.tv_sec, 5000);
pool->DelayTask(5000, task);

cv.Wait();
mu.Unlock();
delete pool;
}

} // namespace tera
21 changes: 13 additions & 8 deletions src/common/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <vector>

#include "mutex.h"
#include "timer.h"

namespace common {

Expand Down Expand Up @@ -84,19 +83,19 @@ class ThreadPool {
// Add a task to the thread pool.
void AddTask(const Task& task) {
MutexLock lock(&mutex_, "AddTask");
queue_.push_back(BGItem(0, timer::get_micros(), task));
queue_.push_back(BGItem(0, get_micros(), task));
++pending_num_;
work_cv_.Signal();
}
void AddPriorityTask(const Task& task) {
MutexLock lock(&mutex_);
queue_.push_front(BGItem(0, timer::get_micros(), task));
queue_.push_front(BGItem(0, get_micros(), task));
++pending_num_;
work_cv_.Signal();
}
int64_t DelayTask(int64_t delay, const Task& task) {
MutexLock lock(&mutex_);
int64_t now_time = timer::get_micros();
int64_t now_time = get_micros();
int64_t exe_time = now_time + delay * 1000;
BGItem bg_item(++last_task_id_, exe_time, task);
time_queue_.push(bg_item);
Expand Down Expand Up @@ -172,6 +171,12 @@ class ThreadPool {
ThreadPool(const ThreadPool&);
void operator=(const ThreadPool&);

int64_t get_micros() { // get us before machine reboot
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

static void* ThreadWrapper(void* arg) {
reinterpret_cast<ThreadPool*>(arg)->ThreadProc();
return NULL;
Expand All @@ -188,7 +193,7 @@ class ThreadPool {
}
// Timer task
if (!time_queue_.empty()) {
int64_t now_time = timer::get_micros();
int64_t now_time = get_micros();
BGItem bg_item = time_queue_.top();
int64_t wait_time = bg_item.exe_time - now_time; // in us
if (wait_time <= 0) {
Expand All @@ -203,7 +208,7 @@ class ThreadPool {
mutex_.Unlock();
task(bg_item.id);
mutex_.Lock("ThreadProcRelock");
task_cost_sum_ += timer::get_micros() - now_time;
task_cost_sum_ += get_micros() - now_time;
task_count_++;
running_task_ids_.erase(bg_item.id);
}
Expand All @@ -219,13 +224,13 @@ class ThreadPool {
int64_t exe_time = queue_.front().exe_time;
queue_.pop_front();
--pending_num_;
int64_t start_time = timer::get_micros();
int64_t start_time = get_micros();
schedule_cost_sum_ += start_time - exe_time;
schedule_count_++;
mutex_.Unlock();
task(0);
mutex_.Lock("ThreadProcRelock2");
task_cost_sum_ += timer::get_micros() - start_time;
task_cost_sum_ += get_micros() - start_time;
task_count_++;
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/common/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ static inline std::string get_curtime_str() {
}

static inline int64_t get_micros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<int64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

static inline int64_t get_unique_micros(int64_t ref) {
struct timeval tv;
int64_t now;
do {
gettimeofday(&tv, NULL);
now = static_cast<int64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
now = get_micros();
} while (now == ref);
return now;
}
Expand Down
2 changes: 1 addition & 1 deletion src/leveldb/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ include build_config.mk
CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT)

LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -ldl -lsnappy
LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -lrt -ldl -lsnappy
LIBS += $(PLATFORM_LIBS)

LIBOBJECTS = $(SOURCES:.cc=.o)
Expand Down
4 changes: 2 additions & 2 deletions src/leveldb/bench/tera_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ class Benchmark {
snprintf(msg, sizeof(msg), "(%d ops)", num_entries);
message_ = msg;
}
char ts[10];
snprintf(ts, sizeof(ts), "%d", FLAGS_value_seed);
char ts[20];
snprintf(ts, sizeof(ts), "%lld", ((long long int)FLAGS_value_seed) * 1000000);

// Write to database
int i = FLAGS_start_key;
Expand Down
29 changes: 18 additions & 11 deletions src/leveldb/port/port_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,39 @@ void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); }

CondVar::CondVar(Mutex* mu)
: mu_(mu) {
PthreadCall("init cv", pthread_cond_init(&cv_, NULL));
// use monotonic clock
PthreadCall("condattr init ", pthread_condattr_init(&attr_));
PthreadCall("condattr setclock ", pthread_condattr_setclock(&attr_, CLOCK_MONOTONIC));
PthreadCall("condvar init with attr", pthread_cond_init(&cond_, &attr_));
}

CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }
CondVar::~CondVar() {
PthreadCall("condvar destroy", pthread_cond_destroy(&cond_));
PthreadCall("condattr destroy", pthread_condattr_destroy(&attr_));
}

void CondVar::Wait() {
PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
PthreadCall("condvar wait", pthread_cond_wait(&cond_, &mu_->mu_));
}

// wait in ms
bool CondVar::Wait(int64_t wait_millisec) {
assert(wait_millisec >= 0);
// ref: http://www.qnx.com/developers/docs/6.5.0SP1.update/com.qnx.doc.neutrino_lib_ref/p/pthread_cond_timedwait.html
struct timespec ts;
struct timeval tp;
gettimeofday(&tp, NULL);
uint64_t usec = tp.tv_usec + wait_millisec * 1000;
ts.tv_sec = tp.tv_sec + usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
return (0 == pthread_cond_timedwait(&cv_, &mu_->mu_, &ts));
clock_gettime(CLOCK_MONOTONIC, &ts);
int64_t nsec = ((int64_t)wait_millisec) * 1000000 + ts.tv_nsec;
ts.tv_sec += nsec / 1000000000;
ts.tv_nsec = nsec % 1000000000;
return (0 == pthread_cond_timedwait(&cond_, &mu_->mu_, &ts));
}

void CondVar::Signal() {
PthreadCall("signal", pthread_cond_signal(&cv_));
PthreadCall("signal", pthread_cond_signal(&cond_));
}

void CondVar::SignalAll() {
PthreadCall("broadcast", pthread_cond_broadcast(&cv_));
PthreadCall("broadcast", pthread_cond_broadcast(&cond_));
}

void InitOnce(OnceType* once, void (*initializer)()) {
Expand Down
3 changes: 2 additions & 1 deletion src/leveldb/port/port_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class CondVar {
void Signal();
void SignalAll();
private:
pthread_cond_t cv_;
pthread_cond_t cond_;
pthread_condattr_t attr_;
Mutex* mu_;
};

Expand Down
6 changes: 3 additions & 3 deletions src/leveldb/util/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,9 @@ class PosixEnv : public Env {
}

virtual uint64_t NowMicros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

virtual void SleepForMicroseconds(int micros) {
Expand Down
6 changes: 3 additions & 3 deletions src/leveldb/util/raw_key_operator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ void print_bytes(const char* str, int len) {
}

int64_t get_micros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<int64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

class RawKeyOperatorTest {};
Expand Down
Loading