Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add process logs & flush() returns future #27

Merged
merged 8 commits into from
Jun 22, 2022
7 changes: 7 additions & 0 deletions src/amplitude/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Amplitude: the Amplitude client class
"""

from cmath import log
Mercy811 marked this conversation as resolved.
Show resolved Hide resolved
import logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

from typing import Optional, Union, List

from amplitude.config import Config
Expand Down Expand Up @@ -43,6 +45,11 @@ def __init__(self, api_key: str, configuration: Config = Config()):
configuration (amplitude.config.Config, optional): The configuration of client instance. A new instance
with default config value will be used by default.
"""
logging.basicConfig(filename="amplitude.log",
filemode='a+',
format='%(asctime)s - %(code)s - %(message)s - %(event)s',
level=logging.INFO
)
Mercy811 marked this conversation as resolved.
Show resolved Hide resolved
self.configuration: Config = configuration
self.configuration.api_key = api_key
self.__timeline = Timeline()
Expand Down
11 changes: 10 additions & 1 deletion src/amplitude/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


class ResponseProcessor:

def __init__(self):
self.configuration = None
self.storage = None
Expand All @@ -15,11 +14,13 @@ def setup(self, configuration, storage):
def process_response(self, res, events):
if res.status == HttpStatus.SUCCESS:
self.callback(events, res.code, "Event sent successfully.")
self.log(events, res.code, "Event sent successfully.")
elif res.status == HttpStatus.TIMEOUT or res.status == HttpStatus.FAILED:
self.push_to_storage(events, 0, res)
elif res.status == HttpStatus.PAYLOAD_TOO_LARGE:
if len(events) == 1:
self.callback(events, res.code, res.error)
self.log(events, res.code, res.error)
else:
self.configuration._increase_flush_divider()
self.push_to_storage(events, 0, res)
Expand All @@ -28,6 +29,7 @@ def process_response(self, res, events):
raise InvalidAPIKeyError(res.error)
if res.missing_field:
self.callback(events, res.code, f"Request missing required field {res.missing_field}")
self.log(events, res.code, f"Request missing required field {res.missing_field}")
else:
invalid_index_set = res.invalid_or_silenced_index()
events_for_retry = []
Expand All @@ -38,6 +40,7 @@ def process_response(self, res, events):
else:
events_for_retry.append(event)
self.callback(events_for_callback, res.code, res.error)
self.log(events_for_callback, res.code, res.error)
self.push_to_storage(events_for_retry, 0, res)
elif res.status == HttpStatus.TOO_MANY_REQUESTS:
events_for_callback = []
Expand All @@ -56,13 +59,15 @@ def process_response(self, res, events):
self.push_to_storage(events_for_retry, 0, res)
else:
self.callback(events, res.code, res.error or "Unknown error")
self.log(events, res.code, res.error or "Unknown error")

def push_to_storage(self, events, delay, res):
for event in events:
event.retry += 1
success, message = self.storage.push(event, delay=delay)
if not success:
self.callback([event], res.code, message)
self.log([event], res.code, message)

def callback(self, events, code, message):
for event in events:
Expand All @@ -72,3 +77,7 @@ def callback(self, events, code, message):
event.callback(code, message)
except Exception:
self.configuration.logger.exception(f"Error callback for event {event}")

def log(self, events, code, message):
for event in events:
self.configuration.logger.info(message, extra={'code':code, 'event':event})
2 changes: 1 addition & 1 deletion src/amplitude/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def stop(self):
def flush(self):
events = self.storage.pull_all()
if events:
self.threads_pool.submit(self.send, events)
self.send(events)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bohan-amplitude are we ok with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is flush event using main thread or threadpool. I was changing this because I don't want to block main thread. Right now many unittests need to wait flush finish then verify result. I talked with Xinyi and change back to send won't cause problem for most of the time. I think here it's better to let flush() return future object returned by threads_pool.submit(). In test, use the future object to wait for result before verify. In other use cases returned future can be ignored. @Mercy811 Can you take a look at this and try?


def send(self, events):
url = self.configuration.server_url
Expand Down