Skip to content

Commit

Permalink
[Bug] refactor pdml
Browse files Browse the repository at this point in the history
Conflicts:
	src/backend/px_optimizer/libgpopt/include/gpopt/engine/CHint.h
  • Loading branch information
HBKO committed Nov 14, 2023
1 parent 0695ead commit e92e032
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 307 deletions.
2 changes: 1 addition & 1 deletion src/backend/executor/execUtils_px.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ FillSliceGangInfo(ExecSlice *slice, int numsegments, DirectDispatchInfo *dd)
slice->segments = NIL;

for(k = 0;k < numsegments; k++)
slice->segments = lappend_int(slice->segments,RW_SEGMENT);
slice->segments = lappend_int(slice->segments, k);
break;
case GANGTYPE_PRIMARY_READER:
slice->planNumSegments = numsegments;
Expand Down
22 changes: 9 additions & 13 deletions src/backend/px/dispatcher/px_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ pxconn_createWorkerDescriptor(struct PxNodeInfo *pxinfo, int identifier,
pxWorkerDesc->identifier = identifier;
pxWorkerDesc->logicalWorkerInfo.total_count = logicalTotalWorkers;
/* POLAR px */
if (MASTER_CONTENT_ID == logicalWorkerIdx && RW_COUNTER_START <= identifier)
pxWorkerDesc->logicalWorkerInfo.idx = (identifier % RW_COUNTER_START) % logicalTotalWorkers;
else
pxWorkerDesc->logicalWorkerInfo.idx = logicalWorkerIdx;
pxWorkerDesc->logicalWorkerInfo.idx = logicalWorkerIdx;
/* POLAR end */

MemoryContextSwitchTo(oldContext);
Expand All @@ -118,10 +115,7 @@ pxconn_termWorkerDescriptor(PxWorkerDescriptor *pxWorkerDesc)
px_nodes = pxWorkerDesc->pxNodeInfo->px_nodes;

/* put px identifier to free list for reuse */
if (RW_COUNTER_START > pxWorkerDesc->identifier)
{
px_nodes->freeCounterList = lappend_int(px_nodes->freeCounterList, pxWorkerDesc->identifier);
}
px_nodes->freeCounterList = lappend_int(px_nodes->freeCounterList, pxWorkerDesc->identifier);

pxconn_disconnect(pxWorkerDesc);

Expand All @@ -139,7 +133,8 @@ pxconn_termWorkerDescriptor(PxWorkerDescriptor *pxWorkerDesc)
void
pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
const char *pxid,
const char *options)
const char *options,
SegmentType segmentType)
{
#define MAX_KEYWORDS 10
#define MAX_INT_STRING_LEN 20
Expand All @@ -148,6 +143,7 @@ pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
const char *values[MAX_KEYWORDS];
char portstr[MAX_INT_STRING_LEN];
int nkeywords = 0;
bool should_be_localhost = false;

keywords[nkeywords] = "pxid";
values[nkeywords] = pxid;
Expand All @@ -170,10 +166,10 @@ pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
*
* For other PX connections, we set "hostaddr". "host" is not used.
*/
if ((px_role == PX_ROLE_QC &&
(pxWorkerDesc->logicalWorkerInfo.idx == MASTER_CONTENT_ID ||
pxWorkerDesc->identifier >= RW_COUNTER_START)) ||
FAULT_COND(SIMPLE_FAULT_INJECTOR("hostaddr_info") == FaultInjectorTypeEnable))
/* When EntryDB or pdml, hostaddr should be localhost(127.0.0.1) */
should_be_localhost = (pxWorkerDesc->logicalWorkerInfo.idx == MASTER_CONTENT_ID)
|| (segmentType == SEGMENTTYPE_EXPLICT_WRITER);
if (px_role == PX_ROLE_QC && should_be_localhost)
{
keywords[nkeywords] = "hostaddr";
values[nkeywords] = "127.0.0.1";
Expand Down
4 changes: 2 additions & 2 deletions src/backend/px/dispatcher/px_gang.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ makeOptions(void)

Assert(px_role == PX_ROLE_QC);

qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID);
qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID, SEGMENTTYPE_ANY);
appendStringInfo(&string, " -c polar_px_qc_hostname=%s", qdinfo->config->hostip);
appendStringInfo(&string, " -c polar_px_qc_port=%d", qdinfo->config->port);

