diff --git a/etc/consumer_server.0.conf b/etc/consumer_server.0.conf index 8dbbbf4..a6f7845 100644 --- a/etc/consumer_server.0.conf +++ b/etc/consumer_server.0.conf @@ -4,10 +4,10 @@ "ip": "127.0.0.1", "port": "8001", "topic": "test", - "nproc": "50", + "nproc": "2", "lock_path_base": "phxqueueconsumer.lock.", "phxqueue_global_config_path": "./etc/globalconfig.conf", - "shm_key_base": "53452" + "shm_key_base": "53460" }, "log": { diff --git a/etc/consumer_server.1.conf b/etc/consumer_server.1.conf index ea2ba5a..43f940c 100644 --- a/etc/consumer_server.1.conf +++ b/etc/consumer_server.1.conf @@ -4,10 +4,10 @@ "ip": "127.0.0.1", "port": "8002", "topic": "test", - "nproc": "50", + "nproc": "2", "lock_path_base": "phxqueueconsumer.lock.", "phxqueue_global_config_path": "./etc/globalconfig.conf", - "shm_key_base": "53452" + "shm_key_base": "53461" }, "log": { diff --git a/etc/consumer_server.2.conf b/etc/consumer_server.2.conf index 8dc1dc6..fdbb692 100644 --- a/etc/consumer_server.2.conf +++ b/etc/consumer_server.2.conf @@ -4,10 +4,10 @@ "ip": "127.0.0.1", "port": "8003", "topic": "test", - "nproc": "50", + "nproc": "2", "lock_path_base": "phxqueueconsumer.lock.", "phxqueue_global_config_path": "./etc/globalconfig.conf", - "shm_key_base": "53452" + "shm_key_base": "53462" }, "log": { diff --git a/etc/consumerconfig.conf b/etc/consumerconfig.conf index 5997d81..c98e9bd 100644 --- a/etc/consumerconfig.conf +++ b/etc/consumerconfig.conf @@ -8,7 +8,7 @@ "port": "8001" }, "scale": "1000", - "sub_ids": [1, 2] + "sub_ids": [1] }, { "addr": @@ -17,7 +17,7 @@ "port": "8002" }, "scale": "1000", - "sub_ids": [1, 2] + "sub_ids": [1] }, { "addr": @@ -26,7 +26,7 @@ "port": "8003" }, "scale": "1000", - "sub_ids": [1, 2] + "sub_ids": [1] } ] -} \ No newline at end of file +} diff --git a/etc/topicconfig.conf b/etc/topicconfig.conf index 863d918..ea4ea8c 100644 --- a/etc/topicconfig.conf +++ b/etc/topicconfig.conf @@ -2,7 +2,7 @@ "topic": { "topic_id": "1000", - "handle_ids": "1", + "handle_ids": ["1", "2"], "store_paxos_batch_count": "80", "store_paxos_batch_delay_time_ms": "30" }, @@ -10,7 +10,7 @@ [ { "queue_info_id": "1", - "ranges": ["0-99"] + "ranges": ["0"] } ], "pubs": @@ -29,4 +29,4 @@ "skip_lock": "1" } ] -} \ No newline at end of file +} diff --git a/phxqueue/consumer/hblock.cpp b/phxqueue/consumer/hblock.cpp index 2105c8a..802a6f3 100644 --- a/phxqueue/consumer/hblock.cpp +++ b/phxqueue/consumer/hblock.cpp @@ -211,16 +211,18 @@ void HeartBeatLock::DistubePendingQueues(const map> &sub_id Queue_t *queue{&impl_->buf->queues[vpid]}; if (LOCK_ITEM_MAGIC == queue->magic && sub_id == queue->sub_id) { - idx = 0; - for (; idx < pending_queues.size(); ++idx) { - if (done[idx]) continue; + for (idx = 0; idx < pending_queues.size(); ++idx) { auto &&pending_queue = pending_queues[idx]; if (queue->sub_id == pending_queue.sub_id && queue->store_id == pending_queue.store_id && queue->queue_id == pending_queue.queue_id) { - done[idx] = true; - QLInfo("QUEUEINFO: vpid %d keep sub %u store %u queue %u", vpid, queue->sub_id, queue->store_id, queue->queue_id); + if (!done[idx]) { + done[idx] = true; + QLInfo("QUEUEINFO: vpid %d keep sub %u store %u queue %u", vpid, queue->sub_id, queue->store_id, queue->queue_id); + } else { + memset(queue, 0, sizeof(Queue_t)); + } break; } @@ -257,6 +259,14 @@ void HeartBeatLock::DistubePendingQueues(const map> &sub_id } } + for (int vpid{0}; vpid < impl_->nproc; ++vpid) { + Queue_t *queue{&impl_->buf->queues[vpid]}; + if (LOCK_ITEM_MAGIC == queue->magic) { + QLInfo("QUEUEINFO: vpid %d match sub %u store %u queue %u", vpid, queue->sub_id, queue->store_id, queue->queue_id); + } + } + + impl_->buf->magic = LOCK_BUF_MAGIC; diff --git a/phxqueue_phxrpc/test/consumer_main.cpp b/phxqueue_phxrpc/test/consumer_main.cpp index f1c4c37..6320668 100644 --- a/phxqueue_phxrpc/test/consumer_main.cpp +++ b/phxqueue_phxrpc/test/consumer_main.cpp @@ -87,6 +87,7 @@ int main(int argc, char ** argv) { opt.lock_path_base = config.GetProto().consumer().lock_path_base(); opt.use_store_master_client_on_get = 1; opt.use_store_master_client_on_add = 1; + opt.shm_key_base = config.GetProto().consumer().shm_key_base(); string phxqueue_global_config_path(config.GetProto().consumer().phxqueue_global_config_path());