Skip to content

Commit

Permalink
Clickhouse window funnel (#2086)
Browse files Browse the repository at this point in the history
* Clickhouse use window funnel

* IPDB

* Fix more issues

* Sub second precision

* use dict instead of Person
  • Loading branch information
timgl committed Oct 29, 2020
1 parent 37680e3 commit 0d8faa4
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 89 deletions.
77 changes: 28 additions & 49 deletions ee/clickhouse/queries/clickhouse_funnel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Any, Dict, List, Tuple
import uuid
from collections import defaultdict
from typing import Any, Dict, List, Match, Tuple

from django.utils import timezone

Expand All @@ -7,8 +9,6 @@
from ee.clickhouse.models.property import parse_prop_clauses
from ee.clickhouse.queries.util import parse_timestamps
from ee.clickhouse.sql.funnels.funnel import FUNNEL_SQL
from ee.clickhouse.sql.funnels.step_action import STEP_ACTION_SQL
from ee.clickhouse.sql.funnels.step_event import STEP_EVENT_SQL
from posthog.constants import TREND_FILTER_TYPE_ACTIONS
from posthog.models.action import Action
from posthog.models.entity import Entity
Expand All @@ -22,18 +22,11 @@
class ClickhouseFunnel(Funnel):
_filter: Filter
_team: Team
_should_join_person: bool

def __init__(self, filter: Filter, team: Team) -> None:
self._filter = filter
self._team = team

self._should_join_person = False
for entity in self._filter.entities:
for entity_prop in entity.properties:
if entity_prop.type == "person":
self._should_join_person = True

def _build_filters(self, entity: Entity, index: int) -> str:
prop_filters, prop_filter_params = parse_prop_clauses(
"uuid", entity.properties, self._team, prepend=str(index), json_extract=True
Expand All @@ -44,12 +37,6 @@ def _build_filters(self, entity: Entity, index: int) -> str:
return ""

def _build_steps_query(self, entity: Entity, index: int) -> str:
parsed_date_from, parsed_date_to = parse_timestamps(filter=self._filter)
is_first_step = (
"timestamp <> toDateTime(0)"
if index == 0
else "step_{prev_step} <> toDateTime(0) AND timestamp >= step_{prev_step}".format(prev_step=index - 1)
)
filters = self._build_filters(entity, index)
if entity.type == TREND_FILTER_TYPE_ACTIONS:
action = Action.objects.get(pk=entity.id)
Expand All @@ -58,29 +45,9 @@ def _build_steps_query(self, entity: Entity, index: int) -> str:
return ""

self.params.update(action_params)
content_sql = STEP_ACTION_SQL.format(
team_id=self._team.pk,
actions_query=action_query,
parsed_date_from=(parsed_date_from or ""),
parsed_date_to=(parsed_date_to or ""),
filters=filters,
step=index,
is_first_step=is_first_step,
person_prop_param=", person_properties" if self._should_join_person else "",
person_prop_arg=", person_props" if self._should_join_person else "",
)
content_sql = "uuid IN {actions_query} {filters}".format(actions_query=action_query, filters=filters,)
else:
content_sql = STEP_EVENT_SQL.format(
team_id=self._team.pk,
event=entity.id,
parsed_date_from=(parsed_date_from or ""),
parsed_date_to=(parsed_date_to or ""),
filters=filters,
step=index,
is_first_step=is_first_step,
person_prop_param=", person_properties" if self._should_join_person else "",
person_prop_arg=", person_props" if self._should_join_person else "",
)
content_sql = "event = '{event}' {filters}".format(event=entity.id, filters=filters)
return content_sql

def _exec_query(self) -> List[Tuple]:
Expand All @@ -98,29 +65,41 @@ def _exec_query(self) -> List[Tuple]:
self.params: Dict = {"team_id": self._team.pk, **prop_filter_params}
steps = [self._build_steps_query(entity, index) for index, entity in enumerate(self._filter.entities)]
query = FUNNEL_SQL.format(
select_steps=",".join(["step_{}".format(index) for index, _ in enumerate(self._filter.entities)]),
team_id=self._team.id,
steps=", ".join(steps),
filters=prop_filters.replace("uuid IN", "events.uuid IN", 1),
parsed_date_from=parsed_date_from,
parsed_date_to=parsed_date_to,
person_prop_join="JOIN (SELECT id, properties FROM person WHERE team_id = %(team_id)s) as person ON person_distinct_id.person_id = person.id"
if self._should_join_person
else "",
person_prop_alias="groupArray(person.properties) as person_props," if self._should_join_person else "",
)
return sync_execute(query, self.params)

def _data_to_return(self, results: List[Dict]) -> List[Dict[str, Any]]:
steps = []
person_score: Dict = defaultdict(int)
for index, funnel_step in enumerate(self._filter.entities):
relevant_people = []
for person in results:
if index < person["max_step"]:
person_score[person["uuid"]] += 1
relevant_people.append(person["uuid"])

steps.append(self._serialize_step(funnel_step, relevant_people))

if len(steps) > 0:
for index, _ in enumerate(steps):
steps[index]["people"] = sorted(steps[index]["people"], key=lambda p: person_score[p], reverse=True)[
0:100
]

return steps

def run(self, *args, **kwargs) -> List[Dict[str, Any]]:
results = self._exec_query()
if len(results) == 0:
return self.data_to_return([])
return self._data_to_return([])
width = len(results[0]) # the three
res = []
for result_tuple in results:
result = list(result_tuple)
person = Person(pk=result[0], uuid=result[0])
for step in range(0, width - 1):
setattr(person, "step_{}".format(step), result[step + 1] if result[step + 1].year != 1970 else None)
res.append(person)
return self.data_to_return(res)
res.append({"uuid": result[0], "max_step": result[1]})
return self._data_to_return(res)
30 changes: 14 additions & 16 deletions ee/clickhouse/sql/funnels/funnel.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
FUNNEL_SQL = """
SELECT id, {select_steps} FROM (
SELECT
person_distinct_id.person_id as id,
groupArray(events.timestamp) as timestamps,
groupArray(events.event) as eventsArr,
groupArray(events.uuid) as event_ids,
groupArray(events.properties) as event_props,
groupArray(events.distinct_id) as distinct_ids,
{person_prop_alias}
{steps}
FROM events
JOIN (SELECT person_id, distinct_id FROM person_distinct_id WHERE team_id = %(team_id)s) as person_distinct_id ON person_distinct_id.distinct_id = events.distinct_id
{person_prop_join}
WHERE team_id = %(team_id)s {filters} {parsed_date_from} {parsed_date_to}
GROUP BY person_distinct_id.person_id, team_id
ORDER BY timestamps
) WHERE step_0 <> toDateTime(0)
person_id as id,
windowFunnel(6048000000000000)(toUInt64(toUnixTimestamp64Micro(timestamp)),
{steps}
)
FROM (
SELECT
events.*, pid.person_id
FROM
events
JOIN (SELECT person_id, distinct_id FROM person_distinct_id WHERE team_id = %(team_id)s) as pid
ON pid.distinct_id = events.distinct_id
WHERE team_id = %(team_id)s {filters} {parsed_date_from} {parsed_date_to}
)
GROUP BY person_id
"""
9 changes: 0 additions & 9 deletions ee/clickhouse/sql/funnels/step_action.py

This file was deleted.

9 changes: 0 additions & 9 deletions ee/clickhouse/sql/funnels/step_event.py

This file was deleted.

5 changes: 3 additions & 2 deletions ee/clickhouse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def tearDown(self):
self._create_event_tables()
self._create_person_tables()
self._create_session_recording_tables()
except ServerException:
except ServerException as e:
print(e)
pass

def _destroy_person_tables(self):
Expand Down Expand Up @@ -113,4 +114,4 @@ def assertNumQueries(self, num, func=None, *args, using=DEFAULT_DB_ALIAS, **kwar


def endpoint_enabled(endpoint_flag: str, distinct_id: str):
return posthoganalytics.feature_enabled(endpoint_flag, distinct_id) or settings.DEBUG or settings.TEST
return settings.DEBUG or settings.TEST or posthoganalytics.feature_enabled(endpoint_flag, distinct_id)
8 changes: 4 additions & 4 deletions posthog/queries/test/test_funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ def test_funnel_with_single_step(self):
person2_stopped_after_signup = person_factory(distinct_ids=["stopped_after_signup2"], team_id=self.team.pk)
self._signup_event(distinct_id="stopped_after_signup2")

# with self.assertNumQueries(1):
result = funnel.run()
with self.assertNumQueries(1):
result = funnel.run()
self.assertEqual(result[0]["name"], "user signed up")
self.assertEqual(result[0]["count"], 2)
# check ordering of people in first step
self.assertEqual(
self.assertCountEqual(
result[0]["people"], [person1_stopped_after_signup.uuid, person2_stopped_after_signup.uuid],
)

Expand Down Expand Up @@ -138,7 +138,7 @@ def test_funnel_events(self):
self.assertEqual(result[0]["name"], "user signed up")
self.assertEqual(result[0]["count"], 4)
# check ordering of people in first step
self.assertEqual(
self.assertCountEqual(
result[0]["people"],
[
person_stopped_after_movie.uuid,
Expand Down

0 comments on commit 0d8faa4

Please sign in to comment.