Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HAWQ-1409. Send AGG-TYPE header to PXF #1189

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/backend/access/external/fileam.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,20 @@ external_stopscan(FileScanDesc scan)
* ----------------
*/
ExternalSelectDesc
external_getnext_init(PlanState *state) {
external_getnext_init(PlanState *state, ExternalScanState *es_state) {
ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData));
Plan *rootPlan;

if (state != NULL)
{
desc->projInfo = state->ps_ProjInfo;
/*
* If we have an agg type then our parent is an Agg node
*/
rootPlan = state->state->es_plannedstmt->planTree;
if (IsA(rootPlan, Agg) && es_state->parent_agg_type) {
desc->agg_type = es_state->parent_agg_type;
}
}
return desc;
}
Expand Down
12 changes: 12 additions & 0 deletions src/backend/access/external/pxfheaders.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ void build_http_header(PxfInputData *input)
else
churl_headers_append(headers, "X-GP-HAS-FILTER", "0");

/* Aggregate information */
if (input->agg_type) {
switch(input->agg_type) {
case EXEC_FLAG_EXTERNAL_AGG_COUNT:
churl_headers_append(headers, "X-GP-AGG-TYPE", "count");
break;
default:
churl_headers_append(headers, "X-GP-AGG-TYPE", "unknown");
break;
}
}

add_delegation_token_headers(headers, input);
add_remote_credentials(headers);
}
Expand Down
13 changes: 13 additions & 0 deletions src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,19 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* initialize child nodes
*/
outerPlan = outerPlan(node);
if (IsA(outerPlan, ExternalScan)) {
/*
* Hack to indicate to PXF when there is an external scan
*/
if (list_length(aggstate->aggs) == 1) {
AggrefExprState *aggrefstate = (AggrefExprState *) linitial(aggstate->aggs);
Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr;
//Only dealing with one agg
if (aggref->aggfnoid == COUNT_ANY_OID || aggref->aggfnoid == COUNT_STAR_OID) {
eflags |= EXEC_FLAG_EXTERNAL_AGG_COUNT;
}
}
}
outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);

/*
Expand Down
7 changes: 6 additions & 1 deletion src/backend/executor/nodeExternalscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ ExternalNext(ExternalScanState *node)
/*
* get the next tuple from the file access methods
*/
externalSelectDesc = external_getnext_init(&(node->ss.ps));
externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
tuple = external_getnext(scandesc, direction, externalSelectDesc);

/*
Expand Down Expand Up @@ -237,6 +237,11 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
externalstate->ss.ps.delayEagerFree =
((eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) != 0);

/*
* If eflag contains EXEC_FLAG_EXTERNAL_AGG_COUNT then notify the underlying storage level
*/
externalstate->parent_agg_type = (eflags & EXEC_FLAG_EXTERNAL_AGG_COUNT);

initGpmonPktForExternalScan((Plan *)node, &externalstate->ss.ps.gpmon_pkt, estate);

return externalstate;
Expand Down
7 changes: 6 additions & 1 deletion src/bin/gpfusion/gpbridgeapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,13 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS)
inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
inputData.quals = EXTPROTOCOL_GET_SCANQUALS(fcinfo);
inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
if (EXTPROTOCOL_GET_SELECTDESC(fcinfo))
if (EXTPROTOCOL_GET_SELECTDESC(fcinfo)) {
inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo);
int agg_type = EXTPROTOCOL_GET_AGG_TYPE(fcinfo);
if (agg_type) {
inputData.agg_type = agg_type;
}
}
add_delegation_token(&inputData);

build_http_header(&inputData);
Expand Down
2 changes: 1 addition & 1 deletion src/include/access/extprotocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ typedef ExtProtocolData *ExtProtocol;
#define EXTPROTOCOL_GET_USER_CTX(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_user_ctx)
#define EXTPROTOCOL_GET_SELECTDESC(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc)
#define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo)
#define EXTPROTOCOL_GET_AGG_TYPE(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->agg_type)
#define EXTPROTOCOL_IS_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call)

#define EXTPROTOCOL_SET_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call = true)
#define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \
(((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p)
Expand Down
4 changes: 3 additions & 1 deletion src/include/access/fileam.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ typedef ExternalInsertDescData *ExternalInsertDesc;
typedef struct ExternalSelectDescData
{
ProjectionInfo *projInfo;
// Information needed for aggregate pushdown
int agg_type;
} ExternalSelectDescData;

typedef enum DataLineStatus
Expand All @@ -89,7 +91,7 @@ extern FileScanDesc external_beginscan(Relation relation, Index scanrelid,
extern void external_rescan(FileScanDesc scan);
extern void external_endscan(FileScanDesc scan);
extern void external_stopscan(FileScanDesc scan);
extern ExternalSelectDesc external_getnext_init(PlanState *state);
extern ExternalSelectDesc external_getnext_init(PlanState *state, ExternalScanState *es_state);
extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc);
extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
Expand Down
2 changes: 2 additions & 0 deletions src/include/access/pxfheaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ typedef struct sPxfInputData
PxfHdfsToken token;
ProjectionInfo *proj_info;
List *quals;
int agg_type;
int agg_groups;
} PxfInputData;

void build_http_header(PxfInputData *input);
Expand Down
2 changes: 2 additions & 0 deletions src/include/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct ChunkTransportState; /* #include "cdb/cdbinterconnect.h" */
#define EXEC_FLAG_BACKWARD 0x0004 /* need backward scan */
#define EXEC_FLAG_MARK 0x0008 /* need mark/restore */

#define EXEC_FLAG_EXTERNAL_AGG_COUNT 0x0010 /* can support external agg */


/*
* ExecEvalExpr was formerly a function containing a switch statement;
Expand Down
1 change: 1 addition & 0 deletions src/include/nodes/execnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,7 @@ typedef struct ExternalScanState
struct FileScanDescData *ess_ScanDesc;
bool cdb_want_ctid;
ItemPointerData cdb_fake_ctid;
int parent_agg_type;
} ExternalScanState;

/* ----------------
Expand Down