diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index a0aab823b5926..3f8acbe5cecd7 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -1,6 +1,6 @@ import uuid from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union import structlog from dateutil import parser @@ -38,38 +38,57 @@ def format_person_query( - cohort: Cohort, index: int, *, custom_match_field: str = "person_id" + cohort: Cohort, + index: int, + *, + custom_match_field: str = "person_id", + cohorts_seen: Optional[Set[int]] = None, + using_new_query: bool = False, ) -> Tuple[str, Dict[str, Any]]: - filters = [] - params: Dict[str, Any] = {} - if cohort.is_static: return format_static_cohort_query(cohort.pk, index, prepend="", custom_match_field=custom_match_field) - or_queries = [] - groups = cohort.groups + if using_new_query: + if not cohort.properties.values: + # No person can match an empty cohort + return "0 = 19", {} - if not groups: - # No person can match a cohort that has no match groups - return "0 = 19", {} + from ee.clickhouse.queries.cohort_query import CohortQuery - for group_idx, group in enumerate(groups): - if group.get("action_id") or group.get("event_id"): - entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx) - params = {**params, **entity_params} - filters.append(entity_query) + query, params = CohortQuery( + Filter(data={"properties": cohort.properties}), cohort.team, cohort_pk=cohort.pk, cohorts_seen=cohorts_seen + ).get_query() - elif group.get("properties"): - prop_query, prop_params = get_properties_cohort_subquery(cohort, group, group_idx) - or_queries.append(prop_query) - params = {**params, **prop_params} + return f"{custom_match_field} IN ({query})", params + + else: + filters = [] + params = {} + + or_queries = [] + groups = cohort.groups + + if not groups: + # No person can match a cohort that has no match groups + return "0 = 19", {} + + for group_idx, group in enumerate(groups): + if group.get("action_id") or group.get("event_id"): + entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx) + params = {**params, **entity_params} + filters.append(entity_query) + + elif group.get("properties"): + prop_query, prop_params = get_properties_cohort_subquery(cohort, group, group_idx) + or_queries.append(prop_query) + params = {**params, **prop_params} - if len(or_queries) > 0: - query = "AND ({})".format(" OR ".join(or_queries)) - filters.append("{} IN {}".format(custom_match_field, GET_LATEST_PERSON_ID_SQL.format(query=query))) + if len(or_queries) > 0: + query = "AND ({})".format(" OR ".join(or_queries)) + filters.append("{} IN {}".format(custom_match_field, GET_LATEST_PERSON_ID_SQL.format(query=query))) - joined_filter = " OR ".join(filters) - return joined_filter, params + joined_filter = " OR ".join(filters) + return joined_filter, params def format_static_cohort_query( @@ -239,8 +258,16 @@ def is_precalculated_query(cohort: Cohort) -> bool: return False -def format_filter_query(cohort: Cohort, index: int = 0, id_column: str = "distinct_id") -> Tuple[str, Dict[str, Any]]: - person_query, params = format_cohort_subquery(cohort, index) +def format_filter_query( + cohort: Cohort, + index: int = 0, + id_column: str = "distinct_id", + cohorts_seen: Optional[Set[int]] = None, + using_new_query: bool = False, +) -> Tuple[str, Dict[str, Any]]: + person_query, params = format_cohort_subquery( + cohort, index, cohorts_seen=cohorts_seen, using_new_query=using_new_query + ) person_id_query = CALCULATE_COHORT_PEOPLE_SQL.format( query=person_query, @@ -250,12 +277,24 @@ def format_filter_query(cohort: Cohort, index: int = 0, id_column: str = "distin return person_id_query, params -def format_cohort_subquery(cohort: Cohort, index: int, custom_match_field="person_id") -> Tuple[str, Dict[str, Any]]: +def format_cohort_subquery( + cohort: Cohort, + index: int, + custom_match_field="person_id", + cohorts_seen: Optional[Set[int]] = None, + using_new_query: bool = False, +) -> Tuple[str, Dict[str, Any]]: is_precalculated = is_precalculated_query(cohort) person_query, params = ( format_precalculated_cohort_query(cohort.pk, index, custom_match_field=custom_match_field) if is_precalculated - else format_person_query(cohort, index, custom_match_field=custom_match_field) + else format_person_query( + cohort, + index, + custom_match_field=custom_match_field, + cohorts_seen=cohorts_seen, + using_new_query=using_new_query, + ) ) return person_query, params @@ -296,6 +335,21 @@ def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int sync_execute(INSERT_PERSON_STATIC_COHORT, persons) +def recalculate_cohortpeople_with_new_query(cohort: Cohort) -> Optional[int]: + cohort_filter, cohort_params = format_person_query(cohort, 0, custom_match_field="id", using_new_query=True) + + count = sync_execute( + f""" + SELECT COUNT(1) + FROM person + WHERE {cohort_filter} + """, + {**cohort_params, "team_id": cohort.team_id, "cohort_id": cohort.pk}, + )[0][0] + + return count + + def recalculate_cohortpeople(cohort: Cohort) -> Optional[int]: cohort_filter, cohort_params = format_person_query(cohort, 0, custom_match_field="id") diff --git a/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr b/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr index 0904c210a2781..6c1d6a3a11b35 100644 --- a/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr +++ b/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr @@ -31,16 +31,30 @@ # name: TestCohort.test_cohortpeople_action_count.10 ' - SELECT count(*) - FROM - (SELECT 1 - FROM cohortpeople - WHERE team_id = 2 - AND cohort_id = 2 - GROUP BY person_id, - cohort_id, - team_id - HAVING sum(sign) > 0) + SELECT COUNT(1) + FROM person + WHERE id IN + (SELECT behavior_query.person_id AS id + FROM + (SELECT pdi.person_id AS person_id, + countIf(timestamp > now() - INTERVAL 3 day + AND timestamp < now() + AND ((event = '$pageview'))) = 1 AS performed_event_multiple_condition_level_level_level_0 + FROM events e + INNER JOIN + (SELECT distinct_id, + argMax(person_id, version) as person_id + FROM person_distinct_id2 + WHERE team_id = 2 + GROUP BY distinct_id + HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id + WHERE team_id = 2 + AND event IN ['$pageview'] + AND timestamp <= now() + AND timestamp >= now() - INTERVAL 3 day + GROUP BY person_id) behavior_query + WHERE 1 = 1 + AND (((performed_event_multiple_condition_level_level_level_0))) ) ' --- # name: TestCohort.test_cohortpeople_action_count.11 @@ -50,14 +64,7 @@ where cohort_id = 2 ' --- -# name: TestCohort.test_cohortpeople_action_count.2 - ' - SELECT person_id - FROM cohortpeople - where cohort_id = 2 - ' ---- -# name: TestCohort.test_cohortpeople_action_count.3 +# name: TestCohort.test_cohortpeople_action_count.12 ' SELECT count(*) @@ -72,7 +79,7 @@ HAVING sum(sign) > 0) ' --- -# name: TestCohort.test_cohortpeople_action_count.4 +# name: TestCohort.test_cohortpeople_action_count.13 ' SELECT count(*) @@ -87,14 +94,79 @@ HAVING sum(sign) > 0) ' --- -# name: TestCohort.test_cohortpeople_action_count.5 +# name: TestCohort.test_cohortpeople_action_count.14 + ' + + SELECT COUNT(1) + FROM person + WHERE id IN + (SELECT behavior_query.person_id AS id + FROM + (SELECT pdi.person_id AS person_id, + countIf(timestamp > now() - INTERVAL 3 day + AND timestamp < now() + AND ((event = '$pageview'))) > 0 AS performed_event_condition_level_level_level_0 + FROM events e + INNER JOIN + (SELECT distinct_id, + argMax(person_id, version) as person_id + FROM person_distinct_id2 + WHERE team_id = 2 + GROUP BY distinct_id + HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id + WHERE team_id = 2 + AND event IN ['$pageview'] + AND timestamp <= now() + AND timestamp >= now() - INTERVAL 3 day + GROUP BY person_id) behavior_query + WHERE 1 = 1 + AND (((performed_event_condition_level_level_level_0))) ) + ' +--- +# name: TestCohort.test_cohortpeople_action_count.15 ' SELECT person_id FROM cohortpeople where cohort_id = 2 ' --- -# name: TestCohort.test_cohortpeople_action_count.6 +# name: TestCohort.test_cohortpeople_action_count.2 + ' + + SELECT COUNT(1) + FROM person + WHERE id IN + (SELECT behavior_query.person_id AS id + FROM + (SELECT pdi.person_id AS person_id, + countIf(timestamp > now() - INTERVAL 3 day + AND timestamp < now() + AND ((event = '$pageview'))) >= 2 AS performed_event_multiple_condition_level_level_level_0 + FROM events e + INNER JOIN + (SELECT distinct_id, + argMax(person_id, version) as person_id + FROM person_distinct_id2 + WHERE team_id = 2 + GROUP BY distinct_id + HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id + WHERE team_id = 2 + AND event IN ['$pageview'] + AND timestamp <= now() + AND timestamp >= now() - INTERVAL 3 day + GROUP BY person_id) behavior_query + WHERE 1 = 1 + AND (((performed_event_multiple_condition_level_level_level_0))) ) + ' +--- +# name: TestCohort.test_cohortpeople_action_count.3 + ' + SELECT person_id + FROM cohortpeople + where cohort_id = 2 + ' +--- +# name: TestCohort.test_cohortpeople_action_count.4 ' SELECT count(*) @@ -109,7 +181,7 @@ HAVING sum(sign) > 0) ' --- -# name: TestCohort.test_cohortpeople_action_count.7 +# name: TestCohort.test_cohortpeople_action_count.5 ' SELECT count(*) @@ -124,13 +196,57 @@ HAVING sum(sign) > 0) ' --- -# name: TestCohort.test_cohortpeople_action_count.8 +# name: TestCohort.test_cohortpeople_action_count.6 + ' + + SELECT COUNT(1) + FROM person + WHERE id IN + (SELECT behavior_query.person_id AS id + FROM + (SELECT pdi.person_id AS person_id, + countIf(timestamp > now() - INTERVAL 3 day + AND timestamp < now() + AND ((event = '$pageview'))) <= 1 AS performed_event_multiple_condition_level_level_level_0 + FROM events e + INNER JOIN + (SELECT distinct_id, + argMax(person_id, version) as person_id + FROM person_distinct_id2 + WHERE team_id = 2 + GROUP BY distinct_id + HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id + WHERE team_id = 2 + AND event IN ['$pageview'] + AND timestamp <= now() + AND timestamp >= now() - INTERVAL 3 day + GROUP BY person_id) behavior_query + WHERE 1 = 1 + AND (((performed_event_multiple_condition_level_level_level_0))) ) + ' +--- +# name: TestCohort.test_cohortpeople_action_count.7 ' SELECT person_id FROM cohortpeople where cohort_id = 2 ' --- +# name: TestCohort.test_cohortpeople_action_count.8 + ' + + SELECT count(*) + FROM + (SELECT 1 + FROM cohortpeople + WHERE team_id = 2 + AND cohort_id = 2 + GROUP BY person_id, + cohort_id, + team_id + HAVING sum(sign) > 0) + ' +--- # name: TestCohort.test_cohortpeople_action_count.9 ' @@ -163,4 +279,4 @@ WHERE cohort_id = %(_cohort_id_0)s AND team_id = %(team_id)s) ' ---- \ No newline at end of file +--- diff --git a/ee/clickhouse/models/test/test_cohort.py b/ee/clickhouse/models/test/test_cohort.py index 758333983dfea..9a4d4b2d767b3 100644 --- a/ee/clickhouse/models/test/test_cohort.py +++ b/ee/clickhouse/models/test/test_cohort.py @@ -1,6 +1,7 @@ -from datetime import datetime +from datetime import datetime, timedelta from unittest.mock import patch +import pytest from django.utils import timezone from freezegun import freeze_time @@ -62,7 +63,14 @@ def test_prop_cohort_basic(self): cohort1 = Cohort.objects.create( team=self.team, - groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}], + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person"}, + {"key": "$another_prop", "value": "something", "type": "person"}, + ] + } + ], name="cohort1", ) @@ -85,14 +93,22 @@ def test_prop_cohort_basic_action(self): action = _create_action(team=self.team, name="$pageview") _create_event( - event="$pageview", team=self.team, distinct_id="some_id", properties={"attr": "some_val"}, + event="$pageview", + team=self.team, + distinct_id="some_id", + properties={"attr": "some_val"}, + timestamp=datetime.now() - timedelta(days=1), ) _create_event( - event="$not_pageview", team=self.team, distinct_id="some_other_id", properties={"attr": "some_val"}, + event="$not_pageview", + team=self.team, + distinct_id="some_other_id", + properties={"attr": "some_val"}, + timestamp=datetime.now() - timedelta(days=2), ) - cohort1 = Cohort.objects.create(team=self.team, groups=[{"action_id": action.pk}], name="cohort1",) + cohort1 = Cohort.objects.create(team=self.team, groups=[{"action_id": action.pk, "days": 3}], name="cohort1",) filter = Filter(data={"properties": [{"key": "id", "value": cohort1.pk, "type": "cohort"}],}, team=self.team) query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) @@ -115,7 +131,7 @@ def test_prop_cohort_basic_event_days(self): team=self.team, distinct_id="some_id", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=0, hours=12), ) _create_event( @@ -123,33 +139,24 @@ def test_prop_cohort_basic_event_days(self): team=self.team, distinct_id="some_other_id", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 5, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=4, hours=12), ) - with freeze_time("2020-01-10"): - cohort1 = Cohort.objects.create( - team=self.team, groups=[{"event_id": "$pageview", "days": 1}], name="cohort1", - ) + cohort1 = Cohort.objects.create(team=self.team, groups=[{"event_id": "$pageview", "days": 1}], name="cohort1",) - filter = Filter( - data={"properties": [{"key": "id", "value": cohort1.pk, "type": "cohort"}],}, team=self.team - ) - query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) - final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) - result = sync_execute(final_query, {**params, "team_id": self.team.pk}) - self.assertEqual(len(result), 1) + filter = Filter(data={"properties": [{"key": "id", "value": cohort1.pk, "type": "cohort"}],}, team=self.team) + query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) + final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) + result = sync_execute(final_query, {**params, "team_id": self.team.pk}) + self.assertEqual(len(result), 1) - cohort2 = Cohort.objects.create( - team=self.team, groups=[{"event_id": "$pageview", "days": 7}], name="cohort2", - ) + cohort2 = Cohort.objects.create(team=self.team, groups=[{"event_id": "$pageview", "days": 7}], name="cohort2",) - filter = Filter( - data={"properties": [{"key": "id", "value": cohort2.pk, "type": "cohort"}],}, team=self.team - ) - query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) - final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) - result = sync_execute(final_query, {**params, "team_id": self.team.pk}) - self.assertEqual(len(result), 2) + filter = Filter(data={"properties": [{"key": "id", "value": cohort2.pk, "type": "cohort"}],}, team=self.team) + query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) + final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) + result = sync_execute(final_query, {**params, "team_id": self.team.pk}) + self.assertEqual(len(result), 2) def test_prop_cohort_basic_action_days(self): @@ -167,7 +174,7 @@ def test_prop_cohort_basic_action_days(self): team=self.team, distinct_id="some_id", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), + timestamp=datetime.now() - timedelta(hours=22), ) _create_event( @@ -175,33 +182,24 @@ def test_prop_cohort_basic_action_days(self): team=self.team, distinct_id="some_other_id", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 5, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=5), ) - with freeze_time("2020-01-10"): - cohort1 = Cohort.objects.create( - team=self.team, groups=[{"action_id": action.pk, "days": 1}], name="cohort1", - ) + cohort1 = Cohort.objects.create(team=self.team, groups=[{"action_id": action.pk, "days": 1}], name="cohort1",) - filter = Filter( - data={"properties": [{"key": "id", "value": cohort1.pk, "type": "cohort"}],}, team=self.team - ) - query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) - final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) - result = sync_execute(final_query, {**params, "team_id": self.team.pk}) - self.assertEqual(len(result), 1) + filter = Filter(data={"properties": [{"key": "id", "value": cohort1.pk, "type": "cohort"}],}, team=self.team) + query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) + final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) + result = sync_execute(final_query, {**params, "team_id": self.team.pk}) + self.assertEqual(len(result), 1) - cohort2 = Cohort.objects.create( - team=self.team, groups=[{"action_id": action.pk, "days": 7}], name="cohort2", - ) + cohort2 = Cohort.objects.create(team=self.team, groups=[{"action_id": action.pk, "days": 7}], name="cohort2",) - filter = Filter( - data={"properties": [{"key": "id", "value": cohort2.pk, "type": "cohort"}],}, team=self.team - ) - query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) - final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) - result = sync_execute(final_query, {**params, "team_id": self.team.pk}) - self.assertEqual(len(result), 2) + filter = Filter(data={"properties": [{"key": "id", "value": cohort2.pk, "type": "cohort"}],}, team=self.team) + query, params = parse_prop_grouped_clauses(team_id=self.team.pk, property_group=filter.property_groups) + final_query = "SELECT uuid FROM events WHERE team_id = %(team_id)s {}".format(query) + result = sync_execute(final_query, {**params, "team_id": self.team.pk}) + self.assertEqual(len(result), 2) def test_prop_cohort_multiple_groups(self): @@ -218,7 +216,10 @@ def test_prop_cohort_multiple_groups(self): cohort1 = Cohort.objects.create( team=self.team, - groups=[{"properties": {"$some_prop": "something"}}, {"properties": {"$another_prop": "something"}}], + groups=[ + {"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}, + {"properties": [{"key": "$another_prop", "value": "something", "type": "person"}]}, + ], name="cohort1", ) @@ -263,7 +264,9 @@ def test_cohort_get_person_ids_by_cohort_id(self): user2 = _create_person(distinct_ids=["user2"], team_id=self.team.pk, properties={"$some_prop": "another"}) user3 = _create_person(distinct_ids=["user3"], team_id=self.team.pk, properties={"$some_prop": "something"}) cohort = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop": "something"}}], name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}], + name="cohort1", ) results = get_person_ids_by_cohort_id(self.team, cohort.id) @@ -318,7 +321,14 @@ def test_cohortpeople_basic(self): cohort1 = Cohort.objects.create( team=self.team, - groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}], + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person"}, + {"key": "$another_prop", "value": "something", "type": "person"}, + ] + } + ], name="cohort1", ) @@ -340,7 +350,14 @@ def test_cohortpeople_basic_paginating(self, mock_sleep): cohort1: Cohort = Cohort.objects.create( team=self.team, - groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}], + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person"}, + {"key": "$another_prop", "value": "something", "type": "person"}, + ] + } + ], name="cohort1", ) @@ -361,7 +378,7 @@ def test_cohortpeople_action_basic(self): team=self.team, distinct_id="1", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), + timestamp=datetime.now() - timedelta(hours=12), ) p2 = Person.objects.create( @@ -375,12 +392,11 @@ def test_cohortpeople_action_basic(self): team=self.team, distinct_id="2", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), + timestamp=datetime.now() - timedelta(hours=12), ) cohort1 = Cohort.objects.create(team=self.team, groups=[{"action_id": action.pk, "days": 1}], name="cohort1",) - with freeze_time("2020-01-10"): - cohort1.calculate_people_ch(pending_version=0) + cohort1.calculate_people_ch(pending_version=0) results = sync_execute( "SELECT person_id FROM cohortpeople WHERE cohort_id = %(cohort_id)s", {"cohort_id": cohort1.pk} @@ -388,57 +404,13 @@ def test_cohortpeople_action_basic(self): self.assertEqual(len(results), 2) cohort2 = Cohort.objects.create(team=self.team, groups=[{"action_id": action.pk, "days": 1}], name="cohort2",) - with freeze_time("2020-01-10"): - cohort2.calculate_people_ch(pending_version=0) + cohort2.calculate_people_ch(pending_version=0) results = sync_execute( "SELECT person_id FROM cohortpeople WHERE cohort_id = %(cohort_id)s", {"cohort_id": cohort2.pk} ) self.assertEqual(len(results), 2) - def test_cohortpeople_timestamp(self): - action = _create_action(team=self.team, name="$pageview") - p1 = Person.objects.create( - team_id=self.team.pk, - distinct_ids=["1"], - properties={"$some_prop": "something", "$another_prop": "something"}, - ) - - _create_event( - event="$pageview", - team=self.team, - distinct_id="1", - properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), - ) - - p2 = Person.objects.create( - team_id=self.team.pk, - distinct_ids=["2"], - properties={"$some_prop": "something", "$another_prop": "something"}, - ) - - _create_event( - event="$pageview", - team=self.team, - distinct_id="2", - properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 7, 12, 0, 1), - ) - - cohort1 = Cohort.objects.create( - team=self.team, - groups=[{"action_id": action.pk, "start_date": datetime(2020, 1, 8, 12, 0, 1)}], - name="cohort1", - ) - with freeze_time("2020-01-10"): - cohort1.calculate_people_ch(pending_version=0) - - results = sync_execute( - "SELECT person_id FROM cohortpeople where team_id = %(team_id)s", {"team_id": self.team.pk} - ) - self.assertEqual(len(results), 1) - def _setup_actions_with_different_counts(self): action = _create_action(team=self.team, name="$pageview") p1 = Person.objects.create( @@ -452,14 +424,14 @@ def _setup_actions_with_different_counts(self): team=self.team, distinct_id="1", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 8, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=1, hours=12), ) _create_event( event="$pageview", team=self.team, distinct_id="1", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=0, hours=12), ) p2 = Person.objects.create( @@ -473,7 +445,7 @@ def _setup_actions_with_different_counts(self): team=self.team, distinct_id="2", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 8, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=1, hours=12), ) _create_event( @@ -481,7 +453,7 @@ def _setup_actions_with_different_counts(self): team=self.team, distinct_id="2", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=0, hours=12), ) p3 = Person.objects.create( @@ -495,7 +467,7 @@ def _setup_actions_with_different_counts(self): team=self.team, distinct_id="3", properties={"attr": "some_val"}, - timestamp=datetime(2020, 1, 9, 12, 0, 1), + timestamp=datetime.now() - timedelta(days=0, hours=12), ) p4 = Person.objects.create( @@ -522,8 +494,7 @@ def test_cohortpeople_action_count(self): groups=[{"action_id": action.pk, "days": 3, "count": 2, "count_operator": "gte"}], name="cohort1", ) - with freeze_time("2020-01-10"): - cohort1.calculate_people_ch(pending_version=0) + cohort1.calculate_people_ch(pending_version=0) results = sync_execute( "SELECT person_id FROM cohortpeople where cohort_id = %(cohort_id)s", {"cohort_id": cohort1.pk} @@ -535,8 +506,7 @@ def test_cohortpeople_action_count(self): groups=[{"action_id": action.pk, "days": 3, "count": 1, "count_operator": "lte"}], name="cohort2", ) - with freeze_time("2020-01-10"): - cohort2.calculate_people_ch(pending_version=0) + cohort2.calculate_people_ch(pending_version=0) results = sync_execute( "SELECT person_id FROM cohortpeople where cohort_id = %(cohort_id)s", {"cohort_id": cohort2.pk} @@ -548,8 +518,7 @@ def test_cohortpeople_action_count(self): groups=[{"action_id": action.pk, "days": 3, "count": 1, "count_operator": "eq"}], name="cohort3", ) - with freeze_time("2020-01-10"): - cohort3.calculate_people_ch(pending_version=0) + cohort3.calculate_people_ch(pending_version=0) results = sync_execute( "SELECT person_id FROM cohortpeople where cohort_id = %(cohort_id)s", {"cohort_id": cohort3.pk} @@ -561,13 +530,12 @@ def test_cohortpeople_action_count(self): groups=[{"action_id": action.pk, "days": 3, "count": 0, "count_operator": "eq"}], name="cohort4", ) - with freeze_time("2020-01-10"): - cohort4.calculate_people_ch(pending_version=0) + cohort4.calculate_people_ch(pending_version=0) results = sync_execute( - "SELECT person_id FROM cohortpeople where cohort_id = %(cohort_id)s", {"cohort_id": cohort4.pk} + "SELECT person_id FROM cohortpeople where cohort_id = %(cohort_id)s", {"cohort_id": cohort3.pk} ) - self.assertEqual(len(results), 2) + self.assertEqual(len(results), 1) def test_cohortpeople_deleted_person(self): p1 = Person.objects.create( @@ -583,7 +551,14 @@ def test_cohortpeople_deleted_person(self): cohort1 = Cohort.objects.create( team=self.team, - groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}], + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person"}, + {"key": "$another_prop", "value": "something", "type": "person"}, + ] + } + ], name="cohort1", ) @@ -592,7 +567,7 @@ def test_cohortpeople_deleted_person(self): cohort1.calculate_people_ch(pending_version=0) def test_cohortpeople_prop_changed(self): - with freeze_time("2020-01-10"): + with freeze_time((datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d")): p1 = Person.objects.create( team_id=self.team.pk, distinct_ids=["1"], @@ -606,13 +581,20 @@ def test_cohortpeople_prop_changed(self): cohort1 = Cohort.objects.create( team=self.team, - groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}], + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person"}, + {"key": "$another_prop", "value": "something", "type": "person"}, + ] + } + ], name="cohort1", ) - cohort1.calculate_people_ch(pending_version=0) + cohort1.calculate_people_ch(pending_version=0) - with freeze_time("2020-01-11"): + with freeze_time((datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d")): p2.properties = {"$some_prop": "another", "$another_prop": "another"} p2.save() @@ -627,24 +609,28 @@ def test_cohortpeople_prop_changed(self): self.assertEqual(results[0][0], p1.uuid) def test_cohort_change(self): - with freeze_time("2020-01-10"): - p1 = Person.objects.create( - team_id=self.team.pk, - distinct_ids=["1"], - properties={"$some_prop": "something", "$another_prop": "something"}, - ) - p2 = Person.objects.create( - team_id=self.team.pk, - distinct_ids=["2"], - properties={"$some_prop": "another", "$another_prop": "another"}, - ) + p1 = Person.objects.create( + team_id=self.team.pk, + distinct_ids=["1"], + properties={"$some_prop": "something", "$another_prop": "something"}, + ) + p2 = Person.objects.create( + team_id=self.team.pk, distinct_ids=["2"], properties={"$some_prop": "another", "$another_prop": "another"}, + ) - cohort1 = Cohort.objects.create( - team=self.team, - groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}], - name="cohort1", - ) - cohort1.calculate_people_ch(pending_version=0) + cohort1 = Cohort.objects.create( + team=self.team, + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person"}, + {"key": "$another_prop", "value": "something", "type": "person"}, + ] + } + ], + name="cohort1", + ) + cohort1.calculate_people_ch(pending_version=0) results = sync_execute( "SELECT person_id FROM cohortpeople WHERE team_id = %(team_id)s GROUP BY person_id, team_id, cohort_id HAVING sum(sign) > 0", @@ -654,10 +640,17 @@ def test_cohort_change(self): self.assertEqual(len(results), 1) self.assertEqual(results[0][0], p1.uuid) - with freeze_time("2020-01-11"): - cohort1.groups = [{"properties": {"$some_prop": "another", "$another_prop": "another"}}] - cohort1.save() - cohort1.calculate_people_ch(pending_version=0) + cohort1.groups = [ + { + "properties": [ + {"key": "$some_prop", "value": "another", "type": "person"}, + {"key": "$another_prop", "value": "another", "type": "person"}, + ] + } + ] + cohort1.save() + + cohort1.calculate_people_ch(pending_version=0) results = sync_execute( "SELECT person_id FROM cohortpeople WHERE team_id = %(team_id)s GROUP BY person_id, team_id, cohort_id HAVING sum(sign) > 0", @@ -678,8 +671,7 @@ def test_static_cohort_precalculated(self): cohort = Cohort.objects.create(team=self.team, groups=[], is_static=True, last_calculation=timezone.now(),) cohort.insert_users_by_list(["1", "123"]) - with freeze_time("2020-01-10"): - cohort.calculate_people_ch(pending_version=0) + cohort.calculate_people_ch(pending_version=0) with self.settings(USE_PRECALCULATED_CH_COHORT_PEOPLE=True): sql, _ = format_filter_query(cohort) @@ -690,7 +682,7 @@ def test_cohortpeople_with_valid_other_cohort_filter(self): p2 = Person.objects.create(team_id=self.team.pk, distinct_ids=["2"], properties={"foo": "non"},) cohort0: Cohort = Cohort.objects.create( - team=self.team, groups=[{"properties": {"foo": "bar"}}], name="cohort0", + team=self.team, groups=[{"properties": [{"key": "foo", "value": "bar", "type": "person"}]}], name="cohort0", ) cohort0.calculate_people_ch(pending_version=0) @@ -739,9 +731,54 @@ def test_cohortpeople_with_cyclic_cohort_filter(self): )[0][0] self.assertEqual(count_result, 2) + @pytest.mark.skip("Old cohorts don't handle this case") + def test_cohortpeople_with_misdirecting_cyclic_cohort_filter(self): + p1 = Person.objects.create(team_id=self.team.pk, distinct_ids=["1"], properties={"foo": "bar"},) + p2 = Person.objects.create(team_id=self.team.pk, distinct_ids=["2"], properties={"foo": "non"},) + + cohort1: Cohort = Cohort.objects.create( + team=self.team, groups=[], name="cohort1", + ) + cohort2: Cohort = Cohort.objects.create( + team=self.team, groups=[], name="cohort2", + ) + cohort3: Cohort = Cohort.objects.create( + team=self.team, groups=[], name="cohort3", + ) + cohort4: Cohort = Cohort.objects.create( + team=self.team, groups=[], name="cohort4", + ) + cohort5: Cohort = Cohort.objects.create( + team=self.team, groups=[], name="cohort5", + ) + + cohort1.groups = [{"properties": [{"key": "id", "type": "cohort", "value": cohort2.id}]}] + cohort1.save() + cohort2.groups = [{"properties": [{"key": "id", "type": "cohort", "value": cohort3.id}]}] + cohort2.save() + cohort3.groups = [{"properties": [{"key": "id", "type": "cohort", "value": cohort4.id}]}] + cohort3.save() + cohort4.groups = [{"properties": [{"key": "id", "type": "cohort", "value": cohort2.id}]}] + cohort4.save() + cohort5.groups = [{"properties": [{"key": "id", "type": "cohort", "value": cohort1.id}]}] + cohort5.save() + + # cohort1 depends on cohort2 which depends on cohort3 which depends on cohort4 which depends on cohort2 + # and cohort5 depends on cohort1 + + with self.assertRaises(ValueError): + cohort5.calculate_people_ch(pending_version=0) + + count_result = sync_execute( + "SELECT count(person_id) FROM cohortpeople where cohort_id = %(cohort_id)s", {"cohort_id": cohort1.pk} + )[0][0] + self.assertEqual(count_result, 0) + def test_clickhouse_empty_query(self): cohort2 = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop": "nomatchihope"}}], name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "$some_prop", "value": "nomatchihope", "type": "person"}]}], + name="cohort1", ) cohort2.calculate_people_ch(pending_version=0) diff --git a/ee/clickhouse/queries/cohort_query.py b/ee/clickhouse/queries/cohort_query.py index c91500702db6d..60922d83b6dae 100644 --- a/ee/clickhouse/queries/cohort_query.py +++ b/ee/clickhouse/queries/cohort_query.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union from ee.clickhouse.materialized_columns.columns import ColumnName from ee.clickhouse.models.cohort import format_filter_query, get_count_operator, get_entity_query @@ -123,6 +123,9 @@ def __init__( self, filter: Filter, team: Team, + *, + cohort_pk: Optional[int] = None, + cohorts_seen: Optional[Set[int]] = None, round_interval=False, should_join_distinct_ids=False, should_join_persons=False, @@ -137,6 +140,8 @@ def __init__( self._events = [] self._earliest_time_for_event_query = None self._restrict_event_query_by_time = True + self._cohort_pk = cohort_pk + self._cohorts_seen = cohorts_seen super().__init__( filter=filter, team=team, @@ -161,7 +166,7 @@ def get_query(self) -> Tuple[str, Dict[str, Any]]: if not self._outer_property_groups: # everything is pushed down, no behavioural stuff to do # thus, use personQuery directly - return self._person_query.get_query() + return self._person_query.get_query(prepend=self._cohort_pk) # TODO: clean up this kludge. Right now, get_conditions has to run first so that _fields is populated for _get_behavioral_subquery() conditions, condition_params = self._get_conditions() @@ -173,7 +178,7 @@ def get_query(self) -> Tuple[str, Dict[str, Any]]: subq.append((behavior_subquery, behavior_query_alias)) self.params.update(behavior_subquery_params) - person_query, person_params, person_query_alias = self._get_persons_query() + person_query, person_params, person_query_alias = self._get_persons_query(prepend=str(self._cohort_pk)) subq.append((person_query, person_query_alias)) self.params.update(person_params) @@ -218,6 +223,7 @@ def _get_behavior_subquery(self) -> Tuple[str, Dict[str, Any], str]: # # Get the subquery for the cohort query. # + event_param_name = f"{self._cohort_pk}_event_ids" query, params = "", {} if self._should_join_behavioral_query: @@ -231,19 +237,19 @@ def _get_behavior_subquery(self) -> Tuple[str, Dict[str, Any], str]: SELECT {", ".join(_fields)} FROM events {self.EVENT_TABLE_ALIAS} {self._get_distinct_id_query()} WHERE team_id = %(team_id)s - AND event IN %(events)s + AND event IN %({event_param_name})s {date_condition} GROUP BY person_id """ - query, params = (query, {"team_id": self._team_id, "events": self._events, **date_params,}) + query, params = (query, {"team_id": self._team_id, event_param_name: self._events, **date_params,}) return query, params, self.BEHAVIOR_QUERY_ALIAS - def _get_persons_query(self) -> Tuple[str, Dict[str, Any], str]: + def _get_persons_query(self, prepend: str = "") -> Tuple[str, Dict[str, Any], str]: query, params = "", {} if self._should_join_persons: - person_query, person_params = self._person_query.get_query() + person_query, person_params = self._person_query.get_query(prepend=prepend) person_query = f"SELECT *, id AS person_id FROM ({person_query})" query, params = person_query, person_params @@ -330,28 +336,23 @@ def get_person_condition(self, prop: Property, prepend: str, idx: int) -> Tuple[ def get_cohort_condition(self, prop: Property, prepend: str, idx: int) -> Tuple[str, Dict[str, Any]]: - q, params = "", {} try: prop_cohort: Cohort = Cohort.objects.get(pk=prop.value, team_id=self._team_id) except Cohort.DoesNotExist: - q = "0 = 14" + return "0 = 14", {} - # TODO: renable this check when this class accepts a cohort not filter - - # if prop_cohort.pk == cohort.pk: - # # If we've encountered a cyclic dependency (meaning this cohort depends on this cohort), - # # we treat it as satisfied for all persons - # pass - # else: - - # TODO: format_filter_query uses the deprecated way of building cohorts - # Update format_filter_query to use this class or use this class directly when backwards compatibility is achieved - # This function will only work for old cohorts right now - person_id_query, cohort_filter_params = format_filter_query(prop_cohort, idx, "person_id") - q = f"id IN ({person_id_query})" - params = cohort_filter_params - - return q, params + if prop_cohort.pk == self._cohort_pk or (self._cohorts_seen and prop_cohort.pk in self._cohorts_seen): + # If we've encountered a cyclic dependency (meaning this cohort depends on this cohort eventually), + # we treat it as an invalid cohort condition + raise ValueError(f"Cyclic dependency detected when computing cohort with id: {prop_cohort.pk}") + else: + cohorts_seen = list(self._cohorts_seen) if self._cohorts_seen is not None else [] + if self._cohort_pk is not None: + cohorts_seen.append(self._cohort_pk) + person_id_query, cohort_filter_params = format_filter_query( + prop_cohort, idx, "person_id", cohorts_seen=set(cohorts_seen), using_new_query=True + ) + return f"id IN ({person_id_query})", cohort_filter_params def get_performed_event_condition(self, prop: Property, prepend: str, idx: int) -> Tuple[str, Dict[str, Any]]: event = (prop.event_type, prop.key) diff --git a/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr b/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr index 2076ecfd4da93..d2089fb533222 100644 --- a/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr +++ b/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr @@ -115,6 +115,20 @@ # name: TestClickhouseFunnel.test_funnel_with_precalculated_cohort_step_filter.2 ' + SELECT COUNT(1) + FROM person + WHERE id IN + (SELECT id + FROM person + WHERE team_id = 2 + GROUP BY id + HAVING max(is_deleted) = 0 + AND ((replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), 'email'), '^"|"$', '') ILIKE '%.com%'))) + ' +--- +# name: TestClickhouseFunnel.test_funnel_with_precalculated_cohort_step_filter.3 + ' + SELECT countIf(steps = 1) step_1, countIf(steps = 2) step_2, avg(step_1_average_conversion_time_inner) step_1_average_conversion_time, diff --git a/ee/clickhouse/queries/funnels/test/test_funnel_trends.py b/ee/clickhouse/queries/funnels/test/test_funnel_trends.py index cb4e518f55867..f768bbc6883d1 100644 --- a/ee/clickhouse/queries/funnels/test/test_funnel_trends.py +++ b/ee/clickhouse/queries/funnels/test/test_funnel_trends.py @@ -1187,7 +1187,11 @@ def test_funnel_trend_cohort_breakdown(self): self.team, ) - cohort = Cohort.objects.create(team=self.team, name="test_cohort", groups=[{"properties": {"key": "value"}}]) + cohort = Cohort.objects.create( + team=self.team, + name="test_cohort", + groups=[{"properties": [{"key": "key", "value": "value", "type": "person"}]}], + ) filter = Filter( data={ "insight": INSIGHT_FUNNELS, diff --git a/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr b/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr index c4479ddec72bc..53d7335435e19 100644 --- a/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr +++ b/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr @@ -143,6 +143,20 @@ # name: TestClickhouseSessionRecordingsList.test_event_filter_with_cohort_properties.2 ' + SELECT COUNT(1) + FROM person + WHERE id IN + (SELECT id + FROM person + WHERE team_id = 2 + GROUP BY id + HAVING max(is_deleted) = 0 + AND ((has(['some_val'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), '$some_prop'), '^"|"$', ''))))) + ' +--- +# name: TestClickhouseSessionRecordingsList.test_event_filter_with_cohort_properties.3 + ' + SELECT session_recordings.session_id, any(session_recordings.start_time) as start_time, any(session_recordings.end_time) as end_time, diff --git a/ee/clickhouse/queries/session_recordings/test/test_clickhouse_session_recording_list.py b/ee/clickhouse/queries/session_recordings/test/test_clickhouse_session_recording_list.py index 3fd1d51a5ceb3..90ae6589f6d96 100644 --- a/ee/clickhouse/queries/session_recordings/test/test_clickhouse_session_recording_list.py +++ b/ee/clickhouse/queries/session_recordings/test/test_clickhouse_session_recording_list.py @@ -52,7 +52,9 @@ def test_event_filter_with_cohort_properties(self): team=self.team, distinct_ids=["user2"], properties={"email": "bla2", "$some_prop": "some_val"} ) cohort = Cohort.objects.create( - team=self.team, name="cohort1", groups=[{"properties": {"$some_prop": "some_val"}}] + team=self.team, + name="cohort1", + groups=[{"properties": [{"key": "$some_prop", "value": "some_val", "type": "person"}]}], ) cohort.calculate_people_ch(pending_version=0) diff --git a/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr b/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr index 3c19dd36c0d13..f5a0e1327306b 100644 --- a/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr +++ b/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr @@ -31,14 +31,41 @@ # name: TestEventQuery.test_account_filters.2 ' + SELECT COUNT(1) + FROM person + WHERE id IN + (SELECT id + FROM person + WHERE team_id = 2 + GROUP BY id + HAVING max(is_deleted) = 0 + AND ((has(['Jane'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), 'name'), '^"|"$', ''))))) + ' +--- +# name: TestEventQuery.test_account_filters.3 + ' + SELECT e.timestamp as timestamp, - e.properties as properties + pdi.person_id as person_id FROM events e + INNER JOIN + (SELECT distinct_id, + argMax(person_id, version) as person_id + FROM person_distinct_id2 + WHERE team_id = 2 + GROUP BY distinct_id + HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id + INNER JOIN + (SELECT id + FROM person + WHERE team_id = 2 + GROUP BY id + HAVING max(is_deleted) = 0 + AND (has(['Jane'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), 'name'), '^"|"$', '')))) person ON person.id = pdi.person_id WHERE team_id = 2 AND event = 'event_name' AND timestamp >= toStartOfDay(toDateTime('2021-01-14 00:00:00'), 'UTC') AND timestamp <= toDateTime('2021-01-21 23:59:59', 'UTC') - AND (has(['Jane'], replaceRegexpAll(JSONExtractRaw(e.properties, 'name'), '^"|"$', ''))) ' --- # name: TestEventQuery.test_basic_event_filter diff --git a/ee/clickhouse/queries/test/__snapshots__/test_person_query.ambr b/ee/clickhouse/queries/test/__snapshots__/test_person_query.ambr index fa4636cfded05..cb53807d42eed 100644 --- a/ee/clickhouse/queries/test/__snapshots__/test_person_query.ambr +++ b/ee/clickhouse/queries/test/__snapshots__/test_person_query.ambr @@ -19,7 +19,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0)s) + HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0)s) ' @@ -32,7 +32,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND (( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery_global_0_1)s), '^"|"$', '') ILIKE %(vpersonquery_global_0_1)s)) + HAVING max(is_deleted) = 0 AND (( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery__0_1)s), '^"|"$', '') ILIKE %(vpersonquery__0_1)s)) ' @@ -45,7 +45,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0)s AND has(%(vpersonquery_global_1)s, replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery_global_1)s), '^"|"$', '')) AND has(%(vpersonquery_global_2)s, replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery_global_2)s), '^"|"$', ''))) + HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0)s AND has(%(vpersonquery__1)s, replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery__1)s), '^"|"$', '')) AND has(%(vpersonquery__2)s, replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery__2)s), '^"|"$', ''))) ' @@ -71,7 +71,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0)s) + HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0)s) ' @@ -84,7 +84,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND (( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery_global_0_1)s), '^"|"$', '') ILIKE %(vpersonquery_global_0_1)s)) + HAVING max(is_deleted) = 0 AND (( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery__0_1)s), '^"|"$', '') ILIKE %(vpersonquery__0_1)s)) ' @@ -97,7 +97,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND ((( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0_0_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery_global_0_0_1)s), '^"|"$', '') ILIKE %(vpersonquery_global_0_0_1)s))AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_1_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery_global_1_1)s), '^"|"$', '') ILIKE %(vpersonquery_global_1_1)s)) + HAVING max(is_deleted) = 0 AND ((( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0_0_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery__0_0_1)s), '^"|"$', '') ILIKE %(vpersonquery__0_0_1)s))AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__1_0)s OR replaceRegexpAll(JSONExtractRaw(argMax(person.properties, _timestamp), %(kpersonquery__1_1)s), '^"|"$', '') ILIKE %(vpersonquery__1_1)s)) ' @@ -110,7 +110,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0)s) + HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0)s) ' @@ -123,7 +123,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0)s) + HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0)s) ' @@ -136,7 +136,7 @@ WHERE team_id = %(team_id)s GROUP BY id - HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery_global_0)s) + HAVING max(is_deleted) = 0 AND ( argMax(person."pmat_email", _timestamp) ILIKE %(vpersonquery__0)s) ' diff --git a/ee/clickhouse/queries/test/test_breakdown_props.py b/ee/clickhouse/queries/test/test_breakdown_props.py index ee6ef130fa4b6..71e24598a7936 100644 --- a/ee/clickhouse/queries/test/test_breakdown_props.py +++ b/ee/clickhouse/queries/test/test_breakdown_props.py @@ -75,7 +75,9 @@ def test_breakdown_person_props_with_entity_filter(self): properties={"key": "val"}, ) - cohort = Cohort.objects.create(team=self.team, name="a", groups=[{"properties": {"$browser": "test"}}]) + cohort = Cohort.objects.create( + team=self.team, name="a", groups=[{"properties": [{"key": "$browser", "value": "test", "type": "person"}]}] + ) cohort.calculate_people_ch(pending_version=0) entity_params = [ diff --git a/ee/clickhouse/queries/test/test_cohort_query.py b/ee/clickhouse/queries/test/test_cohort_query.py index 21aa144574006..5eb43a304d675 100644 --- a/ee/clickhouse/queries/test/test_cohort_query.py +++ b/ee/clickhouse/queries/test/test_cohort_query.py @@ -869,6 +869,7 @@ def test_earliest_date_clause(self): "time_value": 2, "time_interval": "week", "value": "performed_event_multiple", + "operator_value": 1, "type": "behavioural", }, { @@ -1057,7 +1058,11 @@ def test_cohort_filter(self): p1 = Person.objects.create( team_id=self.team.pk, distinct_ids=["p1"], properties={"name": "test", "name": "test"} ) - cohort = _create_cohort(team=self.team, name="cohort1", groups=[{"properties": {"name": "test"}}]) + cohort = _create_cohort( + team=self.team, + name="cohort1", + groups=[{"properties": [{"key": "name", "value": "test", "type": "person"}]}], + ) filter = Filter( data={"properties": {"type": "AND", "values": [{"key": "id", "value": cohort.pk, "type": "cohort"}],},} @@ -1072,7 +1077,11 @@ def test_cohort_filter_with_extra(self): p1 = Person.objects.create( team_id=self.team.pk, distinct_ids=["p1"], properties={"name": "test", "name": "test"} ) - cohort = _create_cohort(team=self.team, name="cohort1", groups=[{"properties": {"name": "test"}}]) + cohort = _create_cohort( + team=self.team, + name="cohort1", + groups=[{"properties": [{"key": "name", "value": "test", "type": "person"}]}], + ) p2 = Person.objects.create( team_id=self.team.pk, distinct_ids=["p2"], properties={"name": "test", "email": "test@posthog.com"} diff --git a/ee/clickhouse/queries/test/test_event_query.py b/ee/clickhouse/queries/test/test_event_query.py index 886c5b6683a5a..0b459748b9829 100644 --- a/ee/clickhouse/queries/test/test_event_query.py +++ b/ee/clickhouse/queries/test/test_event_query.py @@ -126,7 +126,11 @@ def test_event_properties_filter(self): # just smoke test making sure query runs because no new functions are used here @snapshot_clickhouse_queries def test_cohort_filter(self): - cohort = _create_cohort(team=self.team, name="cohort1", groups=[{"properties": {"name": "test"}}]) + cohort = _create_cohort( + team=self.team, + name="cohort1", + groups=[{"properties": [{"key": "name", "value": "test", "type": "person"}]}], + ) filter = Filter( data={ @@ -142,7 +146,11 @@ def test_cohort_filter(self): # just smoke test making sure query runs because no new functions are used here @snapshot_clickhouse_queries def test_entity_filtered_by_cohort(self): - cohort = _create_cohort(team=self.team, name="cohort1", groups=[{"properties": {"name": "test"}}]) + cohort = _create_cohort( + team=self.team, + name="cohort1", + groups=[{"properties": [{"key": "name", "value": "test", "type": "person"}]}], + ) filter = Filter( data={ @@ -193,7 +201,11 @@ def test_account_filters(self): _create_event(event="event_name", team=self.team, distinct_id="person_2") _create_event(event="event_name", team=self.team, distinct_id="person_2") - cohort = Cohort.objects.create(team=self.team, name="cohort1", groups=[{"properties": {"name": "Jane"}}]) + cohort = Cohort.objects.create( + team=self.team, + name="cohort1", + groups=[{"properties": [{"key": "name", "value": "Jane", "type": "person"}]}], + ) cohort.calculate_people_ch(pending_version=0) self.team.test_account_filters = [{"key": "id", "value": cohort.pk, "type": "cohort"}] diff --git a/ee/clickhouse/queries/test/test_trends.py b/ee/clickhouse/queries/test/test_trends.py index 420e0ee76cea4..b4db5a91713f8 100644 --- a/ee/clickhouse/queries/test/test_trends.py +++ b/ee/clickhouse/queries/test/test_trends.py @@ -619,7 +619,9 @@ def test_action_with_prop(self): def test_combine_all_cohort_and_icontains(self): # This caused some issues with SQL parsing sign_up_action, _ = self._create_events() - cohort = Cohort.objects.create(team=self.team, name="a", groups=[{"properties": {"key": "value"}}]) + cohort = Cohort.objects.create( + team=self.team, name="a", groups=[{"properties": [{"key": "key", "value": "value", "type": "person"}]}] + ) action_response = ClickhouseTrends().run( Filter( data={ diff --git a/ee/clickhouse/queries/test/test_util.py b/ee/clickhouse/queries/test/test_util.py index 70bb4b4df85c8..4124dde01014d 100644 --- a/ee/clickhouse/queries/test/test_util.py +++ b/ee/clickhouse/queries/test/test_util.py @@ -41,9 +41,7 @@ def test_get_earliest_timestamp_with_no_events(db, team): def test_parse_breakdown_cohort_query(db, team): action = Action.objects.create(team=team, name="$pageview") ActionStep.objects.create(action=action, event="$pageview") - cohort1 = Cohort.objects.create( - team=team, groups=[{"action_id": action.pk, "start_date": datetime(2020, 1, 8, 12, 0, 1)}], name="cohort1", - ) + cohort1 = Cohort.objects.create(team=team, groups=[{"action_id": action.pk, "days": 3}], name="cohort1",) queries, params = _parse_breakdown_cohorts([cohort1]) assert len(queries) == 1 sync_execute(queries[0], params) diff --git a/ee/clickhouse/queries/trends/test/test_formula.py b/ee/clickhouse/queries/trends/test/test_formula.py index d223be54b86d2..49e080b326f75 100644 --- a/ee/clickhouse/queries/trends/test/test_formula.py +++ b/ee/clickhouse/queries/trends/test/test_formula.py @@ -226,7 +226,9 @@ def test_breakdown_counts_of_different_events_one_without_events(self): def test_breakdown_cohort(self): cohort = Cohort.objects.create( - team=self.team, name="cohort1", groups=[{"properties": {"$some_prop": "some_val"}}] + team=self.team, + name="cohort1", + groups=[{"properties": [{"key": "$some_prop", "value": "some_val", "type": "person"}]}], ) response = self._run({"breakdown": ["all", cohort.pk], "breakdown_type": "cohort"}) self.assertEqual(response[0]["data"], [0.0, 0.0, 0.0, 0.0, 0.0, 1200.0, 1350.0, 0.0]) diff --git a/ee/clickhouse/views/test/test_clickhouse_trends.py b/ee/clickhouse/views/test/test_clickhouse_trends.py index e46db4d48dd1a..e43ea896cb9f8 100644 --- a/ee/clickhouse/views/test/test_clickhouse_trends.py +++ b/ee/clickhouse/views/test/test_clickhouse_trends.py @@ -51,7 +51,10 @@ def test_includes_only_intervals_within_range(client: Client): distinct_id = "abc" update_or_create_person(distinct_ids=[distinct_id], team_id=team.id, properties={"cohort_identifier": 1}) cohort = create_cohort_ok( - client=client, team_id=team.id, name="test cohort", groups=[{"properties": {"cohort_identifier": 1}}] + client=client, + team_id=team.id, + name="test cohort", + groups=[{"properties": [{"key": "cohort_identifier", "value": 1, "type": "person"}]}], ) journeys_for( diff --git a/posthog/api/test/test_action_people.py b/posthog/api/test/test_action_people.py index 322441b062918..5e771c99647e3 100644 --- a/posthog/api/test/test_action_people.py +++ b/posthog/api/test/test_action_people.py @@ -541,7 +541,11 @@ def test_people_csv(self): def test_breakdown_by_cohort_people_endpoint(self): person1, _, _, _ = self._create_multiple_people() - cohort = _create_cohort(name="cohort1", team=self.team, groups=[{"properties": {"name": "person1"}}]) + cohort = _create_cohort( + name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "name", "value": "person1", "type": "person"}]}], + ) _create_cohort(name="cohort2", team=self.team, groups=[{"properties": {"name": "person2"}}]) _create_cohort( name="cohort3", diff --git a/posthog/api/test/test_cohort.py b/posthog/api/test/test_cohort.py index ced3eafb920a5..104c390cec8c1 100644 --- a/posthog/api/test/test_cohort.py +++ b/posthog/api/test/test_cohort.py @@ -202,7 +202,9 @@ def test_csv_export(self): distinct_ids=["person3"], team_id=self.team.pk, properties={"$some_prop": "something"} ) cohort = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop": "something"}}], name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}], + name="cohort1", ) cohort.calculate_people_ch(pending_version=0) diff --git a/posthog/api/test/test_insight.py b/posthog/api/test/test_insight.py index 0515c1c4f261c..2c2e72cddfe80 100644 --- a/posthog/api/test/test_insight.py +++ b/posthog/api/test/test_insight.py @@ -801,8 +801,8 @@ def test_serializer(self, patch_capture_exception): self.assertEqual(patch_capture_exception.call_count, 0, patch_capture_exception.call_args_list) # Breakdown with ints in funnels - cohort_one_id = self._create_one_person_cohort({"prop": 5}) - cohort_two_id = self._create_one_person_cohort({"prop": 6}) + cohort_one_id = self._create_one_person_cohort([{"key": "prop", "value": 5, "type": "person"}]) + cohort_two_id = self._create_one_person_cohort([{"key": "prop", "value": 6, "type": "person"}]) events = [ {"id": "$pageview", "properties": [{"key": "something", "value": ["something"]}]}, @@ -815,7 +815,7 @@ def test_serializer(self, patch_capture_exception): self.assertEqual(response.status_code, 200) self.assertEqual(patch_capture_exception.call_count, 0, patch_capture_exception.call_args_list) - def _create_one_person_cohort(self, properties: Dict[str, Any]) -> int: + def _create_one_person_cohort(self, properties: List[Dict[str, Any]]) -> int: Person.objects.create(team=self.team, properties=properties) cohort_one_id = self.client.post( f"/api/projects/{self.team.id}/cohorts", data={"name": "whatever", "groups": [{"properties": properties}]}, diff --git a/posthog/api/test/test_person.py b/posthog/api/test/test_person.py index d3f5d385fdba2..1a81dc5f27fd6 100644 --- a/posthog/api/test/test_person.py +++ b/posthog/api/test/test_person.py @@ -120,7 +120,9 @@ def test_filter_by_cohort(self): ) flush_persons_and_events() - cohort = Cohort.objects.create(team=self.team, groups=[{"properties": {"$os": "Chrome"}}]) + cohort = Cohort.objects.create( + team=self.team, groups=[{"properties": [{"key": "$os", "value": "Chrome", "type": "person"}]}] + ) cohort.calculate_people_ch(pending_version=0) response = self.client.get(f"/api/cohort/{cohort.pk}/persons") @@ -139,7 +141,9 @@ def test_filter_by_cohort_prop(self): team=self.team, distinct_ids=[f"target"], properties={"$os": "Chrome", "$browser": "Safari"}, ) - cohort = Cohort.objects.create(team=self.team, groups=[{"properties": {"$os": "Chrome"}}]) + cohort = Cohort.objects.create( + team=self.team, groups=[{"properties": [{"key": "$os", "value": "Chrome", "type": "person"}]}] + ) cohort.calculate_people_ch(pending_version=0) response = self.client.get( @@ -159,7 +163,9 @@ def test_filter_by_cohort_search(self): ) flush_persons_and_events() - cohort = Cohort.objects.create(team=self.team, groups=[{"properties": {"$os": "Chrome"}}]) + cohort = Cohort.objects.create( + team=self.team, groups=[{"properties": [{"key": "$os", "value": "Chrome", "type": "person"}]}] + ) cohort.calculate_people_ch(pending_version=0) response = self.client.get(f"/api/cohort/{cohort.pk}/persons?search=target") @@ -491,10 +497,16 @@ def test_person_cohorts(self) -> None: team=self.team, distinct_ids=["2"], properties={"$some_prop": "something", "number": 2}, immediate=True ) cohort1 = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop": "something"}}], name="cohort1" + team=self.team, + groups=[{"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}], + name="cohort1", + ) + cohort2 = Cohort.objects.create( + team=self.team, groups=[{"properties": [{"key": "number", "value": 1, "type": "person"}]}], name="cohort2" + ) + cohort3 = Cohort.objects.create( + team=self.team, groups=[{"properties": [{"key": "number", "value": 2, "type": "person"}]}], name="cohort3" ) - cohort2 = Cohort.objects.create(team=self.team, groups=[{"properties": {"number": 1}}], name="cohort2") - cohort3 = Cohort.objects.create(team=self.team, groups=[{"properties": {"number": 2}}], name="cohort3") cohort1.calculate_people_ch(pending_version=0) cohort2.calculate_people_ch(pending_version=0) cohort3.calculate_people_ch(pending_version=0) diff --git a/posthog/models/cohort.py b/posthog/models/cohort.py index 9cb4286165d69..b600f3a8a2ef0 100644 --- a/posthog/models/cohort.py +++ b/posthog/models/cohort.py @@ -1,6 +1,6 @@ import time from datetime import datetime -from typing import Any, Dict, List, Literal, Optional +from typing import Any, Dict, List, Literal, Optional, cast import structlog from django.conf import settings @@ -10,6 +10,9 @@ from django.utils import timezone from sentry_sdk import capture_exception +from posthog.constants import PropertyOperatorType +from posthog.models.filters.filter import Filter +from posthog.models.property import Property, PropertyGroup from posthog.models.utils import sane_repr from posthog.settings.base_variables import TEST @@ -73,7 +76,6 @@ class Cohort(models.Model): description: models.CharField = models.CharField(max_length=1000, blank=True) team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE) deleted: models.BooleanField = models.BooleanField(default=False) - groups: models.JSONField = models.JSONField(default=list) people: models.ManyToManyField = models.ManyToManyField("Person", through="CohortPeople") version: models.IntegerField = models.IntegerField(blank=True, null=True) pending_version: models.IntegerField = models.IntegerField(blank=True, null=True) @@ -90,7 +92,55 @@ class Cohort(models.Model): objects = CohortManager() + # deprecated + groups: models.JSONField = models.JSONField(default=list) + + @property + def properties(self) -> PropertyGroup: + # convert deprecated groups to properties + if self.groups: + property_groups = [] + for group in self.groups: + if group.get("properties"): + # Do not try simplifying properties at this stage. We'll let this happen at query time. + property_groups.append( + Filter(data={**group, "is_simplified": True}, team=self.team).property_groups + ) + elif group.get("action_id") or group.get("event_id"): + key = group.get("action_id") or group.get("event_id") + event_type: Literal["actions", "events"] = "actions" if group.get("action_id") else "events" + try: + count = int(group.get("count") or 0) + except ValueError: + count = 0 + + property_groups.append( + PropertyGroup( + PropertyOperatorType.AND, + [ + Property( + key=key, + type="behavioural", + value="performed_event_multiple" if count else "performed_event", + event_type=event_type, + time_interval="day", + time_value=group.get("days"), + operator=group.get("count_operator"), + operator_value=count, + ), + ], + ) + ) + else: + # invalid state + return PropertyGroup(PropertyOperatorType.OR, cast(List[Property], [])) + + return PropertyGroup(PropertyOperatorType.OR, property_groups) + + return PropertyGroup(PropertyOperatorType.OR, cast(List[Property], [])) + def get_analytics_metadata(self): + # TODO: add analytics for new cohort prop types action_groups_count: int = 0 properties_groups_count: int = 0 for group in self.groups: @@ -137,7 +187,7 @@ def calculate_people(self, new_version: int, batch_size=10000, pg_batch_size=100 raise err def calculate_people_ch(self, pending_version): - from ee.clickhouse.models.cohort import recalculate_cohortpeople + from ee.clickhouse.models.cohort import recalculate_cohortpeople, recalculate_cohortpeople_with_new_query from posthog.tasks.cohorts_in_feature_flag import get_cohort_ids_in_feature_flags logger.info("cohort_calculation_started", id=self.pk, current_version=self.version, new_version=pending_version) @@ -182,6 +232,15 @@ def calculate_people_ch(self, pending_version): duration=(time.monotonic() - start_time), ) + try: + new_query_count = recalculate_cohortpeople_with_new_query(self) + if new_query_count != count: + raise ValueError("Count mismatch between new query and old query", new_query_count, count) + except Exception as exception: + capture_exception( + exception, {"cohort_id": self.pk, "properties": self.properties.to_dict()}, + ) + def insert_users_by_list(self, items: List[str]) -> None: """ Items can be distinct_id or email diff --git a/posthog/models/property.py b/posthog/models/property.py index 27ae658e452e7..544a31ede35a7 100644 --- a/posthog/models/property.py +++ b/posthog/models/property.py @@ -42,7 +42,6 @@ class BehaviouralPropertyType(str, Enum): "behavioural", ] - PropertyName = str TableWithProperties = Literal["events", "person", "groups"] OperatorType = Literal[ @@ -88,7 +87,6 @@ class BehaviouralPropertyType(str, Enum): "event_type", "time_value", "time_interval", - "operator", "operator_value", ], BehaviouralPropertyType.PERFORMED_EVENT_FIRST_TIME: ["key", "value", "event_type", "time_value", "time_interval",], @@ -110,7 +108,6 @@ class BehaviouralPropertyType(str, Enum): "time_value", "time_interval", "operator_value", - "operator", "min_periods", "total_periods", ], @@ -211,6 +208,9 @@ def __init__( self.seq_time_interval = seq_time_interval self.negation = None if negation is None else str_to_bool(negation) + if self.type not in VALIDATE_PROP_TYPES.keys(): + raise ValueError(f"Invalid property type: {self.type}") + for key in VALIDATE_PROP_TYPES[self.type]: if getattr(self, key, None) is None: raise ValueError(f"Missing required key {key} for property type {self.type}") diff --git a/posthog/queries/event_query.py b/posthog/queries/event_query.py index 5c211e9e8d267..5c218833d3128 100644 --- a/posthog/queries/event_query.py +++ b/posthog/queries/event_query.py @@ -125,8 +125,8 @@ def _does_cohort_need_persons(self, prop: Property) -> bool: return True if cohort.is_static: return True - for group in cohort.groups: - if group.get("properties"): + for property in cohort.properties.flat: + if property.type == "person": return True return False diff --git a/posthog/queries/person_query.py b/posthog/queries/person_query.py index 527a43d63094b..23f4a956ee867 100644 --- a/posthog/queries/person_query.py +++ b/posthog/queries/person_query.py @@ -64,15 +64,15 @@ def __init__( properties ).inner - def get_query(self) -> Tuple[str, Dict]: + def get_query(self, prepend: str = "") -> Tuple[str, Dict]: fields = "id" + " ".join( f", argMax({column_name}, _timestamp) as {alias}" for column_name, alias in self._get_fields() ) - person_filters, params = self._get_person_filters() + person_filters, params = self._get_person_filters(prepend=prepend) cohort_query, cohort_params = self._get_cohort_query() limit_offset, limit_params = self._get_limit_offset() - search_clause, search_params = self._get_search_clause() + search_clause, search_params = self._get_search_clause(prepend=prepend) return ( f""" @@ -118,13 +118,14 @@ def _get_fields(self) -> List[Tuple[str, str]]: return [(column_name, self.ALIASES.get(column_name, column_name)) for column_name in sorted(columns)] - def _get_person_filters(self) -> Tuple[str, Dict]: + def _get_person_filters(self, prepend: str = "") -> Tuple[str, Dict]: return parse_prop_grouped_clauses( self._team_id, self._inner_person_properties, has_person_id_joined=False, group_properties_joined=False, person_properties_mode=PersonPropertiesMode.DIRECT, + prepend=prepend, ) def _get_cohort_query(self) -> Tuple[str, Dict]: @@ -169,7 +170,7 @@ def _get_limit_offset(self) -> Tuple[str, Dict]: return clause, params - def _get_search_clause(self) -> Tuple[str, Dict]: + def _get_search_clause(self, prepend: str = "") -> Tuple[str, Dict]: if not isinstance(self._filter, Filter): return "", {} @@ -182,22 +183,21 @@ def _get_search_clause(self) -> Tuple[str, Dict]: search_clause, params = parse_prop_grouped_clauses( self._team_id, prop_group, - prepend="search", + prepend=f"search_{prepend}", has_person_id_joined=False, group_properties_joined=False, person_properties_mode=PersonPropertiesMode.DIRECT, _top_level=False, ) - distinct_id_clause = """ + distinct_id_param = f"distinct_id_{prepend}" + distinct_id_clause = f""" id IN ( - SELECT person_id FROM ({query}) where distinct_id = %(distinct_id)s - ) - """.format( - query=get_team_distinct_ids_query(self._team_id) + SELECT person_id FROM ({get_team_distinct_ids_query(self._team_id)}) where distinct_id = %({distinct_id_param})s ) + """ - params.update({"distinct_id": self._filter.search}) + params.update({distinct_id_param: self._filter.search}) return f"AND (({search_clause}) OR ({distinct_id_clause}))", params diff --git a/posthog/queries/test/test_trends.py b/posthog/queries/test/test_trends.py index f8e7464fb33ad..ef25e4401ad0a 100644 --- a/posthog/queries/test/test_trends.py +++ b/posthog/queries/test/test_trends.py @@ -1865,12 +1865,23 @@ def test_breakdown_by_empty_cohort(self): @test_with_materialized_columns(person_properties=["name"], verify_no_jsonextract=False) def test_breakdown_by_cohort(self): person1, person2, person3, person4 = self._create_multiple_people() - cohort = cohort_factory(name="cohort1", team=self.team, groups=[{"properties": {"name": "person1"}}]) - cohort2 = cohort_factory(name="cohort2", team=self.team, groups=[{"properties": {"name": "person2"}}]) + cohort = cohort_factory( + name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "name", "value": "person1", "type": "person",}]}], + ) + cohort2 = cohort_factory( + name="cohort2", + team=self.team, + groups=[{"properties": [{"key": "name", "value": "person2", "type": "person",}]}], + ) cohort3 = cohort_factory( name="cohort3", team=self.team, - groups=[{"properties": {"name": "person1"}}, {"properties": {"name": "person2"}},], + groups=[ + {"properties": [{"key": "name", "value": "person1", "type": "person"}]}, + {"properties": [{"key": "name", "value": "person2", "type": "person"}]}, + ], ) action = action_factory(name="watched movie", team=self.team) diff --git a/posthog/test/test_cohort_model.py b/posthog/test/test_cohort_model.py index dc4abcd9d0e81..ec70bd19afd7f 100644 --- a/posthog/test/test_cohort_model.py +++ b/posthog/test/test_cohort_model.py @@ -43,7 +43,9 @@ def test_calculating_cohort_clickhouse(self): distinct_ids=["person3"], team_id=self.team.pk, properties={"$some_prop": "something"} ) cohort = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop": "something"}}], name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}], + name="cohort1", ) cohort.calculate_people_ch(pending_version=0) @@ -51,14 +53,17 @@ def test_calculating_cohort_clickhouse(self): uuids = [ row[0] for row in sync_execute( - "SELECT person_id FROM cohortpeople WHERE cohort_id = %(cohort_id)s", {"cohort_id": cohort.pk} + "SELECT person_id FROM cohortpeople WHERE cohort_id = %(cohort_id)s AND team_id = %(team_id)s GROUP BY person_id, cohort_id, team_id HAVING sum(sign) > 0", + {"cohort_id": cohort.pk, "team_id": self.team.pk}, ) ] self.assertCountEqual(uuids, [person1.uuid, person3.uuid]) def test_empty_query(self): cohort2 = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop": "nomatchihope"}}], name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "$some_prop", "value": "nomatchihope", "type": "person"}]}], + name="cohort1", ) cohort2.calculate_people_ch(pending_version=0) @@ -74,7 +79,9 @@ def test_batch_delete_cohort_people(self, patch_sleep): distinct_ids=["person3"], team_id=self.team.pk, properties={"$some_prop": "something"} ) cohort = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop": "something"}}], name="cohort1", + team=self.team, + groups=[{"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}], + name="cohort1", ) cohort.calculate_people_ch(pending_version=0) @@ -94,3 +101,128 @@ def test_batch_delete_cohort_people(self, patch_sleep): self.assertEqual(CohortPeople.objects.count(), 2) batch_delete_cohort_people(cohort_id=cohort.pk, version=1, batch_size=1) self.assertEqual(CohortPeople.objects.count(), 0) + + def test_group_to_property_conversion(self): + cohort = Cohort.objects.create( + team=self.team, + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person", "operator": "contains"}, + {"key": "other_prop", "value": "other_value", "type": "person"}, + ] + }, + {"days": "4", "count": "3", "label": "$pageview", "action_id": 1, "count_operator": "eq"}, + ], + name="cohort1", + ) + + self.assertEqual( + cohort.properties.to_dict(), + { + "type": "OR", + "values": [ + { + "type": "AND", + "values": [ + {"key": "$some_prop", "type": "person", "value": "something", "operator": "contains"}, + {"key": "other_prop", "type": "person", "value": "other_value"}, + ], + }, + { + "type": "AND", + "values": [ + { + "key": 1, + "type": "behavioural", + "value": "performed_event_multiple", + "event_type": "actions", + "operator": "eq", + "operator_value": 3, + "time_interval": "day", + "time_value": "4", + } + ], + }, + ], + }, + ) + + def test_group_to_property_conversion_with_valid_zero_count(self): + cohort = Cohort.objects.create( + team=self.team, + groups=[ + { + "properties": [ + {"key": "$some_prop", "value": "something", "type": "person", "operator": "contains"}, + {"key": "other_prop", "value": "other_value", "type": "person"}, + ] + }, + {"days": "4", "count": "0", "label": "$pageview", "event_id": "$pageview", "count_operator": "gte"}, + ], + name="cohort1", + ) + + self.assertEqual( + cohort.properties.to_dict(), + { + "type": "OR", + "values": [ + { + "type": "AND", + "values": [ + {"key": "$some_prop", "type": "person", "value": "something", "operator": "contains"}, + {"key": "other_prop", "type": "person", "value": "other_value"}, + ], + }, + { + "type": "AND", + "values": [ + { + "key": "$pageview", + "type": "behavioural", + "value": "performed_event", + "event_type": "events", + "operator": "gte", + "operator_value": 0, + "time_interval": "day", + "time_value": "4", + } + ], + }, + ], + }, + ) + + def test_group_to_property_conversion_with_valid_zero_count_different_operator(self): + cohort = Cohort.objects.create( + team=self.team, + groups=[ + {"days": "4", "count": "0", "label": "$pageview", "event_id": "$pageview", "count_operator": "lte"}, + ], + name="cohort1", + ) + + self.assertEqual( + cohort.properties.to_dict(), + { + "type": "OR", + "values": [ + { + "type": "AND", + "values": [ + { + "key": "$pageview", + "type": "behavioural", + "value": "performed_event", + "event_type": "events", + "operator": "lte", + "operator_value": 0, + "time_interval": "day", + "time_value": "4", + } + ], + } + ], + }, + ) diff --git a/posthog/test/test_feature_flag.py b/posthog/test/test_feature_flag.py index 96cfdc83e91ae..c24bcd257ed51 100644 --- a/posthog/test/test_feature_flag.py +++ b/posthog/test/test_feature_flag.py @@ -81,7 +81,9 @@ def test_multi_property_filters(self): def test_user_in_cohort(self): Person.objects.create(team=self.team, distinct_ids=["example_id_1"], properties={"$some_prop_1": "something_1"}) cohort = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop_1": "something_1"}}], name="cohort1" + team=self.team, + groups=[{"properties": [{"key": "$some_prop_1", "value": "something_1", "type": "person"}]}], + name="cohort1", ) cohort.calculate_people_ch(pending_version=0) @@ -132,7 +134,9 @@ def test_legacy_rollout_and_property_filter(self): def test_legacy_user_in_cohort(self): Person.objects.create(team=self.team, distinct_ids=["example_id_2"], properties={"$some_prop_2": "something_2"}) cohort = Cohort.objects.create( - team=self.team, groups=[{"properties": {"$some_prop_2": "something_2"}}], name="cohort2" + team=self.team, + groups=[{"properties": [{"key": "$some_prop_2", "value": "something_2", "type": "person"}]}], + name="cohort2", ) cohort.calculate_people_ch(pending_version=0)