Skip to content

Commit

Permalink
GG-37994 Session attributes propagated to task event.
Browse files Browse the repository at this point in the history
  • Loading branch information
antkr committed Dec 26, 2023
1 parent d50f221 commit 83839d5
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class TaskEvent extends EventAdapter {
* @param subjId Subject ID.
*/
public TaskEvent(ClusterNode node, String msg, int type, IgniteUuid sesId, String taskName, String taskClsName,
boolean internal, @Nullable UUID subjId) {
boolean internal, @Nullable UUID subjId) {
super(node, msg, type);

this.sesId = sesId;
Expand Down
110 changes: 110 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/events/TaskEventV2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2023 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.events;

import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

import java.util.Map;
import java.util.UUID;

/**
* Grid task event.
* <p>
* Grid events are used for notification about what happens within the grid. Note that by
* design Ignite keeps all events generated on the local node locally and it provides
* APIs for performing a distributed queries across multiple nodes:
* <ul>
* <li>
* {@link org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate, long, int...)} -
* asynchronously querying events occurred on the nodes specified, including remote nodes.
* </li>
* <li>
* {@link org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate, int...)} -
* querying only local events stored on this local node.
* </li>
* <li>
* {@link org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate, int...)} -
* listening to local grid events (events from remote nodes not included).
* </li>
* </ul>
* User can also wait for events using method {@link org.apache.ignite.IgniteEvents#waitForLocal(org.apache.ignite.lang.IgnitePredicate, int...)}.
* <h1 class="header">Events and Performance</h1>
* Note that by default all events in Ignite are enabled and therefore generated and stored
* by whatever event storage SPI is configured. Ignite can and often does generate thousands events per seconds
* under the load and therefore it creates a significant additional load on the system. If these events are
* not needed by the application this load is unnecessary and leads to significant performance degradation.
* <p>
* It is <b>highly recommended</b> to enable only those events that your application logic requires
* by using {@link org.apache.ignite.configuration.IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that certain
* events are required for Ignite's internal operations and such events will still be generated but not stored by
* event storage SPI if they are disabled in Ignite configuration.
* @see EventType#EVT_TASK_FAILED
* @see EventType#EVT_TASK_FINISHED
* @see EventType#EVT_TASK_REDUCED
* @see EventType#EVT_TASK_STARTED
* @see EventType#EVT_TASK_SESSION_ATTR_SET
* @see EventType#EVT_TASK_TIMEDOUT
* @see EventType#EVTS_TASK_EXECUTION
*/
public class TaskEventV2 extends TaskEvent {
/** */
private static final long serialVersionUID = 0L;

/** */
private Map<Object, Object> attributes;

/**
*
* @param node Node.
* @param msg Optional message.
* @param type Event type.
* @param sesId Task session ID.
* @param taskName Task name.
* @param taskClsName Task class name.
* @param internal Task is internal.
* @param subjId Subject ID.
* @param attributes Task session attributes.
*/
public TaskEventV2(ClusterNode node, String msg, int type, IgniteUuid sesId, String taskName, String taskClsName,
boolean internal, @Nullable UUID subjId, Map<Object, Object> attributes) {
super(node, msg, type, sesId, taskName, taskClsName, internal, subjId);
this.attributes = attributes;
}

/**
* Gets map of attributes from attached task session.
*
* @return Attributes from task session.
*/
public Map<Object, Object> attributes() {
return attributes;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TaskEventV2.class, this,
"nodeId8", U.id8(node().id()),
"msg", message(),
"type", name(),
"attributes", attributes(),
"tstamp", timestamp());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ public enum IgniteFeatures {
NEW_DR_FST_COMMANDS(67),

/** This feature enables checking a cache generation for incoming cache messages (see GridCacheIdMessage). */
CHECK_CACHE_GENERATION(68);
CHECK_CACHE_GENERATION(68),

/** This feature enables attributes to be included into TaskEvents. */
TASK_EVT_ATTRIBUTE_SUPPORT(69);

/**
* Unique feature identifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.ignite.events.Event;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.events.TaskEventV2;
import org.apache.ignite.internal.GridJobCancelRequest;
import org.apache.ignite.internal.GridJobContextImpl;
import org.apache.ignite.internal.GridJobExecuteRequest;
Expand Down Expand Up @@ -114,6 +115,8 @@
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.IgniteFeatures.TASK_EVT_ATTRIBUTE_SUPPORT;
import static org.apache.ignite.internal.IgniteFeatures.allNodesSupport;
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
Expand Down Expand Up @@ -1742,17 +1745,7 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req)
U.resolveClassLoader(ses.getClassLoader(), ctx.config()));

if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);

ctx.event().record(evt);
recordTaskEvent(attrs, ses);
}

synchronized (ses) {
Expand Down Expand Up @@ -2442,4 +2435,37 @@ else if (jobId == null)
public long computeJobWorkerInterruptTimeout() {
return computeJobWorkerInterruptTimeout.getOrDefault(ctx.config().getFailureDetectionTimeout());
}

/**
* @param attrs Attributes set.
* @param ses Internal session.
*/
public void recordTaskEvent(Map<?, ?> attrs, GridTaskSessionImpl ses) {
Event evt;

if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null,
ses.isFullSupport() ? ses.getAttributes() : null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);
}

ctx.event().record(evt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.events.TaskEventV2;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.GridJobSiblingImpl;
Expand Down Expand Up @@ -101,6 +102,8 @@
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK_CANCEL;
import static org.apache.ignite.internal.IgniteFeatures.TASK_EVT_ATTRIBUTE_SUPPORT;
import static org.apache.ignite.internal.IgniteFeatures.allNodesSupport;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
Expand Down Expand Up @@ -834,17 +837,34 @@ else if (task != null) {
if (ctx.event().isRecordable(EVT_MANAGEMENT_TASK_STARTED) && dep.visorManagementTask(task, taskCls)) {
VisorTaskArgument visorTaskArgument = (VisorTaskArgument)arg;

Event evt = new TaskEvent(
ctx.discovery().localNode(),
visorTaskArgument != null && visorTaskArgument.getArgument() != null
? visorTaskArgument.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
false,
subjId
);
Event evt;

if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
visorTaskArgument != null && visorTaskArgument.getArgument() != null
? visorTaskArgument.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
false,
subjId,
null
);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
visorTaskArgument != null && visorTaskArgument.getArgument() != null
? visorTaskArgument.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
false,
subjId
);
}

ctx.event().record(evt);
}
Expand Down Expand Up @@ -1074,17 +1094,7 @@ private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl ses)
}

