Skip to content

Commit

Permalink
set different shm_key to each consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
unixliang committed Sep 16, 2017
1 parent d58ad23 commit c899588
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 18 deletions.
4 changes: 2 additions & 2 deletions etc/consumer_server.0.conf
Expand Up @@ -4,10 +4,10 @@
"ip": "127.0.0.1",
"port": "8001",
"topic": "test",
"nproc": "50",
"nproc": "2",
"lock_path_base": "phxqueueconsumer.lock.",
"phxqueue_global_config_path": "./etc/globalconfig.conf",
"shm_key_base": "53452"
"shm_key_base": "53460"
},
"log":
{
Expand Down
4 changes: 2 additions & 2 deletions etc/consumer_server.1.conf
Expand Up @@ -4,10 +4,10 @@
"ip": "127.0.0.1",
"port": "8002",
"topic": "test",
"nproc": "50",
"nproc": "2",
"lock_path_base": "phxqueueconsumer.lock.",
"phxqueue_global_config_path": "./etc/globalconfig.conf",
"shm_key_base": "53452"
"shm_key_base": "53461"
},
"log":
{
Expand Down
4 changes: 2 additions & 2 deletions etc/consumer_server.2.conf
Expand Up @@ -4,10 +4,10 @@
"ip": "127.0.0.1",
"port": "8003",
"topic": "test",
"nproc": "50",
"nproc": "2",
"lock_path_base": "phxqueueconsumer.lock.",
"phxqueue_global_config_path": "./etc/globalconfig.conf",
"shm_key_base": "53452"
"shm_key_base": "53462"
},
"log":
{
Expand Down
8 changes: 4 additions & 4 deletions etc/consumerconfig.conf
Expand Up @@ -8,7 +8,7 @@
"port": "8001"
},
"scale": "1000",
"sub_ids": [1, 2]
"sub_ids": [1]
},
{
"addr":
Expand All @@ -17,7 +17,7 @@
"port": "8002"
},
"scale": "1000",
"sub_ids": [1, 2]
"sub_ids": [1]
},
{
"addr":
Expand All @@ -26,7 +26,7 @@
"port": "8003"
},
"scale": "1000",
"sub_ids": [1, 2]
"sub_ids": [1]
}
]
}
}
6 changes: 3 additions & 3 deletions etc/topicconfig.conf
Expand Up @@ -2,15 +2,15 @@
"topic":
{
"topic_id": "1000",
"handle_ids": "1",
"handle_ids": ["1", "2"],
"store_paxos_batch_count": "80",
"store_paxos_batch_delay_time_ms": "30"
},
"queue_infos":
[
{
"queue_info_id": "1",
"ranges": ["0-99"]
"ranges": ["0"]
}
],
"pubs":
Expand All @@ -29,4 +29,4 @@
"skip_lock": "1"
}
]
}
}
20 changes: 15 additions & 5 deletions phxqueue/consumer/hblock.cpp
Expand Up @@ -211,16 +211,18 @@ void HeartBeatLock::DistubePendingQueues(const map<int, vector<Queue_t>> &sub_id
Queue_t *queue{&impl_->buf->queues[vpid]};

if (LOCK_ITEM_MAGIC == queue->magic && sub_id == queue->sub_id) {
idx = 0;
for (; idx < pending_queues.size(); ++idx) {
if (done[idx]) continue;
for (idx = 0; idx < pending_queues.size(); ++idx) {
auto &&pending_queue = pending_queues[idx];
if (queue->sub_id == pending_queue.sub_id &&
queue->store_id == pending_queue.store_id &&
queue->queue_id == pending_queue.queue_id) {
done[idx] = true;

QLInfo("QUEUEINFO: vpid %d keep sub %u store %u queue %u", vpid, queue->sub_id, queue->store_id, queue->queue_id);
if (!done[idx]) {
done[idx] = true;
QLInfo("QUEUEINFO: vpid %d keep sub %u store %u queue %u", vpid, queue->sub_id, queue->store_id, queue->queue_id);
} else {
memset(queue, 0, sizeof(Queue_t));
}

break;
}
Expand Down Expand Up @@ -257,6 +259,14 @@ void HeartBeatLock::DistubePendingQueues(const map<int, vector<Queue_t>> &sub_id
}
}

for (int vpid{0}; vpid < impl_->nproc; ++vpid) {
Queue_t *queue{&impl_->buf->queues[vpid]};
if (LOCK_ITEM_MAGIC == queue->magic) {
QLInfo("QUEUEINFO: vpid %d match sub %u store %u queue %u", vpid, queue->sub_id, queue->store_id, queue->queue_id);
}
}


impl_->buf->magic = LOCK_BUF_MAGIC;


Expand Down
1 change: 1 addition & 0 deletions phxqueue_phxrpc/test/consumer_main.cpp
Expand Up @@ -87,6 +87,7 @@ int main(int argc, char ** argv) {
opt.lock_path_base = config.GetProto().consumer().lock_path_base();
opt.use_store_master_client_on_get = 1;
opt.use_store_master_client_on_add = 1;
opt.shm_key_base = config.GetProto().consumer().shm_key_base();


string phxqueue_global_config_path(config.GetProto().consumer().phxqueue_global_config_path());
Expand Down

0 comments on commit c899588

Please sign in to comment.