Skip to content

Commit

Permalink
[pgmoneta#123] Receive and consume copy stream messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Jubilee101 committed Jun 17, 2023
1 parent d024137 commit cf4a7f5
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 24 deletions.
34 changes: 34 additions & 0 deletions src/include/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ extern "C" {
#include <pgmoneta.h>

#include <stdlib.h>
/** @struct
* Defines a streaming buffer
*/
struct stream_buffer
{
char* buf; /**< allocated buffer holding streaming data */
int size; /**< allocated buffer size */
int start; /**< offset to the first unconsumed data in buffer */
int end; /**< offset to the first position after available data */
int cursor; /**< next byte to consume */
} __attribute__ ((aligned (64)));

/**
* Initialize a memory segment for the process local message structure
Expand Down Expand Up @@ -96,6 +107,29 @@ pgmoneta_memory_dynamic_destroy(void* data);
void*
pgmoneta_memory_dynamic_append(void* orig, size_t orig_size, void* append, size_t append_size, size_t* new_size);

/**
* Initialize a stream buffer
* @param stream_buf The stream buffer to be initialized
*/
void
pgmoneta_memory_stream_buffer_init(struct stream_buffer** stream_buf);

/**
* Enlarge the buffer, doesn't guarantee success
* @param stream_buf The stream buffer
* @param bytes_needed The number of bytes needed
* @return 0 upon success, otherwise 1
*/
int
pgmoneta_memory_stream_buffer_enlarge(struct stream_buffer* stream_buf, int bytes_needed);

/**
* Free a stream buffer
* @param stream_buf The stream buffer to be freed
*/
void
pgmoneta_memory_stream_buffer_free(struct stream_buffer* stream_buf);

#ifdef __cplusplus
}
#endif
Expand Down
22 changes: 22 additions & 0 deletions src/include/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extern "C" {
#endif

#include <pgmoneta.h>
#include <memory.h>

#include <stdbool.h>
#include <stdlib.h>
Expand Down Expand Up @@ -385,6 +386,27 @@ pgmoneta_free_query_response(struct query_response* response);
void
pgmoneta_query_response_debug(struct query_response* response);

/**
* Read the copy stream into the streaming buffer in blocking mode
* @param socket The socket
* @param stream_buf The streaming buffer
* @return 1 upon success, 0 if no data received, otherwise 2
*/
int
pgmoneta_read_copy_stream(int socket, struct stream_buffer* stream_buf);

/**
* Consume the data in copy stream buffer, get the next valid message in the copy stream buffer
* Recognized message types are DataRow, CopyOutResponse, CopyInResponse, CopyData, CopyDone, CopyFail and ErrorResponse
* Other message will be ignored
* @param socket The socket
* @param stream_buf The stream buffer
* @param message [out] The message
* @return 1 upon success, 0 if no data to consume, otherwise 2
*/
int
pgmoneta_consume_copy_stream(int socket, struct stream_buffer* stream_buf, struct message** message);

#ifdef __cplusplus
}
#endif
Expand Down
42 changes: 21 additions & 21 deletions src/include/pgmoneta.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ extern "C" {
#define unlikely(x) __builtin_expect (!!(x), 0)

#define MAX(a, b) \
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a > _b ? _a : _b; })
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a > _b ? _a : _b; })

#define MIN(a, b) \
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a < _b ? _a : _b; })
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a < _b ? _a : _b; })

/*
* Common piece of code to perform a sleeping.
Expand All @@ -127,13 +127,13 @@ extern "C" {
*
*/
#define SLEEP(zzz) \
do \
{ \
struct timespec ts_private; \
ts_private.tv_sec = 0; \
ts_private.tv_nsec = zzz; \
nanosleep(&ts_private, NULL); \
} while (0);
do \
{ \
struct timespec ts_private; \
ts_private.tv_sec = 0; \
ts_private.tv_nsec = zzz; \
nanosleep(&ts_private, NULL); \
} while (0);

