Skip to content

Commit

Permalink
Merge branch 'master' into watermarks_func
Browse files Browse the repository at this point in the history
  • Loading branch information
zilder committed Apr 12, 2019
2 parents 00a29de + 14ed751 commit e34034a
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 164 deletions.
9 changes: 0 additions & 9 deletions kafka_fdw--0.0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,3 @@ LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER kafka_fdw
HANDLER kafka_fdw_handler
VALIDATOR kafka_fdw_validator;

CREATE FUNCTION kafka_get_watermarks(IN rel regclass,
OUT partition int,
OUT offset_low int,
OUT offset_high int)
RETURNS SETOF record
AS 'MODULE_PATHNAME', 'kafka_get_watermarks'
LANGUAGE C STRICT PARALLEL SAFE;

6 changes: 4 additions & 2 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ KafkaFdwGetConnection(KafkaOptions *k_options,
void
kafkaCloseConnection(KafkaFdwExecutionState *festate)
{
rd_kafka_topic_destroy(festate->kafka_topic_handle);
rd_kafka_destroy(festate->kafka_handle);
if (festate->kafka_topic_handle)
rd_kafka_topic_destroy(festate->kafka_topic_handle);
if (festate->kafka_handle)
rd_kafka_destroy(festate->kafka_handle);
festate->kafka_topic_handle = NULL;
festate->kafka_handle = NULL;
}
301 changes: 148 additions & 153 deletions src/kafka_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ makeKafkaExecutionState(Relation relation, KafkaOptions *kafka_options, ParseOpt
/* setup execution state */
festate = (KafkaFdwExecutionState *) palloc0(sizeof(KafkaFdwExecutionState));
festate->param_values = NULL;
festate->kafka_handle = NULL;
festate->kafka_topic_handle = NULL;
festate->scan_data = makeScanPData();

/* we we get a parallel scan_data_desc will point to a shared mem segment by InitializeDSMForeignScan */
Expand Down Expand Up @@ -1529,13 +1531,6 @@ kafkaAcquireSampleRowsFunc(Relation relation,
double * totalrows,
double * totaldeadrows)
{
#define STORE_ERROR(...) \
do \
{ \
snprintf(errstr, KAFKA_MAX_ERR_MSG, __VA_ARGS__); \
catched_error = true; \
} while (0)

KafkaFdwExecutionState *festate;
rd_kafka_message_t ** messages;
int p;
Expand All @@ -1554,177 +1549,177 @@ kafkaAcquireSampleRowsFunc(Relation relation,
kafkaGetOptions(RelationGetRelid(relation), &kafka_options, &parse_options);
festate = makeKafkaExecutionState(relation, &kafka_options, &parse_options);

/* Establish connection */
KafkaFdwGetConnection(&kafka_options,
&festate->kafka_handle,
&festate->kafka_topic_handle);

festate->partition_list = getPartitionList(festate->kafka_handle,
festate->kafka_topic_handle);
partnum = festate->partition_list->partition_cnt;
PG_TRY();
{
/* Establish connection */
KafkaFdwGetConnection(&kafka_options,
&festate->kafka_handle,
&festate->kafka_topic_handle);

/* Allocate memory for partition bounds */
low = palloc(sizeof(int64_t) * partnum);
high = palloc(sizeof(int64_t) * partnum);
festate->partition_list = getPartitionList(festate->kafka_handle,
festate->kafka_topic_handle);
partnum = festate->partition_list->partition_cnt;

/* Obtain lower and upper bounds for partitions */
for (p = 0; p < partnum; p++)
{
rd_kafka_resp_err_t err;
/* Allocate memory for partition bounds */
low = palloc(sizeof(int64_t) * partnum);
high = palloc(sizeof(int64_t) * partnum);

err = rd_kafka_query_watermark_offsets(
festate->kafka_handle, festate->kafka_options.topic, p, &low[p], &high[p], WARTERMARK_TIMEOUT);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR && err != RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
/* Obtain lower and upper bounds for partitions */
for (p = 0; p < partnum; p++)
{
STORE_ERROR("Failed to get watermarks %s", rd_kafka_err2str(err));
goto finish_acquire_sample;
rd_kafka_resp_err_t err;

err = rd_kafka_query_watermark_offsets(festate->kafka_handle,
festate->kafka_options.topic,
p, &low[p], &high[p],
WARTERMARK_TIMEOUT);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR && err != RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
{
elog(ERROR, "Failed to get watermarks %s", rd_kafka_err2str(err));
}
total += high[p] - low[p];
}
total += high[p] - low[p];
}
*totaldeadrows = 0;
*totalrows = total;

/* Empty topic */
if (total == 0)
goto finish_acquire_sample;

/* Allocate memory for batch and tuple data */
messages = palloc(kafka_options.batch_size * sizeof(rd_kafka_message_t *));
values = palloc(sizeof(Datum) * RelationGetDescr(relation)->natts);
nulls = palloc(sizeof(bool) * RelationGetDescr(relation)->natts);
*totaldeadrows = 0;
*totalrows = total;

/* Get a sample from each partition */
for (p = 0; p < partnum; p++)
{
MemoryContext oldcontext = CurrentMemoryContext;
int64 partrows, rows_to_read, step;
int64 batch_size = kafka_options.batch_size;
int batches;
double share;
volatile int64 offset = low[p];
volatile int m;
volatile bool done = false;

/*
* Ideally we need to peak individual messages from the partition evenly for
* statistics to be more accurate. Unfortunatelly it leads to a very slow
* execution. As an alternative we read data with batches.
*
* Calculate how many batches should we read from this partition and how big
* steps between those batches should be.
*/
partrows = high[p] - low[p]; /* rows in current partition */
share = partrows / (double) total;
rows_to_read = share * targrows; /* rows to read from partition */
batches = rows_to_read / batch_size; /* batches number to read */
if (batches <= 0)
continue;
step = batch_size + (partrows - rows_to_read) / batches;
/* Empty topic */
if (total == 0)
goto finish_acquire_sample;

/* Restrict the minimum step size */
if (step < batch_size * STEP_FACTOR)
step = batch_size * STEP_FACTOR;
/* Allocate memory for batch and tuple data */
messages = palloc(kafka_options.batch_size * sizeof(rd_kafka_message_t *));
values = palloc(sizeof(Datum) * RelationGetDescr(relation)->natts);
nulls = palloc(sizeof(bool) * RelationGetDescr(relation)->natts);

/* Start consuming batches */
while (offset < high[p])
/* Get a sample from each partition */
for (p = 0; p < partnum; p++)
{
int rows_fetched;
int64 partrows, rows_to_read, step;
int64 batch_size = kafka_options.batch_size;
int batches;
double share;
volatile int64 offset = low[p];
volatile int m;
volatile bool done = false;

if (rd_kafka_consume_start(festate->kafka_topic_handle, p, offset) == -1)
/*
* Ideally we need to peak individual messages from the partition evenly for
* statistics to be more accurate. Unfortunatelly it leads to a very slow
* execution. As an alternative we read data with batches.
*
* Calculate how many batches should we read from this partition and how big
* steps between those batches should be.
*/
partrows = high[p] - low[p]; /* rows in current partition */
share = partrows / (double) total;
rows_to_read = share * targrows; /* rows to read from partition */
batches = rows_to_read / batch_size; /* batches number to read */
if (batches <= 0)
continue;
step = batch_size + (partrows - rows_to_read) / batches;

/* Restrict the minimum step size */
if (step < batch_size * STEP_FACTOR)
step = batch_size * STEP_FACTOR;

/* Start consuming batches */
while (offset < high[p])
{
rd_kafka_resp_err_t err = rd_kafka_last_error();

STORE_ERROR("Failed to start consuming: %s", rd_kafka_err2str(err));
goto finish_acquire_sample;
}
int rows_fetched;

/* Read next batch */
rows_fetched =
rd_kafka_consume_batch(festate->kafka_topic_handle, p, kafka_options.buffer_delay, messages, batch_size);
/* Not empty dataset obtained */
if (rows_fetched > 0)
{
PG_TRY();
if (rd_kafka_consume_start(festate->kafka_topic_handle, p, offset) == -1)
{
for (m = 0; m < rows_fetched; m++)
{
rd_kafka_resp_err_t err = messages[m]->err;
rd_kafka_resp_err_t err = rd_kafka_last_error();

if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
{
ReadKafkaMessage(relation, festate, messages[m], CurrentMemoryContext, &values, &nulls);
elog(ERROR, "Failed to start consuming: %s", rd_kafka_err2str(err));
}

Assert(cnt <= targrows);
rows[cnt++] = heap_form_tuple(RelationGetDescr(relation), values, nulls);
}
else if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
elog(LOG, "kafka_fdw has reached the end of the queue");
done = true; /* finish scan for this partition */
break;
}
else if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
/* Read next batch */
rows_fetched =
rd_kafka_consume_batch(festate->kafka_topic_handle, p, kafka_options.buffer_delay, messages, batch_size);
/* Not empty dataset obtained */
if (rows_fetched > 0)
{
PG_TRY();
{
for (m = 0; m < rows_fetched; m++)
{
ereport(ERROR,
(errcode(ERRCODE_FDW_ERROR),
errmsg_internal("kafka_fdw got an error %s when fetching a message from queue",
rd_kafka_err2str(err))));
rd_kafka_resp_err_t err = messages[m]->err;

if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
{
ReadKafkaMessage(relation, festate, messages[m], CurrentMemoryContext, &values, &nulls);

Assert(cnt <= targrows);
rows[cnt++] = heap_form_tuple(RelationGetDescr(relation), values, nulls);
}
else if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
elog(LOG, "kafka_fdw has reached the end of the queue");
done = true; /* finish scan for this partition */
break;
}
else if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
ereport(ERROR,
(errcode(ERRCODE_FDW_ERROR),
errmsg_internal("kafka_fdw got an error %s when fetching a message from queue",
rd_kafka_err2str(err))));
}

rd_kafka_message_destroy(messages[m]);
}
}
PG_CATCH();
{
/*
* If any error occurs during parsing messages we should
* correctly release all kafka-related resources and
* close connection because they are not maintaied by
* postgres' resource manager.
*/

rd_kafka_message_destroy(messages[m]);
while (m < rows_fetched)
rd_kafka_message_destroy(messages[m++]);

PG_RE_THROW();
}
PG_END_TRY();
}
PG_CATCH();
/* Error */
else if (rows_fetched < 0)
{
ErrorData *edata;

/*
* If any error occurs during parsing messages we should correctly
* release all kafka-related resources and close connection because
* they are not maintaied by postgres' resource manager.
*/
catched_error = true;

while (m < rows_fetched)
rd_kafka_message_destroy(messages[m++]);

/* Store original error message */
MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
FlushErrorState();
STORE_ERROR("%s", edata->message);
elog(ERROR, "Failed to consuming a batch");
}
PG_END_TRY();
}
/* Error */
else if (rows_fetched < 0)
{
STORE_ERROR("Failed to consuming a batch");
}
/*
* And rows_fetched == 0 means that the request is timed out. We can just
* skip it as loosing one single batch during ANALYZE doesn't make much
* difference
*/

/* Finish reading */
if (rd_kafka_consume_stop(festate->kafka_topic_handle, p) == -1)
{
rd_kafka_resp_err_t err = rd_kafka_last_error();

STORE_ERROR("Failed to stop consuming: %s", rd_kafka_err2str(err));
}
/*
* And rows_fetched == 0 means that the request is timed out.
* We can just skip it as loosing one single batch during
* ANALYZE doesn't make much difference.
*/

/* Finish reading */
if (rd_kafka_consume_stop(festate->kafka_topic_handle, p) == -1)
{
rd_kafka_resp_err_t err = rd_kafka_last_error();

if (catched_error)
goto finish_acquire_sample;
elog(ERROR, "Failed to stop consuming: %s", rd_kafka_err2str(err));
}

/* Proceed to the next partition */
if (done)
break;
/* Proceed to the next partition */
if (done)
break;

offset += step;
} /* iterate over batches */
} /* iterate over partitions */
offset += step;
} /* iterate over batches */
} /* iterate over partitions */
}
PG_CATCH();
{
kafkaCloseConnection(festate);
PG_RE_THROW();
}
PG_END_TRY();

finish_acquire_sample:
/* Finalize connection and quit */
Expand Down

0 comments on commit e34034a

Please sign in to comment.