Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk session recording events #3705

Merged
merged 4 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-django
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django
if: steps.cache.outputs.cache-hit != 'true'

- name: Check migrations
Expand Down Expand Up @@ -196,7 +196,7 @@ jobs:
cd deploy
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-django
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django
if: steps.cache.outputs.cache-hit != 'true'

# The 2-step migration process (first master, then current branch) verifies that it'll always
Expand Down Expand Up @@ -300,7 +300,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-django
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django
if: steps.cache.outputs.cache-hit != 'true'

- name: Remove ee
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/queries/clickhouse_session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
distinct_id,
MIN(timestamp) AS start_time,
MAX(timestamp) AS end_time,
COUNT(JSONExtractInt(snapshot_data, 'type') = 2 ? 1 : NULL) as full_snapshots
COUNT((JSONExtractInt(snapshot_data, 'type') = 2 OR JSONExtractBool(snapshot_data, 'has_full_snapshot')) ? 1 : NULL) as full_snapshots
FROM session_recording_events
WHERE
team_id = %(team_id)s
Expand Down
3 changes: 3 additions & 0 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from posthog.celery import app as celery_app
from posthog.ee import is_ee_enabled
from posthog.helpers.session_recording import preprocess_session_recording_events
from posthog.models import Team, User
from posthog.models.feature_flag import get_active_feature_flags
from posthog.models.utils import UUIDT
Expand Down Expand Up @@ -174,6 +175,8 @@ def get_event(request):
else:
events = [data]

events = preprocess_session_recording_events(events)

for event in events:
try:
distinct_id = _get_distinct_id(event)
Expand Down
99 changes: 99 additions & 0 deletions posthog/helpers/session_recording.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import base64
import gzip
import json
from collections import defaultdict
from typing import Dict, Generator, List

from sentry_sdk.api import capture_message

from posthog.models import utils

Event = Dict
SnapshotData = Dict

FULL_SNAPSHOT = 2


def preprocess_session_recording_events(events: List[Event]) -> List[Event]:
result = []
snapshots_by_session = defaultdict(list)
for event in events:
if is_unchunked_snapshot(event):
session_recording_id = event["properties"]["$session_id"]
snapshots_by_session[session_recording_id].append(event)
else:
result.append(event)

for session_recording_id, snapshots in snapshots_by_session.items():
result.extend(list(compress_and_chunk_snapshots(snapshots)))

return result


def compress_and_chunk_snapshots(events: List[Event], chunk_size=512 * 1024) -> Generator[Event, None, None]:
data_list = [event["properties"]["$snapshot_data"] for event in events]
session_id = events[0]["properties"]["$session_id"]
has_full_snapshot = any(snapshot_data["type"] == FULL_SNAPSHOT for snapshot_data in data_list)

compressed_data = compress_to_string(json.dumps(data_list))

id = str(utils.UUIDT())
chunks = chunk_string(compressed_data, chunk_size)
for index, chunk in enumerate(chunks):
yield {
**events[0],
"properties": {
**events[0]["properties"],
"$session_id": session_id,
"$snapshot_data": {
"chunk_id": id,
"chunk_index": index,
"chunk_count": len(chunks),
"data": chunk,
"compression": "gzip-base64",
"has_full_snapshot": has_full_snapshot,
},
},
}


def decompress_chunked_snapshot_data(
team_id: int, session_recording_id: str, snapshot_list: List[SnapshotData]
) -> Generator[SnapshotData, None, None]:
chunks_collector = defaultdict(list)
for snapshot_data in snapshot_list:
if "chunk_id" not in snapshot_data:
yield snapshot_data
else:
chunks_collector[snapshot_data["chunk_id"]].append(snapshot_data)

for chunks in chunks_collector.values():
if len(chunks) != chunks[0]["chunk_count"]:
capture_message(
"Did not find all session recording chunks! Team: {}, Session: {}".format(team_id, session_recording_id)
)
continue

b64_compressed_data = "".join(chunk["data"] for chunk in sorted(chunks, key=lambda c: c["chunk_index"]))
decompressed_data = json.loads(decompress(b64_compressed_data))

yield from decompressed_data


def chunk_string(string: str, chunk_length: int) -> List[str]:
"""Split a string into chunk_length-sized elements. Reversal operation: `''.join()`."""
return [string[0 + offset : chunk_length + offset] for offset in range(0, len(string), chunk_length)]


def is_unchunked_snapshot(event: Dict) -> bool:
return event["event"] == "$snapshot" and "chunk_id" not in event["properties"]["$snapshot_data"]


