Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adding stream listener to stream changes in child nodes #183

Merged
merged 22 commits into from Aug 8, 2018

Conversation

@the-c0d3r
Copy link
Contributor

commented Jul 21, 2018

Hi,

This PR's original intent is to add stream listener function to fire a callback function when changes in the database is detected. It is a revival of this #50 1-year-old PR and also based on the code contributed by @rizasif and @Aqsa-K in that PR. This PR is mainly to serve 3 purposes.

  1. Update the old PR with a rebase on master
  2. Clean up, refactor & fix lint errors
  3. Write tests

For now, I have completed the first and second of the above. Please advise me on how to properly write the test in PyTest. I tested the code manually by defining callback and manually modifiying data in my firebase demo project. And it seems to be fired everytime I made changes. I read through the existing test suite and I cannot think of a way to write into it yet.

Changes:

  • lint.sh : Ignore directive for "protected-access"
  • firebase_admin/sseclient.py : SSE client module
  • firebase_admin/db.py : Streaming functionality
  • New API call: database.reference('/xxx').stream(callbackFunc)

Regards

@hiranya911

This comment has been minimized.

Copy link
Member

commented Jul 23, 2018

@the-c0d3r thanks for putting in the time and effort into fixing this. Some comments and answers:

Please revert the changes to lint.sh. Instead add the directives to the lines in the source files where the lint errors should be ignored.

Rename sseclient.py to _sseclient.py so it will be considered an internal module.

For unit testing, you can mount an adapter on the requests.Session object and have it return some fake events (I think). See how we do something similar in the existing tests:

def instrument(self, ref, payload, status=200):
recorder = []
adapter = MockAdapter(payload, status, recorder)
ref._client.session.mount(self.test_url, adapter)
return recorder

The MockAdapter used in the above example responds to requests made by the SDK with a fake response. You will probably end up writing a similar adapter that responds with a fake stream of events.

the-c0d3r added some commits Jul 24, 2018

@@ -155,10 +155,10 @@ def parent(self):

def build_headers(self, token=None):
headers = {'content-type' : 'application/json; charset=UTF-8'}
if not token and self._client._session.credentials:
if not token and self._client._session.credentials: # pylint: disable=protected-access

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 24, 2018

Member

I think self._client.session should work here. session is exposed from the HttpClient:

@property
def session(self):
return self._session

That way you won't need the linter directives at all.

This comment has been minimized.

Copy link
@the-c0d3r

the-c0d3r Jul 24, 2018

Author Contributor

Oh, I didn't catch that earlier. I have changed it, thanks. I still have yet to write the tests. I'll finish the tests in a few days.

This comment has been minimized.

Copy link
@Aqsa-K

Aqsa-K Jul 25, 2018

great work @the-c0d3r . I am glad that this is finally progressing. Let me know if any help is required.

@the-c0d3r

This comment has been minimized.

Copy link
Contributor Author

commented Jul 26, 2018

Hi, I have added the test for SSEClient code. I'm not sure if I did it correctly or not. I added MockSSEClientAdapter() which is a subclass of MockAdapter. And I captured the actual event raw response and put it in payload. But somehow when I run the test, the MockSSEClient actually sends out Bytes object which is expected, but inside SSEClient code's response.iter_content returns bytes instead of string. I compared it with the my program to actually communicate with the server, and somehow iter_content returns string. Therefore for now, I added a check for the type in SSEClient, to call decode() if the nextchar from response.iter_content is a bytes object.

                nextchar = next(self.resp_iterator)
                if isinstance(nextchar, bytes):
                    nextchar = nextchar.decode()
                self.buf += nextchar

Python will raise TypeError: Can't convert 'bytes' object to str implicitly if executed the test withouth this check. Is there anyway to modify the MockSSEClientAdapter or the parent class, to make it's response.iter_content return string instead of bytes?
But I'm not sure about this check yet so I haven't committed this change. Please comment.

@hiranya911

This comment has been minimized.

Copy link
Member

commented Jul 26, 2018

Hi @the-c0d3r. I think this is due to the way iter_content() is implemented in requests:

http://docs.python-requests.org/en/master/_modules/requests/models/#Response.iter_content

There are couple of things you can try out.

  1. Try setting resp.raw.stream = True before returning from send().
  2. Try setting an encoding on the mock response as it is expected here.

One of these is likely to fix the issue.

Update:

On a second look, resp.raw.stream is expected to be a function, not a boolean. So option 2 is probably the only viable option.

@the-c0d3r

This comment has been minimized.

Copy link
Contributor Author

commented Jul 27, 2018

A quick test shows that the second fix you suggested works. I'll patch it in and commit that later today. And I also found the CI build failed for python3's syntax super() call in python2 environment. I'll change it back to support both python3 and python2.

Are there any additional tasks left to be completed for this PR to be merged?
PS: Does Stream class needs to have testing? I noticed that the SSE client's testing code more or less does the same thing as the Stream class.

@hiranya911

This comment has been minimized.

Copy link
Member

commented Jul 27, 2018

Thanks @the-c0d3r. This still needs to go through our internal API review process (which I have already initiated). We might get some feedback from that, which will need to be implemented -- but we can do that in steps, after merging this one. I will also take another closer look at all the code here, and post some feedback.

@hiranya911
Copy link
Member

left a comment

@the-c0d3r A few code review comments to address when you get a chance. But overall this looks pretty good. Thanks again for putting in the effort.

