Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
MJY-HUST committed May 22, 2024
1 parent b601c89 commit b339a7b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 41 deletions.
100 changes: 65 additions & 35 deletions src/bthread/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pthread.h>
#include "butil/macros.h"
#include "butil/atomicops.h"
#include "butil/thread_key.h"
#include "bvar/passive_status.h"
#include "bthread/errno.h" // EAGAIN
#include "bthread/task_group.h" // TaskGroup
Expand Down Expand Up @@ -204,14 +205,54 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTable {
SubKeyTable* _subs[KEY_1STLEVEL_SIZE];
};

class KeyTableList {
public:
KeyTableList() {
keytable = NULL;
}
~KeyTableList() {
bthread::TaskGroup* const g = bthread::tls_task_group;
bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
while (keytable) {
bthread::KeyTable* kt = keytable;
keytable = kt->next;
bthread::tls_bls.keytable = kt;
if (g) {
g->current_task()->local_storage.keytable = kt;
}
delete kt;
if (old_kt == kt) {
old_kt = NULL;
}
}
bthread::tls_bls.keytable = old_kt;
if(g) {
g->current_task()->local_storage.keytable = old_kt;
}
}
KeyTable* keytable;
};

static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
if (pool != NULL && pool->free_keytables) {
BAIDU_SCOPED_LOCK(pool->mutex);
KeyTable* p = (KeyTable*)pool->free_keytables;
if (pool != NULL && (pool->list->get()->keytable || pool->free_keytables)) {
pthread_rwlock_rdlock(&pool->rwlock);
KeyTable* p = pool->list->get()->keytable;
if (p) {
pool->free_keytables = p->next;
pool->list->get()->keytable = p->next;
pthread_rwlock_unlock(&pool->rwlock);
return p;
}
pthread_rwlock_unlock(&pool->rwlock);
if (pool->free_keytables) {
pthread_rwlock_wrlock(&pool->rwlock);
p = (KeyTable*)pool->free_keytables;
if (p) {
pool->free_keytables = p->next;
pthread_rwlock_unlock(&pool->rwlock);
return p;
}
pthread_rwlock_unlock(&pool->rwlock);
}
}
return NULL;
}
Expand All @@ -226,14 +267,15 @@ void return_keytable(bthread_keytable_pool_t* pool, KeyTable* kt) {
delete kt;
return;
}
std::unique_lock<pthread_mutex_t> mu(pool->mutex);
pthread_rwlock_rdlock(&pool->rwlock);
if (pool->destroyed) {
mu.unlock();
pthread_rwlock_unlock(&pool->rwlock);
delete kt;
return;
}
kt->next = (KeyTable*)pool->free_keytables;
pool->free_keytables = kt;
kt->next = pool->list->get()->keytable;
pool->list->get()->keytable = kt;
pthread_rwlock_unlock(&pool->rwlock);
}

static void cleanup_pthread(void* arg) {
Expand Down Expand Up @@ -279,7 +321,8 @@ int bthread_keytable_pool_init(bthread_keytable_pool_t* pool) {
LOG(ERROR) << "Param[pool] is NULL";
return EINVAL;
}
pthread_mutex_init(&pool->mutex, NULL);
pthread_rwlock_init(&pool->rwlock, NULL);
pool->list = new butil::ThreadLocal<bthread::KeyTableList>();
pool->free_keytables = NULL;
pool->destroyed = 0;
return 0;
Expand All @@ -291,33 +334,18 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) {
return EINVAL;
}
bthread::KeyTable* saved_free_keytables = NULL;
{
BAIDU_SCOPED_LOCK(pool->mutex);
if (pool->free_keytables) {
saved_free_keytables = (bthread::KeyTable*)pool->free_keytables;
pool->free_keytables = NULL;
}
pool->destroyed = 1;
}
// Cheat get/setspecific and destroy the keytables.
bthread::TaskGroup* const g = bthread::tls_task_group;
bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
while (saved_free_keytables) {
pthread_rwlock_wrlock(&pool->rwlock);
pool->destroyed = 1;
delete pool->list;
saved_free_keytables = (bthread::KeyTable*)pool->free_keytables;
pool->free_keytables = NULL;
pthread_rwlock_unlock(&pool->rwlock);
while(saved_free_keytables) {
bthread::KeyTable* kt = saved_free_keytables;
saved_free_keytables = kt->next;
bthread::tls_bls.keytable = kt;
if (g) {
g->current_task()->local_storage.keytable = kt;
}
delete kt;
if (old_kt == kt) {
old_kt = NULL;
}
}
bthread::tls_bls.keytable = old_kt;
if (g) {
g->current_task()->local_storage.keytable = old_kt;
}
return 0;
// TODO: return_keytable may race with this function, we don't destroy
// the mutex right now.
// pthread_mutex_destroy(&pool->mutex);
Expand All @@ -330,11 +358,12 @@ int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
LOG(ERROR) << "Param[pool] or Param[stat] is NULL";
return EINVAL;
}
std::unique_lock<pthread_mutex_t> mu(pool->mutex);
pthread_rwlock_wrlock(&pool->rwlock);
size_t count = 0;
bthread::KeyTable* p = (bthread::KeyTable*)pool->free_keytables;
for (; p; p = p->next, ++count) {}
stat->nfree = count;
pthread_rwlock_unlock(&pool->rwlock);
return 0;
}

