Skip to content

Commit

Permalink
Funnel with Strict Step Ordering (#4890)
Browse files Browse the repository at this point in the history
* wip: pagination for persons on clickhouse funnels

* wip: added offset support for getting a list of persons; added support for conversion window;

* fixed mypy exception

* helper function to insert data for local testing

* moved generate code into separate class for more functionality later

* corrected person_distinct_id to use the person id from postgres

* minor corrections to generate local class along with addition of data cleanup via destroy() method

* reduce the number of persons who make it to each step

* moved funnel queries to a new folder for better organization; separated funnel_persons and funnel_trends_persons into individual classes;

* funnel persons and tests

* initial implementation

* invoke the funnel or funnel trends class respectively

* add a test

* add breakdown handling and first test

* add test stubs

* remove repeats

* mypy corrections and PR feedback

* run funnel test suite on new query implementation

* remove imports

* corrected tests

* minor test updates

* correct func name

* fix types

* func name change

* move builder functions to funnel base

* add test classe for new funnel

* Handle multiple same events in the funnel (#4863)

* dedup + tests

* deep equality. Tests to come

* write test for entity equality

* finish testing funnels

* clean up comments

* add strict option for funnels

* typing

* use new persons pattern

* persons for funnel strict ordering

Co-authored-by: Buddy Williams <buddy@posthog.com>
Co-authored-by: eric <eeoneric@gmail.com>
  • Loading branch information
3 people committed Jul 1, 2021
1 parent e60e3c7 commit a5b7b31
Show file tree
Hide file tree
Showing 8 changed files with 491 additions and 10 deletions.
12 changes: 9 additions & 3 deletions ee/clickhouse/queries/funnels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,21 @@ def _get_sorting_condition(self, curr_index: int, max_steps: int):

return f"if({' AND '.join(conditions)}, {curr_index}, {self._get_sorting_condition(curr_index - 1, max_steps)})"

def _get_inner_event_query(self, entities=None, entity_name="events") -> str:
def _get_inner_event_query(
self, entities=None, entity_name="events", skip_entity_filter=False, skip_step_filter=False
) -> str:
entities_to_use = entities or self._filter.entities

event_query, params = FunnelEventQuery(filter=self._filter, team_id=self._team.pk).get_query(
entities_to_use, entity_name
entities_to_use, entity_name, skip_entity_filter=skip_entity_filter
)

self.params.update(params)
steps_conditions = self._get_steps_conditions(length=len(entities_to_use))

if skip_step_filter:
steps_conditions = "1=1"
else:
steps_conditions = self._get_steps_conditions(length=len(self._filter.entities))

all_step_cols: List[str] = []
for index, entity in enumerate(entities_to_use):
Expand Down
9 changes: 7 additions & 2 deletions ee/clickhouse/queries/funnels/funnel_event_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class FunnelEventQuery(ClickhouseEventQuery):
def get_query(self, entities=None, entity_name="events") -> Tuple[str, Dict[str, Any]]:
def get_query(self, entities=None, entity_name="events", skip_entity_filter=False) -> Tuple[str, Dict[str, Any]]:
_fields = (
f"{self.EVENT_TABLE_ALIAS}.event as event, {self.EVENT_TABLE_ALIAS}.team_id as team_id, {self.EVENT_TABLE_ALIAS}.distinct_id as distinct_id, {self.EVENT_TABLE_ALIAS}.timestamp as timestamp, {self.EVENT_TABLE_ALIAS}.properties as properties, {self.EVENT_TABLE_ALIAS}.elements_chain as elements_chain"
+ (f", {self.DISTINCT_ID_TABLE_ALIAS}.person_id as person_id" if self._should_join_distinct_ids else "")
Expand All @@ -20,7 +20,12 @@ def get_query(self, entities=None, entity_name="events") -> Tuple[str, Dict[str,
prop_query, prop_params = self._get_props(prop_filters)
self.params.update(prop_params)

entity_query, entity_params = self._get_entity_query(entities, entity_name)
if skip_entity_filter:
entity_query = ""
entity_params: Dict[str, Any] = {}
else:
entity_query, entity_params = self._get_entity_query(entities, entity_name)

self.params.update(entity_params)

query = f"""
Expand Down
73 changes: 73 additions & 0 deletions ee/clickhouse/queries/funnels/funnel_strict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import List

from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase


class ClickhouseFunnelStrict(ClickhouseFunnelBase):
def get_query(self, format_properties):
max_steps = len(self._filter.entities)
return f"""
SELECT {self._get_count_columns(max_steps)} {self._get_step_time_avgs(max_steps)} FROM (
{self.get_step_counts_query()}
) SETTINGS allow_experimental_window_functions = 1
"""

def get_step_counts_query(self):

max_steps = len(self._filter.entities)

partition_select = self.get_partition_cols(1, max_steps)
sorting_condition = self._get_sorting_condition(max_steps, max_steps)

inner_query = f"""
SELECT
person_id,
timestamp,
{partition_select}
FROM ({self._get_inner_event_query(skip_entity_filter=True, skip_step_filter=True)})
"""

formatted_query = f"""
SELECT *, {sorting_condition} AS steps {self._get_step_times(max_steps)} FROM (
{inner_query}
) WHERE step_0 = 1"""

return f"""
SELECT person_id, max(steps) AS steps {self._get_step_time_avgs(max_steps)} FROM (
{formatted_query}
) GROUP BY person_id
"""

def get_partition_cols(self, level_index: int, max_steps: int):
cols: List[str] = []
for i in range(0, max_steps):
cols.append(f"step_{i}")
if i < level_index:
cols.append(f"latest_{i}")
else:
cols.append(
f"min(latest_{i}) over (PARTITION by person_id ORDER BY timestamp DESC ROWS BETWEEN {i} PRECEDING AND {i} PRECEDING) latest_{i}"
)
return ", ".join(cols)

# TODO: copied from funnel.py. Once the new funnel query replaces old one, the base format_results function can use this
def _format_results(self, results):
# Format of this is [step order, person count (that reached that step), array of person uuids]
steps = []
total_people = 0

for step in reversed(self._filter.entities):

if results[0] and len(results[0]) > 0:
total_people += results[0][step.order]

serialized_result = self._serialize_step(step, total_people, [])
if step.order > 0:
serialized_result.update(
{"average_conversion_time": results[0][step.order + len(self._filter.entities) - 1]}
)
else:
serialized_result.update({"average_conversion_time": None})
steps.append(serialized_result)

return steps[::-1] # 聽reverse
19 changes: 19 additions & 0 deletions ee/clickhouse/queries/funnels/funnel_strict_persons.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from ee.clickhouse.queries.funnels.funnel_strict import ClickhouseFunnelStrict
from ee.clickhouse.sql.funnels.funnel import FUNNEL_PERSONS_BY_STEP_SQL
from posthog.models import Person


class ClickhouseFunnelStrictPersons(ClickhouseFunnelStrict):
def get_query(self, format_properties):
return FUNNEL_PERSONS_BY_STEP_SQL.format(
**format_properties,
steps_per_person_query=self.get_step_counts_query(),
persons_steps=self._get_funnel_person_step_condition()
)

def _format_results(self, results):
people = Person.objects.filter(team_id=self._team.pk, uuid__in=[val[0] for val in results])

from posthog.api.person import PersonSerializer

return PersonSerializer(people, many=True).data
2 changes: 1 addition & 1 deletion ee/clickhouse/queries/funnels/test/test_funnel_persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _create_event(**kwargs):
create_event(**kwargs)


class TestFunnel(ClickhouseTestMixin, APIBaseTest):
class TestFunnelPersons(ClickhouseTestMixin, APIBaseTest):
def _create_sample_data_multiple_dropoffs(self):
for i in range(5):
_create_person(distinct_ids=[f"user_{i}"], team=self.team)
Expand Down
Loading

0 comments on commit a5b7b31

Please sign in to comment.