if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);

ctx.event().record(evt);
recordTaskEvent(attrs, ses);
}

notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(ses), false);
Expand Down Expand Up @@ -1690,4 +1700,37 @@ public Map<ComputeJobStatusEnum, Long> jobStatuses(IgniteUuid sesId) {
else
return taskWorker.jobStatuses();
}

/**
* @param attrs Attributes set.
* @param ses Internal session.
*/
public void recordTaskEvent(Map<?, ?> attrs, GridTaskSessionImpl ses) {
Event evt;

if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null,
ses.isFullSupport() ? ses.getAttributes() : null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);
}

ctx.event().record(evt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.ignite.events.Event;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.events.TaskEventV2;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridInternalException;
import org.apache.ignite.internal.GridJobCancelRequest;
Expand Down Expand Up @@ -105,6 +106,8 @@
import static org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.IgniteFeatures.TASK_EVT_ATTRIBUTE_SUPPORT;
import static org.apache.ignite.internal.IgniteFeatures.allNodesSupport;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
Expand Down Expand Up @@ -1576,15 +1579,29 @@ void onNodeLeft(UUID nodeId) {
*/
private void recordTaskEvent(int evtType, String msg) {
if (!internal && ctx.event().isRecordable(evtType)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId);
Event evt;
if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId,
ses.isFullSupport() ? ses.getAttributes() : null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId);
}

ctx.event().record(evt);
}
Expand Down

0 comments on commit 83839d5

Please sign in to comment.