@@ -0,0 +1,198 @@
"""SSEClient module to handle streaming of realtime changes on the database

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

Please add the Apache license header here.

# Optional support for passing in a requests.Session()
self.session = session
# function for building auth header when token expires
self.build_headers = build_headers

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

Do we really need this? The session we get from the db module is an instance of google.auth.transport.requests.AuthorizedSession, which is guaranteed to add the required Authorization header. So this shouldn't be required.

This comment has been minimized.

Copy link
@the-c0d3r

the-c0d3r Jul 28, 2018

Author Contributor

build_header() and all subsequent function arguments removed.

self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'

# The 'Accept' header is not required, but explicit > implicit
self.requests_kwargs['headers']['Accept'] = 'text/event-stream'

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

The above can be written as:

headers = self.request_kwargs.get('headers', {})
# add the required values to headers
self.request_kwargs['headers'] = headers
self.should_connect = False
self.retry = 0
self.resp.close()
# self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

Remove the commented out lines?

On a side note, I was testing this branch the other day, and I noticed that calling close() here does not immediately release the underlying socket. As a result the Stream remains active for a while even after calling stream.close(). So perhaps we do need the commented out lines?

This comment has been minimized.

Copy link
@the-c0d3r

the-c0d3r Jul 28, 2018

Author Contributor

I also noticed the delay in calling stream.close(). I was calling it in the python shell, and it takes a few seconds to close and return. I'll test it out a bit more after I followed your other suggestions first.


def _connect(self):
"""connects to the server using requests"""
if self.should_connect:

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

Should also check for self.running.


import firebase_admin
from firebase_admin import _http_client
from firebase_admin import _utils
from firebase_admin._sseclient import SSEClient, KeepAuthSession

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

As a matter of style, we don't import individual classes and members. Please import the module here as from firebase_admin import _sseclient, and change the code below accordingly.


def start_stream(self):
"""Streaming function for the spawned thread to run"""
self.sse = SSEClient(

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

Can you move this to the start() method above, before the thread is started? Then I think you will be able to drop the sleep() call in close().

@@ -101,6 +153,23 @@ def parent(self):
return Reference(client=self._client, segments=self._segments[:-1])
return None

def build_headers(self, token=None):

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

See if we can remove this altogether (see my comment on _sseclient). If we must keep this, please rename to _build_headers() so it's treated as an internal method.

This comment has been minimized.

Copy link
@the-c0d3r

the-c0d3r Jul 28, 2018

Author Contributor

This build_headers() is tested to be unnecessary just like your comment on _sseclient. So I will remove it from all subsequent code.

@@ -523,7 +523,7 @@ def test_range_query(self):
assert recorder[0].headers['User-Agent'] == db._USER_AGENT


class TestDatabseInitialization(object):
class TestDatabaseInitialization(object):

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

Good catch 👍

sseclient = self.init_sse()
for msg in sseclient:
event = json.loads(msg.data)
break

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 27, 2018

Member

Is the break necessary? Shouldn't it exit on its own since the mock response only has one event?

@the-c0d3r

This comment has been minimized.

Copy link
Contributor Author

commented Jul 28, 2018

Hi @hiranya911, I believe I have addressed all of your comments with my latest commits, except the part about the closing of the stream. I wrote a simple python code to just setup stream and close it and record the timing. It returned within 100 ms. But somehow when executing the code line by line manually in python shell, it would take over 10 seconds to close the stream (aka time taken for python to return control to user). It will take me some time to debug this since I don't know much about requests module's internals. If you can shed some light on it, that would be great.

@hiranya911
Copy link
Member

left a comment

Looks pretty good. I just had one comment for you.

We can investigate the socket closing issue later. Once we get the internal approval for this, I can merge it, but I'll probably make a few more changes based on the feedback we get.

last_id: optional id
retry: the interval in ms
**kwargs: extra kwargs will be sent to requests.get
"""
self.should_connect = True
self.url = url
self.last_id = last_id
self.retry = retry
self.running = True

This comment has been minimized.

Copy link
@hiranya911

hiranya911 Jul 30, 2018

Member

Remove this as it gets set to False below.

@wamburu

This comment has been minimized.

Copy link

commented Jul 31, 2018

+1

@hiranya911

This comment has been minimized.

Copy link
Member

commented Aug 2, 2018

@the-c0d3r the internal review process for this is chugging along. I will have an update for you next week.

@hiranya911
Copy link
Member

left a comment

LGTM.

Please sync this PR with the latest master when you get a chance.

@the-c0d3r

This comment has been minimized.

Copy link
Contributor Author

commented Aug 3, 2018

Hi @hiranya911 thanks for the update. I have synced the PR with the latest master. If there are any further tasks, please let me know.

@hiranya911

This comment has been minimized.

Copy link
Member

commented Aug 7, 2018

@the-c0d3r I have some feedback. Once they are implemented, I can merge this PR:

  1. Rename the stream() method to listen().
  2. Rename the Stream type to ListenerRegistration.
  3. Remove the stream_id parameter from the listen() method.

Let me know if you have any questions.

Renamed Stream class to ListenerRegistration, stream() to listen(), r…
…emoved 'stream_id' and added more documentation
@the-c0d3r

This comment has been minimized.

Copy link
Contributor Author

commented Aug 8, 2018

Hi, I think I have addressed those feedbacks. Please take a look.

@hiranya911

This comment has been minimized.

Copy link
Member

commented Aug 8, 2018

Thanks @the-c0d3r. This looks pretty solid. I'll merge it shortly. I'll intend to do a bit of work on top it over the next few days, before release.

@hiranya911 hiranya911 merged commit 3f1190d into firebase:master Aug 8, 2018

2 checks passed

cla/google All necessary CLAs are signed
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@hiranya911

This comment has been minimized.

Copy link
Member

commented Aug 8, 2018

Upon taking another look, this might have a problem. I don't see how the credentials are passed into the underlying session.

@hiranya911

This comment has been minimized.

Copy link
Member

commented Aug 8, 2018

Ok, managed to fix the issue. I'll send a separate PR with the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.