Skip to content

Commit

Permalink
Use dynamic PQExpBuffer strings to build SQL queries. (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
DimCitus committed Jul 8, 2021
1 parent 5e8a48e commit 0fecd65
Showing 1 changed file with 61 additions and 74 deletions.
135 changes: 61 additions & 74 deletions src/bin/pg_autoctl/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1528,12 +1528,25 @@ typedef struct nodesArraysValuesParams
static bool
BuildNodesArrayValues(NodeAddressArray *nodeArray,
nodesArraysValuesParams *sqlParams,
char *values, int size)
PQExpBuffer values)
{
char buffer[BUFSIZE] = { 0 };
int nodeIndex = 0;
int paramIndex = 0;
int valuesIndex = 0;

/* when we didn't find any node to process, return our empty set */
if (nodeArray->count == 0)
{
appendPQExpBufferStr(
values,
"SELECT id, lsn "
"FROM (values (null::int, null::pg_lsn)) as t(id, lsn) "
"where false");

return true;
}

/* we start the VALUES subquery with the values SQL keyword */
appendPQExpBufferStr(values, "values ");

/*
* Build a SQL VALUES statement for every other node registered in the
Expand All @@ -1560,23 +1573,15 @@ BuildNodesArrayValues(NodeAddressArray *nodeArray,
/* store the (char *) pointer to the data in values */
sqlParams->values[lsnParamIndex] = sqlParams->lsns[nodeIndex];

valuesIndex += sformat(buffer + valuesIndex, BUFSIZE - valuesIndex,
"%s($%d, $%d%s)",
valuesIndex == 0 ? "" : ",",
appendPQExpBuffer(values,
"%s($%d, $%d%s)",
paramIndex == 0 ? "" : ",",

/* we begin at $1 here: intentional off-by-one */
idParamIndex + 1, lsnParamIndex + 1,
/* we begin at $1 here: intentional off-by-one */
idParamIndex + 1, lsnParamIndex + 1,

/* cast only the first row */
valuesIndex == 0 ? "::pg_lsn" : "");

if (valuesIndex > BUFSIZE)
{
/* shouldn't happen because we only support up to 12 nodes */
log_error("Failed to prepare the SQL query for "
"pgsql_replication_slot_maintain");
return false;
}
/* cast only the first row */
paramIndex == 0 ? "::pg_lsn" : "");

/* prepare next round */
paramIndex += 2;
Expand All @@ -1585,28 +1590,6 @@ BuildNodesArrayValues(NodeAddressArray *nodeArray,
/* count how many parameters where appended to the VALUES() parts */
sqlParams->count = paramIndex;

/* when we didn't find any node to process, return our empty set */
if (paramIndex == 0)
{
/* we know it fits, size is BUFSIZE or more */
sformat(values, size,
"SELECT id, lsn "
"FROM (values (null::int, null::pg_lsn)) as t(id, lsn) "
"where false");
}
else
{
int bytes = sformat(values, size, "values %s", buffer);

if (bytes > size)
{
/* shouldn't happen because we only support up to 12 nodes */
log_error("Failed to prepare the SQL query for "
"pgsql_replication_slot_maintain");
return false;
}
}

return true;
}

Expand All @@ -1624,8 +1607,8 @@ BuildNodesArrayValues(NodeAddressArray *nodeArray,
bool
pgsql_replication_slot_create_and_drop(PGSQL *pgsql, NodeAddressArray *nodeArray)
{
char sql[2 * BUFSIZE] = { 0 };
char values[BUFSIZE] = { 0 };
PQExpBuffer query = createPQExpBuffer();
PQExpBuffer values = createPQExpBuffer();

/* *INDENT-OFF* */
char *sqlTemplate =
Expand Down Expand Up @@ -1662,29 +1645,31 @@ pgsql_replication_slot_create_and_drop(PGSQL *pgsql, NodeAddressArray *nodeArray
nodesArraysValuesParams sqlParams = { 0 };
ReplicationSlotMaintainContext context = { 0 };

if (!BuildNodesArrayValues(nodeArray, &sqlParams, values, BUFSIZE))
if (!BuildNodesArrayValues(nodeArray, &sqlParams, values))
{
/* errors have already been logged */
PQfreemem(query);
PQfreemem(values);

return false;
}

/* add the computed ($1,$2), ... string to the query "template" */
int bytes = sformat(sql, 2 * BUFSIZE, sqlTemplate, values);
appendPQExpBuffer(query, sqlTemplate, values->data);

if (bytes > 2 * BUFSIZE)
{
/* shouldn't happen because we only support up to 12 nodes */
log_error("Failed to prepare the SQL query for "
"pgsql_replication_slot_maintain");
return false;
}
bool success =
pgsql_execute_with_params(pgsql,
query->data,
sqlParams.count,
sqlParams.types,
(const char **) sqlParams.values,
&context,
parseReplicationSlotMaintain);

return pgsql_execute_with_params(pgsql, sql,
sqlParams.count,
sqlParams.types,
(const char **) sqlParams.values,
&context,
parseReplicationSlotMaintain);
PQfreemem(query);
PQfreemem(values);

return success;
}


Expand All @@ -1697,8 +1682,8 @@ pgsql_replication_slot_create_and_drop(PGSQL *pgsql, NodeAddressArray *nodeArray
bool
pgsql_replication_slot_maintain(PGSQL *pgsql, NodeAddressArray *nodeArray)
{
char sql[2 * BUFSIZE] = { 0 };
char values[BUFSIZE] = { 0 };
PQExpBuffer query = createPQExpBuffer();
PQExpBuffer values = createPQExpBuffer();

/* *INDENT-OFF* */
char *sqlTemplate =
Expand Down Expand Up @@ -1736,29 +1721,31 @@ pgsql_replication_slot_maintain(PGSQL *pgsql, NodeAddressArray *nodeArray)
nodesArraysValuesParams sqlParams = { 0 };
ReplicationSlotMaintainContext context = { 0 };

if (!BuildNodesArrayValues(nodeArray, &sqlParams, values, BUFSIZE))
if (!BuildNodesArrayValues(nodeArray, &sqlParams, values))
{
/* errors have already been logged */
PQfreemem(query);
PQfreemem(values);

return false;
}

/* add the computed ($1,$2), ... string to the query "template" */
int bytes = sformat(sql, 2 * BUFSIZE, sqlTemplate, values);
appendPQExpBuffer(query, sqlTemplate, values->data);

if (bytes > 2 * BUFSIZE)
{
/* shouldn't happen because we only support up to 12 nodes */
log_error("Failed to prepare the SQL query for "
"pgsql_replication_slot_maintain");
return false;
}
bool success =
pgsql_execute_with_params(pgsql,
query->data,
sqlParams.count,
sqlParams.types,
(const char **) sqlParams.values,
&context,
parseReplicationSlotMaintain);

return pgsql_execute_with_params(pgsql, sql,
sqlParams.count,
sqlParams.types,
(const char **) sqlParams.values,
&context,
parseReplicationSlotMaintain);
PQfreemem(query);
PQfreemem(values);

return success;
}


Expand Down

0 comments on commit 0fecd65

Please sign in to comment.