Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding stream listeners to sniff change in child #50

Closed
wants to merge 4 commits into from

Conversation

rizasif
Copy link

@rizasif rizasif commented Jul 18, 2017

Hello,

I noticed that there (according to my best knowledge) hasn't been a stream listener implemented in the project. Which has a sole responsibility to fire up a callback function (https://firebase.google.com/docs/reference/rest/database/#callback) when any change to a firebase node is occurred. This is essential for developers who want to perform an action when a change to database is observed.

The changes I proposed are inspired by PyRebase (https://github.com/thisbejim/Pyrebase). I have implemented a SSEClient which uses python request library to listen for changes. These changes have been tested using a simulation of 100 users, performing read and write operations. The results, so far, are promising with 0% failure rate.

I have written a test code for you as well. The following adds a new child node to the already available parent "users". Now, if you visit the firebase console and delete this new child node, then the "call_function" will fire up. Similar, case can be observed by updating, adding or deleting in the child of "users" node through the console.

import firebase_admin
from firebase_admin import credentials
from firebase_admin import db

# Fetch the service account key JSON file contents
cred = credentials.Certificate("PATH_TO_JSON_FILE")

# Initialize the app with a service account, granting admin privileges
firebase_admin.initialize_app(cred, {"databaseURL": "DATABASE_URL"})

root = db.reference()
print(root.get())


new_user = root.child("users").push({"name": "lila Annen", "age": 19})
ref = root.child("users")
print(ref.get())

def call_function(message):
  print( message["event"])
  print(message["path"])

my_stream_question_requested = db.reference("users").stream(call_function, stream_id="new stream")

Hope to hear from you soon, thanks!

@hiranya911 hiranya911 self-assigned this Jul 18, 2017
@hiranya911
Copy link
Contributor

Hi @rizasif,

This is awesome! Thanks for doing it. I will look it over in the next couple of days. Also since this is essentially a new API, we need to get it through some of our internal review processes before we can merge it. It's going to take some time, but please hang tight.

In the meantime, if you can address the lint errors reported at https://travis-ci.org/firebase/firebase-admin-python/jobs/254848028 that would be great.

@hiranya911
Copy link
Contributor

Also please do provide some unit tests to start with. This will help us review this change, and also make changes to it in safe manner.

@rizasif
Copy link
Author

rizasif commented Jul 19, 2017 via email

@rizasif
Copy link
Author

rizasif commented Jul 20, 2017

@hiranya911 I not getting how this error is appearing, can you guide me how can I reproduce it on my system?

@hiranya911
Copy link
Contributor

@rizasif This is a lint error. Try running lint.sh all command. You need to have pylint 1.6.4 already installed.

@alexanderwhatley
Copy link
Contributor

Is there a way to get the ETag when streaming the changes @hiranya911? If so, this seems quite relevant to the pull request that I made.

@hiranya911
Copy link
Contributor

Hi @alexanderwhatley. I believe it's not possible to retrieve ETags when using the streaming API (@caseycrogers or @rockwotj please confirm). Usually we treat transactions and event listeners as two separate features -- although one may use them together to implement certain complex use cases. Did you have any particular use case in mind?

@rockwotj
Copy link
Contributor

rockwotj commented Aug 4, 2017

You are correct, the streaming API a single direction - the server sending events to the client. ETags are not sent from this endpoint, you'll have to use the normal REST requests for this. (The clients get around this by computing the hashes locally on the devices).

@alexanderwhatley
Copy link
Contributor

@hiranya911, yeah, I was thinking of a situation where the user is storing data in a cache, and would stream data to update the cache, and would use the updated ETag to check whether or not the data in the cache was stale.

On a more general note, it would seem like a useful addition to the REST API would be the inclusion of methods that allow the user to retrieve the ETag itself without the corresponding data at some location. @rockwotj, you mention that the ETags are computed as hashes locally if I am understanding you correctly, is the hash function exposed anywhere?

@rockwotj
Copy link
Contributor

rockwotj commented Aug 4, 2017

The open source iOS client is the only place that has it, but I think it was tweaked slightly for REST. The hash function is pretty complicated however, I don't recommend trying to port it to other platforms.

If you just want to ETag and not the data, you can always do ?print=silent

@tamakisquare
Copy link

@hiranya911 @rizasif
Hi guys. I have been following this PR as it's the last thing that's stopping me from switching over from PyRebase, which, unfortunately, has not been maintained since early this year. Plus, PyRebase's streaming feature is quite buggy. I had to tinker with it quite a bit, so that it's usable in my project.

May I know what's left to do for this PR to get merged and released? I want to see how I can help here.

One quick question about streaming from Firebase RESTful API. Just trying my luck to see if you guys know anything about it. With my modified version of PyRebase, the streaming works mostly fine except for getting "502 Server Error: Bad Gateway for url..." from Firebase occasionally. On average, it occurs 2-3 days after I restart the stream listener process on my server. But sometimes it occurs in the same day. Note that the Firebase query that I set up for streaming uses limitToLast=1 and orderBy="$value"

@hiranya911
Copy link
Contributor

@tamakisquare I believe this PR has all the key elements necessary to support event listeners. The public API and the implementation could use a bit of tinkering. It also need to be rebased to the latest head. The biggest thing that is holding this PR back right now is the lack of any tests and insufficient documentation. If you or somebody can spend some time to provide those we can start the code review process for it.

I'm not sure why you're getting 502 errors. Perhaps @rockwotj can shed some light on that matter.

@rockwotj
Copy link
Contributor

@tamakisquare the 502 are most likely when we deploy to your database host. We have measure in place to prevent this from happening, but the progress is slow. Clients should try and reconnect with backoff, but expect this to become less of an issue as time goes on.

@rizasif
Copy link
Author

rizasif commented Sep 14, 2017

Hello people,
I am sorry to be away for some while.
I am having some issues with the patch that I have added here. The listener doesn't work immediately, unless, at least one GET request from firebase is made. Probably some issue with the Access Tokens. @tamakisquare can you please look at this?
I am finding hard to manage my time for this, so if anyone volunteers to work on this, I am happy to assist.

@tamakisquare
Copy link

@hiranya911 @rizasif
Thanks for the prompt response on the current status of this PR. I'll give this PR a look this weekend and will keep everyone posted here.

@rockwotj. Thanks for the information and your quick response. Good to learn that the issue is under your team's radar and that it will eventually ease off. Is this issue tracked somewhere that is accessible to others like me?

@rockwotj
Copy link
Contributor

Sorry no public status tracker or promises on timeline.

@hiranya911
Copy link
Contributor

@tamakisquare I think the issue mentioned by @rizasif is due to extending the requests.Session type. Instead the PR should extend from google.auth.transport.requests.AuthorizedSession.

@tamakisquare
Copy link

@hiranya911 Thanks for the tip. I'll keep that in mind when I look into the PR.

@rockwotj No problem. All is good. Thanks for the quick responses. Cheers.

@morganchen12 morganchen12 changed the title Adding stream listerners to sniff change in child Adding stream listeners to sniff change in child Sep 15, 2017
@tamakisquare
Copy link

The sseclient used in this PR is a clone of the sseclient used in PyRebase. The trouble with that is it does not come with any tests. To save the trouble from writing tests for some module that I am not familiar with, I have decided to work with another SSE client that comes with its own set of tests.

The work is in progress. I will submit another PR once the work is done.

@hiranya911
Copy link
Contributor

@tamakisquare +1 for using a third-party library for SSE support. It looks like the one you've chosen works readily with the requests library, which we use in the Admin SDK.

@rizasif
Copy link
Author

rizasif commented Oct 24, 2017

@tamakisquare what is the progress? If you need anything done then I am available.

@tamakisquare
Copy link

@rizasif - The development work is almost done. I got carried away by other things, so I haven't worked on it in the past couple weeks. I'll get back to it soon. I could use help with writing tests. Would that be something you could help me with?

@hiranya911
Copy link
Contributor

@tamakisquare I recommend you share the details of your implementation (at least the public API), as early as possible. All public APIs require explicit approval from my team, and it would be good to get that process started before you put in a ton of effort into writing tests.

@hiranya911
Copy link
Contributor

Also once we have some designs and code, I can also probably help out with writing tests.

@rustydb
Copy link

rustydb commented Jan 31, 2018

Still interested in this!

@hiranya911
Copy link
Contributor

We don't have cycles to put into this right now. But if somebody wants to pick it up and carry on I can help get it reviewed and approved.

@sphonala
Copy link

waiting for this to merge soon as it's very needy for ARM-Linux based IoT apllications, it's working nice in Pyrebase with some chnages.

@rizasif
Copy link
Author

rizasif commented Feb 16, 2018

@sphonala you can use my repository in the meanwhile it's working fine for us https://github.com/rizasif/firebase-admin-python

@hiranya911
Copy link
Contributor

This PR needs a bit of cleaning up, and at least some unit tests. I don't have the time to work on it myself right now, but if @rizasif or somebody else can help take this PR to maturity, I'll gladly help to get it through the code and API review process.

@Aqsa-K
Copy link

Aqsa-K commented Mar 9, 2018

Hello everyone,

I have been working on resolving the access token issue being faced. And it has been resolved; the token is being refreshed now. I have updated the code in the recent version of firebase_admin_python. Here is the link: https://github.com/Aqsa-K/firebase-admin-python

The updates in this version:

  • Can add streaming listeners to sniff change in child
  • Once authorized, the access token is refreshed automatically

I am currently trying to run the tests and I am facing some issues. @hiranya911 I would appreciate some help in this regard. Can you please guide me on how to run the current tests? This will help me write unittests for the 'streaming' changes.
Also Can you please give the url of the test database or will I have to define my own?

@hiranya911
Copy link
Contributor

@Aqsa-K thanks for the update. Instructions for running tests are mostly available in the CONTRIBUTING.md file. Usually you can just run pytest in the root directory to invoke the unit tests. For integration tests you need to specify a service account credential and an API key. You can use one of your own FIrebase projects for this.

self.stream_handler(msg_data)

def close(self):
while not self.sse and not hasattr(self.sse, 'resp'):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be or instead of and?

return self

def start_stream(self):
self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes an error if pyrebase is not installed.

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 129, in start_stream
    self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)
  File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 88, in __init__
    super(ClosableSSEClient, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.5/site-packages/sseclient.py", line 39, in __init__
    self._connect()
  File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 94, in _connect
    super(ClosableSSEClient, self)._connect()
  File "/usr/local/lib/python3.5/site-packages/sseclient.py", line 47, in _connect
    self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
  File "/usr/local/lib/python3.5/site-packages/requests/sessions.py", line 521, in get
    return self.request('GET', url, **kwargs)
TypeError: request() got an unexpected keyword argument 'build_headers'

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check. I'll get back to you on this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi - Any luck with this one, I am getting exactly the same error. What is the fix?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have resolved this issue. I'll share the changes by tonight or tomorrow.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. Thanks. Waiting for the patch. By the way. This is an awesome library.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vijjuk
Yes firebase-admin is much better. I shifted from pyrebase to firebase-admin.
firebase-admin is using sse-client library to listen for changes in firebase database.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, can I ask what is the status with this pull request? Is it going to be merged? I am developing an app, that requires the server to listen to changes in the firebase database. @Aqsa-K I tried your fork repo, somehow it fires off an event as soon as I set the stream() and the callback function is called immediately. But the intended listening event is triggered every time when the data is changed in the database side.

>>> ref.stream(callbackfunc)
<firebase_admin.db.Stream object at 0x10c9b7ef0>
>>> {'event': 'put', 'data': True, 'path': '/'}
>>> # The event above is fired off automatically on stream without modification on db

>>> # these 3 events below are fired off by the changes in the db. 
>>> {'event': 'put', 'data': 'true', 'path': '/'}
>>> {'event': 'put', 'data': 'false', 'path': '/'}
>>> {'event': 'put', 'data': 'true', 'path': '/'}

>>> self.db.reference("/users/dev1/successful").stream(callbackfunc)
<firebase_admin.db.Stream object at 0x10ef897f0>
>>> {'event': 'put', 'data': True, 'path': '/'}
>>> # The event above is fired off automatically on stream without modification on db

Is it possible to fix this unexpected behavior? I would like to help fix it, but I have no idea where to start looking and I have never worked with SSE before. If anyone can point me in correct direction, please do.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first event is fired initially to retrieve all of the current data in the database.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-c0d3r you can separate out the first event which is fired initially by filtering the type 'event' or the type 'path' in the message received by callback function.

If you intend to look for changes in your data, then that should have the type 'event' as 'put'.

Let me know if this is still unclear and you need more help with this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @Aqsa-K Thanks for your comment. I have implemented functionality to filter the events and to only fire them if the value changes. I also created a PR on your fork. I noticed a few things on the SSE client. When I close the stream, it takes about 10 to 20 seconds for it to return. The following close function triggers some attribute error as well. So I catch them with an exception.

    def close(self):
        self.should_connect = False
        self.retry = 0
        try:
            self.resp.close()
            self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
            self.resp.raw._fp.fp.raw._sock.close()
        except AttributeError:
            pass
    def close(self):
        while not self.sse and not hasattr(self.sse, 'resp'):
            time.sleep(0.001)
        self.sse.running = False
        self.sse.close()
        self.thread.join()
        #  return self

But the problem still remains, the closing of the stream takes too long for a single threaded operation for my app. The close() function of the stream waits for the sse client thread to join. I think that takes up the most time. Is there any way to force the closing of the stream? I need to watch the changes based on the users' query (which differs everytime), then after getting results, I have to dispose of the stream. One workaround I can think of is to do that in a threadpool, so the thread can wait on the sse client closing process, but that does not seem to be the optimal way to me. Or am I doing anything wrong by trying to close?

The following is the way I dispose of the streams

streamRef = db.reference("/path/to/obj").stream(lambda x: print(x))
# attempt some changes on the path
streamRef.close()
# takes from 10 to 20 seconds here to return

Thanks for reading.

@abvijaykumar
Copy link

abvijaykumar commented Apr 13, 2018 via email

@abvijaykumar
Copy link

abvijaykumar commented Apr 14, 2018 via email

@abvijaykumar
Copy link

abvijaykumar commented Apr 15, 2018 via email

@Grimthorr
Copy link

Grimthorr commented Jul 18, 2018

There's some great work here. What is the progress with the PR though? Are we any closer to merging it, if at all?

This is a gentle reminder that this PR is celebrating its 1 year birthday today 🎂.

@hiranya911
Copy link
Contributor

Can't accept a PR with no tests and breaks the build due to lint errors. If somebody's willing to put in the effort to get it up to standard, then we can give it a try.

@the-c0d3r
Copy link
Contributor

Hi @hiranya911 I have fixed up most of the lint errors (unused, too long, no docstrings, wrong import sequence, etc) from db.py and sseclient.py . I couldn't manage to fix the following lint errors. I do not know what other alternatives can be done for those protected variables.

W:101, 8: Access to a protected member _sock of a client class (protected-access)
W:101, 8: Access to a protected member _fp of a client class (protected-access)
W:102, 8: Access to a protected member _sock of a client class (protected-access)
W:102, 8: Access to a protected member _fp of a client class (protected-access)
W:184,25: Access to a protected member _session of a client class (protected-access)
W:186,12: Access to a protected member _session of a client class (protected-access)
W:187,27: Access to a protected member _session of a client class (protected-access)

How do I go about writing unit testing for this kind of client-server module? What other things do I need to do, in order to complete this PR? And do I create a new PR with the latest master's commits? I'm quite new to contributing to open source projects, please bear with me.

@hiranya911
Copy link
Contributor

hiranya911 commented Jul 19, 2018

Hi @the-c0d3r. Thanks for offering to contribute. Here are few pointers:

  1. I think you might have to create a new PR. This PR originally came from @rizasif. So unless you have a way of contributing your changes to his master branch first, you'll most likely need a new PR.
  2. This PR is currently way out of sync with our latest master branch. Perhaps start by merging/syncing with the latest master.
  3. As for the remaining lint errors, I think you can add an ignore directive to _fp and _sock. _session should be available as an unprotected member (session) in the latest master.
  4. Not entirely sure how to unit test this. But I think trying to mount a mock request adapter into the HTTP session could be a good start. This should enable you to inject mock SSE messages into a test connection. See how the existing tests does it.
  5. You can also try writing an integration test (which runs against a Firebase database in production). It's often easier to write one of these.
  6. There are other style-related comments I can give. But I'll defer them until we have a PR that is up-to-date and builds without issues.

@hiranya911
Copy link
Contributor

Closing since #183 was merged

@hiranya911 hiranya911 closed this Aug 9, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet