Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions api/src/main/java/org/apache/flink/agents/api/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,62 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

/** Base class for all event types in the system. */
public abstract class Event {
public class Event {
Comment thread
addu390 marked this conversation as resolved.

private static final ObjectMapper MAPPER = new ObjectMapper();

private final UUID id;
private final String type;
private final Map<String, Object> attributes;
/** The timestamp of the source record. */

/**
* Runtime-internal timestamp from the source record. Not part of the cross-language event
* contract; used by the Flink runtime for timestamp propagation.
*/
private Long sourceTimestamp;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this field to keep the align between python Event and java Event

Copy link
Copy Markdown
Contributor Author

@addu390 addu390 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sourceTimestamp seems to be used in runtime for timestamp propagation (ActionExecutionOperator, RunnerContextImpl, JavaActionTask, etc.). Safe to remove?

I could however, specifically not use in the cross-language event contract, without any repercussions.


public Event() {
this(UUID.randomUUID(), new HashMap<>());
/** Unified event with user-defined type and attributes. */
public Event(String type, Map<String, Object> attributes) {
this(UUID.randomUUID(), type, attributes);
}

/** Unified event with user-defined type and empty attributes. */
public Event(String type) {
this(type, new HashMap<>());
}

@JsonCreator
public Event(
@JsonProperty("id") UUID id,
@JsonProperty("type") String type,
@JsonProperty("attributes") Map<String, Object> attributes) {
if (type == null || type.isEmpty()) {
throw new IllegalArgumentException("Event 'type' must not be null or empty.");
}
this.id = id;
this.attributes = attributes;
this.type = type;
this.attributes = attributes != null ? attributes : new HashMap<>();
}

public UUID getId() {
return id;
}

/** Returns the event type string used for routing. */
@JsonProperty("type")
public String getType() {
return type;
}

public Map<String, Object> getAttributes() {
return attributes;
}
Expand All @@ -73,17 +100,42 @@ public void setSourceTimestamp(long timestamp) {
this.sourceTimestamp = timestamp;
}

/**
* Creates a base Event from another Event, copying id, type, and attributes. Subclasses
* override this to reconstruct typed event objects with proper field deserialization.
*/
public static Event fromEvent(Event event) {
Event copy =
new Event(event.getId(), event.getType(), new HashMap<>(event.getAttributes()));
if (event.hasSourceTimestamp()) {
copy.setSourceTimestamp(event.getSourceTimestamp());
}
return copy;
}

/**
* Creates an Event from a JSON string.
*
* @param json the JSON string to deserialize
* @return the deserialized Event
* @throws IOException if JSON parsing fails or the 'type' field is missing or empty
*/
public static Event fromJson(String json) throws IOException {
return MAPPER.readValue(json, Event.class);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Event other = (Event) o;
return Objects.equals(this.id, other.id)
&& Objects.equals(this.getType(), other.getType())
&& Objects.equals(this.attributes, other.attributes);
}

@Override
public int hashCode() {
return Objects.hash(id, attributes);
return Objects.hash(id, getType(), attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@

/** Contextual information about an event, such as its type and timestamp. */
public class EventContext {
// Type of the event, the class name of the event
/** The routing key for the event, matching the {@code EVENT_TYPE} constant or type string. */
private final String eventType;

// Timestamp of when the event occurred
private final String timestamp;

public EventContext(Event event) {
this(event.getClass().getName(), Instant.now().toString());
this(event.getType(), Instant.now().toString());
}

@JsonCreator
Expand Down
32 changes: 24 additions & 8 deletions api/src/main/java/org/apache/flink/agents/api/InputEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,46 @@
package org.apache.flink.agents.api;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/** Event generated by the framework, carrying an input data that arrives at the agent. */
public class InputEvent extends Event {
private final Object input;

public static final String EVENT_TYPE = "_input_event";

public InputEvent(Object input) {
super();
this.input = input;
super(EVENT_TYPE);
setAttr("input", input);
}

@JsonCreator
public InputEvent(
@JsonProperty("id") UUID id,
@JsonProperty("attributes") Map<String, Object> attributes,
@JsonProperty("input") Object input) {
super(id, attributes);
this.input = input;
@JsonProperty("attributes") Map<String, Object> attributes) {
super(id, EVENT_TYPE, attributes);
}

/**
* Reconstructs a typed InputEvent from a base Event.
*
* @param event the base event containing the input data in attributes
* @return a typed InputEvent
*/
public static InputEvent fromEvent(Event event) {
InputEvent result = new InputEvent(event.getId(), new HashMap<>(event.getAttributes()));
if (event.hasSourceTimestamp()) {
result.setSourceTimestamp(event.getSourceTimestamp());
}
return result;
}

@JsonIgnore
public Object getInput() {
return input;
return getAttr("input");
}
}
32 changes: 24 additions & 8 deletions api/src/main/java/org/apache/flink/agents/api/OutputEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.agents.api;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

Expand All @@ -29,23 +31,37 @@
* data.
*/
public class OutputEvent extends Event {
private final Object output;

public static final String EVENT_TYPE = "_output_event";

public OutputEvent(Object output) {
super();
this.output = output;
super(EVENT_TYPE);
setAttr("output", output);
}

@JsonCreator
public OutputEvent(
@JsonProperty("id") UUID id,
@JsonProperty("attributes") Map<String, Object> attributes,
@JsonProperty("output") Object output) {
super(id, attributes);
this.output = output;
@JsonProperty("attributes") Map<String, Object> attributes) {
super(id, EVENT_TYPE, attributes);
}

/**
* Reconstructs a typed OutputEvent from a base Event.
*
* @param event the base event containing the output data in attributes
* @return a typed OutputEvent
*/
public static OutputEvent fromEvent(Event event) {
OutputEvent result = new OutputEvent(event.getId(), new HashMap<>(event.getAttributes()));
if (event.hasSourceTimestamp()) {
result.setSourceTimestamp(event.getSourceTimestamp());
}
return result;
}

@JsonIgnore
public Object getOutput() {
return output;
return getAttr("output");
}
}
18 changes: 8 additions & 10 deletions api/src/main/java/org/apache/flink/agents/api/agents/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.agents.api.agents;

import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
Expand All @@ -32,8 +31,7 @@

/** Base class for defining agent logic. */
public class Agent {
private final Map<String, Tuple3<Class<? extends Event>[], Method, Map<String, Object>>>
actions;
private final Map<String, Tuple3<String[], Method, Map<String, Object>>> actions;

private final Map<ResourceType, Map<String, Object>> resources;

Expand All @@ -45,7 +43,7 @@ public Agent() {
this.actions = new HashMap<>();
}

public Map<String, Tuple3<Class<? extends Event>[], Method, Map<String, Object>>> getActions() {
public Map<String, Tuple3<String[], Method, Map<String, Object>>> getActions() {
return actions;
}

Expand All @@ -56,28 +54,28 @@ public Map<ResourceType, Map<String, Object>> getResources() {
/**
* Add action to agent.
*
* @param events The event types this action listened.
* @param eventTypes The event type strings this action listens to.
* @param method The method of this action, should be static method.
* @param config The optional config can be used by this action.
*/
public Agent addAction(
Class<? extends Event>[] events, Method method, @Nullable Map<String, Object> config) {
String[] eventTypes, Method method, @Nullable Map<String, Object> config) {
String name = method.getName();
if (actions.containsKey(name)) {
throw new IllegalArgumentException(String.format("Action %s already defined.", name));
}
actions.put(name, new Tuple3<>(events, method, config));
actions.put(name, new Tuple3<>(eventTypes, method, config));
return this;
}

/**
* Add action to agent.
*
* @param events The event types this action listened.
* @param eventTypes The event type strings this action listens to.
* @param method The method of this action, should be static method.
*/
public Agent addAction(Class<? extends Event>[] events, Method method) {
return addAction(events, method, null);
public Agent addAction(String[] eventTypes, Method method) {
return addAction(eventTypes, method, null);
}

public void addResourcesIfAbsent(Map<ResourceType, Map<String, Object>> resources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.annotation.Action;
Expand Down Expand Up @@ -92,16 +93,17 @@ public ReActAgent(

try {
Method method =
this.getClass().getMethod("startAction", InputEvent.class, RunnerContext.class);
this.addAction(new Class[] {InputEvent.class}, method, actionConfig);
this.getClass().getMethod("startAction", Event.class, RunnerContext.class);
this.addAction(new String[] {InputEvent.EVENT_TYPE}, method, actionConfig);
} catch (NoSuchMethodException e) {
throw new IllegalStateException(
"Can't find the method stopAction, this must be a bug.");
}
}

public static void startAction(InputEvent event, RunnerContext ctx) {
Object input = event.getInput();
public static void startAction(Event event, RunnerContext ctx) {
InputEvent inputEvent = InputEvent.fromEvent(event);
Object input = inputEvent.getInput();

Prompt userPrompt;
try {
Expand Down Expand Up @@ -166,9 +168,10 @@ public static void startAction(InputEvent event, RunnerContext ctx) {
ctx.sendEvent(new ChatRequestEvent(DEFAULT_CHAT_MODEL, inputMessages, outputSchema));
}

@Action(listenEvents = {ChatResponseEvent.class})
public static void stopAction(ChatResponseEvent event, RunnerContext ctx) {
ChatMessage response = event.getResponse();
@Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
public static void stopAction(Event event, RunnerContext ctx) {
ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
ChatMessage response = chatResponse.getResponse();

Object output;
if (response.getExtraArgs().containsKey(STRUCTURED_OUTPUT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.agents.api.annotation;

import org.apache.flink.agents.api.Event;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand All @@ -31,23 +29,27 @@
* <p>This annotation specifies which event types the action should respond to. The annotated method
* will be triggered when any of the specified event types occur.
*
* <p>Events are specified as type strings via {@link #listenEventTypes()}. Use the {@code
* EVENT_TYPE} constants on built-in event classes for standard events, or plain strings for custom
* events.
*
* <p>Example usage:
*
* <pre>{@code
* @Action(listenEvents = {InputEvent.class, CustomEvent.class})
* public void handleEvents(Event event) {
* // Action logic here
* }
* @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
* public void handleInput(Event event, RunnerContext ctx) { ... }
*
* @Action(listenEventTypes = {InputEvent.EVENT_TYPE, "MyCustomEvent"})
* public void handleMultiple(Event event, RunnerContext ctx) { ... }
* }</pre>
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Action {
/**
* List of event types that this action should respond to. At least one event type must be
* specified.
* List of event type strings that this action should respond to.
*
* @return Array of Event classes that this action listens to
* @return Array of event type strings
*/
Class<? extends Event>[] listenEvents();
String[] listenEventTypes();
}
Loading
Loading