diff --git a/ee/clickhouse/process_event.py b/ee/clickhouse/process_event.py index 67d62b46a2c21..f1887fe0a0d24 100644 --- a/ee/clickhouse/process_event.py +++ b/ee/clickhouse/process_event.py @@ -3,7 +3,11 @@ from typing import Dict, Optional from uuid import UUID +import statsd from celery import shared_task +from dateutil import parser +from dateutil.relativedelta import relativedelta +from django.conf import settings from django.db.utils import IntegrityError from ee.clickhouse.models.event import create_event @@ -15,7 +19,9 @@ from posthog.models.person import Person from posthog.models.team import Team from posthog.models.utils import UUIDT -from posthog.tasks.process_event import handle_identify_or_alias, handle_timestamp, store_names_and_properties +from posthog.tasks.process_event import handle_identify_or_alias, store_names_and_properties + +statsd.Connection.set_defaults(host=settings.STATSD_HOST, port=settings.STATSD_PORT) def _capture_ee( @@ -76,12 +82,37 @@ def _capture_ee( ) +def handle_timestamp(data: dict, now: datetime.datetime, sent_at: Optional[datetime.datetime]) -> datetime.datetime: + if data.get("timestamp"): + if sent_at: + # sent_at - timestamp == now - x + # x = now + (timestamp - sent_at) + try: + # timestamp and sent_at must both be in the same format: either both with or both without timezones + # otherwise we can't get a diff to add to now + return now + (parser.isoparse(data["timestamp"]) - sent_at) + except TypeError as e: + pass + return parser.isoparse(data["timestamp"]) + now_datetime = now + if data.get("offset"): + return now_datetime - relativedelta(microseconds=data["offset"] * 1000) + return now_datetime + + if check_ee_enabled(): - @shared_task(name="ee.clickhouse.process_event.process_event_ee", ignore_result=True) def process_event_ee( - distinct_id: str, ip: str, site_url: str, data: dict, team_id: int, now: str, sent_at: Optional[str], + distinct_id: str, + ip: str, + site_url: str, + data: dict, + team_id: int, + now: datetime.datetime, + sent_at: Optional[datetime.datetime], ) -> None: + timer = statsd.Timer("%s_posthog_cloud" % (settings.STATSD_PREFIX,)) + timer.start() properties = data.get("properties", {}) if data.get("$set"): properties["$set"] = data["$set"] @@ -113,12 +144,20 @@ def process_event_ee( properties=properties, timestamp=ts, ) + timer.stop("process_event_ee") else: - @shared_task(name="ee.clickhouse.process_event.process_event_ee", ignore_result=True) - def process_event_ee(*args, **kwargs) -> None: + def process_event_ee( + distinct_id: str, + ip: str, + site_url: str, + data: dict, + team_id: int, + now: datetime.datetime, + sent_at: Optional[datetime.datetime], + ) -> None: # Noop if ee is not enabled return diff --git a/ee/clickhouse/test/test_process_event_ee.py b/ee/clickhouse/test/test_process_event_ee.py index 341fca232986c..a5b76e893b26b 100644 --- a/ee/clickhouse/test/test_process_event_ee.py +++ b/ee/clickhouse/test/test_process_event_ee.py @@ -1,8 +1,9 @@ import json from datetime import datetime, timedelta -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union from uuid import UUID +from dateutil import parser from django.utils.timezone import now from freezegun import freeze_time @@ -47,8 +48,22 @@ def get_elements(event_id: Union[int, UUID]) -> List[Element]: ) +def _process_event_ee( + distinct_id: str, ip: str, site_url: str, data: dict, team_id: int, now: str, sent_at: Optional[str], +) -> None: + return process_event_ee( + distinct_id=distinct_id, + ip=ip, + site_url=site_url, + data=data, + team_id=team_id, + now=parser.isoparse(now), + sent_at=parser.isoparse(sent_at) if sent_at else None, + ) + + class ClickhouseProcessEvent( ClickhouseTestMixin, - test_process_event_factory(process_event_ee, _get_events, get_session_recording_events, get_elements), # type: ignore + test_process_event_factory(_process_event_ee, _get_events, get_session_recording_events, get_elements), # type: ignore ): pass diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 0c19e71003ab4..b48e87ffa06bf 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -15,7 +15,7 @@ from posthog.utils import cors_response, get_ip_address, load_data_from_request if settings.EE_AVAILABLE: - from ee.clickhouse.process_event import log_event + from ee.clickhouse.process_event import log_event, process_event_ee def _datetime_from_seconds_or_millis(timestamp: str) -> datetime: @@ -163,28 +163,35 @@ def get_event(request): ) if check_ee_enabled(): - task_name = "ee.clickhouse.process_event.process_event_ee" + process_event_ee( + distinct_id=distinct_id, + ip=get_ip_address(request), + site_url=request.build_absolute_uri("/")[:-1], + data=event, + team_id=team.id, + now=now, + sent_at=sent_at, + ) else: task_name = "posthog.tasks.process_event.process_event" - celery_queue = settings.CELERY_DEFAULT_QUEUE - - if team.plugins_opt_in: - task_name += "_with_plugins" - celery_queue = settings.PLUGINS_CELERY_QUEUE - - celery_app.send_task( - name=task_name, - queue=celery_queue, - args=[ - distinct_id, - get_ip_address(request), - request.build_absolute_uri("/")[:-1], - event, - team.id, - now.isoformat(), - sent_at, - ], - ) + celery_queue = settings.CELERY_DEFAULT_QUEUE + if team.plugins_opt_in: + task_name += "_with_plugins" + celery_queue = settings.PLUGINS_CELERY_QUEUE + + celery_app.send_task( + name=task_name, + queue=celery_queue, + args=[ + distinct_id, + get_ip_address(request), + request.build_absolute_uri("/")[:-1], + event, + team.id, + now.isoformat(), + sent_at, + ], + ) if check_ee_enabled() and settings.LOG_TO_WAL: # log the event to kafka write ahead log for processing diff --git a/posthog/settings.py b/posthog/settings.py index 3d4812fee38ac..014346448c573 100644 --- a/posthog/settings.py +++ b/posthog/settings.py @@ -187,9 +187,9 @@ def print_warning(warning_lines: Sequence[str]): ALLOWED_HOSTS = get_list(os.environ.get("ALLOWED_HOSTS", "*")) # Metrics - StatsD -STATSD_HOST = os.environ.get("STATSD_HOST", None) +STATSD_HOST = os.environ.get("STATSD_HOST", "") STATSD_PORT = os.environ.get("STATSD_PORT", 8125) -STATSD_PREFIX = os.environ.get("STATSD_PREFIX", None) +STATSD_PREFIX = os.environ.get("STATSD_PREFIX", "") # Application definition