From f01a876a3f82a611b2982cfd00521391c9451b79 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Thu, 18 Mar 2021 15:28:41 +0200 Subject: [PATCH 1/4] Chunk session recording events Closes https://github.com/PostHog/posthog/issues/3632 and replaces https://github.com/PostHog/posthog/pull/3566/files This should make it possible to ingest large full snapshot events Base64 is used to compress the data for serialization purposes. pytest.mock is used for clean patching methods --- .github/workflows/ci-backend.yml | 6 +- .../queries/clickhouse_session_recording.py | 2 +- posthog/api/capture.py | 3 + posthog/helpers/session_recording.py | 97 ++++++++++++++ posthog/helpers/tests/__init__.py | 0 .../tests/test_session_recording_helpers.py | 126 ++++++++++++++++++ posthog/queries/sessions/session_recording.py | 5 +- .../sessions/test/test_session_recording.py | 22 ++- requirements-dev.in | 1 + requirements-dev.txt | 3 + 10 files changed, 257 insertions(+), 8 deletions(-) create mode 100644 posthog/helpers/session_recording.py create mode 100644 posthog/helpers/tests/__init__.py create mode 100644 posthog/helpers/tests/test_session_recording_helpers.py diff --git a/.github/workflows/ci-backend.yml b/.github/workflows/ci-backend.yml index 3e58027c28b9f..36834959895c7 100644 --- a/.github/workflows/ci-backend.yml +++ b/.github/workflows/ci-backend.yml @@ -107,7 +107,7 @@ jobs: run: | python -m pip install --upgrade pip python -m pip install -r requirements.txt - python -m pip install freezegun fakeredis pytest pytest-django + python -m pip install freezegun fakeredis pytest pytest-mock pytest-django if: steps.cache.outputs.cache-hit != 'true' - name: Check migrations @@ -196,7 +196,7 @@ jobs: cd deploy python -m pip install --upgrade pip python -m pip install -r requirements.txt - python -m pip install freezegun fakeredis pytest pytest-django + python -m pip install freezegun fakeredis pytest pytest-mock pytest-django if: steps.cache.outputs.cache-hit != 'true' # The 2-step migration process (first master, then current branch) verifies that it'll always @@ -300,7 +300,7 @@ jobs: run: | python -m pip install --upgrade pip python -m pip install -r requirements.txt - python -m pip install freezegun fakeredis pytest pytest-django + python -m pip install freezegun fakeredis pytest pytest-mock pytest-django if: steps.cache.outputs.cache-hit != 'true' - name: Remove ee diff --git a/ee/clickhouse/queries/clickhouse_session_recording.py b/ee/clickhouse/queries/clickhouse_session_recording.py index 806f1fe5f85d8..cf3ef75b39fd9 100644 --- a/ee/clickhouse/queries/clickhouse_session_recording.py +++ b/ee/clickhouse/queries/clickhouse_session_recording.py @@ -35,7 +35,7 @@ distinct_id, MIN(timestamp) AS start_time, MAX(timestamp) AS end_time, - COUNT(JSONExtractInt(snapshot_data, 'type') = 2 ? 1 : NULL) as full_snapshots + COUNT((JSONExtractInt(snapshot_data, 'type') = 2 OR JSONExtractBool(snapshot_data, 'has_full_snapshot')) ? 1 : NULL) as full_snapshots FROM session_recording_events WHERE team_id = %(team_id)s diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 96a3a1a7b251c..0b707a85795de 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -12,6 +12,7 @@ from posthog.celery import app as celery_app from posthog.ee import is_ee_enabled +from posthog.helpers.session_recording import preprocess_session_recording_events from posthog.models import Team, User from posthog.models.feature_flag import get_active_feature_flags from posthog.models.utils import UUIDT @@ -174,6 +175,8 @@ def get_event(request): else: events = [data] + events = preprocess_session_recording_events(events) + for event in events: try: distinct_id = _get_distinct_id(event) diff --git a/posthog/helpers/session_recording.py b/posthog/helpers/session_recording.py new file mode 100644 index 0000000000000..7df039322dc6c --- /dev/null +++ b/posthog/helpers/session_recording.py @@ -0,0 +1,97 @@ +import base64 +import gzip +import json +from collections import defaultdict +from typing import Dict, Generator, List + +from sentry_sdk.api import capture_message + +from posthog.models import utils + +Event = Dict +SnapshotData = Dict + +FULL_SNAPSHOT = 2 + + +def preprocess_session_recording_events(events: List[Event]) -> List[Event]: + result, snapshots = [], [] + for event in events: + if is_snapshot(event): + snapshots.append(event) + else: + result.append(event) + + if len(snapshots) > 0: + result.extend(list(compress_and_chunk_snapshots(snapshots))) + + return result + + +def compress_and_chunk_snapshots(events: List[Event], chunk_size=512 * 1024) -> Generator[Event, None, None]: + data_list = [event["properties"]["$snapshot_data"] for event in events] + session_id = events[0]["properties"]["$session_id"] # assumption: all events within a request have same session_id + has_full_snapshot = any(snapshot_data["type"] == FULL_SNAPSHOT for snapshot_data in data_list) + + compressed_data = compress_to_string(json.dumps(data_list)) + + id = str(utils.UUIDT()) + chunks = chunk_string(compressed_data, chunk_size) + for index, chunk in enumerate(chunks): + yield { + **events[0], + "properties": { + **events[0]["properties"], + "$session_id": session_id, + "$snapshot_data": { + "chunk_id": id, + "chunk_index": index, + "chunk_count": len(chunks), + "data": chunk, + "compression": "gzip-base64", + "has_full_snapshot": has_full_snapshot, + }, + }, + } + + +def decompress_chunked_snapshot_data( + team_id: int, session_recording_id: str, snapshot_list: List[SnapshotData] +) -> Generator[SnapshotData, None, None]: + chunks_collector = defaultdict(list) + for snapshot_data in snapshot_list: + if "chunk_id" not in snapshot_data: + yield snapshot_data + else: + chunks_collector[snapshot_data["chunk_id"]].append(snapshot_data) + + for chunks in chunks_collector.values(): + if len(chunks) != chunks[0]["chunk_count"]: + capture_message( + "Did not find all session recording chunks! Team: {}, Session: {}".format(team_id, session_recording_id) + ) + continue + + b64_compressed_data = "".join(chunk["data"] for chunk in sorted(chunks, key=lambda c: c["chunk_index"])) + decompressed_data = json.loads(decompress(b64_compressed_data)) + + yield from decompressed_data + + +def chunk_string(string: str, chunk_length: int) -> List[str]: + """Split a string into chunk_length-sized elements. Reversal operation: `''.join()`.""" + return [string[0 + offset : chunk_length + offset] for offset in range(0, len(string), chunk_length)] + + +def is_snapshot(event: Dict) -> bool: + return event["event"] == "$snapshot" + + +def compress_to_string(json_string: str) -> str: + compressed_data = gzip.compress(json_string.encode("utf-16", "surrogatepass"), mtime=0) + return base64.b64encode(compressed_data).decode("utf-8") + + +def decompress(base64data: str) -> str: + compressed_bytes = base64.b64decode(base64data) + return gzip.decompress(compressed_bytes).decode("utf-16", "surrogatepass") diff --git a/posthog/helpers/tests/__init__.py b/posthog/helpers/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/posthog/helpers/tests/test_session_recording_helpers.py b/posthog/helpers/tests/test_session_recording_helpers.py new file mode 100644 index 0000000000000..5d11af9621d9b --- /dev/null +++ b/posthog/helpers/tests/test_session_recording_helpers.py @@ -0,0 +1,126 @@ +import pytest +from pytest_mock import MockerFixture + +from posthog.helpers.session_recording import ( + compress_and_chunk_snapshots, + decompress_chunked_snapshot_data, + preprocess_session_recording_events, +) + + +def test_preprocess_with_no_recordings(): + events = [{"event": "$pageview"}, {"event": "$pageleave"}] + assert preprocess_session_recording_events(events) == events + + +def test_preprocess_recording_event_creates_chunks(): + events = [ + { + "event": "$snapshot", + "properties": {"$session_id": "1234", "$snapshot_data": {"type": 2, "foo": "bar"}, "distinct_id": "abc123"}, + } + ] + + preprocessed = preprocess_session_recording_events(events) + assert preprocessed != events + assert len(preprocessed) == 1 + assert preprocessed[0]["event"] == "$snapshot" + assert preprocessed[0]["properties"]["$session_id"] == "1234" + assert preprocessed[0]["properties"]["distinct_id"] == "abc123" + assert "chunk_id" in preprocessed[0]["properties"]["$snapshot_data"] + + +def test_compression_and_chunking(snapshot_events, mocker: MockerFixture): + mocker.patch("posthog.models.utils.UUIDT", return_value="0178495e-8521-0000-8e1c-2652fa57099b") + + assert list(compress_and_chunk_snapshots(snapshot_events)) == [ + { + "event": "$snapshot", + "properties": { + "$session_id": "1234", + "$snapshot_data": { + "chunk_count": 1, + "chunk_id": "0178495e-8521-0000-8e1c-2652fa57099b", + "chunk_index": 0, + "compression": "gzip-base64", + "data": "H4sIAAAAAAAC//v/L5qhmkGJoYShkqGAIRXIsmJQYDBi0AGSSgxpDPlACBFTYkhiSGQoAtK1YFlMXcZYdVUB5UuAOkH6YhkAxKw6nnAAAAA=", + "has_full_snapshot": True, + }, + "distinct_id": "abc123", + }, + } + ] + + +def test_decompression_results_in_same_data(snapshot_events): + assert len(list(compress_and_chunk_snapshots(snapshot_events, 1000))) == 1 + assert compress_and_decompress(snapshot_events, 1000) == [ + snapshot_events[0]["properties"]["$snapshot_data"], + snapshot_events[1]["properties"]["$snapshot_data"], + ] + assert len(list(compress_and_chunk_snapshots(snapshot_events, 100))) == 2 + assert compress_and_decompress(snapshot_events, 100) == [ + snapshot_events[0]["properties"]["$snapshot_data"], + snapshot_events[1]["properties"]["$snapshot_data"], + ] + + +def test_has_full_snapshot_property(snapshot_events): + compressed = list(compress_and_chunk_snapshots(snapshot_events)) + assert len(compressed) == 1 + assert compressed[0]["properties"]["$snapshot_data"]["has_full_snapshot"] + + snapshot_events[0]["properties"]["$snapshot_data"]["type"] = 0 + compressed = list(compress_and_chunk_snapshots(snapshot_events)) + assert len(compressed) == 1 + assert not compressed[0]["properties"]["$snapshot_data"]["has_full_snapshot"] + + +def test_decompress_returns_unmodified_events(snapshot_events): + snapshot_data = [event["properties"]["$snapshot_data"] for event in snapshot_events] + assert list(decompress_chunked_snapshot_data(1, "someid", snapshot_data)) == snapshot_data + + +def test_decompress_ignores_if_not_enough_chunks(snapshot_events): + snapshot_data = complete_snapshots = [event["properties"]["$snapshot_data"] for event in snapshot_events] + snapshot_data.append( + { + "$session_id": "1234", + "$snapshot_data": { + "chunk_id": "unique_id", + "chunk_index": 1, + "chunk_count": 2, + "data": {}, + "compression": "gzip", + "has_full_snapshot": False, + }, + "distinct_id": "abc123", + } + ) + + assert list(decompress_chunked_snapshot_data(1, "someid", snapshot_data)) == complete_snapshots + + +@pytest.fixture +def snapshot_events(): + return [ + { + "event": "$snapshot", + "properties": {"$session_id": "1234", "$snapshot_data": {"type": 2, "foo": "bar"}, "distinct_id": "abc123"}, + }, + { + "event": "$snapshot", + "properties": { + "$session_id": "1234", + "$snapshot_data": {"type": 3, "foo": "zeta"}, + "distinct_id": "abc123", + }, + }, + ] + + +def compress_and_decompress(events, chunk_size): + snapshot_data = [ + event["properties"]["$snapshot_data"] for event in compress_and_chunk_snapshots(events, chunk_size) + ] + return list(decompress_chunked_snapshot_data(1, "someid", snapshot_data)) diff --git a/posthog/queries/sessions/session_recording.py b/posthog/queries/sessions/session_recording.py index 804e93f39194a..cecc441a47e3a 100644 --- a/posthog/queries/sessions/session_recording.py +++ b/posthog/queries/sessions/session_recording.py @@ -12,6 +12,7 @@ from django.db import connection +from posthog.helpers.session_recording import decompress_chunked_snapshot_data from posthog.models import Person, SessionRecordingEvent, Team from posthog.models.filters.sessions_filter import SessionsFilter from posthog.models.session_recording_event import SessionRecordingViewed @@ -36,7 +37,7 @@ MIN(timestamp) as start_time, MAX(timestamp) as end_time, MAX(timestamp) - MIN(timestamp) as duration, - COUNT(*) FILTER(where snapshot_data->>'type' = '2') as full_snapshots + COUNT(*) FILTER(where snapshot_data->>'type' = '2' OR (snapshot_data->>'has_full_snapshot')::boolean) as full_snapshots FROM posthog_sessionrecordingevent WHERE team_id = %(team_id)s @@ -63,6 +64,8 @@ def run(self, team: Team, session_recording_id: str, *args, **kwargs) -> Dict[st from posthog.api.person import PersonSerializer distinct_id, start_time, snapshots = self.query_recording_snapshots(team, session_recording_id) + snapshots = list(decompress_chunked_snapshot_data(team.pk, session_recording_id, snapshots)) + person = ( PersonSerializer(Person.objects.get(team=team, persondistinctid__distinct_id=distinct_id)).data if distinct_id diff --git a/posthog/queries/sessions/test/test_session_recording.py b/posthog/queries/sessions/test/test_session_recording.py index ab53d650656ed..78429c22799a5 100644 --- a/posthog/queries/sessions/test/test_session_recording.py +++ b/posthog/queries/sessions/test/test_session_recording.py @@ -47,11 +47,18 @@ def _test_filter_sessions(self, filter, expected): self.create_snapshot("user", "3", now() + relativedelta(seconds=15)) self.create_snapshot("user", "3", now() + relativedelta(seconds=20)) self.create_snapshot("user", "3", now() + relativedelta(seconds=60)) - self.create_snapshot("user", "4", now() + relativedelta(seconds=999)) - self.create_snapshot("user", "4", now() + relativedelta(seconds=1020)) + self.create_chunked_snapshot( + "user", "4", now() + relativedelta(seconds=999), {"chunk_id": "afb", "has_full_snapshot": True} + ) + self.create_snapshot("user", "4", now() + relativedelta(seconds=1020), type=1) self.create_snapshot("broken-user", "5", now() + relativedelta(seconds=10), type=3) - self.create_snapshot("broken-user", "5", now() + relativedelta(seconds=20), type=3) + self.create_chunked_snapshot( + "broken-user", + "5", + now() + relativedelta(seconds=20), + {"chunk_id": "afb", "has_full_snapshot": False}, + ) sessions = [ {"distinct_id": "user", "start_time": now(), "end_time": now() + relativedelta(seconds=100)}, @@ -116,6 +123,15 @@ def create_snapshot(self, distinct_id, session_id, timestamp, type=2): snapshot_data={"timestamp": timestamp.timestamp(), "type": type}, ) + def create_chunked_snapshot(self, distinct_id, session_id, timestamp, snapshot_data): + event_factory( + team_id=self.team.pk, + distinct_id=distinct_id, + timestamp=timestamp, + session_id=session_id, + snapshot_data=snapshot_data, + ) + return TestSessionRecording diff --git a/requirements-dev.in b/requirements-dev.in index 5846a5e3202c4..a682514179e6b 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -21,3 +21,4 @@ black isort pytest pytest-django +pytest-mock diff --git a/requirements-dev.txt b/requirements-dev.txt index 2bb82fe53b8d7..3d5b02bcc8ccf 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -97,10 +97,13 @@ pyparsing==2.4.7 # via packaging pytest-django==4.1.0 # via -r requirements-dev.in +pytest-mock==3.5.1 + # via -r requirements-dev.in pytest==6.2.2 # via # -r requirements-dev.in # pytest-django + # pytest-mock python-dateutil==2.8.1 # via freezegun pytz==2021.1 From b3474024b60d4f33a89970ab207b8f4aa112dd3b Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Fri, 19 Mar 2021 15:27:43 +0200 Subject: [PATCH 2/4] Mock time.time for py3.7 compatibility --- posthog/helpers/session_recording.py | 2 +- posthog/helpers/tests/test_session_recording_helpers.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/posthog/helpers/session_recording.py b/posthog/helpers/session_recording.py index 7df039322dc6c..93c1c1e87b796 100644 --- a/posthog/helpers/session_recording.py +++ b/posthog/helpers/session_recording.py @@ -88,7 +88,7 @@ def is_snapshot(event: Dict) -> bool: def compress_to_string(json_string: str) -> str: - compressed_data = gzip.compress(json_string.encode("utf-16", "surrogatepass"), mtime=0) + compressed_data = gzip.compress(json_string.encode("utf-16", "surrogatepass")) return base64.b64encode(compressed_data).decode("utf-8") diff --git a/posthog/helpers/tests/test_session_recording_helpers.py b/posthog/helpers/tests/test_session_recording_helpers.py index 5d11af9621d9b..a81f5f9db113a 100644 --- a/posthog/helpers/tests/test_session_recording_helpers.py +++ b/posthog/helpers/tests/test_session_recording_helpers.py @@ -32,6 +32,7 @@ def test_preprocess_recording_event_creates_chunks(): def test_compression_and_chunking(snapshot_events, mocker: MockerFixture): mocker.patch("posthog.models.utils.UUIDT", return_value="0178495e-8521-0000-8e1c-2652fa57099b") + mocker.patch("time.time", return_value=0) assert list(compress_and_chunk_snapshots(snapshot_events)) == [ { From ec9474c74bde97ca4e997e26040c9a84da3c854d Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 23 Mar 2021 09:00:11 +0200 Subject: [PATCH 3/4] Group captured $snapshot events by $session_id --- posthog/helpers/session_recording.py | 10 +++++---- .../tests/test_session_recording_helpers.py | 22 ++++++++++++++----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/posthog/helpers/session_recording.py b/posthog/helpers/session_recording.py index 93c1c1e87b796..473948a6201c5 100644 --- a/posthog/helpers/session_recording.py +++ b/posthog/helpers/session_recording.py @@ -15,14 +15,16 @@ def preprocess_session_recording_events(events: List[Event]) -> List[Event]: - result, snapshots = [], [] + result = [] + snapshots_by_session = defaultdict(list) for event in events: if is_snapshot(event): - snapshots.append(event) + session_recording_id = event["properties"]["$session_id"] + snapshots_by_session[session_recording_id].append(event) else: result.append(event) - if len(snapshots) > 0: + for session_recording_id, snapshots in snapshots_by_session.items(): result.extend(list(compress_and_chunk_snapshots(snapshots))) return result @@ -30,7 +32,7 @@ def preprocess_session_recording_events(events: List[Event]) -> List[Event]: def compress_and_chunk_snapshots(events: List[Event], chunk_size=512 * 1024) -> Generator[Event, None, None]: data_list = [event["properties"]["$snapshot_data"] for event in events] - session_id = events[0]["properties"]["$session_id"] # assumption: all events within a request have same session_id + session_id = events[0]["properties"]["$session_id"] has_full_snapshot = any(snapshot_data["type"] == FULL_SNAPSHOT for snapshot_data in data_list) compressed_data = compress_to_string(json.dumps(data_list)) diff --git a/posthog/helpers/tests/test_session_recording_helpers.py b/posthog/helpers/tests/test_session_recording_helpers.py index a81f5f9db113a..7bfde427f17ac 100644 --- a/posthog/helpers/tests/test_session_recording_helpers.py +++ b/posthog/helpers/tests/test_session_recording_helpers.py @@ -18,16 +18,26 @@ def test_preprocess_recording_event_creates_chunks(): { "event": "$snapshot", "properties": {"$session_id": "1234", "$snapshot_data": {"type": 2, "foo": "bar"}, "distinct_id": "abc123"}, - } + }, + { + "event": "$snapshot", + "properties": {"$session_id": "1234", "$snapshot_data": {"type": 1, "foo": "bar"}, "distinct_id": "abc123"}, + }, + { + "event": "$snapshot", + "properties": {"$session_id": "5678", "$snapshot_data": {"type": 1, "foo": "bar"}, "distinct_id": "abc123"}, + }, ] preprocessed = preprocess_session_recording_events(events) assert preprocessed != events - assert len(preprocessed) == 1 - assert preprocessed[0]["event"] == "$snapshot" - assert preprocessed[0]["properties"]["$session_id"] == "1234" - assert preprocessed[0]["properties"]["distinct_id"] == "abc123" - assert "chunk_id" in preprocessed[0]["properties"]["$snapshot_data"] + assert len(preprocessed) == 2 + for result, expected_session_id in zip(preprocessed, ["1234", "5678"]): + assert result["event"] == "$snapshot" + assert result["properties"]["$session_id"] == expected_session_id + assert result["properties"]["distinct_id"] == "abc123" + assert "chunk_id" in result["properties"]["$snapshot_data"] + assert result["event"] == "$snapshot" def test_compression_and_chunking(snapshot_events, mocker: MockerFixture): From 9972212acabbc0f998607fade4c204a224d59b67 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 23 Mar 2021 11:51:57 +0200 Subject: [PATCH 4/4] Don't chunk already chunked payloads --- posthog/helpers/session_recording.py | 6 +++--- posthog/helpers/tests/test_session_recording_helpers.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/posthog/helpers/session_recording.py b/posthog/helpers/session_recording.py index 473948a6201c5..41ddd6520a473 100644 --- a/posthog/helpers/session_recording.py +++ b/posthog/helpers/session_recording.py @@ -18,7 +18,7 @@ def preprocess_session_recording_events(events: List[Event]) -> List[Event]: result = [] snapshots_by_session = defaultdict(list) for event in events: - if is_snapshot(event): + if is_unchunked_snapshot(event): session_recording_id = event["properties"]["$session_id"] snapshots_by_session[session_recording_id].append(event) else: @@ -85,8 +85,8 @@ def chunk_string(string: str, chunk_length: int) -> List[str]: return [string[0 + offset : chunk_length + offset] for offset in range(0, len(string), chunk_length)] -def is_snapshot(event: Dict) -> bool: - return event["event"] == "$snapshot" +def is_unchunked_snapshot(event: Dict) -> bool: + return event["event"] == "$snapshot" and "chunk_id" not in event["properties"]["$snapshot_data"] def compress_to_string(json_string: str) -> str: diff --git a/posthog/helpers/tests/test_session_recording_helpers.py b/posthog/helpers/tests/test_session_recording_helpers.py index 7bfde427f17ac..b748f94acb40b 100644 --- a/posthog/helpers/tests/test_session_recording_helpers.py +++ b/posthog/helpers/tests/test_session_recording_helpers.py @@ -39,6 +39,9 @@ def test_preprocess_recording_event_creates_chunks(): assert "chunk_id" in result["properties"]["$snapshot_data"] assert result["event"] == "$snapshot" + # it does not rechunk already chunked events + assert preprocess_session_recording_events(preprocessed) == preprocessed + def test_compression_and_chunking(snapshot_events, mocker: MockerFixture): mocker.patch("posthog.models.utils.UUIDT", return_value="0178495e-8521-0000-8e1c-2652fa57099b")