-
Notifications
You must be signed in to change notification settings - Fork 507
METRON-1704 Message Timestamp Logic Should be Shared [Feature Branch] #1146
METRON-1704 Message Timestamp Logic Should be Shared [Feature Branch] #1146
Conversation
…eded by both Storm and Spark
* @param route The message route. | ||
* @param context The Stellar execution context. | ||
*/ | ||
@Override | ||
public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) { | ||
public void distribute(MessageRoute route, Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MessageRoute
abstraction now contains both the message (a JSONObject) and the timestamp (a Long). Previously this was passed along separately. It is much simpler for all of the ports (Spark included) to wrap this all into the MessageRoute
.
/** | ||
* Responsible for creating the {@link Clock}. | ||
*/ | ||
private ClockFactory clockFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ClockFactory
helps with the timestamp logic. This used to live in the ProfileSplitterBolt
as that is where the timestamp extract logic lived. Now the MessageRouter
does this for all Profiler ports.
Optional<Long> timestamp = clock.currentTimeMillis(message); | ||
|
||
// can only route the message, if we have a timestamp | ||
if(timestamp.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I removed the timestamp logic from the REPL. The MessageRouter
does this for us now.
|
||
// what time is it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I removed the timestamp logic from Storm. The MessageRouter
does this for us now.
// what is the name of the entity in this message? | ||
String entity = executor.execute(profile.getForeach(), state, String.class); | ||
route = Optional.of(new MessageRoute(profile, entity)); | ||
// what time is is? could be either system or event time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp logic now lives here. This allows it to be shared by all of the Profiler ports; Storm, REPL, and Spark.
lgtm @nickwallen. +1 via inspection. |
Thanks for the review. Merged. |
The Profiler can operate using either processing time or event time. This is controlled by the user by defining the "timestampField" option in their Profiler configuration.
There is logic that determines the timestamp of a message. If the Profiler is configured to use processing time, then system time is returned by this logic. If the Profiler is configured to use event time, then the timestamp is extracted from a message field.
This logic is currently duplicated across both ports of the Profiler; the REPL and Storm. This should be pulled into
metron-profiler-common/
so that the logic can be shared and also used by the Spark port.This is a pull request against the
METRON-1699-create-batch-profiler
feature branch.Testing
Pull Request Checklist