Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event broadcasting capability #672

Open
wants to merge 1 commit into
base: mli-feature
Choose a base branch
from

Conversation

ankona
Copy link
Contributor

@ankona ankona commented Aug 21, 2024

No description provided.

@ankona ankona changed the title Add event broadcasting via XxxCommChannel Add event broadcasting capability Aug 21, 2024
@ankona ankona requested a review from AlyssaCote August 21, 2024 20:20
Copy link

codecov bot commented Aug 21, 2024

Codecov Report

Attention: Patch coverage is 0% with 183 lines in your changes missing coverage. Please review.

Please upload report for BASE (mli-feature@5d85995). Learn more about missing BASE report.

Files Patch % Lines
...mli/infrastructure/storage/backbonefeaturestore.py 0.00% 162 Missing ⚠️
...m/_core/mli/infrastructure/storage/featurestore.py 0.00% 18 Missing ⚠️
...e/mli/infrastructure/storage/dragonfeaturestore.py 0.00% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff               @@
##             mli-feature     #672   +/-   ##
==============================================
  Coverage               ?   69.84%           
==============================================
  Files                  ?      103           
  Lines                  ?     8708           
  Branches               ?        0           
==============================================
  Hits                   ?     6082           
  Misses                 ?     2626           
  Partials               ?        0           
Files Coverage Δ
...e/mli/infrastructure/storage/dragonfeaturestore.py 0.00% <0.00%> (ø)
...m/_core/mli/infrastructure/storage/featurestore.py 0.00% <0.00%> (ø)
...mli/infrastructure/storage/backbonefeaturestore.py 0.00% <0.00%> (ø)

@ankona ankona marked this pull request as ready for review August 21, 2024 20:43
Comment on lines +77 to +73
# todo: consider that this could (under load) never exit. do we need
# to configure a maximum number to pull at once?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm didn't think about this. We'd want all of the messages within the channel though, right?

Copy link
Contributor Author

@ankona ankona Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's possible with a while to never run out of new messages to receive. it doesn't even need to be a ton of buffered messages, just constant.

i think a for i in range(max_message_retrievals) may ensure that it doesn't get stuck and any loop triggering retrieval would end up getting called again, anyway...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be the responsibility of the caller and not DragonCommChannel. It seems like recv might be less prone to edge case problems like the one described if it just simply returned the next message on the channel or None if the channel is empty. It is then up to the caller to takes as many messages as it should based on the context of what it is doing. Is this a good idea / bad idea / too big of a change / nonsense? Another approach maybe is that the timeout is not reset but it is decremented each iteration so it is applied across all messages? Something strikes me as too many responsibilities or unintuitive about the way this is designed now.

Comment on lines +53 to +54
# The tests in this file belong to the group_a group
pytestmark = pytest.mark.group_a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these tests are getting skipped in group a

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok for them to get skipped if dragon isn't installed). that will be done on purpose with dragon = pytest.importorskip("dragon")

However, if the dragon library is there, we can run all of these tests without needing a "real dragon" environment (only need the ability to import the dragon components/modules successfully). For example, these will run on hotlum w/a normal pytest like pytest ./tests/test_featurestore.py

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so we're cool with these tests being skipped in github actions then. Just wanted to double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've still got a valid point... I didn't achieve what I thought I did, after reviewing the gh action. Investigating...

tests/dragon/utils/channel.py Outdated Show resolved Hide resolved
tests/mli/channel.py Outdated Show resolved Hide resolved
tests/test_featurestore.py Outdated Show resolved Hide resolved
Copy link
Contributor

@mellis13 mellis13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some comments before seeing the integration additions.

:param channel: a channel to use for communications
:param recv_timeout: a default timeout to apply to receive calls"""
serialized_ch = channel.serialize()
safe_descriptor = base64.b64encode(serialized_ch).decode("utf-8")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by a "safe" descriptor? Have we run into a bug/issue?

Comment on lines +77 to +73
# todo: consider that this could (under load) never exit. do we need
# to configure a maximum number to pull at once?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be the responsibility of the caller and not DragonCommChannel. It seems like recv might be less prone to edge case problems like the one described if it just simply returned the next message on the channel or None if the channel is empty. It is then up to the caller to takes as many messages as it should based on the context of what it is doing. Is this a good idea / bad idea / too big of a change / nonsense? Another approach maybe is that the timeout is not reset but it is decremented each iteration so it is applied across all messages? Something strikes me as too many responsibilities or unintuitive about the way this is designed now.

return f"{self.uid}|{self.category}"


class OnCreateConsumer(EventBase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer is used throughout this class (and the EventCategory). Would Subscriber be more accurate?


CONSUMER_CREATED: str = "consumer-created"
FEATURE_STORE_WRITTEN: str = "feature-store-written"
UNKNOWN: str = "unknown"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is UNKNOWN used by other packages? I was wondering if DEFAULT is better

:raises SmartSimError: if any unexpected error occurs during send"""
try:
self._save_to_buffer(event)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space?

try:
comm_channel.send(next_event)
num_sent += 1
except Exception as ex:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one channel broadcast fails, then an error is raised. This means that other channels that may be OK will never get the message because it was already popped and not placed back on the deque. I wonder if we should stored failed sends and still go through other channels? We would still need to think about if the failed broadcasts should be retired or just thrown away...

:raises SmartSimError: if any unexpected error occurs during send"""
try:
self._save_to_buffer(event)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants