Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private String initiatorId(IgniteEx node, String sqlMatch, int timeout) throws E
fail("Timeout. Cannot find query with: " + sqlMatch);

List<List<?>> res = node.context().query().querySqlFields(
new SqlFieldsQuery("SELECT sql, initiator_id FROM SYS.SQL_QUERIES"), false).getAll();
new SqlFieldsQuery("SELECT sql, initiator_id FROM SYS.SQL_QUERIES WHERE MAP_QUERY = FALSE"), false).getAll();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like an issue. Users use initiatorId like label. Maybe it's worth to pass it together with map query request (maybe by another ticket)


for (List<?> row : res) {
if (((String)row.get(0)).toUpperCase().contains(sqlMatch.toUpperCase()))
Expand All @@ -356,7 +356,7 @@ private void checkRunningQueriesCount(IgniteEx node, int expectedQryCount, int t

while (true) {
List<List<?>> res = node.context().query().querySqlFields(
new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES"), false).getAll();
new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES WHERE MAP_QUERY = FALSE"), false).getAll();

res.stream().forEach(System.out::println);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ public void testGetAllColumns() throws Exception {
"SYS.SQL_QUERIES.DURATION.null",
"SYS.SQL_QUERIES.ORIGIN_NODE_ID.null",
"SYS.SQL_QUERIES.INITIATOR_ID.null",
"SYS.SQL_QUERIES.MAP_QUERY.null",
"SYS.SQL_QUERIES.SUBJECT_ID.null",
"SYS.SCAN_QUERIES.START_TIME.null",
"SYS.SCAN_QUERIES.TRANSFORMER.null",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public class GridRunningQueryInfo {
/** Originator. */
private final String qryInitiatorId;

/** Map query flag. */
private final boolean mapQry;

/** Enforce join order flag. */
private final boolean enforceJoinOrder;

Expand All @@ -89,6 +92,7 @@ public class GridRunningQueryInfo {
* @param cancel Query cancel.
* @param loc Local query flag.
* @param qryInitiatorId Query's initiator identifier.
* @param mapQry Map query flag.
* @param enforceJoinOrder Enforce join order flag.
* @param distributedJoins Distributed joins flag.
* @param subjId Subject ID.
Expand All @@ -104,6 +108,7 @@ public GridRunningQueryInfo(
GridQueryCancel cancel,
boolean loc,
String qryInitiatorId,
boolean mapQry,
boolean enforceJoinOrder,
boolean distributedJoins,
UUID subjId
Expand All @@ -119,6 +124,7 @@ public GridRunningQueryInfo(
this.loc = loc;
this.span = MTC.span();
this.qryInitiatorId = qryInitiatorId;
this.mapQry = mapQry;
this.enforceJoinOrder = enforceJoinOrder;
this.distributedJoins = distributedJoins;
this.subjId = subjId;
Expand Down Expand Up @@ -224,6 +230,13 @@ public String queryInitiatorId() {
return qryInitiatorId;
}

/**
* @return {@code true} if query executes map phase.
*/
public boolean mapQuery() {
return mapQry;
}

/**
* @return Distributed joins.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,76 @@
public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
@Nullable GridQueryCancel cancel,
String qryInitiatorId, boolean enforceJoinOrder, boolean distributedJoins) {
return register(
qry,
qryType,
schemaName,
loc,
cancel,
qryInitiatorId,
enforceJoinOrder,
distributedJoins,
localNodeId,
false
);
}

/**
* Registers map-side running query and returns an id associated with the query on the current node.
*
* @param qry Query text.
* @param schemaName Schema name.
* @param cancel Query cancel.
* @param qryInitiatorId Query initiator ID.
* @param originNodeId Query origin node ID.
* @param enforceJoinOrder Enforce join order flag.
* @param distributedJoins Distributed joins flag.
* @return Id of registered query.
*/
public long registerMapQuery(
String qry,
String schemaName,
@Nullable GridQueryCancel cancel,
String qryInitiatorId,
UUID originNodeId,
boolean enforceJoinOrder,
boolean distributedJoins
) {
return register(
qry,
SQL_FIELDS,
schemaName,
false,
cancel,
qryInitiatorId,
enforceJoinOrder,
distributedJoins,
originNodeId,
true
);
}

/** Registers running query and returns an id associated with the query. */
private long register(

Check warning on line 340 in modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Method has 10 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ4qWnp8W4Q6haNCo6Xx&open=AZ4qWnp8W4Q6haNCo6Xx&pullRequest=13091
String qry,
GridCacheQueryType qryType,
String schemaName,
boolean loc,
@Nullable GridQueryCancel cancel,
String qryInitiatorId,
boolean enforceJoinOrder,
boolean distributedJoins,
UUID nodeId,
boolean mapQry
) {
long qryId = qryIdGen.incrementAndGet();

if (qryInitiatorId == null)
qryInitiatorId = SqlFieldsQuery.threadedQueryInitiatorId();

final GridRunningQueryInfo run = new GridRunningQueryInfo(
qryId,
localNodeId,
nodeId,
qry,
qryType,
schemaName,
Expand All @@ -303,6 +365,7 @@
cancel,
loc,
qryInitiatorId,
mapQry,
enforceJoinOrder,
distributedJoins,
securitySubjectId(ctx)
Expand All @@ -314,7 +377,7 @@

run.span().addTag(SQL_QRY_ID, run::globalQueryId);

if (!qryStartedListeners.isEmpty()) {
if (!mapQry && !qryStartedListeners.isEmpty()) {
GridQueryStartedInfo info = new GridQueryStartedInfo(
run.id(),
localNodeId,
Expand Down Expand Up @@ -375,10 +438,13 @@
if (failed)
qrySpan.addTag(ERROR, failReason::getMessage);

//We need to collect query history and metrics only for SQL queries.
Comment thread
alex-plekhanov marked this conversation as resolved.
if (isSqlQuery(qry)) {
qry.runningFuture().onDone();

if (qry.mapQuery())
return;

// We need to collect query history and metrics only for SQL queries initiated by user.
qryHistTracker.collectHistory(qry, failed);

if (!failed)
Expand Down Expand Up @@ -553,7 +619,7 @@
long curTime = U.currentTimeMillis();

for (GridRunningQueryInfo runningQryInfo : runs.values()) {
if (curTime - runningQryInfo.startTime() > duration)
if (!runningQryInfo.mapQuery() && curTime - runningQryInfo.startTime() > duration)
res.add(runningQryInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public String initiatorId() {
return qry.queryInitiatorId();
}

/** @return {@code True} if query executes map phase. */
@Order(8)
public boolean mapQuery() {
return qry.mapQuery();
}

/** @return {@code True} if query is local. */
public boolean local() {
return qry.local();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2177,6 +2177,7 @@ private UpdateResult executeUpdate0(
distributedPlan.getCacheIds(),
qryDesc.sql(),
qryParams.arguments(),
qryDesc.queryInitiatorId(),
qryDesc.enforceJoinOrder(),
qryParams.pageSize(),
qryParams.timeout(),
Expand Down Expand Up @@ -2205,6 +2206,7 @@ private UpdateResult executeUpdate0(
.setEnforceJoinOrder(qryDesc.enforceJoinOrder())
.setLocal(qryDesc.local())
.setPageSize(qryParams.pageSize())
.setQueryInitiatorId(qryDesc.queryInitiatorId())
.setTimeout(qryParams.timeout(), TimeUnit.MILLISECONDS);

Iterable<List<?>> cur;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,46 @@
/** Segment. */
private final int segment;

/** Local query id. */
private final long locQryId;

/**
* @param stmt Query statement.
* @param sql Query statement.
* @param nodeId Originator node id.
* @param qryId Query id.
* @param locQryId Local query id.
* @param initiatorId Query initiator id.
* @param reqId Request ID.
* @param segment Segment.
*/
public MapH2QueryInfo(

Check warning on line 46 in modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 8 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ4qWnxgW4Q6haNCo6Xy&open=AZ4qWnxgW4Q6haNCo6Xy&pullRequest=13091
PreparedStatement stmt,
String sql,
UUID nodeId,
long qryId,
long locQryId,
String initiatorId,
long reqId,
int segment
) {
super(QueryType.MAP, stmt, sql, nodeId, qryId, initiatorId);

this.locQryId = locQryId;
this.reqId = reqId;
this.segment = segment;
}

/** @return Local query id. */
public long localQueryId() {
return locQryId;
}

/** {@inheritDoc} */
@Override protected void printInfo(StringBuilder msg) {
msg.append(", reqId=").append(reqId)
msg.append(", mapQuery=true")
.append(", originNodeId=").append(nodeId())
.append(", reqId=").append(reqId)
.append(", segment=").append(segment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
Expand Down Expand Up @@ -454,6 +455,8 @@ private void onQueryRequest0(

MapH2QueryInfo qryInfo = null;

long runningQryId = RunningQueryManager.UNDEFINED_QUERY_ID;

try {
res.lock();

Expand All @@ -468,7 +471,28 @@ private void onQueryRequest0(

H2Utils.bindParameters(stmt, params0);

qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, qryInitiatorId, reqId, segmentId);
GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx);

runningQryId = h2.runningQueryManager().registerMapQuery(
sql,
schemaName,
qryCancel,
qryInitiatorId,
node.id(),
enforceJoinOrder,
distributedJoins
);

qryInfo = new MapH2QueryInfo(
stmt,
qry.query(),
node.id(),
qryId,
runningQryId,
qryInitiatorId,
reqId,
segmentId
);

h2.heavyQueriesTracker().startTracking(qryInfo);

Expand All @@ -482,8 +506,6 @@ private void onQueryRequest0(
);
}

GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx);

ResultSet rs = h2.executeWithResumableTimeTracking(
() -> h2.executeSqlQueryWithTimer(
stmt,
Expand Down Expand Up @@ -567,6 +589,8 @@ private void onQueryRequest0(
if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, e);

h2.runningQueryManager().unregister(runningQryId, e);
Comment thread
alex-plekhanov marked this conversation as resolved.

throw e;
}
finally {
Expand Down Expand Up @@ -684,6 +708,8 @@ public void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) {

MapNodeResults nodeResults = resultsForNode(node.id());

long runningQryId = RunningQueryManager.UNDEFINED_QUERY_ID;

// We don't use try with resources on purpose - the catch block must also be executed in the context of this span.
TraceSurroundings trace = MTC.support(ctx.tracing()
.create(SpanType.SQL_DML_QRY_EXEC_REQ, MTC.span())
Expand Down Expand Up @@ -720,6 +746,7 @@ public void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) {

fldsQry.setEnforceJoinOrder(req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
fldsQry.setPageSize(req.pageSize());
fldsQry.setQueryInitiatorId(req.queryInitiatorId());
fldsQry.setLocal(true);

if (req.timeout() > 0 || req.explicitTimeout())
Expand All @@ -736,6 +763,16 @@ public void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) {
loc = false;
}

runningQryId = h2.runningQueryManager().registerMapQuery(
req.query(),
req.schemaName(),
cancel,
req.queryInitiatorId(),
node.id(),
req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER),
fldsQry.isDistributedJoins()
);

UpdateResult updRes = h2.executeUpdateOnDataNode(req.schemaName(), fldsQry, filter, cancel, loc);

GridCacheContext<?, ?> mainCctx =
Expand All @@ -760,13 +797,17 @@ public void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) {
}

sendUpdateResponse(node, reqId, updRes, null);

h2.runningQueryManager().unregister(runningQryId, null);
}
catch (Exception e) {
MTC.span().addTag(ERROR, e::getMessage);

U.error(log, "Error processing dml request. [localNodeId=" + ctx.localNodeId() +
", nodeId=" + node.id() + ", req=" + req + ']', e);

h2.runningQueryManager().unregister(runningQryId, e);

sendUpdateResponse(node, reqId, null, e.getMessage());
}
finally {
Expand Down
Loading
Loading