Skip to content

Commit

Permalink
Rename row-by-row fetcher to COPY fetcher
Browse files Browse the repository at this point in the history
This name better reflects its characteristics, and I'm thinking about
resurrecting the old row-by-row fetcher later, because it can be useful
for parameterized queries.
  • Loading branch information
akuzm committed Oct 14, 2022
1 parent 38878be commit 066bcbe
Show file tree
Hide file tree
Showing 22 changed files with 187 additions and 187 deletions.
4 changes: 2 additions & 2 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static const struct config_enum_entry telemetry_level_options[] = {
#endif

static const struct config_enum_entry remote_data_fetchers[] = {
{ "rowbyrow", RowByRowFetcherType, false },
{ "copy", CopyFetcherType, false },
{ "cursor", CursorFetcherType, false },
{ "auto", AutoFetcherType, false },
{ NULL, 0, false }
Expand Down Expand Up @@ -373,7 +373,7 @@ _guc_init(void)
DefineCustomEnumVariable("timescaledb.remote_data_fetcher",
"Set remote data fetcher type",
"Pick data fetcher type based on type of queries you plan to run "
"(rowbyrow or cursor)",
"(copy or cursor)",
(int *) &ts_guc_remote_data_fetcher,
AutoFetcherType,
remote_data_fetchers,
Expand Down
2 changes: 1 addition & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ extern TSDLLEXPORT bool ts_guc_enable_remote_explain;
typedef enum DataFetcherType
{
CursorFetcherType,
RowByRowFetcherType,
CopyFetcherType,
AutoFetcherType,
} DataFetcherType;

Expand Down
10 changes: 5 additions & 5 deletions src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ timescaledb_planner(Query *parse, int cursor_opts, ParamListInfo bound_params)

/*
* Determine which type of fetcher to use. If set by GUC, use what
* is set. If the GUC says 'auto', use the row-by-row fetcher if we
* is set. If the GUC says 'auto', use the COPY fetcher if we
* have at most one distributed table in the query. This enables
* parallel plans on data nodes, which speeds up the query.
* We can't use parallel plans with the cursor fetcher, because the
Expand All @@ -518,12 +518,12 @@ timescaledb_planner(Query *parse, int cursor_opts, ParamListInfo bound_params)

if (context.num_distributed_tables >= 2)
{
if (ts_guc_remote_data_fetcher == RowByRowFetcherType)
if (ts_guc_remote_data_fetcher == CopyFetcherType)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("row-by-row fetcher not supported"),
errhint("Row-by-row fetching of data is not supported in "
errmsg("COPY fetcher not supported"),
errhint("COPY fetching of data is not supported in "
"queries with multiple distributed hypertables."
" Use cursor fetcher instead.")));
}
Expand All @@ -533,7 +533,7 @@ timescaledb_planner(Query *parse, int cursor_opts, ParamListInfo bound_params)
{
if (ts_guc_remote_data_fetcher == AutoFetcherType)
{
ts_data_node_fetcher_scan_type = RowByRowFetcherType;
ts_data_node_fetcher_scan_type = CopyFetcherType;
}
else
{
Expand Down
22 changes: 11 additions & 11 deletions tsl/src/fdw/scan_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "scan_exec.h"
#include "utils.h"
#include "remote/data_fetcher.h"
#include "remote/row_by_row_fetcher.h"
#include "remote/copy_fetcher.h"
#include "remote/cursor_fetcher.h"
#include "guc.h"
#include "planner.h"
Expand Down Expand Up @@ -131,13 +131,13 @@ create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate)

TupleFactory *tf = tuplefactory_create_for_scan(ss, fsstate->retrieved_attrs);

if (!tuplefactory_is_binary(tf) && fsstate->planned_fetcher_type == RowByRowFetcherType)
if (!tuplefactory_is_binary(tf) && fsstate->planned_fetcher_type == CopyFetcherType)
{
if (ts_guc_remote_data_fetcher == AutoFetcherType)
{
/*
* The user-set fetcher type was auto, and the planner decided to
* use row-by-row fetcher, but at execution time (now) we found out
* use COPY fetcher, but at execution time (now) we found out
* there is no binary serialization for some data types. In this
* case we can revert to cursor fetcher which supports text
* serialization.
Expand All @@ -147,17 +147,17 @@ create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate)
else
{
ereport(ERROR,
(errmsg("cannot use row-by-row fetcher because some of the column types do not "
(errmsg("cannot use COPY fetcher because some of the column types do not "
"have binary serialization")));
}
}

/*
* Row-by-row fetcher uses COPY statement that don't work with prepared
* COPY fetcher uses COPY statement that don't work with prepared
* statements. If this plan is parameterized, this means we'll have to
* revert to cursor fetcher.
*/
if (num_params > 0 && fsstate->planned_fetcher_type == RowByRowFetcherType)
if (num_params > 0 && fsstate->planned_fetcher_type == CopyFetcherType)
{
if (ts_guc_remote_data_fetcher == AutoFetcherType)
{
Expand All @@ -166,7 +166,7 @@ create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate)
else
{
ereport(ERROR,
(errmsg("cannot use row-by-row fetcher because the plan is parameterized"),
(errmsg("cannot use COPY fetcher because the plan is parameterized"),
errhint("Set \"timescaledb.remote_data_fetcher\" to \"cursor\" to explicitly "
"set the fetcher type or use \"auto\" to select the fetcher type "
"automatically.")));
Expand All @@ -183,8 +183,8 @@ create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate)
* The fetcher type must have been determined by the planner at this
* point, so we shouldn't see 'auto' here.
*/
Assert(fsstate->planned_fetcher_type == RowByRowFetcherType);
fetcher = row_by_row_fetcher_create_for_scan(fsstate->conn, fsstate->query, params, tf);
Assert(fsstate->planned_fetcher_type == CopyFetcherType);
fetcher = copy_fetcher_create_for_scan(fsstate->conn, fsstate->query, params, tf);
}

fsstate->fetcher = fetcher;
Expand Down Expand Up @@ -439,8 +439,8 @@ explain_fetcher_type(DataFetcherType type)
{
case AutoFetcherType:
return "Auto";
case RowByRowFetcherType:
return "Row by row";
case CopyFetcherType:
return "COPY";
case CursorFetcherType:
return "Cursor";
default:
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/fdw/scan_exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ typedef struct TsFdwScanState
int fetch_size; /* number of tuples per fetch */
/*
* The type of data fetcher to use. Note that we still can revert to
* cursor fetcher if row-by-row fetcher was chosen automatically, but binary
* cursor fetcher if COPY fetcher was chosen automatically, but binary
* serialization turns out to be unavailable for some of the data types. We
* only check this when we execute the query.
*/
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/dist_commands.c
${CMAKE_CURRENT_SOURCE_DIR}/dist_copy.c
${CMAKE_CURRENT_SOURCE_DIR}/dist_ddl.c
${CMAKE_CURRENT_SOURCE_DIR}/row_by_row_fetcher.c
${CMAKE_CURRENT_SOURCE_DIR}/copy_fetcher.c
${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c
${CMAKE_CURRENT_SOURCE_DIR}/tuplefactory.c
${CMAKE_CURRENT_SOURCE_DIR}/txn.c
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/remote/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ async_request_discard_response(AsyncRequest *req)
AsyncResponseResult *result = NULL;
do
{
/* for row-by-row fetching we need to loop until we consume the whole response */
/* for COPY fetching we need to loop until we consume the whole response */
result = async_request_set_wait_any_result(&set);
if (result != NULL)
async_response_result_close(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,66 @@
#include <postgres.h>
#include <port/pg_bswap.h>

#include "row_by_row_fetcher.h"
#include "copy_fetcher.h"
#include "tuplefactory.h"
#include "async.h"

typedef struct RowByRowFetcher
typedef struct CopyFetcher
{
DataFetcher state;

/* Data for virtual tuples of the current retrieved batch. */
Datum *batch_values;
bool *batch_nulls;
} RowByRowFetcher;
} CopyFetcher;

static void row_by_row_fetcher_send_fetch_request(DataFetcher *df);
static void row_by_row_fetcher_reset(RowByRowFetcher *fetcher);
static int row_by_row_fetcher_fetch_data(DataFetcher *df);
static void row_by_row_fetcher_set_fetch_size(DataFetcher *df, int fetch_size);
static void row_by_row_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx);
static void row_by_row_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot);
static void row_by_row_fetcher_rescan(DataFetcher *df);
static void row_by_row_fetcher_close(DataFetcher *df);
static void copy_fetcher_send_fetch_request(DataFetcher *df);
static void copy_fetcher_reset(CopyFetcher *fetcher);
static int copy_fetcher_fetch_data(DataFetcher *df);
static void copy_fetcher_set_fetch_size(DataFetcher *df, int fetch_size);
static void copy_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx);
static void copy_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot);
static void copy_fetcher_rescan(DataFetcher *df);
static void copy_fetcher_close(DataFetcher *df);

static DataFetcherFuncs funcs = {
.send_fetch_request = row_by_row_fetcher_send_fetch_request,
.fetch_data = row_by_row_fetcher_fetch_data,
.set_fetch_size = row_by_row_fetcher_set_fetch_size,
.set_tuple_mctx = row_by_row_fetcher_set_tuple_memcontext,
.store_next_tuple = row_by_row_fetcher_store_next_tuple,
.rewind = row_by_row_fetcher_rescan,
.close = row_by_row_fetcher_close,
.send_fetch_request = copy_fetcher_send_fetch_request,
.fetch_data = copy_fetcher_fetch_data,
.set_fetch_size = copy_fetcher_set_fetch_size,
.set_tuple_mctx = copy_fetcher_set_tuple_memcontext,
.store_next_tuple = copy_fetcher_store_next_tuple,
.rewind = copy_fetcher_rescan,
.close = copy_fetcher_close,
};

static void
row_by_row_fetcher_set_fetch_size(DataFetcher *df, int fetch_size)
copy_fetcher_set_fetch_size(DataFetcher *df, int fetch_size)
{
RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df);
CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df);

data_fetcher_set_fetch_size(&fetcher->state, fetch_size);
}

static void
row_by_row_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx)
copy_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx)
{
RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df);
CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df);
data_fetcher_set_tuple_mctx(&fetcher->state, mctx);
}

static void
row_by_row_fetcher_reset(RowByRowFetcher *fetcher)
copy_fetcher_reset(CopyFetcher *fetcher)
{
fetcher->state.open = false;
data_fetcher_reset(&fetcher->state);
}

static void
row_by_row_fetcher_send_fetch_request(DataFetcher *df)
copy_fetcher_send_fetch_request(DataFetcher *df)
{
AsyncRequest *volatile req = NULL;
MemoryContext oldcontext;
RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df);
CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df);

if (fetcher->state.open)
{
Expand All @@ -76,7 +76,7 @@ row_by_row_fetcher_send_fetch_request(DataFetcher *df)
}

/* make sure to have a clean state */
row_by_row_fetcher_reset(fetcher);
copy_fetcher_reset(fetcher);

StringInfoData copy_query;
initStringInfo(&copy_query);
Expand Down Expand Up @@ -244,7 +244,7 @@ copy_data_check_header(StringInfo copy_data)
* Process response for ongoing async request
*/
static int
row_by_row_fetcher_complete(RowByRowFetcher *fetcher)
copy_fetcher_complete(CopyFetcher *fetcher)
{
/* Marked as volatile since it's modified in PG_TRY used in PG_CATCH */
AsyncResponseResult *volatile response = NULL;
Expand Down Expand Up @@ -480,23 +480,23 @@ row_by_row_fetcher_complete(RowByRowFetcher *fetcher)
}

static int
row_by_row_fetcher_fetch_data(DataFetcher *df)
copy_fetcher_fetch_data(DataFetcher *df)
{
RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df);
CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df);

if (fetcher->state.eof)
return 0;

if (!fetcher->state.open)
row_by_row_fetcher_send_fetch_request(df);
copy_fetcher_send_fetch_request(df);

return row_by_row_fetcher_complete(fetcher);
return copy_fetcher_complete(fetcher);
}

static void
row_by_row_fetcher_store_tuple(DataFetcher *df, int row, TupleTableSlot *slot)
copy_fetcher_store_tuple(DataFetcher *df, int row, TupleTableSlot *slot)
{
RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df);
CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df);

ExecClearTuple(slot);

Expand All @@ -522,9 +522,9 @@ row_by_row_fetcher_store_tuple(DataFetcher *df, int row, TupleTableSlot *slot)
}

static void
row_by_row_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot)
copy_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot)
{
row_by_row_fetcher_store_tuple(df, df->next_tuple_idx, slot);
copy_fetcher_store_tuple(df, df->next_tuple_idx, slot);

if (!TupIsNull(slot))
df->next_tuple_idx++;
Expand All @@ -533,22 +533,22 @@ row_by_row_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot)
}

DataFetcher *
row_by_row_fetcher_create_for_scan(TSConnection *conn, const char *stmt, StmtParams *params,
TupleFactory *tf)
copy_fetcher_create_for_scan(TSConnection *conn, const char *stmt, StmtParams *params,
TupleFactory *tf)
{
RowByRowFetcher *fetcher = palloc0(sizeof(RowByRowFetcher));
CopyFetcher *fetcher = palloc0(sizeof(CopyFetcher));

data_fetcher_init(&fetcher->state, conn, stmt, params, tf);
fetcher->state.type = RowByRowFetcherType;
fetcher->state.type = CopyFetcherType;
fetcher->state.funcs = &funcs;

return &fetcher->state;
}