Expand Down Expand Up @@ -365,14 +394,15 @@ void bthread_keytable_pool_reserve(bthread_keytable_pool_t* pool,
kt->set_data(key, data);
} // else append kt w/o data.

std::unique_lock<pthread_mutex_t> mu(pool->mutex);
pthread_rwlock_wrlock(&pool->rwlock);
if (pool->destroyed) {
mu.unlock();
pthread_rwlock_unlock(&pool->rwlock);
delete kt;
break;
}
kt->next = (bthread::KeyTable*)pool->free_keytables;
pool->free_keytables = kt;
pthread_rwlock_unlock(&pool->rwlock);
if (data == NULL) {
break;
}
Expand Down
11 changes: 10 additions & 1 deletion src/bthread/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,17 @@ inline std::ostream& operator<<(std::ostream& os, bthread_key_t key) {
}
#endif // __cplusplus

namespace bthread{
class KeyTableList;
}

namespace butil {
template <typename T> class ThreadLocal;
}

typedef struct {
pthread_mutex_t mutex;
pthread_rwlock_t rwlock;
butil::ThreadLocal<bthread::KeyTableList>* list;
void* free_keytables;
int destroyed;
} bthread_keytable_pool_t;
Expand Down
7 changes: 2 additions & 5 deletions test/bthread_key_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,12 @@ TEST(KeyTest, set_tls_before_creating_any_bthread) {

struct PoolData {
bthread_key_t key;
PoolData* expected_data;
PoolData* data;
int seq;
int end_seq;
};

static void pool_thread_impl(PoolData* data) {
ASSERT_EQ(data->expected_data, (PoolData*)bthread_getspecific(data->key));
if (NULL == bthread_getspecific(data->key)) {
ASSERT_EQ(0, bthread_setspecific(data->key, data));
}
Expand Down Expand Up @@ -385,19 +384,17 @@ TEST(KeyTest, using_pool) {
ASSERT_EQ(0, bthread_start_urgent(&bth, &attr, pool_thread, &bth_data));
ASSERT_EQ(0, bthread_join(bth, NULL));
ASSERT_EQ(0, bth_data.seq);
ASSERT_EQ(1, bthread_keytable_pool_size(&pool));

PoolData bth2_data = { key, &bth_data, 0, 3 };
bthread_t bth2;
ASSERT_EQ(0, bthread_start_urgent(&bth2, &attr2, pool_thread, &bth2_data));
ASSERT_EQ(0, bthread_join(bth2, NULL));
ASSERT_EQ(0, bth2_data.seq);
ASSERT_EQ(1, bthread_keytable_pool_size(&pool));

ASSERT_EQ(0, bthread_keytable_pool_destroy(&pool));

EXPECT_EQ(bth_data.end_seq, bth_data.seq);
EXPECT_EQ(0, bth2_data.seq);
EXPECT_EQ(bth_data.end_seq, bth2_data.seq);

ASSERT_EQ(0, bthread_key_delete(key));
}
Expand Down

0 comments on commit b339a7b

Please sign in to comment.