/*
* Commonly used block of code to sleep
Expand All @@ -150,14 +150,14 @@ extern "C" {
SLEEP_AND_GOTO(100000L, retry)
*/
#define SLEEP_AND_GOTO(zzz, goto_to) \
do \
{ \
struct timespec ts_private; \
ts_private.tv_sec = 0; \
ts_private.tv_nsec = zzz; \
nanosleep(&ts_private, NULL); \
goto goto_to; \
} while (0);
do \
{ \
struct timespec ts_private; \
ts_private.tv_sec = 0; \
ts_private.tv_nsec = zzz; \
nanosleep(&ts_private, NULL); \
goto goto_to; \
} while (0);

/**
* The shared memory segment
Expand Down
41 changes: 41 additions & 0 deletions src/libpgmoneta/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,44 @@ pgmoneta_memory_dynamic_append(void* orig, size_t orig_size, void* append, size_

return d;
}

void
pgmoneta_memory_stream_buffer_init(struct stream_buffer** stream_buf)
{
struct stream_buffer* b = malloc(sizeof(struct stream_buffer));
b->size = DEFAULT_BUFFER_SIZE;
b->start = b->end = b->cursor = 0;
b->buf = malloc(DEFAULT_BUFFER_SIZE);
*stream_buf = b;
}

int
pgmoneta_memory_stream_buffer_enlarge(struct stream_buffer* stream_buf, int bytes_needed)
{
char* new_buf = NULL;
// subtract the space we have left to avoid wasting space
bytes_needed -= (stream_buf->size - stream_buf->end);
stream_buf->size += bytes_needed;
new_buf = realloc(stream_buf->buf, stream_buf->size);
if (new_buf == NULL)
{
return 1;
}
stream_buf->buf = new_buf;
return 0;
}

void
pgmoneta_memory_stream_buffer_free(struct stream_buffer* stream_buf)
{
if (stream_buf == NULL)
{
return;
}
if (stream_buf->buf != NULL)
{
free(stream_buf->buf);
stream_buf->buf = NULL;
}
free(stream_buf);
}
163 changes: 163 additions & 0 deletions src/libpgmoneta/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1396,3 +1396,166 @@ ssl_write_message(SSL* ssl, struct message* msg)

return MESSAGE_STATUS_ERROR;
}

int
pgmoneta_read_copy_stream(int socket, struct stream_buffer* stream_buf)
{
int numbytes = 0;
bool keep_read = false;
// left shift unconsumed data to reuse space
if (stream_buf->start < stream_buf->end)
{
if (stream_buf->start > 0)
{
memmove(stream_buf->buf, stream_buf->buf + stream_buf->start, stream_buf->end - stream_buf->start);
stream_buf->end -= stream_buf->start;
stream_buf->cursor -= stream_buf->start;
stream_buf->start = 0;
}
}
else
{
stream_buf->start = stream_buf->end = stream_buf->cursor = 0;
}
/*
* if buffer is still too full,
* try enlarging it to be at least big enough for one TCP packet (I'm using 1500B here)
* we don't expect it to absolutely work
*/
if (stream_buf->size - stream_buf->end < 1500)
{
pgmoneta_log_info("Stream buffer too full");
if (pgmoneta_memory_stream_buffer_enlarge(stream_buf, 1500))
{
pgmoneta_log_error("Fail to enlarge stream buffer");
}
}
if (stream_buf->end >= stream_buf->size)
{
pgmoneta_log_error("Not enough space to read new copy-out data");
goto error;
}
do
{
numbytes = read(socket, stream_buf->buf + stream_buf->end, stream_buf->size - stream_buf->end);

if (likely(numbytes > 0))
{
stream_buf->end += numbytes;
return MESSAGE_STATUS_OK;
}
else if (numbytes == 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
keep_read = true;
errno = 0;
}
else
{
return MESSAGE_STATUS_ZERO;
}
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
keep_read = true;
errno = 0;
}
else
{
keep_read = false;
}
}
}
while (keep_read);
return MESSAGE_STATUS_ERROR;

error:
return MESSAGE_STATUS_ERROR;
}

int
pgmoneta_consume_copy_stream(int socket, struct stream_buffer* stream_buf, struct message** message)
{
struct message* m = NULL;
bool keep_read = false;
int status;
int length;

pgmoneta_free_copy_message(*message);
do
{
while (stream_buf->cursor >= stream_buf->end)
{
status = pgmoneta_read_copy_stream(socket, stream_buf);
if (status == MESSAGE_STATUS_ZERO)
{
SLEEP(1000000L);
}
else if (status != MESSAGE_STATUS_OK)
{
goto error;
}
}
m = (struct message*)malloc(sizeof(struct message));
m->kind = stream_buf->buf[stream_buf->cursor++];
// try to get message length
while (stream_buf->cursor + 4 >= stream_buf->end)
{
status = pgmoneta_read_copy_stream(socket, stream_buf);
if (status == MESSAGE_STATUS_ZERO)
{
SLEEP(1000000L);
}
else if (status != MESSAGE_STATUS_OK)
{
goto error;
}
}
length = pgmoneta_read_int32(stream_buf->buf + stream_buf->cursor);
// receive the whole message even if we are going to skip it
while (stream_buf->cursor + length >= stream_buf->end)
{
status = pgmoneta_read_copy_stream(socket, stream_buf);
if (status == MESSAGE_STATUS_ZERO)
{
SLEEP(1000000L);
}
else if (status != MESSAGE_STATUS_OK)
{
goto error;
}
}
if (m->kind != 'D' && m->kind != 'H' && m->kind != 'W' &&
m->kind != 'c' && m->kind != 'f' && m->kind != 'E' && m->kind != 'd')
{
// skip this message
keep_read = true;
stream_buf->cursor += length;
stream_buf->start = stream_buf->cursor;
continue;
}

m->data = (void*) malloc(length - 4 + 1);
m->length = length - 4;
memset(m->data, 0, m->length + 1);
memcpy(m->data, stream_buf->buf + (stream_buf->cursor + 4), m->length);
*message = m;

stream_buf->cursor += length;
stream_buf->start = stream_buf->cursor;

keep_read = false;

}
while (keep_read);

return MESSAGE_STATUS_OK;

error:
pgmoneta_free_copy_message(m);
*message = NULL;
return status;
}
6 changes: 3 additions & 3 deletions src/libpgmoneta/wf_archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ static void int_to_oct(int num, char* oct, size_t octlen);

#define th_get_size(t) oct_to_size((t)->th_buf.size)
#define th_set_size(t, fsize) \
int_to_oct_nonull((fsize), (t)->th_buf.size, 12)
int_to_oct_nonull((fsize), (t)->th_buf.size, 12)

typedef unsigned int (* libtar_hashfunc_t)(void*, unsigned int);
typedef void (* libtar_freefunc_t)(void*);
Expand Down Expand Up @@ -224,7 +224,7 @@ typedef struct
tartype_t;

#define tar_block_write(t, buf) \
(*((t)->type->writefunc))((t)->fd, (char*)(buf), T_BLOCKSIZE)
(*((t)->type->writefunc))((t)->fd, (char*)(buf), T_BLOCKSIZE)

static tartype_t default_type = {open, close, read, write};

Expand Down Expand Up @@ -281,7 +281,7 @@ static int dev_hash(dev_t* dev);
static int ino_hash(ino_t* inode);

#define th_set_mtime(t, fmtime) \
int_to_oct_nonull((fmtime), (t)->th_buf.mtime, 12)
int_to_oct_nonull((fmtime), (t)->th_buf.mtime, 12)

static TAR* tar = NULL;

Expand Down

0 comments on commit cf4a7f5

Please sign in to comment.