Expand Down Expand Up @@ -639,7 +639,7 @@ getPxProcessesForQC(int isPrimary)
elog(FATAL, "getPxProcessesForQC: unsupported request for master mirror process");
}

qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID);
qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID, SEGMENTTYPE_ANY);

Assert(qdinfo->config->node_idx == -1);
Assert(qdinfo->config->hostip != NULL);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/px/dispatcher/px_gang_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pxgang_createGang_async(List *segments, SegmentType segmentType)
options = makeOptions();

/* start connection in asynchronous way */
pxconn_doConnectStart(pxWorkerDesc, pxid, options);
pxconn_doConnectStart(pxWorkerDesc, pxid, options, segmentType);

if (pxconn_isBadConnection(pxWorkerDesc))
ereport(ERROR, (errcode(ERRCODE_PX_INTERCONNECTION_ERROR),
Expand Down
54 changes: 17 additions & 37 deletions src/backend/px/px_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static int px_node_configs_generation = POLAR_CLUSTER_INFO_INVALID_GENERATION
static MemoryContext px_worker_context = NULL;
static PxNodeConfigEntry *px_node_configs = NULL;

static int nextPXIdentifer(PxNodes *px_nodes, bool isRW);
static int nextPXIdentifer(PxNodes *px_nodes);
static void GeneratePxNodeConfigs(void);

/* NB: all extern function should switch to this context */
Expand Down Expand Up @@ -249,7 +249,7 @@ GeneratePxNodeConfigs(void)
qc_config->node_idx = -1;
qc_config->dop = 1;
qc_config->port = PostPortNumber;
qc_config->hostip = "127.0.0.1";
qc_config->hostip = "";

px_node_configs = configs;
px_node_configs_size = idx;
Expand Down Expand Up @@ -305,7 +305,6 @@ pxnode_getPxNodes()
px_nodes->numActivePXs = 0;
px_nodes->numIdlePXs = 0;
px_nodes->pxCounter = 0;
px_nodes->rwCounter = RW_COUNTER_START;
px_nodes->freeCounterList = NIL;

px_nodes->pxInfo =
Expand Down Expand Up @@ -368,10 +367,10 @@ pxnode_allocateIdlePX(int logicalWorkerIdx, int logicalTotalWorkers, SegmentType

if (logicalWorkerIdx == -1)
{
pxinfo = pxnode_getPxNodeInfo(-1);
pxinfo = pxnode_getPxNodeInfo(-1, segmentType);
logicalTotalWorkers = getPxWorkerCount();
} else
pxinfo = pxnode_getPxNodeInfo(logicalWorkerIdx);
pxinfo = pxnode_getPxNodeInfo(logicalWorkerIdx, segmentType);

if (pxinfo == NULL)
{
Expand Down Expand Up @@ -414,21 +413,10 @@ pxnode_allocateIdlePX(int logicalWorkerIdx, int logicalTotalWorkers, SegmentType
/* POLAR px */
if (!pxWorkerDesc)
{
if (RW_SEGMENT == logicalWorkerIdx)
{
/* RW */
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes, true),
MASTER_CONTENT_ID,
logicalTotalWorkers);
}
else
{
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes, false),
logicalWorkerIdx,
logicalTotalWorkers);
}
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes),
logicalWorkerIdx,
logicalTotalWorkers);
}
/* POLAR end */

