From 1ad3ee0805dea0b8a2ebbea7ae2f2b287c57503b Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 3 Nov 2020 15:35:36 -0800 Subject: [PATCH 1/6] skip celery for ee path --- ee/clickhouse/process_event.py | 2 -- posthog/api/capture.py | 49 +++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/ee/clickhouse/process_event.py b/ee/clickhouse/process_event.py index 67d62b46a2c21..6f897a0ac637c 100644 --- a/ee/clickhouse/process_event.py +++ b/ee/clickhouse/process_event.py @@ -78,7 +78,6 @@ def _capture_ee( if check_ee_enabled(): - @shared_task(name="ee.clickhouse.process_event.process_event_ee", ignore_result=True) def process_event_ee( distinct_id: str, ip: str, site_url: str, data: dict, team_id: int, now: str, sent_at: Optional[str], ) -> None: @@ -117,7 +116,6 @@ def process_event_ee( else: - @shared_task(name="ee.clickhouse.process_event.process_event_ee", ignore_result=True) def process_event_ee(*args, **kwargs) -> None: # Noop if ee is not enabled return diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 0c19e71003ab4..068d658597143 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -15,7 +15,7 @@ from posthog.utils import cors_response, get_ip_address, load_data_from_request if settings.EE_AVAILABLE: - from ee.clickhouse.process_event import log_event + from ee.clickhouse.process_event import log_event, process_event_ee def _datetime_from_seconds_or_millis(timestamp: str) -> datetime: @@ -163,28 +163,35 @@ def get_event(request): ) if check_ee_enabled(): - task_name = "ee.clickhouse.process_event.process_event_ee" + process_event_ee( + distinct_id=distinct_id, + ip=get_ip_address(request), + site_url=request.build_absolute_uri("/")[:-1], + event=event, + team_id=team.id, + now=now.isoformat(), + sent_at=sent_at, + ) else: task_name = "posthog.tasks.process_event.process_event" - celery_queue = settings.CELERY_DEFAULT_QUEUE - - if team.plugins_opt_in: - task_name += "_with_plugins" - celery_queue = settings.PLUGINS_CELERY_QUEUE - - celery_app.send_task( - name=task_name, - queue=celery_queue, - args=[ - distinct_id, - get_ip_address(request), - request.build_absolute_uri("/")[:-1], - event, - team.id, - now.isoformat(), - sent_at, - ], - ) + celery_queue = settings.CELERY_DEFAULT_QUEUE + if team.plugins_opt_in: + task_name += "_with_plugins" + celery_queue = settings.PLUGINS_CELERY_QUEUE + + celery_app.send_task( + name=task_name, + queue=celery_queue, + args=[ + distinct_id, + get_ip_address(request), + request.build_absolute_uri("/")[:-1], + event, + team.id, + now.isoformat(), + sent_at, + ], + ) if check_ee_enabled() and settings.LOG_TO_WAL: # log the event to kafka write ahead log for processing From 34c9526ea0d233473c2301c2dba8adf9f37db889 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 3 Nov 2020 15:52:03 -0800 Subject: [PATCH 2/6] mypy fixes --- ee/clickhouse/process_event.py | 40 +++++++++++++++++++++++++++++++--- posthog/api/capture.py | 2 +- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/ee/clickhouse/process_event.py b/ee/clickhouse/process_event.py index 6f897a0ac637c..a9aee5b38aef0 100644 --- a/ee/clickhouse/process_event.py +++ b/ee/clickhouse/process_event.py @@ -4,6 +4,8 @@ from uuid import UUID from celery import shared_task +from dateutil import parser +from dateutil.relativedelta import relativedelta from django.db.utils import IntegrityError from ee.clickhouse.models.event import create_event @@ -15,7 +17,7 @@ from posthog.models.person import Person from posthog.models.team import Team from posthog.models.utils import UUIDT -from posthog.tasks.process_event import handle_identify_or_alias, handle_timestamp, store_names_and_properties +from posthog.tasks.process_event import handle_identify_or_alias, store_names_and_properties def _capture_ee( @@ -76,10 +78,34 @@ def _capture_ee( ) +def handle_timestamp(data: dict, now: str, sent_at: Optional[datetime.datetime]) -> datetime.datetime: + if data.get("timestamp"): + if sent_at: + # sent_at - timestamp == now - x + # x = now + (timestamp - sent_at) + try: + # timestamp and sent_at must both be in the same format: either both with or both without timezones + # otherwise we can't get a diff to add to now + return parser.isoparse(now) + (parser.isoparse(data["timestamp"]) - sent_at) + except TypeError as e: + pass + return parser.isoparse(data["timestamp"]) + now_datetime = parser.parse(now) + if data.get("offset"): + return now_datetime - relativedelta(microseconds=data["offset"] * 1000) + return now_datetime + + if check_ee_enabled(): def process_event_ee( - distinct_id: str, ip: str, site_url: str, data: dict, team_id: int, now: str, sent_at: Optional[str], + distinct_id: str, + ip: str, + site_url: str, + data: dict, + team_id: int, + now: str, + sent_at: Optional[datetime.datetime], ) -> None: properties = data.get("properties", {}) if data.get("$set"): @@ -116,7 +142,15 @@ def process_event_ee( else: - def process_event_ee(*args, **kwargs) -> None: + def process_event_ee( + distinct_id: str, + ip: str, + site_url: str, + data: dict, + team_id: int, + now: str, + sent_at: Optional[datetime.datetime], + ) -> None: # Noop if ee is not enabled return diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 068d658597143..62d074a6ec294 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -167,7 +167,7 @@ def get_event(request): distinct_id=distinct_id, ip=get_ip_address(request), site_url=request.build_absolute_uri("/")[:-1], - event=event, + data=event, team_id=team.id, now=now.isoformat(), sent_at=sent_at, From b04effa8d6aa431aa300be8acc6b8885374ca920 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 3 Nov 2020 16:24:27 -0800 Subject: [PATCH 3/6] take celery out and fix types as cleanly and performant as possible --- ee/clickhouse/process_event.py | 10 +++++----- ee/clickhouse/test/test_process_event_ee.py | 19 +++++++++++++++++-- posthog/api/capture.py | 2 +- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/ee/clickhouse/process_event.py b/ee/clickhouse/process_event.py index a9aee5b38aef0..835a1955b9527 100644 --- a/ee/clickhouse/process_event.py +++ b/ee/clickhouse/process_event.py @@ -78,7 +78,7 @@ def _capture_ee( ) -def handle_timestamp(data: dict, now: str, sent_at: Optional[datetime.datetime]) -> datetime.datetime: +def handle_timestamp(data: dict, now: datetime.datetime, sent_at: Optional[datetime.datetime]) -> datetime.datetime: if data.get("timestamp"): if sent_at: # sent_at - timestamp == now - x @@ -86,11 +86,11 @@ def handle_timestamp(data: dict, now: str, sent_at: Optional[datetime.datetime]) try: # timestamp and sent_at must both be in the same format: either both with or both without timezones # otherwise we can't get a diff to add to now - return parser.isoparse(now) + (parser.isoparse(data["timestamp"]) - sent_at) + return now + (parser.isoparse(data["timestamp"]) - sent_at) except TypeError as e: pass return parser.isoparse(data["timestamp"]) - now_datetime = parser.parse(now) + now_datetime = now if data.get("offset"): return now_datetime - relativedelta(microseconds=data["offset"] * 1000) return now_datetime @@ -104,7 +104,7 @@ def process_event_ee( site_url: str, data: dict, team_id: int, - now: str, + now: datetime.datetime, sent_at: Optional[datetime.datetime], ) -> None: properties = data.get("properties", {}) @@ -148,7 +148,7 @@ def process_event_ee( site_url: str, data: dict, team_id: int, - now: str, + now: datetime.datetime, sent_at: Optional[datetime.datetime], ) -> None: # Noop if ee is not enabled diff --git a/ee/clickhouse/test/test_process_event_ee.py b/ee/clickhouse/test/test_process_event_ee.py index 341fca232986c..a5b76e893b26b 100644 --- a/ee/clickhouse/test/test_process_event_ee.py +++ b/ee/clickhouse/test/test_process_event_ee.py @@ -1,8 +1,9 @@ import json from datetime import datetime, timedelta -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union from uuid import UUID +from dateutil import parser from django.utils.timezone import now from freezegun import freeze_time @@ -47,8 +48,22 @@ def get_elements(event_id: Union[int, UUID]) -> List[Element]: ) +def _process_event_ee( + distinct_id: str, ip: str, site_url: str, data: dict, team_id: int, now: str, sent_at: Optional[str], +) -> None: + return process_event_ee( + distinct_id=distinct_id, + ip=ip, + site_url=site_url, + data=data, + team_id=team_id, + now=parser.isoparse(now), + sent_at=parser.isoparse(sent_at) if sent_at else None, + ) + + class ClickhouseProcessEvent( ClickhouseTestMixin, - test_process_event_factory(process_event_ee, _get_events, get_session_recording_events, get_elements), # type: ignore + test_process_event_factory(_process_event_ee, _get_events, get_session_recording_events, get_elements), # type: ignore ): pass diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 62d074a6ec294..b48e87ffa06bf 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -169,7 +169,7 @@ def get_event(request): site_url=request.build_absolute_uri("/")[:-1], data=event, team_id=team.id, - now=now.isoformat(), + now=now, sent_at=sent_at, ) else: From 97c1a664dab7a469b0e827b6d5320ce8698a0bc3 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 3 Nov 2020 16:37:26 -0800 Subject: [PATCH 4/6] add timing --- ee/clickhouse/process_event.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ee/clickhouse/process_event.py b/ee/clickhouse/process_event.py index 835a1955b9527..851dd6d567b47 100644 --- a/ee/clickhouse/process_event.py +++ b/ee/clickhouse/process_event.py @@ -3,9 +3,11 @@ from typing import Dict, Optional from uuid import UUID +import statsd from celery import shared_task from dateutil import parser from dateutil.relativedelta import relativedelta +from django.conf import settings from django.db.utils import IntegrityError from ee.clickhouse.models.event import create_event @@ -107,6 +109,8 @@ def process_event_ee( now: datetime.datetime, sent_at: Optional[datetime.datetime], ) -> None: + timer = statsd.Timer("%s_posthog_cloud" % (settings.STATSD_PREFIX,)) + timer.start() properties = data.get("properties", {}) if data.get("$set"): properties["$set"] = data["$set"] @@ -138,6 +142,7 @@ def process_event_ee( properties=properties, timestamp=ts, ) + timer.stop("process_event_ee") else: From f05c09faefaddeae8ba9c28d64030321081afca4 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 3 Nov 2020 16:47:28 -0800 Subject: [PATCH 5/6] setup statsd, need to clean this up --- ee/clickhouse/process_event.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ee/clickhouse/process_event.py b/ee/clickhouse/process_event.py index 851dd6d567b47..f1887fe0a0d24 100644 --- a/ee/clickhouse/process_event.py +++ b/ee/clickhouse/process_event.py @@ -21,6 +21,8 @@ from posthog.models.utils import UUIDT from posthog.tasks.process_event import handle_identify_or_alias, store_names_and_properties +statsd.Connection.set_defaults(host=settings.STATSD_HOST, port=settings.STATSD_PORT) + def _capture_ee( event_uuid: UUID, From 465fdfb4b89ef3a95cadae2e611315eaf6db2cb1 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 3 Nov 2020 16:55:10 -0800 Subject: [PATCH 6/6] use sane defaults for statsd --- posthog/settings.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/settings.py b/posthog/settings.py index 3d4812fee38ac..014346448c573 100644 --- a/posthog/settings.py +++ b/posthog/settings.py @@ -187,9 +187,9 @@ def print_warning(warning_lines: Sequence[str]): ALLOWED_HOSTS = get_list(os.environ.get("ALLOWED_HOSTS", "*")) # Metrics - StatsD -STATSD_HOST = os.environ.get("STATSD_HOST", None) +STATSD_HOST = os.environ.get("STATSD_HOST", "") STATSD_PORT = os.environ.get("STATSD_PORT", 8125) -STATSD_PREFIX = os.environ.get("STATSD_PREFIX", None) +STATSD_PREFIX = os.environ.get("STATSD_PREFIX", "") # Application definition