def compress_to_string(json_string: str) -> str:
compressed_data = gzip.compress(json_string.encode("utf-16", "surrogatepass"))
return base64.b64encode(compressed_data).decode("utf-8")


def decompress(base64data: str) -> str:
compressed_bytes = base64.b64decode(base64data)
return gzip.decompress(compressed_bytes).decode("utf-16", "surrogatepass")
Empty file.
140 changes: 140 additions & 0 deletions posthog/helpers/tests/test_session_recording_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import pytest
from pytest_mock import MockerFixture

from posthog.helpers.session_recording import (
compress_and_chunk_snapshots,
decompress_chunked_snapshot_data,
preprocess_session_recording_events,
)


def test_preprocess_with_no_recordings():
events = [{"event": "$pageview"}, {"event": "$pageleave"}]
assert preprocess_session_recording_events(events) == events


def test_preprocess_recording_event_creates_chunks():
events = [
{
"event": "$snapshot",
"properties": {"$session_id": "1234", "$snapshot_data": {"type": 2, "foo": "bar"}, "distinct_id": "abc123"},
},
{
"event": "$snapshot",
"properties": {"$session_id": "1234", "$snapshot_data": {"type": 1, "foo": "bar"}, "distinct_id": "abc123"},
},
{
"event": "$snapshot",
"properties": {"$session_id": "5678", "$snapshot_data": {"type": 1, "foo": "bar"}, "distinct_id": "abc123"},
},
]

preprocessed = preprocess_session_recording_events(events)
assert preprocessed != events
assert len(preprocessed) == 2
for result, expected_session_id in zip(preprocessed, ["1234", "5678"]):
assert result["event"] == "$snapshot"
assert result["properties"]["$session_id"] == expected_session_id
assert result["properties"]["distinct_id"] == "abc123"
assert "chunk_id" in result["properties"]["$snapshot_data"]
assert result["event"] == "$snapshot"

# it does not rechunk already chunked events
assert preprocess_session_recording_events(preprocessed) == preprocessed


def test_compression_and_chunking(snapshot_events, mocker: MockerFixture):
mocker.patch("posthog.models.utils.UUIDT", return_value="0178495e-8521-0000-8e1c-2652fa57099b")
mocker.patch("time.time", return_value=0)

assert list(compress_and_chunk_snapshots(snapshot_events)) == [
{
"event": "$snapshot",
"properties": {
"$session_id": "1234",
"$snapshot_data": {
"chunk_count": 1,
"chunk_id": "0178495e-8521-0000-8e1c-2652fa57099b",
"chunk_index": 0,
"compression": "gzip-base64",
"data": "H4sIAAAAAAAC//v/L5qhmkGJoYShkqGAIRXIsmJQYDBi0AGSSgxpDPlACBFTYkhiSGQoAtK1YFlMXcZYdVUB5UuAOkH6YhkAxKw6nnAAAAA=",
"has_full_snapshot": True,
},
"distinct_id": "abc123",
},
}
]


def test_decompression_results_in_same_data(snapshot_events):
assert len(list(compress_and_chunk_snapshots(snapshot_events, 1000))) == 1
assert compress_and_decompress(snapshot_events, 1000) == [
snapshot_events[0]["properties"]["$snapshot_data"],
snapshot_events[1]["properties"]["$snapshot_data"],
]
assert len(list(compress_and_chunk_snapshots(snapshot_events, 100))) == 2
assert compress_and_decompress(snapshot_events, 100) == [
snapshot_events[0]["properties"]["$snapshot_data"],
snapshot_events[1]["properties"]["$snapshot_data"],
]


def test_has_full_snapshot_property(snapshot_events):
compressed = list(compress_and_chunk_snapshots(snapshot_events))
assert len(compressed) == 1
assert compressed[0]["properties"]["$snapshot_data"]["has_full_snapshot"]

snapshot_events[0]["properties"]["$snapshot_data"]["type"] = 0
compressed = list(compress_and_chunk_snapshots(snapshot_events))
assert len(compressed) == 1
assert not compressed[0]["properties"]["$snapshot_data"]["has_full_snapshot"]


def test_decompress_returns_unmodified_events(snapshot_events):
snapshot_data = [event["properties"]["$snapshot_data"] for event in snapshot_events]
assert list(decompress_chunked_snapshot_data(1, "someid", snapshot_data)) == snapshot_data


def test_decompress_ignores_if_not_enough_chunks(snapshot_events):
snapshot_data = complete_snapshots = [event["properties"]["$snapshot_data"] for event in snapshot_events]
snapshot_data.append(
{
"$session_id": "1234",
"$snapshot_data": {
"chunk_id": "unique_id",
"chunk_index": 1,
"chunk_count": 2,
"data": {},
"compression": "gzip",
"has_full_snapshot": False,
},
"distinct_id": "abc123",
}
)