Expand Down Expand Up @@ -504,33 +492,25 @@ pxnode_recycleIdlePX(PxWorkerDescriptor *pxWorkerDesc, bool forceDestroy)
}

static int
nextPXIdentifer(PxNodes *px_nodes, bool isRW)
nextPXIdentifer(PxNodes *px_nodes)
{
int result;
if (isRW)
{
return px_nodes->rwCounter++;
}
else
{
if (!px_nodes->freeCounterList)
{
result = px_nodes->pxCounter;
px_nodes->pxCounter = (px_nodes->pxCounter + 1) % RW_COUNTER_START;
return result;
}

result = linitial_int(px_nodes->freeCounterList);
px_nodes->freeCounterList = list_delete_first(px_nodes->freeCounterList);
if (!px_nodes->freeCounterList)
{
result = px_nodes->pxCounter++;
return result;
}
result = linitial_int(px_nodes->freeCounterList);
px_nodes->freeCounterList = list_delete_first(px_nodes->freeCounterList);
return result;
}

/*
* Find PxNodeInfo in the array by segment index.
*/
PxNodeInfo *
pxnode_getPxNodeInfo(int contentId)
pxnode_getPxNodeInfo(int contentId, SegmentType segmentType)
{
PxNodeInfo *pxInfo = NULL;
PxNodes *px_nodes;
Expand All @@ -546,7 +526,7 @@ pxnode_getPxNodeInfo(int contentId)
* Because the IP and Port of the RW and QC nodes are exactly the same
* , qcInfo can be used directly.
*/
if (RW_SEGMENT == contentId)
if (SEGMENTTYPE_EXPLICT_WRITER == segmentType)
return px_nodes->qcInfo;

if (contentId < 0)
Expand Down
67 changes: 3 additions & 64 deletions src/backend/px_optimizer/libgpopt/include/gpopt/engine/CHint.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

/* POLAR px */
#define MAX_INSERT_DOP_NUM ULONG(128)
#define MAX_UPDATE_DOP_NUM ULONG(128)
#define MAX_SELECT_DOP_NUM ULONG(128)
#define MAX_DELETE_DOP_NUM ULONG(128)
/* POLAR px */

namespace gpopt
Expand Down Expand Up @@ -59,17 +56,6 @@ class CHint : public CRefCount
/* POLAR px */
ULONG m_ulInsertDopNum;

ULONG m_ulUpdateDopNum;

ULONG m_ulSelectDopNum;

ULONG m_ulDeleteDopNum;

BOOL m_fRemoveUpdateRedundantMotion;

BOOL m_fRemoveDeleteRedundantMotion;
/* POLAR px */

public:
CHint(const CHint &) = delete;

Expand All @@ -80,12 +66,7 @@ class CHint : public CRefCount
ULONG broadcast_threshold, BOOL enforce_constraint_on_dml,
ULONG push_group_by_below_setop_threshold,
/* POLAR px */
ULONG insert_dop_num,
ULONG update_dop_num,
ULONG select_dop_num,
ULONG delete_dop_num,
BOOL remove_update_redundant_motion,
BOOL remove_delete_redundant_motion)
ULONG insert_dop_num)
: m_ulMinNumOfPartsToRequireSortOnInsert(
min_num_of_parts_to_require_sort_on_insert),
m_ulJoinArityForAssociativityCommutativity(
Expand All @@ -97,12 +78,7 @@ class CHint : public CRefCount
m_ulPushGroupByBelowSetopThreshold(
push_group_by_below_setop_threshold),
/* POLAR px */
m_ulInsertDopNum(insert_dop_num),
m_ulUpdateDopNum(update_dop_num),
m_ulSelectDopNum(select_dop_num),
m_ulDeleteDopNum(delete_dop_num),
m_fRemoveUpdateRedundantMotion(remove_update_redundant_motion),
m_fRemoveDeleteRedundantMotion(remove_delete_redundant_motion)
m_ulInsertDopNum(insert_dop_num)
{
}

Expand Down Expand Up @@ -175,37 +151,6 @@ class CHint : public CRefCount
return m_ulInsertDopNum;
}

