Skip to content

Conversation

@isaacreath
Copy link
Contributor

@isaacreath isaacreath commented Dec 19, 2022

[CASSANDRA-16325] Update streaming metrics incrementally

Currently the inbound and outbound streamed bytes metrics are incremented after each file is streamed, what doesn't represent the current number of bytes streamed since it can take a long time for a large file to be streamed. This patch changes the StreamSession class to increment the total bytes streamed every time the StreamSesson makes progress instead of when a file finishes streaming.

patch by Isaac Reath and Dejan Gvozdenac ; reviewed by Paulo Motta for CASSANDRA-16325

Co-authored-by: Isaac Reath
Co-authored-by: Dejan Gvozdenac

The Cassandra Jira

@isaacreath
Copy link
Contributor Author

isaacreath commented Jan 6, 2023

@smiklosovic After thinking about this issue for a little bit, @pauloricardomg and I came up with a different approach.

We moved the logic which increments the incoming / outgoing bytes metrics from the StreamSession to a dedicated class called the FileStreamMetricsListener. FileStreamMetricsListener objects will then be created by each CassandraIncomingFile and CassandraOutgoingFile. These then pass the FileStreamMetricsListener to the appropriate stream reader / writer. As those reader / writers transfer bytes, FileStreamMetricsListener::onBytesTransferred will be called to update the metrics. When file transfer is complete, we call FileStreamMetricsListener::onStreamSuccessful to verify that we've tracked the size appropriately (thus ensuring that any new stream reader / writers are calling onBytesTransferred).

This approach addresses the concerns of having an ever-growing map of ProgressInfo objects and it neatly encapsulates the logic for updating these counter objects. There are some aspects of this approach of which I am not a fan, namely the fact that there is little ensuring that new Stream reader / writer objects actually create a FileStreamMetricsListener and update it properly; but, from looking at the code, there is some refactoring which needs to be done to unify all of the stream reader / writer classes in a single hierarchy that should probably be handled outside the scope of this ticket before we could consider cleaning up this approach.

Copy link
Contributor

@smiklosovic smiklosovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @isaacreath . I left couple comments again.

Copy link
Contributor

@smiklosovic smiklosovic Jan 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if onStreamSuccessful returned boolean so it would be called isStreamSuccessful and if it is not it would throw an exception? This method already throws IOException so it seems to be quite logical to me that if streaming was not successful we would throw as well. Right now we are doing assert in it which can throw AssertionError but that is not subclass of IOException.

Another option is to throw IOException directly from onStreamSuccessful with accompanying error message but that feels strange.

Copy link
Contributor

@pauloricardomg pauloricardomg Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think FileStreamMetricsListener should be responsible for determining whether a stream is successful, since this class is meant to be a dumb consumer. For instance, I think we should update the number of incoming files streamed on this method (metrics.countStreamedIn(isEntireSSTable)), since we can assume the stream is completed successfully when onStreamSuccessful() is called. The assertion assert lastSeenBytes == totalSize; on onStreamSuccessful() is just to verify that the behavior of the class FileStreamMetricsListener is correct, not to check whether a stream is successful or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried making the change to isStreamSuccessful from onStreamSuccessful and it seems to make the FileStreamMetricsListener a very leaky abstraction to provide a proper error message and we end up duplicating the checking logic between the CassandraIncomingFile and CassandraOutgoingFile which I'm not a huge fan of (and again, seems like a leaky abstraction).

I could instead throw an IOException instead of running the assert; but, like you say, that feels strange as no IO is actually happening in onStreamSuccessful.

Copy link
Contributor

@smiklosovic smiklosovic Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I get that but code-wise that method might throw AssertionError but the method that method is called in throws IOException and that is (technically) just wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that method might throw AssertionError but the method that method is called in throws IOException and that is (technically) just wrong.

I don't see what is wrong about this, as far as I understand assertions can be added anywhere in the code to validate assumptions, without requiring methods to throw AssertionError explicitly. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK Paulo.

May I ask, what happens when it actually throws that exception (AssertionError). What happens in runtime? Where it propagates and what eventually fails?

Just honestly asking as I have never checked out this code in IDE to dig deeper, sorry ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask, what happens when it actually throws that exception (AssertionError). What happens in runtime? Where it propagates and what eventually fails?

The way I see it is that assertions shouldn't happen in runtime unless the code is broken, as they're errors targeted for developers to detect broken code assumptions. Unfortunately in Cassandra we overload assertions a bit and use it for error handling (which IMO is a bad smell), but this is not what is being done here.

The assertion assert lastSeenBytes == totalSize on FileStreamMetricsListener.onStreamSuccessful is a runtime safety check to ensure that the behavior of the class FileStreamMetricsListener is working as expected and we don't report more streamed bytes than the expected amount. It's a future-proof safety check for programming errors, but should not be expected to fail during correct operation.  If this assertion is ever thrown, it means that there is a bug in the code that must be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a sanity check, we can look into whether or not this can happen during valid operation before we commit this assertion. As far as I can tell it can't, but it's worth digging into further.

@isaacreath isaacreath force-pushed the feature/CASSANDRA-16325 branch 4 times, most recently from b1667c5 to 2d73e7d Compare January 19, 2023 22:56
@isaacreath isaacreath force-pushed the feature/CASSANDRA-16325 branch from c455e0e to e8e71b1 Compare February 14, 2023 23:17
- Add StreamMetricsHandler class which is a StreamEventHandler that processes ProgressEvent objects.
- Fix bug in CassandraCompressedStreamReader where we were over-reporting deltas between calls to StreamSession::progress.
- Add StreamMetricsHandler on StreamManager initialization.
- Add testing for incremental metrics:
	- Update cluster size to 2 and RF=2 to make it easier to reason about state
	- Create single 10MB sstable on node1 with compression disabled
	- Check that more than 3 incoming/outgoing bytes
          are reported during streaming from node1 to node2
	- Check that final transferred bytes matches expected sstable size (10MB)
…e the StreamingMetricsTestUtils not inherit from TestBaseImpl
@isaacreath isaacreath force-pushed the feature/CASSANDRA-16325 branch from 5675fca to a3c0123 Compare February 17, 2023 19:47
@belliottsmith belliottsmith force-pushed the trunk branch 2 times, most recently from df3eb40 to 54e39a9 Compare July 23, 2025 11:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants