Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HAWQ-1409. Send AGG-TYPE header to PXF #1189

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/backend/access/external/fileam.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,20 @@ external_stopscan(FileScanDesc scan)
* ----------------
*/
ExternalSelectDesc
external_getnext_init(PlanState *state) {
external_getnext_init(PlanState *state, ExternalScanState *es_state) {
ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData));
Plan *rootPlan;

if (state != NULL)
{
desc->projInfo = state->ps_ProjInfo;
/*
* If we have an agg type then our parent is an Agg node
*/
rootPlan = state->state->es_plannedstmt->planTree;
if (rootPlan->type == T_Agg && es_state->parent_agg_type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend to use IsA(rootPlan, Agg)

desc->agg_type = es_state->parent_agg_type;
}
}
return desc;
}
Expand Down
14 changes: 14 additions & 0 deletions src/backend/access/external/pxfheaders.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ void build_http_header(PxfInputData *input)
else
churl_headers_append(headers, "X-GP-HAS-FILTER", "0");

/* Aggregate information */
if (input->agg_type) {
char agg_groups_str[sizeof(int32)];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this var used?


switch(input->agg_type) {
case EXEC_FLAG_EXTERNAL_AGG_COUNT:
churl_headers_append(headers, "X-GP-AGG-TYPE", "count");
break;
default:
churl_headers_append(headers, "X-GP-AGG-TYPE", "unknown");
break;
}
}

add_delegation_token_headers(headers, input);
add_remote_credentials(headers);
}
Expand Down
15 changes: 15 additions & 0 deletions src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,21 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* initialize child nodes
*/
outerPlan = outerPlan(node);
if (outerPlan->type == T_ExternalScan) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use IsA(outerPlan, ExternalScan)

/*
* Hack to indicate to PXF when there is an external scan
*/
if (aggstate->aggs && list_length(aggstate->aggs) == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need check aggstate->aggs is null or not? if it is null, list_length will return 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I thought list_length() on a null value would cause an error

foreach(l, aggstate->aggs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the size of aggstate->aggs is already 1, do we need use foreach to iterate?

AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just by changing lfirst(l) to linitial(aggstate->aggs), you can remove foreach loop.

Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr;
//Only dealing with one agg
if (aggref->aggfnoid == 2147 || aggref->aggfnoid == 2803) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better use COUNT_ANY_OID and COUNT_STAR_OID...

eflags |= EXEC_FLAG_EXTERNAL_AGG_COUNT;
}
}
}
}
outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);

/*
Expand Down
7 changes: 6 additions & 1 deletion src/backend/executor/nodeExternalscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ ExternalNext(ExternalScanState *node)
/*
* get the next tuple from the file access methods
*/
externalSelectDesc = external_getnext_init(&(node->ss.ps));
externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
tuple = external_getnext(scandesc, direction, externalSelectDesc);

/*
Expand Down Expand Up @@ -237,6 +237,11 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
externalstate->ss.ps.delayEagerFree =
((eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) != 0);

/*
* If eflag contains EXEC_FLAG_EXTERNAL_AGG_COUNT then notify the underlying storage level
*/
externalstate->parent_agg_type = (eflags & EXEC_FLAG_EXTERNAL_AGG_COUNT);

initGpmonPktForExternalScan((Plan *)node, &externalstate->ss.ps.gpmon_pkt, estate);

return externalstate;
Expand Down
7 changes: 6 additions & 1 deletion src/bin/gpfusion/gpbridgeapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,13 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS)
inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
inputData.quals = EXTPROTOCOL_GET_SCANQUALS(fcinfo);
inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
if (EXTPROTOCOL_GET_SELECTDESC(fcinfo))
if (EXTPROTOCOL_GET_SELECTDESC(fcinfo)) {
inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo);
int agg_type = EXTPROTOCOL_GET_AGG_TYPE(fcinfo);
if (agg_type) {
inputData.agg_type = agg_type;
}
}
add_delegation_token(&inputData);

build_http_header(&inputData);
Expand Down
2 changes: 1 addition & 1 deletion src/include/access/extprotocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ typedef ExtProtocolData *ExtProtocol;
#define EXTPROTOCOL_GET_USER_CTX(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_user_ctx)
#define EXTPROTOCOL_GET_SELECTDESC(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc)
#define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo)
#define EXTPROTOCOL_GET_AGG_TYPE(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->agg_type)
#define EXTPROTOCOL_IS_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call)

#define EXTPROTOCOL_SET_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call = true)
#define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \
(((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p)
Expand Down
4 changes: 3 additions & 1 deletion src/include/access/fileam.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ typedef ExternalInsertDescData *ExternalInsertDesc;
typedef struct ExternalSelectDescData
{
ProjectionInfo *projInfo;
// Information needed for aggregate pushdown
int agg_type;
} ExternalSelectDescData;

typedef enum DataLineStatus
Expand All @@ -89,7 +91,7 @@ extern FileScanDesc external_beginscan(Relation relation, Index scanrelid,
extern void external_rescan(FileScanDesc scan);
extern void external_endscan(FileScanDesc scan);
extern void external_stopscan(FileScanDesc scan);
extern ExternalSelectDesc external_getnext_init(PlanState *state);
extern ExternalSelectDesc external_getnext_init(PlanState *state, ExternalScanState *es_state);
extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc);
extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
Expand Down
2 changes: 2 additions & 0 deletions src/include/access/pxfheaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ typedef struct sPxfInputData
PxfHdfsToken token;
ProjectionInfo *proj_info;
List *quals;
int agg_type;
int agg_groups;
} PxfInputData;

void build_http_header(PxfInputData *input);
Expand Down
2 changes: 2 additions & 0 deletions src/include/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct ChunkTransportState; /* #include "cdb/cdbinterconnect.h" */
#define EXEC_FLAG_BACKWARD 0x0004 /* need backward scan */
#define EXEC_FLAG_MARK 0x0008 /* need mark/restore */

#define EXEC_FLAG_EXTERNAL_AGG_COUNT 0x0010 /* can support external agg */


/*
* ExecEvalExpr was formerly a function containing a switch statement;
Expand Down
1 change: 1 addition & 0 deletions src/include/nodes/execnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,7 @@ typedef struct ExternalScanState
struct FileScanDescData *ess_ScanDesc;
bool cdb_want_ctid;
ItemPointerData cdb_fake_ctid;
int parent_agg_type;
} ExternalScanState;

/* ----------------
Expand Down