Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/pg_logicaldec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ message DatumMessage {
}
}

message TypeInfo {
required string modifier = 1;
}

message RowMessage {
optional uint32 transaction_id = 1;
optional uint64 commit_time = 2;
optional string table = 3;
optional Op op = 4;
repeated DatumMessage new_tuple = 5;
repeated DatumMessage old_tuple = 6;
repeated TypeInfo new_typeinfo = 7;
}
46 changes: 46 additions & 0 deletions src/decoderbufs.c
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,38 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
}
}

/* provide a metadata for new tuple */
static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
Relation relation, HeapTuple tuple,
TupleDesc tupdesc) {
int natt;
int valid_attr_cnt = 0;
elog(DEBUG1, "Adding metadata for %d columns", tupdesc->natts);
/* build column names and values */
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr;
char *typ_mod;
Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT;

attr = tupdesc->attrs[natt];

/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue;
}

typ_mod = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod));
elog(DEBUG1, "Adding typemodifier '%s' for column %d", typ_mod, natt);

typeinfo.modifier = typ_mod;
tmsg[valid_attr_cnt] = palloc(sizeof(typeinfo));
memcpy(tmsg[valid_attr_cnt], &typeinfo, sizeof(typeinfo));

valid_attr_cnt++;
}
}

/* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) {
Expand Down Expand Up @@ -676,11 +708,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation);

rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc);

rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE:
Expand All @@ -700,11 +739,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation);

rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc);

rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc);
}
}
break;
Expand Down
100 changes: 97 additions & 3 deletions src/proto/pg_logicaldec.pb-c.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,49 @@ void decoderbufs__datum_message__free_unpacked
assert(message->base.descriptor == &decoderbufs__datum_message__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void decoderbufs__type_info__init
(Decoderbufs__TypeInfo *message)
{
static Decoderbufs__TypeInfo init_value = DECODERBUFS__TYPE_INFO__INIT;
*message = init_value;
}
size_t decoderbufs__type_info__get_packed_size
(const Decoderbufs__TypeInfo *message)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t decoderbufs__type_info__pack
(const Decoderbufs__TypeInfo *message,
uint8_t *out)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t decoderbufs__type_info__pack_to_buffer
(const Decoderbufs__TypeInfo *message,
ProtobufCBuffer *buffer)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Decoderbufs__TypeInfo *
decoderbufs__type_info__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data)
{
return (Decoderbufs__TypeInfo *)
protobuf_c_message_unpack (&decoderbufs__type_info__descriptor,
allocator, len, data);
}
void decoderbufs__type_info__free_unpacked
(Decoderbufs__TypeInfo *message,
ProtobufCAllocator *allocator)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void decoderbufs__row_message__init
(Decoderbufs__RowMessage *message)
{
Expand Down Expand Up @@ -342,7 +385,45 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
(ProtobufCMessageInit) decoderbufs__datum_message__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] =
static const ProtobufCFieldDescriptor decoderbufs__type_info__field_descriptors[1] =
{
{
"modifier",
1,
PROTOBUF_C_LABEL_REQUIRED,
PROTOBUF_C_TYPE_STRING,
0, /* quantifier_offset */
offsetof(Decoderbufs__TypeInfo, modifier),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__type_info__field_indices_by_name[] = {
0, /* field[0] = modifier */
};
static const ProtobufCIntRange decoderbufs__type_info__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 1 }
};
const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor =
{
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
"decoderbufs.TypeInfo",
"TypeInfo",
"Decoderbufs__TypeInfo",
"decoderbufs",
sizeof(Decoderbufs__TypeInfo),
1,
decoderbufs__type_info__field_descriptors,
decoderbufs__type_info__field_indices_by_name,
1, decoderbufs__type_info__number_ranges,
(ProtobufCMessageInit) decoderbufs__type_info__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[7] =
{
{
"transaction_id",
Expand Down Expand Up @@ -416,10 +497,23 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"new_typeinfo",
7,
PROTOBUF_C_LABEL_REPEATED,
PROTOBUF_C_TYPE_MESSAGE,
offsetof(Decoderbufs__RowMessage, n_new_typeinfo),
offsetof(Decoderbufs__RowMessage, new_typeinfo),
&decoderbufs__type_info__descriptor,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
1, /* field[1] = commit_time */
4, /* field[4] = new_tuple */
6, /* field[6] = new_typeinfo */
5, /* field[5] = old_tuple */
3, /* field[3] = op */
2, /* field[2] = table */
Expand All @@ -428,7 +522,7 @@ static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 6 }
{ 0, 7 }
};
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
{
Expand All @@ -438,7 +532,7 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
"Decoderbufs__RowMessage",
"decoderbufs",
sizeof(Decoderbufs__RowMessage),
6,
7,
decoderbufs__row_message__field_descriptors,
decoderbufs__row_message__field_indices_by_name,
1, decoderbufs__row_message__number_ranges,
Expand Down
42 changes: 39 additions & 3 deletions src/proto/pg_logicaldec.pb-c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.