static void
row_by_row_fetcher_close(DataFetcher *df)
copy_fetcher_close(DataFetcher *df)
{
RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df);
CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df);

/*
* The fetcher state might not be open if the fetcher got initialized but
Expand Down Expand Up @@ -585,17 +585,17 @@ row_by_row_fetcher_close(DataFetcher *df)
pfree(fetcher->state.data_req);
fetcher->state.data_req = NULL;
}
row_by_row_fetcher_reset(fetcher);
copy_fetcher_reset(fetcher);
}

static void
row_by_row_fetcher_rescan(DataFetcher *df)
copy_fetcher_rescan(DataFetcher *df)
{
RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df);
CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df);

if (fetcher->state.batch_count > 1)
/* we're over the first batch so we need to close fetcher and restart from clean state */
row_by_row_fetcher_close(df);
copy_fetcher_close(df);
else
/* we can reuse current batch of results */
fetcher->state.next_tuple_idx = 0;
Expand Down
16 changes: 16 additions & 0 deletions tsl/src/remote/copy_fetcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_COPY_FETCHER_H
#define TIMESCALEDB_TSL_COPY_FETCHER_H

#include <postgres.h>

#include "data_fetcher.h"

extern DataFetcher *copy_fetcher_create_for_scan(TSConnection *conn, const char *stmt,
StmtParams *params, TupleFactory *tf);

#endif /* TIMESCALEDB_TSL_COPY_FETCHER_H */
2 changes: 1 addition & 1 deletion tsl/src/remote/data_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#include "data_fetcher.h"
#include "cursor_fetcher.h"
#include "row_by_row_fetcher.h"
#include "copy_fetcher.h"
#include "guc.h"
#include "errors.h"

Expand Down

0 comments on commit 066bcbe

Please sign in to comment.