assert list(decompress_chunked_snapshot_data(1, "someid", snapshot_data)) == complete_snapshots


@pytest.fixture
def snapshot_events():
return [
{
"event": "$snapshot",
"properties": {"$session_id": "1234", "$snapshot_data": {"type": 2, "foo": "bar"}, "distinct_id": "abc123"},
},
{
"event": "$snapshot",
"properties": {
"$session_id": "1234",
"$snapshot_data": {"type": 3, "foo": "zeta"},
"distinct_id": "abc123",
},
},
]


def compress_and_decompress(events, chunk_size):
snapshot_data = [
event["properties"]["$snapshot_data"] for event in compress_and_chunk_snapshots(events, chunk_size)
]
return list(decompress_chunked_snapshot_data(1, "someid", snapshot_data))
5 changes: 4 additions & 1 deletion posthog/queries/sessions/session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from django.db import connection

from posthog.helpers.session_recording import decompress_chunked_snapshot_data
from posthog.models import Person, SessionRecordingEvent, Team
from posthog.models.filters.sessions_filter import SessionsFilter
from posthog.models.session_recording_event import SessionRecordingViewed
Expand All @@ -36,7 +37,7 @@
MIN(timestamp) as start_time,
MAX(timestamp) as end_time,
MAX(timestamp) - MIN(timestamp) as duration,
COUNT(*) FILTER(where snapshot_data->>'type' = '2') as full_snapshots
COUNT(*) FILTER(where snapshot_data->>'type' = '2' OR (snapshot_data->>'has_full_snapshot')::boolean) as full_snapshots
FROM posthog_sessionrecordingevent
WHERE
team_id = %(team_id)s
Expand All @@ -63,6 +64,8 @@ def run(self, team: Team, session_recording_id: str, *args, **kwargs) -> Dict[st
from posthog.api.person import PersonSerializer

distinct_id, start_time, snapshots = self.query_recording_snapshots(team, session_recording_id)
snapshots = list(decompress_chunked_snapshot_data(team.pk, session_recording_id, snapshots))

person = (
PersonSerializer(Person.objects.get(team=team, persondistinctid__distinct_id=distinct_id)).data
if distinct_id
Expand Down
22 changes: 19 additions & 3 deletions posthog/queries/sessions/test/test_session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,18 @@ def _test_filter_sessions(self, filter, expected):
self.create_snapshot("user", "3", now() + relativedelta(seconds=15))
self.create_snapshot("user", "3", now() + relativedelta(seconds=20))
self.create_snapshot("user", "3", now() + relativedelta(seconds=60))
self.create_snapshot("user", "4", now() + relativedelta(seconds=999))
self.create_snapshot("user", "4", now() + relativedelta(seconds=1020))
self.create_chunked_snapshot(
"user", "4", now() + relativedelta(seconds=999), {"chunk_id": "afb", "has_full_snapshot": True}
)
self.create_snapshot("user", "4", now() + relativedelta(seconds=1020), type=1)

self.create_snapshot("broken-user", "5", now() + relativedelta(seconds=10), type=3)
self.create_snapshot("broken-user", "5", now() + relativedelta(seconds=20), type=3)
self.create_chunked_snapshot(
"broken-user",
"5",
now() + relativedelta(seconds=20),
{"chunk_id": "afb", "has_full_snapshot": False},
)

sessions = [
{"distinct_id": "user", "start_time": now(), "end_time": now() + relativedelta(seconds=100)},
Expand Down Expand Up @@ -116,6 +123,15 @@ def create_snapshot(self, distinct_id, session_id, timestamp, type=2):
snapshot_data={"timestamp": timestamp.timestamp(), "type": type},
)

def create_chunked_snapshot(self, distinct_id, session_id, timestamp, snapshot_data):
event_factory(
team_id=self.team.pk,
distinct_id=distinct_id,
timestamp=timestamp,
session_id=session_id,
snapshot_data=snapshot_data,
)

return TestSessionRecording


Expand Down
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ black
isort
pytest
pytest-django
pytest-mock
3 changes: 3 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@ pyparsing==2.4.7
# via packaging
pytest-django==4.1.0
# via -r requirements-dev.in
pytest-mock==3.5.1
# via -r requirements-dev.in
pytest==6.2.2
# via
# -r requirements-dev.in
# pytest-django
# pytest-mock
python-dateutil==2.8.1
# via freezegun
pytz==2021.1
Expand Down