Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add process logs & flush() returns future #27

Merged
merged 8 commits into from
Jun 22, 2022
Merged

fix: add process logs & flush() returns future #27

merged 8 commits into from
Jun 22, 2022

Conversation

Mercy811
Copy link
Contributor

Summary

  • send events without submitting to threads pool
  • add logs: amplitude.log

Checklist

  • Does your PR title have the correct title format?
  • Does your PR have a breaking change?:

@Mercy811
Copy link
Contributor Author

Since we already have a callback function where users can define logs they want to see. If I add a default logger to output every piece of logs in the terminal, it's gonna be a lot. So I store logs to a file amplitude.log.

For flush() function in worker.py, send events directly instead of submitting them to threads pool. If events are sent to another thread, the following situation may happen: in one thread, following code is executed by order; in the other thread, events are being processed.

In this case, I got errors in test_worker.py everywhere workers.flush() exits, but all unit tests are passed in github where python -m unittest discover -s ./src -p 'test_*.py' is executed. The reason behind this is probably because code is executed faster in laptop than in github. And when to test re-try logic, we verify the number of events, so to get the right/expected number of events, we have to wait the previous event process done.

src/amplitude/client.py Outdated Show resolved Hide resolved
src/amplitude/client.py Outdated Show resolved Hide resolved
@@ -39,7 +39,7 @@ def stop(self):
def flush(self):
events = self.storage.pull_all()
if events:
self.threads_pool.submit(self.send, events)
self.send(events)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bohan-amplitude are we ok with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is flush event using main thread or threadpool. I was changing this because I don't want to block main thread. Right now many unittests need to wait flush finish then verify result. I talked with Xinyi and change back to send won't cause problem for most of the time. I think here it's better to let flush() return future object returned by threads_pool.submit(). In test, use the future object to wait for result before verify. In other use cases returned future can be ignored. @Mercy811 Can you take a look at this and try?

@@ -43,7 +43,7 @@ def remove(self, plugin):
def flush(self):
for destination in self.plugins[PluginType.DESTINATION]:
try:
destination.flush()
return destination.flush()
Copy link
Contributor

@bohan-amplitude bohan-amplitude Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since timeline could have multiple destination plugins, return here may end the function earlier and only return future of first flush call. Should return a list of future for all destination plugins

@@ -4,6 +4,7 @@
Amplitude: the Amplitude client class
"""

import logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

@Mercy811 Mercy811 changed the title fix: send events without submitting to threads pool & add logging fix: add process logs & client.flush() returns a list of futures objects from plugins Jun 22, 2022
@Mercy811 Mercy811 changed the title fix: add process logs & client.flush() returns a list of futures objects from plugins fix: add process logs & flush() returns future Jun 22, 2022
Comment on lines 135 to 136
if flush_future:
flush_future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra indent here. Otherwise look good to me.

@Mercy811 Mercy811 merged commit 33d4c5c into main Jun 22, 2022
@Mercy811 Mercy811 deleted the mercy-dev branch June 22, 2022 22:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants