Skip to content

Commit

Permalink
[pgmoneta#123] Receive and parse DataRow messages in stream buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Jubilee101 committed Jun 23, 2023
1 parent f440f5c commit f05dd09
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 8 deletions.
12 changes: 11 additions & 1 deletion src/include/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ pgmoneta_read_copy_stream(int socket, struct stream_buffer* buffer);

/**
* Consume the data in copy stream buffer, get the next valid message in the copy stream buffer
* Recognized message types are DataRow, CopyOutResponse, CopyInResponse, CopyData, CopyDone, CopyFail and ErrorResponse
* Recognized message types are DataRow, CopyOutResponse, CopyInResponse, CopyData, CopyDone, CopyFail, RowDescription, CommandComplete and ErrorResponse
* Other message will be ignored
* @param socket The socket
* @param buffer The stream buffer
Expand All @@ -407,6 +407,16 @@ pgmoneta_read_copy_stream(int socket, struct stream_buffer* buffer);
int
pgmoneta_consume_copy_stream(int socket, struct stream_buffer* buffer, struct message** message);

/**
* Receive and parse the DataRow messages into tuples
* @param socket The socket
* @param buffer The stream buffer holding the messages
* @param response The query response
* @return 0 upon success, otherwise 1
*/
int
pgmoneta_consume_data_row_messages(int socket, struct stream_buffer* buffer, struct query_response** response);

#ifdef __cplusplus
}
#endif
Expand Down
108 changes: 101 additions & 7 deletions src/libpgmoneta/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1528,8 +1528,8 @@ pgmoneta_consume_copy_stream(int socket, struct stream_buffer* buffer, struct me
goto error;
}
}
if (m->kind != 'D' && m->kind != 'H' && m->kind != 'W' &&
m->kind != 'c' && m->kind != 'f' && m->kind != 'E' && m->kind != 'd')
if (m->kind != 'D' && m->kind != 'H' && m->kind != 'W' && m->kind != 'T' &&
m->kind != 'c' && m->kind != 'f' && m->kind != 'E' && m->kind != 'd' && m->kind != 'C')
{
// skip this message
keep_read = true;
Expand All @@ -1538,12 +1538,25 @@ pgmoneta_consume_copy_stream(int socket, struct stream_buffer* buffer, struct me
continue;
}

m->data = (void*) malloc(length - 4 + 1);
m->length = length - 4;
memset(m->data, 0, m->length + 1);
memcpy(m->data, buffer->buffer + (buffer->cursor + 4), m->length);
*message = m;
if (m->kind != 'D' && m->kind != 'T')
{
m->data = (void*) malloc(length - 4 + 1);
m->length = length - 4;
memset(m->data, 0, m->length + 1);
memcpy(m->data, buffer->buffer + (buffer->cursor + 4), m->length);
}
else
{
/** include all the data in message's data buffer, i.e. include type and length info,
* if it's a DataRow or RowDescription message
* This is to accommodate our existing message parsing APIs for these two types of messages
*/
m->data = (void*) malloc(length + 1);
m->length = length + 1;
memcpy(m->data, buffer->buffer + buffer->cursor - 1, m->length);
}

*message = m;
buffer->cursor += length;
buffer->start = buffer->cursor;

Expand All @@ -1558,4 +1571,85 @@ pgmoneta_consume_copy_stream(int socket, struct stream_buffer* buffer, struct me
pgmoneta_free_copy_message(m);
*message = NULL;
return status;
}

int
pgmoneta_consume_data_row_messages(int socket, struct stream_buffer* buffer, struct query_response** response)
{
int cols;
int status;
char* name = NULL;
struct message* msg = NULL;
struct tuple* current = NULL;
struct query_response* r = NULL;

// consume DataRow messages from stream buffer until CommandComplete
while (msg == NULL || msg->kind != 'C')
{
status = pgmoneta_consume_copy_stream(socket, buffer, &msg);

if (status != MESSAGE_STATUS_OK)
{
goto error;
}

if (msg == NULL || msg->kind == 'E' || msg->kind == 'F')
{
pgmoneta_log_message(msg);
goto error;
}

if (msg->kind == 'T')
{
cols = get_number_of_columns(msg);
r = (struct query_response*)malloc(sizeof(struct query_response));
memset(r, 0, sizeof(struct query_response));

r->number_of_columns = cols;
for (int i = 0; i < cols; i++)
{
if (get_column_name(msg, i, &name))
{
goto error;
}

memcpy(&r->names[i][0], name, strlen(name));

free(name);
name = NULL;
}
}
else if (msg->kind == 'D')
{
if (r == NULL)
{
// we should have received the RowDescription message
goto error;
}
struct tuple* dtuple = NULL;

create_D_tuple(cols, msg, &dtuple);

if (r->tuples == NULL)
{
r->tuples = dtuple;
}
else
{
current->next = dtuple;
}

current = dtuple;
}
}
*response = r;
pgmoneta_free_copy_message(msg);
msg = NULL;

return 0;
error:
pgmoneta_free_copy_message(msg);
pgmoneta_disconnect(socket);
pgmoneta_free_query_response(r);
return 1;
}

0 comments on commit f05dd09

Please sign in to comment.