Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement generate_events_with_long_polling for enterprise event streams #734

Open
justineyster opened this issue May 24, 2022 · 7 comments
Assignees

Comments

@justineyster
Copy link

justineyster commented May 24, 2022

Is your feature request related to a problem? Please describe.

Yes. We're seeing far fewer events when streaming enterprise events via the Box Python SDK in comparison to the Node one, which we want to migrate off of due to better Kafka support in Python.

In order to switch from Node to Python, I believe that we need generate_events_with_long_polling() to be implemented for the enterprise event stream type.

You can see the warning that this is not implemented here in source and here in docs.

The reason I believe we need generate_events_with_long_polling() is because it allows you to make one call and continuously receive events from that position in the stream forward (like we do in Node). Trying to do that manually has the following problem: you need to manually track your position in the stream, and positions in the stream are constantly moving as data falls off the end of the retention window. i.e. The positions are not static, they're moving as data comes in, so there's no reliable way to keep fetching data in order. We've been told by Box devs in the past that manually managing the stream position is unreliable for that reason, and will result in data loss and duplication.

Describe the solution you'd like

Implement generate_events_with_long_polling() for the enterprise event stream type.

Describe alternatives you've considered

Staying on Node for the time being. This is the only alternative I can think of, as we can't seem to reliably capture all events in our enterprise instance using the Python SDK.

Additional context

I've reached out via a support ticket with Box through my company IBM, and they suggested also opening an issue here to communicate with the SDK team directly. Sakora said:

It looks like our Python SDK team would need to implement this functionality into our SDK. I have reached out to our Python SDK team for further clarification when this will be implemented. You can also reach out directly to our Python SDK team by opening a GitHub Issues ticket in the GitHub repo to communicate directly with them. I will provide an update once I receive a response back from our Python SDK team.

@justineyster justineyster changed the title Implement generate_events_with_long_polling for enterprise event stream type Implement generate_events_with_long_polling for enterprise event streams May 24, 2022
@Jeff-Meadows
Copy link
Contributor

The admin events API doesn't actually support long polling (see. https://box.dev/guides/events/enterprise-events/for-enterprise/#limitations), so it's probably not appropriate to truly implement generate_events_with_long_polling for admin events.

It looks like the Node SDK just repeatedly calls the admin events API until it reaches current events, and then starts polling the endpoint periodically (https://github.com/box/box-node-sdk/blob/main/src/enterprise-event-stream.ts).

Some Python code to do something similar might look like

from time import sleep
from boxsdk.pagination.limit_offset_based_object_collection import LimitOffsetBasedObjectCollection

class EventsCollection(LimitOffsetBasedObjectCollection):
    def __init__(
            self,
            session: 'Session',
            url: str,
            limit: Optional[int] = None,
            fields: Iterator[str] = None,
            additional_params: Optional[dict] = None,
            next_stream_position: Optional[int] = None,
            polling_interval: Optional[int] = None,
    ):
        super().__init__(session, url, limit, fields, additional_params)
        self._next_stream_position = next_stream_position or 0
        self._polling_interval = polling_interval
        self._current = False

    def _has_more_pages(self, response_object: dict) -> bool: return not self._current
    def _next_page_pointer_params(self) -> dict: return {'stream_position': self._next_stream_position}
    def next_pointer(self) -> int: return self._next_stream_position

    def _update_pointer_to_next_page(self, response_object: dict) -> None:
        self._next_stream_position = response_object['next_stream_position']
        if not response_object['entries']:
            if self._polling_interval:
                sleep(self._polling_interval)
            else:
                self._current = True

def get_all_admin_events_then_poll(client, polling_interval=60):
    yield from EventsCollection(
        client.session,
        client.events().get_url(),
        additional_params={'stream_type': 'admin_logs'},
        next_stream_position=0,
        polling_interval=polling_interval,
    )

@arjankowski
Copy link
Contributor

Hi @justineyster,

I will talk to the SDK team about adding the @Jeff-Meadows solution to our SDK,
but In the meantime you can try to follow his suggested solution.

jira-ticket: SDK-2214

@justineyster
Copy link
Author

@Jeff-Meadows @arjankowski Thank you for the quick and helpful response. I'll give it a try!

@justineyster
Copy link
Author

justineyster commented May 25, 2022

@Jeff-Meadows @arjankowski The code works great! Thanks again. I also figured out how to implement event type filters and timestamp parameters by looking at your codebase.

def get_all_admin_events_then_poll(
    client,
    event_types=None,
    created_after=None,
    created_before=None,
    polling_interval=60,
):
    params = {
        "created_after": created_after,
        "created_before": created_before,
        "stream_type": "admin_logs",
    }
    if event_types is not None:
        params["event_type"] = ",".join(event_types)
    yield from EventsCollection(
        client.session,
        client.events().get_url(),
        additional_params=params,
        next_stream_position=0,
        polling_interval=polling_interval,
    )

I'll keep running it for a while to see if it fixes my event count issue (compared to Node). Thanks again for your help!

@justineyster
Copy link
Author

justineyster commented May 26, 2022

Been running this since yesterday. Seems to be doing fine except it's falling behind in the stream pretty quickly, i.e. isn't keeping up with the pace of events coming in. Now trying pulling pages of events to see if the performance is better. Changed EventsCollection's init call to super to:

super().__init__(
            session, url, limit, fields, additional_params, return_full_pages=True
        )

Will see if this does a better job keeping up with the incoming data stream.

EDIT: doesn't seem to have any effect. Still falling behind. Not sure where the bottleneck is yet. Going to try deploying this closer to our Kafka instance.

@justineyster
Copy link
Author

Having a hard time scaling this to our instance's volume so I may stick with Node for the time being. I wonder if Node's event-driven architecture and async capabilities are an advantage in this space. I still believe this would be a handy SDK feature for your users, though. Thanks a bunch.

@Jeff-Meadows
Copy link
Contributor

@justineyster - it's hard to say if Node is a better option than Python without knowing more about your use case. By default, the Python SDK makes all of its requests on the calling thread, and those requests block the thread until complete. If event processing takes any appreciable amount of time in your system, and you do it on the same thread that's making the requests, that could have some performance problems.

Generally, when I want to fetch a bunch of stuff from the Box API and then do something with the data, I'll use multiple threads, eg

from threading import Thread
from queue import SimpleQueue

queue = SimpleQueue()

def fetch_events():
    for event in get_all_admin_events_then_poll(client):
        queue.push(event)

def process_events():
    while True:
        event = queue.get()
        # do something

Thread(target=fetch_events).start()
process_events()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants