Skip to content

Commit

Permalink
[WORKING]: switched to O(n) approach
Browse files Browse the repository at this point in the history
[BUG]: Caching fails at HASH_FINd_PTR when adding a new cache entry

[STILL BUGGED]: Working with custom script

[HACK]: working with pgbench -S

[BUG]: Mult-client bug

[WORKING]: Everything works

increase cache size

[Race Condition]: Added locking & proper string null termination

uncomment get locking

[BUGGED]: fix some mem leaks

[WORKING]: switched to O(n) approach

[NEEDS CLEAN-UP]: Working O(n) Cache

[FOUND PROBLEM]: Empty Keys

[WORKING]: switched to local memory segment

[WORKING]: Remove atomic operations
  • Loading branch information
SudeepRed committed Nov 5, 2023
1 parent 306d142 commit 8cc81e6
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 152 deletions.
32 changes: 29 additions & 3 deletions src/include/pgagroal.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ extern "C" {
#define MISC_LENGTH 128
#define NUMBER_OF_SERVERS 64
#ifdef DEBUG
#define MAX_NUMBER_OF_CONNECTIONS 8
#define MAX_NUMBER_OF_CONNECTIONS 100
#else
#define MAX_NUMBER_OF_CONNECTIONS 10000
#endif
Expand Down Expand Up @@ -253,6 +253,11 @@ extern void* prometheus_cache_shmem;
* response cache.
*/
extern void* query_cache_shmem;
/**
* Shared memory used to contain the Prometheus
* response cache.
*/
extern void* client_server_shmem;

/** @struct
* Defines a server
Expand Down Expand Up @@ -359,6 +364,13 @@ struct prometheus_cache
size_t size; /**< size of the cache */
char data[]; /**< the payload */
} __attribute__ ((aligned (64)));
struct client_server_cache
{
char kind;
atomic_schar lock;
size_t key_length;
char key[];
} __attribute__ ((aligned (64)));

/** @struct
* Defines the Prometheus metrics
Expand Down Expand Up @@ -424,14 +436,28 @@ struct query_cache
atomic_schar lock; /**< lock to protect the cache */
size_t size; /**< size of the cache */
struct hashTable* table;
int max_elements;
struct cachev2
{
struct hashEntry* key;
struct hashEntry* data;
} cache[100000];

} __attribute__ ((aligned (64)));

struct hashTable
{
void* key;
struct hashEntry* key;
time_t valid_until;
void* data;
struct hashEntry* data;
UT_hash_handle hh;
} __attribute__ ((aligned (64)));
struct hashEntry
{
void* value;
size_t length;
char key[1024];
} __attribute__ ((aligned (64)));

/** @struct
* Defines the configuration and state of pgagroal
Expand Down
10 changes: 5 additions & 5 deletions src/include/query_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ pgagroal_query_cache_init(size_t* p_size, void** p_shmem);
*
* @warning The caller should ensure the validity of the 'Table' pointer and the 'key' pointer.
*/
struct hashTable*
pgagroal_query_cache_get(struct hashTable** Table, void* key);
struct hashEntry*
pgagroal_query_cache_get(struct query_cache* cache, struct hashTable** Table, struct hashEntry* key);

/**
* Invalidate a cache entry and remove it from the hash table.
Expand All @@ -100,7 +100,7 @@ pgagroal_query_cache_get(struct hashTable** Table, void* key);
* @warning The memory associated with the removed entry is freed within the function.
*/
int
pgagroal_query_cache_invalidate(struct hashTable** Table, void* key);
pgagroal_query_cache_invalidate(struct hashTable** Table, struct hashEntry* key);

/**
* Update the data associated with a cache entry in the hash table.
Expand All @@ -120,7 +120,7 @@ pgagroal_query_cache_invalidate(struct hashTable** Table, void* key);
* @warning The function only updates the 'data' pointer and does not free any previously allocated memory.
*/
int
pgagroal_query_cache_update(struct hashTable** Table, void* key, void* data);
pgagroal_query_cache_update(struct hashTable** Table, struct hashEntry* key, struct hashEntry* data);

/**
* Add a new cache entry to the hash table.
Expand All @@ -142,7 +142,7 @@ pgagroal_query_cache_update(struct hashTable** Table, void* key, void* data);
* The caller should manage memory to avoid leaks.
*/
int
pgagroal_query_cache_add(struct hashTable** Table, void* data, void* key);
pgagroal_query_cache_add(struct query_cache* cache, struct hashTable** Table, struct hashEntry* data, struct hashEntry* key, int flag);

/**
* Clear all cache entries from the hash table and free associated memory.
Expand Down
146 changes: 144 additions & 2 deletions src/libpgagroal/pipeline_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include <shmem.h>
#include <utils.h>
#include <worker.h>
#include <uthash.h>
#include <query_cache.h>

/* system */
#include <errno.h>
Expand Down Expand Up @@ -66,7 +68,7 @@ static bool saw_x = false;
#define CLIENT_IDLE 1
#define CLIENT_ACTIVE 2
#define CLIENT_CHECK 3

