-
Notifications
You must be signed in to change notification settings - Fork 3
Table filter with better behaving "hasNext()" #20
Conversation
@@ -17,10 +17,8 @@ | |||
|
|||
public final class MetricsConstant { | |||
|
|||
public static final String EMITTER_FAILURES = "shunting_yard_emitter_failures"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer have emitters so I cleaned these up.
public static final String RECEIVER_FAILURES = "shunting_yard_receiver_failures"; | ||
public static final String RECEIVER_EMPTY = "shunting_yard_receiver_empty"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured we may as well count these so we can plot how many times they happen, should probably "never" happen since the SqsMessageReader
sits in a loop waiting for messages to arrive before returning the call to next()
.
@@ -48,15 +50,16 @@ public static void main(String[] args) throws Exception { | |||
|
|||
int exitCode = -1; | |||
try { | |||
exitCode = SpringApplication.exit(new SpringApplicationBuilder(MetaStoreEventReplication.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a formatting change, not sure why.
@@ -94,6 +97,11 @@ private static void logVersionInfo() { | |||
LOG.info("MetaStoreEventReplication"); | |||
} | |||
|
|||
@Bean | |||
public TaskExecutor taskExecutor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this so we have a simple way to fire off a thread to run the main app.
} | ||
|
||
@Override | ||
public void run(ApplicationArguments args) { | ||
while (eventReader.hasNext()) { | ||
executor.execute(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This effectively runs this class in a separate thread.
Pull Request Test Coverage Report for Build 177
💛 - Coveralls |
return 0; | ||
} | ||
|
||
public void stop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will probably only ever be used by the unit test as Spring will handle stopping the thread if the application context is closed some other way. We will want to revisit this stop/start logic and should use a fancier executor that can stop the receiver from receiving new messages but wait until any messages in flight are completed before shutting down. For now this does the job and shouldn't be any worse than what we had before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does all look a bit odd with the two run methods. Perhaps we should introduce a ReplicationRunnerTask
. Or if we have a ticket for above we can rethink this a bit, I'm fine either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree it's not ideal but I'd rather not spend the time on it now. I think we need to sort out overall how this app runs and particularly how it shuts down so should sort all of that out at the same time and can improve this then. I'll make a note to create a ticket for this.
while (startTime + maxExecTime > System.currentTimeMillis() && delegate.hasNext()) { | ||
events.add(delegate.next()); | ||
while (startTime + maxExecTime > System.currentTimeMillis()) { | ||
// TODO: since delegate.next() effectively blocks until some messages arrive, the below means we could exceed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing to do with this PR but we noticed this potential bug in CR so I've put a TODO here to capture it.
runner.run(args); | ||
int attempts = 0; | ||
int maxAttempts = 1; | ||
while (executor.getThreadPoolExecutor().getActiveCount() < 1 || attempts < maxAttempts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a crude attempt at waiting for the runner to have processed at least one event, I tried a few other things and this was as good as I could do for now. If/when we revisit the start/stop logic of the runner we can hopefully make this more robust.
@@ -0,0 +1,35 @@ | |||
<?xml version="1.0" encoding="UTF-8" ?> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed nothing was being logged by the tests so added this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That filter looks much better, made one comment, but approving we'll probably address that some other time
This turned into a much bigger change than I had anticipated but the basic idea is that since the
FilteringMessageReader
can filter messages out we'd rather return these as empty Optionals than block and wait for something that passes the filter to arrive. While making these changes I also removed the two implementations ofIterator
since these weren't true iterators any more and were abusing the contract (e.g. always returning true for hasNext(), throwing Exceptions for unimplemented remove() methods etc.). I then noticed that the main loop controlling theReplicationRunner
could be improved by firing it off in a separate thread and running while true