Permalink
Browse files

Use hash for message store to speed up loading.

Comes at the expense of increased memory usage. This could be countered
by using a hash just for loading (increased memory usage during loading,
reduced afterwards) but this approach does allow the immediate removal
of messages from the store.
  • Loading branch information...
1 parent 94cd34c commit 4374170c40ef9abfd864c5fcbc3af28019c39bdf @ralight ralight committed Nov 17, 2014
Showing with 31 additions and 48 deletions.
  1. +3 −0 .gitignore
  2. +15 −25 src/database.c
  3. +1 −1 src/mosquitto_broker.h
  4. +12 −22 src/persist.c
View
@@ -36,5 +36,8 @@ lib/libmosquitto.a
test/ssl/*.csr
+test/lib/c/*.test
+test/lib/cpp/*.test
+
build/
dist/
View
@@ -438,7 +438,6 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t
temp = _mosquitto_malloc(sizeof(struct mosquitto_msg_store));
if(!temp) return MOSQ_ERR_NOMEM;
- temp->next = db->msg_store;
temp->ref_count = 0;
if(source){
temp->source_id = _mosquitto_strdup(source);
@@ -490,7 +489,6 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t
temp->dest_ids = NULL;
temp->dest_id_count = 0;
db->msg_store_count++;
- db->msg_store = temp;
(*stored) = temp;
if(!store_id){
@@ -499,6 +497,8 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t
temp->db_id = store_id;
}
+ HASH_ADD(hh, db->msg_store, db_id, sizeof(dbid_t), temp);
+
return MOSQ_ERR_SUCCESS;
}
@@ -841,35 +841,25 @@ int mqtt3_db_message_write(struct mosquitto *context)
void mqtt3_db_store_clean(struct mosquitto_db *db)
{
/* FIXME - this may not be necessary if checks are made when messages are removed. */
- struct mosquitto_msg_store *tail, *last = NULL;
+ struct mosquitto_msg_store *msg_store, *msg_tmp;
int i;
assert(db);
- tail = db->msg_store;
- while(tail){
- if(tail->ref_count == 0){
- if(tail->source_id) _mosquitto_free(tail->source_id);
- if(tail->dest_ids){
- for(i=0; i<tail->dest_id_count; i++){
- if(tail->dest_ids[i]) _mosquitto_free(tail->dest_ids[i]);
+ HASH_ITER(hh, db->msg_store, msg_store, msg_tmp){
+ if(msg_store->ref_count == 0){
+ HASH_DELETE(hh, db->msg_store, msg_store);
+
+ if(msg_store->source_id) _mosquitto_free(msg_store->source_id);
+ if(msg_store->dest_ids){
+ for(i=0; i<msg_store->dest_id_count; i++){
+ if(msg_store->dest_ids[i]) _mosquitto_free(msg_store->dest_ids[i]);
}
- _mosquitto_free(tail->dest_ids);
- }
- if(tail->msg.topic) _mosquitto_free(tail->msg.topic);
- if(tail->msg.payload) _mosquitto_free(tail->msg.payload);
- if(last){
- last->next = tail->next;
- _mosquitto_free(tail);
- tail = last->next;
- }else{
- db->msg_store = tail->next;
- _mosquitto_free(tail);
- tail = db->msg_store;
+ _mosquitto_free(msg_store->dest_ids);
}
+ if(msg_store->msg.topic) _mosquitto_free(msg_store->msg.topic);
+ if(msg_store->msg.payload) _mosquitto_free(msg_store->msg.payload);
+ _mosquitto_free(msg_store);
db->msg_store_count--;
- }else{
- last = tail;
- tail = tail->next;
}
}
}
@@ -143,7 +143,7 @@ struct _mosquitto_subhier {
};
struct mosquitto_msg_store{
- struct mosquitto_msg_store *next;
+ UT_hash_handle hh;
dbid_t db_id;
int ref_count;
char *source_id;
View
@@ -131,14 +131,13 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr)
uint32_t i32temp;
uint16_t i16temp, slen;
uint8_t i8temp;
- struct mosquitto_msg_store *stored;
+ struct mosquitto_msg_store *stored, *stored_tmp;
bool force_no_retain;
assert(db);
assert(db_fptr);
- stored = db->msg_store;
- while(stored){
+ HASH_ITER(hh, db->msg_store, stored, stored_tmp){
if(!strncmp(stored->msg.topic, "$SYS", 4)){
/* Don't save $SYS messages as retained otherwise they can give
* misleading information when reloaded. They should still be saved
@@ -193,8 +192,6 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr)
if(stored->msg.payloadlen){
write_e(db_fptr, stored->msg.payload, (unsigned int)stored->msg.payloadlen);
}
-
- stored = stored->next;
}
return MOSQ_ERR_SUCCESS;
@@ -428,20 +425,14 @@ static int _db_client_msg_restore(struct mosquitto_db *db, const char *client_id
cmsg->state = state;
cmsg->dup = dup;
- store = db->msg_store;
- while(store){
- if(store->db_id == store_id){
- cmsg->store = store;
- cmsg->store->ref_count++;
- break;
- }
- store = store->next;
- }
- if(!cmsg->store){
+ HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store);
+ if(!store){
_mosquitto_free(cmsg);
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error restoring persistent database, message store corrupt.");
return 1;
}
+ cmsg->store = store;
+
context = _db_find_or_add_context(db, client_id, 0);
if(!context){
_mosquitto_free(cmsg);
@@ -655,13 +646,12 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
return 1;
}
store_id = i64temp;
- store = db->msg_store;
- while(store){
- if(store->db_id == store_id){
- mqtt3_db_messages_queue(db, NULL, store->msg.topic, store->msg.qos, store->msg.retain, store);
- break;
- }
- store = store->next;
+ HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store);
+ if(store){
+ mqtt3_db_messages_queue(db, NULL, store->msg.topic, store->msg.qos, store->msg.retain, store);
+ }else{
+ _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message.");
+ return MOSQ_ERR_INVAL;
}
return MOSQ_ERR_SUCCESS;
}

0 comments on commit 4374170

Please sign in to comment.