#define QUERY_KEY_SIZE 1024 * 1024
struct client_session
{
atomic_schar state; /**< The state */
Expand Down Expand Up @@ -99,11 +101,25 @@ session_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shmem_si
size_t session_shmem_size;
struct client_session* client;
struct configuration* config;
struct client_server_cache* cache_key;

config = (struct configuration*)shmem;
client_server_shmem = NULL;

*pipeline_shmem = NULL;
*pipeline_shmem_size = 0;
if (pgagroal_create_shared_memory(sizeof(struct client_server_cache) + QUERY_KEY_SIZE, config->hugepage, (void*)&cache_key))
{
return 1;
}
memset(cache_key, 0, sizeof(struct client_server_cache) + QUERY_KEY_SIZE);
memset(cache_key->key, 0, QUERY_KEY_SIZE);

cache_key->kind = '\0';
cache_key->key_length = 0;
atomic_init(&cache_key->lock, STATE_FREE);

client_server_shmem = cache_key;

if (config->disconnect_client > 0)
{
Expand Down Expand Up @@ -287,9 +303,14 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents)
struct worker_io* wi = NULL;
struct message* msg = NULL;
struct configuration* config = NULL;
struct client_server_cache* client_query = NULL;

wi = (struct worker_io*)watcher;
config = (struct configuration*)shmem;
client_query = (struct client_server_cache*)client_server_shmem;

struct query_cache* cache;
cache = (struct query_cache*)query_cache_shmem;

client_active(wi->slot);

Expand All @@ -307,6 +328,54 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents)

if (likely(msg->kind != 'X'))
{

if (msg->kind == 'Q')
{
size_t key_length = strlen(msg->data + 5);

memset(client_query->key, 0, QUERY_KEY_SIZE);
memcpy(client_query->key, msg->data + 5, key_length);
client_query->key[key_length + 1] = '\0';

client_query->key_length = key_length;
client_query->kind = msg->kind;

struct hashEntry* key = NULL;
key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + client_query->key_length + 1);

if (key == NULL)
{

client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);
return;
}

memset(key->key, 0, client_query->key_length + 1);

memcpy(key->key, client_query->key, key_length);
key->key[client_query->key_length + 1] = '\0';

key->length = client_query->key_length;
struct hashEntry* s = pgagroal_query_cache_get(cache, &(cache->table), key);

if (s != NULL && s->value != NULL)
{
// log cache hit
struct message* result = NULL;

if (pgagroal_create_message(s->value, s->length, &result))
{
status = pgagroal_write_socket_message(wi->client_fd, result);
client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);

return;
}
}

}

int offset = 0;

while (offset < msg->length)
Expand Down Expand Up @@ -453,6 +522,11 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
struct worker_io* wi = NULL;
struct message* msg = NULL;
struct configuration* config = NULL;
struct client_server_cache* client_query = NULL;
client_query = (struct client_server_cache*)client_server_shmem;

struct query_cache* cache;
cache = (struct query_cache*)query_cache_shmem;

wi = (struct worker_io*)watcher;

Expand All @@ -468,8 +542,77 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
}
if (likely(status == MESSAGE_STATUS_OK))
{

pgagroal_prometheus_network_received_add(msg->length);

if (msg->kind == 'T' && client_query->kind == 'Q')
{

struct message* tmp = NULL;
if (!pgagroal_extract_message('Z', msg, &tmp))
{

struct hashEntry* key, * data;
key = (struct hashEntry*)malloc(sizeof(struct hashEntry) + client_query->key_length + 1);
data = (struct hashEntry*)malloc(sizeof(struct hashEntry) + msg->length);

if (key == NULL || data == NULL)
{

if (key != NULL)
{
free(key);
key = NULL;
}
if (data != NULL)
{
free(data);
data = NULL;
}
client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);

return;
}

data->value = malloc(msg->length);
memset(key->key, 0, client_query->key_length + 1);

if (data->value == NULL)
{

if (data->value != NULL)
{
free(data->value);
data->value = NULL;
}
if (data != NULL)
{
free(data);
data = NULL;
}
client_inactive(wi->slot);
ev_break(loop, EVBREAK_ONE);

return;
}

memcpy(key->key, client_query->key, client_query->key_length);
key->key[client_query->key_length + 1] = '\0';
key->length = client_query->key_length;

memset(data->value, 0, msg->length);
memcpy(data->value, msg->data, msg->length);
data->length = msg->length;

pgagroal_query_cache_add(cache, &(cache->table), data, key, 1);

}

}

client_query->kind = '\0';

int offset = 0;

while (offset < msg->length)
Expand All @@ -479,7 +622,6 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents)
char kind = pgagroal_read_byte(msg->data + offset);
int length = pgagroal_read_int32(msg->data + offset + 1);

/* The Z message tell us the transaction state */
if (kind == 'Z')
{
char tx_state = pgagroal_read_byte(msg->data + offset + 5);
Expand Down
Loading

0 comments on commit 8cc81e6

Please sign in to comment.