ULONG
UlUpdateDopNum() const
{
return m_ulUpdateDopNum;
}

ULONG
UlSelectDopNum() const
{
return m_ulSelectDopNum;
}

ULONG
UlDeleteDopNum() const
{
return m_ulDeleteDopNum;
}

BOOL
FRemoveUpdateRedundantMotion() const
{
return m_fRemoveUpdateRedundantMotion;
}

BOOL
FRemoveDeleteRedundantMotion() const
{
return m_fRemoveDeleteRedundantMotion;
}
/* POLAR px */

// generate default hint configurations, which disables sort during insert on
// append only row-oriented partitioned tables by default
static CHint *
Expand All @@ -220,13 +165,7 @@ class CHint : public CRefCount
true, /* enforce_constraint_on_dml */
PUSH_GROUP_BY_BELOW_SETOP_THRESHOLD, /* push_group_by_below_setop_threshold */
/* POLAR px */
MAX_INSERT_DOP_NUM,
MAX_UPDATE_DOP_NUM,
MAX_SELECT_DOP_NUM,
MAX_DELETE_DOP_NUM,
true,
true
/* POLAR px */
MAX_INSERT_DOP_NUM
);
}

Expand Down
38 changes: 3 additions & 35 deletions src/backend/px_optimizer/libgpopt/src/operators/CPhysicalDML.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,42 +284,10 @@ CPhysicalDML::PdsRequired(CMemoryPool *mp,
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
else if (CLogicalDML::EdmlUpdate == m_edmlop)
// Update and Delete
else if(CLogicalDML::EdmlUpdate == m_edmlop || CLogicalDML::EdmlDelete == m_edmlop)
{
bool remove_redundant_motion = optimizer_config->GetHint()->FRemoveUpdateRedundantMotion();
ULONG update_dop_num = optimizer_config->GetHint()->UlUpdateDopNum();
ULONG select_dop_num = optimizer_config->GetHint()->UlSelectDopNum();
if (!remove_redundant_motion)
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
else
{
if (update_dop_num == select_dop_num)
{
m_pds->AddRef();
return m_pds;
}
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
}
else if (CLogicalDML::EdmlDelete == m_edmlop)
{
/* delete */
bool remove_redundant_motion = optimizer_config->GetHint()->FRemoveDeleteRedundantMotion();
ULONG delete_dop_num = optimizer_config->GetHint()->UlDeleteDopNum();
ULONG select_dop_num = optimizer_config->GetHint()->UlSelectDopNum();
if (!remove_redundant_motion)
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
else
{
if (delete_dop_num == select_dop_num)
{
m_pds->AddRef();
return m_pds;
}
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ using namespace gpopt;

/* POLAR px */
#define DEFAULT_INSERT_DOP_NUM ULONG(1)
#define DEFAULT_UPDATE_DOP_NUM ULONG(1)
#define DEFAULT_SELECT_DOP_NUM ULONG(1)
#define DEFAULT_DELETE_DOP_NUM ULONG(1)
/* POLAR px */

XERCES_CPP_NAMESPACE_USE
Expand Down Expand Up @@ -125,12 +122,7 @@ CParseHandlerHint::StartElement(const XMLCh *const, //element_uri,
join_order_dp_threshold, broadcast_threshold, enforce_constraint_on_dml,
push_group_by_below_setop_threshold,
/* POLAR px */
DEFAULT_INSERT_DOP_NUM,
DEFAULT_UPDATE_DOP_NUM,
DEFAULT_SELECT_DOP_NUM,
DEFAULT_DELETE_DOP_NUM,
true,
true);
DEFAULT_INSERT_DOP_NUM);
}

//---------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit e92e032

Please sign in to comment.