diff --git a/src/clickhousedb_adjust.c b/src/clickhousedb_adjust.c index e06d2e2..72925bd 100644 --- a/src/clickhousedb_adjust.c +++ b/src/clickhousedb_adjust.c @@ -62,11 +62,11 @@ create_custom_columns_cache(void) return hash_create("clickhouse_fdw custom functions", 20, &ctl, HASH_ELEM | HASH_BLOBS); } -CustomObjectDef *checkForCustomFunction(Oid funcid) +CustomObjectDef *chfdw_check_for_custom_function(Oid funcid) { CustomObjectDef *entry; - if (is_builtin(funcid)) + if (chfdw_is_builtin(funcid)) { switch (funcid) { @@ -131,7 +131,7 @@ CustomObjectDef *checkForCustomFunction(Oid funcid) return entry; } -CustomObjectDef *checkForCustomType(Oid typeoid) +CustomObjectDef *chfdw_check_for_custom_type(Oid typeoid) { const char *proname; @@ -139,7 +139,7 @@ CustomObjectDef *checkForCustomType(Oid typeoid) if (!custom_objects_cache) custom_objects_cache = create_custom_objects_cache(); - if (is_builtin(typeoid)) + if (chfdw_is_builtin(typeoid)) return NULL; entry = hash_search(custom_objects_cache, (void *) &typeoid, HASH_FIND, NULL); @@ -166,7 +166,7 @@ CustomObjectDef *checkForCustomType(Oid typeoid) return entry; } -CustomObjectDef *checkForCustomOperator(Oid opoid, Form_pg_operator form) +CustomObjectDef *chfdw_check_for_custom_operator(Oid opoid, Form_pg_operator form) { HeapTuple tuple = NULL; const char *proname; @@ -175,7 +175,7 @@ CustomObjectDef *checkForCustomOperator(Oid opoid, Form_pg_operator form) if (!custom_objects_cache) custom_objects_cache = create_custom_objects_cache(); - if (is_builtin(opoid)) + if (chfdw_is_builtin(opoid)) { switch (opoid) { /* timestamptz + interval */ @@ -229,7 +229,7 @@ CustomObjectDef *checkForCustomOperator(Oid opoid, Form_pg_operator form) * New options might also require tweaking merge_fdw_options(). */ void -ApplyCustomTableOptions(CHFdwRelationInfo *fpinfo, Oid relid) +chfdw_apply_custom_table_options(CHFdwRelationInfo *fpinfo, Oid relid) { ListCell *lc; TupleDesc tupdesc; @@ -312,7 +312,7 @@ ApplyCustomTableOptions(CHFdwRelationInfo *fpinfo, Oid relid) /* Get foreign relation options */ CustomColumnInfo * -GetCustomColumnInfo(Oid relid, uint16 varattno) +chfdw_get_custom_column_info(Oid relid, uint16 varattno) { CustomColumnInfo entry_key, *entry; diff --git a/src/clickhousedb_connection.c b/src/clickhousedb_connection.c index 7653794..721c116 100644 --- a/src/clickhousedb_connection.c +++ b/src/clickhousedb_connection.c @@ -45,9 +45,9 @@ clickhouse_connect(ForeignServer *server, UserMapping *user) /* default settings */ ch_connection_details details = {"127.0.0.1", 8123, NULL, NULL, "default"}; - ExtractConnectionOptions(server->options, &driver, &details.host, + chfdw_extract_options(server->options, &driver, &details.host, &details.port, &details.dbname, &details.username, &details.password); - ExtractConnectionOptions(user->options, &driver, &details.host, + chfdw_extract_options(user->options, &driver, &details.host, &details.port, &details.dbname, &details.username, &details.password); if (strcmp(driver, "http") == 0) @@ -64,7 +64,7 @@ clickhouse_connect(ForeignServer *server, UserMapping *user) else connstring = psprintf("http://%s:%d/", details.host, details.port); - conn = http_connect(connstring); + conn = chfdw_http_connect(connstring); pfree(connstring); return conn; } @@ -73,14 +73,14 @@ clickhouse_connect(ForeignServer *server, UserMapping *user) if (details.port == 8123) details.port = 9000; - return binary_connect(&details); + return chfdw_binary_connect(&details); } else elog(ERROR, "invalid ClickHouse connection driver"); } ch_connection -GetConnection(UserMapping *user) +chfdw_get_connection(UserMapping *user) { bool found; ConnCacheEntry *entry; diff --git a/src/clickhousedb_deparse.c b/src/clickhousedb_deparse.c index 048acc8..122ccf0 100644 --- a/src/clickhousedb_deparse.c +++ b/src/clickhousedb_deparse.c @@ -172,7 +172,7 @@ static void get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel, * - local_conds contains expressions that can't be evaluated remotely */ void -classifyConditions(PlannerInfo *root, +chfdw_classify_conditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, @@ -187,7 +187,7 @@ classifyConditions(PlannerInfo *root, { RestrictInfo *ri = lfirst_node(RestrictInfo, lc); - if (is_foreign_expr(root, baserel, ri->clause)) + if (chfdw_is_foreign_expr(root, baserel, ri->clause)) *remote_conds = lappend(*remote_conds, ri); else *local_conds = lappend(*local_conds, ri); @@ -198,7 +198,7 @@ classifyConditions(PlannerInfo *root, * Returns true if given expr is safe to evaluate on the foreign server. */ bool -is_foreign_expr(PlannerInfo *root, +chfdw_is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr) { @@ -232,7 +232,7 @@ is_foreign_expr(PlannerInfo *root, /* 1: '=', 2: '<>', 0 - false */ int -is_equal_op(Oid opno) +chfdw_is_equal_op(Oid opno) { Form_pg_operator operform; HeapTuple opertup; @@ -359,7 +359,7 @@ foreign_expr_walker(Node *node, * can't be sent to remote because it might have incompatible * semantics on remote side. */ - if (!is_shippable(fe->funcid, ProcedureRelationId, fpinfo, &cdef)) + if (!chfdw_is_shippable(fe->funcid, ProcedureRelationId, fpinfo, &cdef)) return false; /* @@ -380,7 +380,7 @@ foreign_expr_walker(Node *node, * (If the operator is shippable, we assume its underlying * function is too.) */ - if (!is_shippable(oe->opno, OperatorRelationId, fpinfo, NULL)) + if (!chfdw_is_shippable(oe->opno, OperatorRelationId, fpinfo, NULL)) return false; /* @@ -395,7 +395,7 @@ foreign_expr_walker(Node *node, { ScalarArrayOpExpr *oe = (ScalarArrayOpExpr *) node; - if (!is_equal_op(oe->opno)) + if (!chfdw_is_equal_op(oe->opno)) return false; /* @@ -497,7 +497,7 @@ foreign_expr_walker(Node *node, return false; /* As usual, it must be shippable. */ - if (!is_shippable(agg->aggfnoid, ProcedureRelationId, fpinfo, NULL)) + if (!chfdw_is_shippable(agg->aggfnoid, ProcedureRelationId, fpinfo, NULL)) return false; /* Features that ClickHouse doesn't support */ @@ -590,7 +590,7 @@ foreign_expr_walker(Node *node, * If result type of given expression is not shippable, it can't be sent * to remote because it might have incompatible semantics on remote side. */ - if (check_type && !is_shippable(exprType(node), TypeRelationId, fpinfo, NULL)) + if (check_type && !chfdw_is_shippable(exprType(node), TypeRelationId, fpinfo, NULL)) { return false; } @@ -759,7 +759,7 @@ ch_format_type_extended(Oid type_oid, int32 typemod, bits16 flags) CustomObjectDef *cdef; char *typname; - cdef = checkForCustomType(type_oid); + cdef = chfdw_check_for_custom_type(type_oid); if (cdef && cdef->custom_name[0] != '\0') buf = pstrdup(cdef->custom_name); else @@ -796,7 +796,7 @@ deparse_type_name(Oid type_oid, int32 typemod) { bits16 flags = FORMAT_TYPE_TYPEMOD_GIVEN; - if (!is_builtin(type_oid)) + if (!chfdw_is_builtin(type_oid)) flags |= FORMAT_TYPE_FORCE_QUALIFY; return ch_format_type_extended(type_oid, typemod, flags); @@ -811,7 +811,7 @@ deparse_type_name(Oid type_oid, int32 typemod) * foreign server. */ List * -build_tlist_to_deparse(RelOptInfo *foreignrel) +chfdw_build_tlist_to_deparse(RelOptInfo *foreignrel) { List *tlist = NIL; CHFdwRelationInfo *fpinfo = (CHFdwRelationInfo *) foreignrel->fdw_private; @@ -868,7 +868,7 @@ build_tlist_to_deparse(RelOptInfo *foreignrel) * List of columns selected is returned in retrieved_attrs. */ void -deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, +chfdw_deparse_select_stmt_for_rel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist, List *remote_conds, List *pathkeys, bool is_subquery, List **retrieved_attrs, List **params_list) @@ -942,7 +942,7 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, * * tlist is the list of desired columns. is_subquery is the flag to * indicate whether to deparse the specified relation as a subquery. - * Read prologue of deparseSelectStmtForRel() for details. + * Read prologue of chfdw_deparse_select_stmt_for_rel() for details. */ static void deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs, @@ -1077,7 +1077,7 @@ deparseTargetList(StringInfo buf, first = false; - cdef = checkForCustomType(attr->atttypid); + cdef = chfdw_check_for_custom_type(attr->atttypid); deparseColumnRef(buf, cdef, rtindex, i, rte, qualify_col); *retrieved_attrs = lappend_int(*retrieved_attrs, i); @@ -1142,7 +1142,7 @@ appendConditions(List *exprs, deparse_expr_cxt *context) /* Output join name for given join type */ const char * -get_jointype_name(JoinType jointype) +chfdw_get_jointype_name(JoinType jointype) { switch (jointype) { @@ -1355,7 +1355,7 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, * ((outer relation) (inner relation) ON (joinclauses)) */ appendStringInfo(buf, " %s ALL %s JOIN %s ON ", join_sql_o.data, - get_jointype_name(fpinfo->jointype), join_sql_i.data); + chfdw_get_jointype_name(fpinfo->jointype), join_sql_i.data); /* Append join clause; (TRUE) if no join clause */ if (fpinfo->joinclauses) @@ -1435,7 +1435,7 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, /* Deparse the subquery representing the relation. */ appendStringInfoChar(buf, '('); - deparseSelectStmtForRel(buf, root, foreignrel, NIL, + chfdw_deparse_select_stmt_for_rel(buf, root, foreignrel, NIL, fpinfo->remote_conds, NIL, true, &retrieved_attrs, params_list); appendStringInfoChar(buf, ')'); @@ -1476,7 +1476,7 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, * deparse remote INSERT statement */ void -deparseInsertSql(StringInfo buf, RangeTblEntry *rte, +chfdw_deparse_insert_sql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs) { @@ -1506,92 +1506,6 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, } } - -/* - * Construct SELECT statement to acquire size in blocks of given relation. - * - * Note: we use local definition of block size, not remote definition. - * This is perhaps debatable. - * - * Note: pg_relation_size() exists in 8.1 and later. - */ -void -deparseAnalyzeSizeSql(StringInfo buf, Relation rel) -{ - StringInfoData relname; - - /* We'll need the remote relation name as a literal. */ - initStringInfo(&relname); - deparseRelation(&relname, rel); -} - -/* - * Construct SELECT statement to acquire sample rows of given relation. - * - * SELECT command is appended to buf, and list of columns retrieved - * is returned to *retrieved_attrs. - */ -void -deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) -{ - Oid relid = RelationGetRelid(rel); - TupleDesc tupdesc = RelationGetDescr(rel); - int i; - char *colname; - List *options; - ListCell *lc; - bool first = true; - - *retrieved_attrs = NIL; - - appendStringInfoString(buf, "SELECT "); - for (i = 0; i < tupdesc->natts; i++) - { - /* Ignore dropped columns. */ - if (TupleDescAttr(tupdesc, i)->attisdropped) - { - continue; - } - - if (!first) - { - appendStringInfoString(buf, ", "); - } - first = false; - - /* Use attribute name or column_name option. */ - colname = NameStr(TupleDescAttr(tupdesc, i)->attname); - options = GetForeignColumnOptions(relid, i + 1); - - foreach (lc, options) - { - DefElem *def = (DefElem *) lfirst(lc); - - if (strcmp(def->defname, "column_name") == 0) - { - colname = defGetString(def); - break; - } - } - - appendStringInfoString(buf, quote_identifier(colname)); - - *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1); - } - - /* Don't generate bad syntax for zero-column relation. */ - if (first) - { - appendStringInfoString(buf, "NULL"); - } - - /* - * Construct FROM clause - */ - appendStringInfoString(buf, " FROM "); - deparseRelation(buf, rel); -} - /* * Construct name to use for given column, and emit it into buf. * If it has a column_name FDW option, use that instead of attribute name. @@ -1613,7 +1527,7 @@ deparseColumnRef(StringInfo buf, CustomObjectDef *cdef, elog(ERROR, "ClickHouse does not support system attributes"); /* Get FDW specific options for this column */ - cinfo = GetCustomColumnInfo(rte->relid, varattno); + cinfo = chfdw_get_custom_column_info(rte->relid, varattno); if (cinfo) colname = cinfo->colname; @@ -1646,10 +1560,10 @@ deparseRelation(StringInfo buf, Relation rel) char *username; char *password; char *dbname; - ForeignServer *server = get_foreign_server(rel); + ForeignServer *server = chfdw_get_foreign_server(rel); ListCell *lc; - ExtractConnectionOptions(server->options, &driver, &host, &port, &dbname, + chfdw_extract_options(server->options, &driver, &host, &port, &dbname, &username, &password); /* obtain additional catalog information. */ @@ -1819,7 +1733,7 @@ deparseVar(Var *node, deparse_expr_cxt *context) cdef = context->func; if (!cdef) - cdef = checkForCustomType(node->vartype); + cdef = chfdw_check_for_custom_type(node->vartype); if (bms_is_member(node->varno, relids) && node->varlevelsup == 0) deparseColumnRef(context->buf, cdef, @@ -2361,7 +2275,7 @@ deparseOpExpr(OpExpr *node, deparse_expr_cxt *context) (oprkind == 'l' && list_length(node->args) == 1) || (oprkind == 'b' && list_length(node->args) == 2)); - cdef = checkForCustomOperator(node->opno, form); + cdef = chfdw_check_for_custom_operator(node->opno, form); if (cdef) { switch (cdef->cf_type) @@ -2496,7 +2410,7 @@ deparseScalarArrayOpExpr(ScalarArrayOpExpr *node, deparse_expr_cxt *context) Expr *arg2; /* Retrieve information about the operator from system catalog. */ - int optype = is_equal_op(node->opno); + int optype = chfdw_is_equal_op(node->opno); /* Sanity check. */ Assert(list_length(node->args) == 2); @@ -2997,7 +2911,7 @@ appendOrderByClause(List *pathkeys, deparse_expr_cxt *context) PathKey *pathkey = lfirst(lcell); Expr *em_expr; - em_expr = find_em_expr_for_rel(pathkey->pk_eclass, baserel); + em_expr = chfdw_find_em_expr(pathkey->pk_eclass, baserel); Assert(em_expr != NULL); appendStringInfoString(buf, delim); @@ -3026,7 +2940,7 @@ appendFunctionName(Oid funcid, deparse_expr_cxt *context) CustomObjectDef *cdef; CHFdwRelationInfo *fpinfo = context->scanrel->fdw_private; - cdef = checkForCustomFunction(funcid); + cdef = chfdw_check_for_custom_function(funcid); if (cdef && cdef->custom_name[0] != '\0') { if (cdef->custom_name[0] != '\1') @@ -3042,7 +2956,7 @@ appendFunctionName(Oid funcid, deparse_expr_cxt *context) proname = NameStr(procform->proname); /* we have some additional conditions on aggregation functions */ - if (is_builtin(funcid) && procform->prokind == PROKIND_AGGREGATE + if (chfdw_is_builtin(funcid) && procform->prokind == PROKIND_AGGREGATE && fpinfo->ch_table_engine == CH_COLLAPSING_MERGE_TREE) { cdef = palloc(sizeof(CustomObjectDef)); diff --git a/src/clickhousedb_fdw.c b/src/clickhousedb_fdw.c index 9062062..874f26a 100644 --- a/src/clickhousedb_fdw.c +++ b/src/clickhousedb_fdw.c @@ -273,9 +273,9 @@ clickhousedb_raw_query(PG_FUNCTION_ARGS) char *connstring = TextDatumGetCString(PG_GETARG_TEXT_P(1)), *query = TextDatumGetCString(PG_GETARG_TEXT_P(0)); - ch_connection conn = http_connect(connstring); + ch_connection conn = chfdw_http_connect(connstring); ch_cursor *cursor = conn.methods->simple_query(conn.conn, query); - text *res = http_fetch_raw_data(cursor); + text *res = chfdw_http_fetch_raw_data(cursor); MemoryContextDelete(cursor->memcxt); conn.methods->disconnect(conn.conn); @@ -326,7 +326,7 @@ clickhouseGetForeignRelSize(PlannerInfo *root, fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; - ApplyCustomTableOptions(fpinfo, foreigntableid); + chfdw_apply_custom_table_options(fpinfo, foreigntableid); fpinfo->user = NULL; @@ -334,7 +334,7 @@ clickhouseGetForeignRelSize(PlannerInfo *root, * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. */ - classifyConditions(root, baserel, baserel->baserestrictinfo, + chfdw_classify_conditions(root, baserel, baserel->baserestrictinfo, &fpinfo->remote_conds, &fpinfo->local_conds); /* @@ -545,12 +545,12 @@ get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) * end up resorting the entire data set. So, unless we can push * down all of the query pathkeys, forget it. * - * is_foreign_expr would detect volatile expressions as well, but + * chfdw_is_foreign_expr would detect volatile expressions as well, but * checking ec_has_volatile here saves some cycles. */ if (pathkey_ec->ec_has_volatile || - !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) || - !is_foreign_expr(root, rel, em_expr)) + !(em_expr = chfdw_find_em_expr(pathkey_ec, rel)) || + !chfdw_is_foreign_expr(root, rel, em_expr)) { query_pathkeys_ok = false; break; @@ -604,8 +604,8 @@ get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) continue; /* If no pushable expression for this rel, skip it. */ - em_expr = find_em_expr_for_rel(cur_ec, rel); - if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr)) + em_expr = chfdw_find_em_expr(cur_ec, rel); + if (em_expr == NULL || !chfdw_is_foreign_expr(root, rel, em_expr)) continue; /* Looks like we can generate a pathkey, so let's do it. */ @@ -691,7 +691,7 @@ clickhouseGetForeignPlan(PlannerInfo *root, * * Separate the scan_clauses into those that can be executed remotely * and those that can't. baserestrictinfo clauses that were - * previously determined to be safe or unsafe by classifyConditions + * previously determined to be safe or unsafe by chfdw_classify_conditions * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything * else in the scan_clauses list will be a join clause, which we have * to check for remote-safety. @@ -717,7 +717,7 @@ clickhouseGetForeignPlan(PlannerInfo *root, remote_exprs = lappend(remote_exprs, rinfo->clause); else if (list_member_ptr(fpinfo->local_conds, rinfo)) local_exprs = lappend(local_exprs, rinfo->clause); - else if (is_foreign_expr(root, foreignrel, rinfo->clause)) + else if (chfdw_is_foreign_expr(root, foreignrel, rinfo->clause)) remote_exprs = lappend(remote_exprs, rinfo->clause); else local_exprs = lappend(local_exprs, rinfo->clause); @@ -762,7 +762,7 @@ clickhouseGetForeignPlan(PlannerInfo *root, */ /* Build the list of columns to be fetched from the foreign server. */ - fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + fdw_scan_tlist = chfdw_build_tlist_to_deparse(foreignrel); /* * Ensure that the outer plan produces a tuple whose descriptor @@ -808,7 +808,7 @@ clickhouseGetForeignPlan(PlannerInfo *root, * expressions to be sent as parameters. */ initStringInfo(&sql); - deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, + chfdw_deparse_select_stmt_for_rel(&sql, root, foreignrel, fdw_scan_tlist, remote_exprs, best_path->path.pathkeys, false, &retrieved_attrs, ¶ms_list); @@ -893,7 +893,7 @@ clickhouseBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user); + fsstate->conn = chfdw_get_connection(user); /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, @@ -1157,7 +1157,7 @@ clickhousePlanForeignModify(PlannerInfo *root, switch (operation) { case CMD_INSERT: - deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs); + chfdw_deparse_insert_sql(&sql, rte, resultRelation, rel, targetAttrs); break; case CMD_UPDATE: elog(ERROR, "ClickHouse does not support updates"); @@ -1242,7 +1242,7 @@ clickhouseExecForeignInsert(EState *estate, } ForeignServer * -get_foreign_server(Relation rel) +chfdw_get_foreign_server(Relation rel) { ForeignServer *server; ForeignTable *table; @@ -1284,7 +1284,7 @@ clickhouseBeginForeignInsert(ModifyTableState *mtstate, /* * If the foreign table is a partition, we need to create a new RTE - * describing the foreign table for use by deparseInsertSql and + * describing the foreign table for use by chfdw_deparse_insert_sql and * create_foreign_modify() below, after first copying the parent's RTE and * modifying some fields to describe the foreign partition to work on. * However, if this is invoked by UPDATE, the existing RTE may already @@ -1300,7 +1300,7 @@ clickhouseBeginForeignInsert(ModifyTableState *mtstate, } initStringInfo(&sql); - deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs); + chfdw_deparse_insert_sql(&sql, rte, resultRelation, rel, targetAttrs); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -1456,7 +1456,7 @@ create_foreign_modify(EState *estate, table = GetForeignTable(RelationGetRelid(rel)); user = GetUserMapping(userid, table->serverid); - fmstate->conn = GetConnection(user); + fmstate->conn = chfdw_get_connection(user); /* Set up remote query information. */ fmstate->query = query; @@ -1726,7 +1726,7 @@ is_simple_join_clause(Expr *expr) if (IsA(expr, OpExpr)) { OpExpr *opexpr = (OpExpr *) expr; - if (is_equal_op(opexpr->opno) == 1 + if (chfdw_is_equal_op(opexpr->opno) == 1 && list_length(opexpr->args) == 2 && IsA(list_nth(opexpr->args, 0), Var) && IsA(list_nth(opexpr->args, 1), Var)) @@ -1829,7 +1829,7 @@ foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, foreach(lc, extra->restrictlist) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); - bool is_remote_clause = is_foreign_expr(root, joinrel, + bool is_remote_clause = chfdw_is_foreign_expr(root, joinrel, rinfo->clause); if (IS_OUTER_JOIN(jointype) && @@ -2005,7 +2005,7 @@ foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, fpinfo->relation_name = makeStringInfo(); appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)", fpinfo_o->relation_name->data, - get_jointype_name(fpinfo->jointype), + chfdw_get_jointype_name(fpinfo->jointype), fpinfo_i->relation_name->data); /* @@ -2299,7 +2299,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, * If any GROUP BY expression is not shippable, then we cannot * push down aggregation to the foreign server. */ - if (!is_foreign_expr(root, grouped_rel, expr)) + if (!chfdw_is_foreign_expr(root, grouped_rel, expr)) return false; /* @@ -2319,7 +2319,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, /* * Non-grouping expression we need to compute. Is it shippable? */ - if (is_foreign_expr(root, grouped_rel, expr)) + if (chfdw_is_foreign_expr(root, grouped_rel, expr)) { /* Yes, so add to tlist as-is; OK to suppress duplicates */ tlist = add_to_flat_tlist(tlist, list_make1(expr)); @@ -2334,7 +2334,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, * If any aggregate expression is not shippable, then we * cannot push down aggregation to the foreign server. */ - if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) + if (!chfdw_is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) return false; /* @@ -2386,7 +2386,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, grouped_rel->relids, NULL, NULL); - if (is_foreign_expr(root, grouped_rel, expr)) + if (chfdw_is_foreign_expr(root, grouped_rel, expr)) fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); else fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); @@ -2423,7 +2423,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, */ if (IsA(expr, Aggref)) { - if (!is_foreign_expr(root, grouped_rel, expr)) + if (!chfdw_is_foreign_expr(root, grouped_rel, expr)) return false; tlist = add_to_flat_tlist(tlist, list_make1(expr)); @@ -2586,7 +2586,7 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, * the indicated relation. */ Expr * -find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel) +chfdw_find_em_expr(EquivalenceClass *ec, RelOptInfo *rel) { ListCell *lc_em; @@ -2616,7 +2616,7 @@ clickhouseImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) ForeignServer *server; server = GetForeignServer(serverOid); - return construct_create_tables(stmt, server); + return chfdw_construct_create_tables(stmt, server); } diff --git a/src/clickhousedb_fdw.h b/src/clickhousedb_fdw.h index 6c6b659..ee14c10 100644 --- a/src/clickhousedb_fdw.h +++ b/src/clickhousedb_fdw.h @@ -66,10 +66,10 @@ typedef struct { char *dbname; } ch_connection_details; -ch_connection http_connect(char *connstring); -ch_connection binary_connect(ch_connection_details *details); -text *http_fetch_raw_data(ch_cursor *cursor); -List *construct_create_tables(ImportForeignSchemaStmt *stmt, ForeignServer *server); +ch_connection chfdw_http_connect(char *connstring); +ch_connection chfdw_binary_connect(ch_connection_details *details); +text *chfdw_http_fetch_raw_data(ch_cursor *cursor); +List *chfdw_construct_create_tables(ImportForeignSchemaStmt *stmt, ForeignServer *server); typedef enum { CH_DEFAULT, @@ -171,50 +171,42 @@ typedef struct CHFdwRelationInfo } CHFdwRelationInfo; /* in clickhouse_fdw.c */ -extern ForeignServer *get_foreign_server(Relation rel); +extern ForeignServer *chfdw_get_foreign_server(Relation rel); -/* in connection.c */ -extern ch_connection GetConnection(UserMapping *user); -extern unsigned int GetCursorNumber(ch_connection conn); -extern unsigned int GetPrepStmtNumber(ch_connection conn); +/* in clickhousedb_connection.c */ +extern ch_connection chfdw_get_connection(UserMapping *user); extern void chfdw_exec_query(ch_connection conn, const char *query); extern void chfdw_report_error(int elevel, ch_connection conn, bool clear, const char *sql); -/* in option.c */ +/* in clickhousedb_option.c */ extern void -ExtractConnectionOptions(List *defelems, char **driver, char **host, int *port, +chfdw_extract_options(List *defelems, char **driver, char **host, int *port, char **dbname, char **username, char **password); -extern List *ExtractExtensionList(const char *extensionsString, - bool warnOnMissing); - /* in deparse.c */ -extern void classifyConditions(PlannerInfo *root, +extern void chfdw_classify_conditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, List **local_conds); -extern bool is_foreign_expr(PlannerInfo *root, +extern bool chfdw_is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr); -extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, +extern void chfdw_deparse_insert_sql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs); -extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel); -extern void deparseAnalyzeSql(StringInfo buf, Relation rel, - List **retrieved_attrs); -extern Expr *find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel); -extern List *build_tlist_to_deparse(RelOptInfo *foreignrel); -extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, +extern Expr *chfdw_find_em_expr(EquivalenceClass *ec, RelOptInfo *rel); +extern List *chfdw_build_tlist_to_deparse(RelOptInfo *foreignrel); +extern void chfdw_deparse_select_stmt_for_rel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, List *tlist, List *remote_conds, List *pathkeys, bool is_subquery, List **retrieved_attrs, List **params_list); -extern const char *get_jointype_name(JoinType jointype); +extern const char *chfdw_get_jointype_name(JoinType jointype); /* in shippable.c */ -extern bool is_builtin(Oid objectId); -extern int is_equal_op(Oid opno); +extern bool chfdw_is_builtin(Oid objectId); +extern int chfdw_is_equal_op(Oid opno); /* * Connection cache hash table entry @@ -267,18 +259,18 @@ typedef struct CustomColumnInfo char signfield[NAMEDATALEN]; } CustomColumnInfo; -extern CustomObjectDef *checkForCustomFunction(Oid funcid); -extern CustomObjectDef *checkForCustomType(Oid typeoid); +extern CustomObjectDef *chfdw_check_for_custom_function(Oid funcid); +extern CustomObjectDef *chfdw_check_for_custom_type(Oid typeoid); extern void modifyCustomVar(CustomObjectDef *def, Node *node); -extern void ApplyCustomTableOptions(CHFdwRelationInfo *fpinfo, Oid relid); -extern CustomColumnInfo *GetCustomColumnInfo(Oid relid, uint16 varattno); -extern CustomObjectDef *checkForCustomOperator(Oid opoid, Form_pg_operator form); +extern void chfdw_apply_custom_table_options(CHFdwRelationInfo *fpinfo, Oid relid); +extern CustomColumnInfo *chfdw_get_custom_column_info(Oid relid, uint16 varattno); +extern CustomObjectDef *chfdw_check_for_custom_operator(Oid opoid, Form_pg_operator form); extern Datum ch_timestamp_out(PG_FUNCTION_ARGS); extern Datum ch_date_out(PG_FUNCTION_ARGS); extern Datum ch_time_out(PG_FUNCTION_ARGS); -extern bool is_shippable(Oid objectId, Oid classId, CHFdwRelationInfo *fpinfo, +extern bool chfdw_is_shippable(Oid objectId, Oid classId, CHFdwRelationInfo *fpinfo, CustomObjectDef **outcdef); /* compat */ diff --git a/src/clickhousedb_option.c b/src/clickhousedb_option.c index 4ccfa23..fcd0fb6 100644 --- a/src/clickhousedb_option.c +++ b/src/clickhousedb_option.c @@ -242,7 +242,7 @@ is_ch_option(const char *keyword) * allocated large-enough arrays. Returns number of options found. */ void -ExtractConnectionOptions(List *defelems, char **driver, char **host, int *port, +chfdw_extract_options(List *defelems, char **driver, char **host, int *port, char **dbname, char **username, char **password) { ListCell *lc; @@ -272,42 +272,3 @@ ExtractConnectionOptions(List *defelems, char **driver, char **host, int *port, } } } - -/* - * Parse a comma-separated string and return a List of the OIDs of the - * extensions named in the string. If any names in the list cannot be - * found, report a warning if warnOnMissing is true, else just silently - * ignore them. - */ -List * -ExtractExtensionList(const char *extensionsString, bool warnOnMissing) -{ - List *extensionOids = NIL; - List *extlist; - ListCell *lc; - - /* SplitIdentifierString scribbles on its input, so pstrdup first */ - if (!SplitIdentifierString(pstrdup(extensionsString), ',', &extlist)) - /* syntax error in name list */ - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("parameter \"%s\" must be a list of extension names", - "extensions"))); - - foreach (lc, extlist) - { - const char *extension_name = (const char *) lfirst(lc); - Oid extension_oid = get_extension_oid(extension_name, true); - - if (OidIsValid(extension_oid)) - extensionOids = lappend_oid(extensionOids, extension_oid); - else if (warnOnMissing) - ereport(WARNING, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("extension \"%s\" is not installed", - extension_name))); - } - - list_free(extlist); - return extensionOids; -} diff --git a/src/clickhousedb_shipable.c b/src/clickhousedb_shipable.c index 9f3cfe6..bc4f991 100644 --- a/src/clickhousedb_shipable.c +++ b/src/clickhousedb_shipable.c @@ -149,37 +149,37 @@ lookup_shippable(Oid objectId, Oid classId, CHFdwRelationInfo *fpinfo) * track of that would be a huge exercise. */ bool -is_builtin(Oid objectId) +chfdw_is_builtin(Oid objectId) { return (objectId < FirstBootstrapObjectId); } /* - * is_shippable + * chfdw_is_shippable * Is this object (function/operator/type) shippable to foreign server? */ bool -is_shippable(Oid objectId, Oid classId, CHFdwRelationInfo *fpinfo, +chfdw_is_shippable(Oid objectId, Oid classId, CHFdwRelationInfo *fpinfo, CustomObjectDef **outcdef) { ShippableCacheKey key; ShippableCacheEntry *entry; /* Built-in objects are presumed shippable. */ - if (is_builtin(objectId)) + if (chfdw_is_builtin(objectId)) return true; if (classId == ProcedureRelationId) { - CustomObjectDef *cdef = checkForCustomFunction(objectId); + CustomObjectDef *cdef = chfdw_check_for_custom_function(objectId); if (outcdef != NULL) *outcdef = cdef; return (cdef && cdef->cf_type != CF_UNSHIPPABLE); } - else if (classId == TypeRelationId && checkForCustomType(objectId) != NULL) + else if (classId == TypeRelationId && chfdw_check_for_custom_type(objectId) != NULL) return true; - else if (classId == OperatorRelationId && checkForCustomOperator(objectId, NULL) != NULL) + else if (classId == OperatorRelationId && chfdw_check_for_custom_operator(objectId, NULL) != NULL) return true; /* Otherwise, give up if user hasn't specified any shippable extensions. */ diff --git a/src/libclickhouse_link.c b/src/libclickhouse_link.c index 55ad778..d79e479 100644 --- a/src/libclickhouse_link.c +++ b/src/libclickhouse_link.c @@ -66,7 +66,7 @@ static bool is_canceled(void) } ch_connection -http_connect(char *connstring) +chfdw_http_connect(char *connstring) { ch_connection res; ch_http_connection_t *conn = ch_http_connection(connstring); @@ -280,7 +280,7 @@ http_fetch_row(ch_cursor *cursor, List *attrs, TupleDesc tupdesc, Datum *v, bool } text * -http_fetch_raw_data(ch_cursor *cursor) +chfdw_http_fetch_raw_data(ch_cursor *cursor) { ch_http_read_state *state = cursor->read_state; if (state->data == NULL) @@ -292,7 +292,7 @@ http_fetch_raw_data(ch_cursor *cursor) /*** BINARY PROTOCOL ***/ ch_connection -binary_connect(ch_connection_details *details) +chfdw_binary_connect(ch_connection_details *details) { char *ch_error = NULL; ch_connection res; @@ -729,11 +729,11 @@ static char *str_types_map[STR_TYPES_COUNT][2] = { }; List * -construct_create_tables(ImportForeignSchemaStmt *stmt, ForeignServer *server) +chfdw_construct_create_tables(ImportForeignSchemaStmt *stmt, ForeignServer *server) { Oid userid = GetUserId(); UserMapping *user = GetUserMapping(userid, server->serverid); - ch_connection conn = GetConnection(user); + ch_connection conn = chfdw_get_connection(user); ch_cursor *cursor; char *query, *driver; @@ -744,7 +744,7 @@ construct_create_tables(ImportForeignSchemaStmt *stmt, ForeignServer *server) ch_connection_details details; details.dbname = "default"; - ExtractConnectionOptions(server->options, &driver, &details.host, + chfdw_extract_options(server->options, &driver, &details.host, &details.port, &details.dbname, &details.username, &details.password); query = psprintf("select name from system.tables where database='%s'", details.dbname);