Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new person matieralized #1944

Merged
merged 17 commits into from Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions ee/clickhouse/migrations/0005_person_materialized.py
@@ -0,0 +1,19 @@
from infi.clickhouse_orm import migrations # type: ignore

from ee.clickhouse.sql.person import (
MAT_PERSONS_PROP_TABLE_SQL,
MAT_PERSONS_WITH_PROPS_TABLE_SQL,
PERSONS_PROP_UP_TO_DATE_VIEW,
PERSONS_UP_TO_DATE_MATERIALIZED_VIEW,
PERSONS_UP_TO_DATE_VIEW,
PERSONS_WITH_PROPS_TABLE_SQL,
)

operations = [
migrations.RunSQL(PERSONS_UP_TO_DATE_MATERIALIZED_VIEW),
migrations.RunSQL(PERSONS_UP_TO_DATE_VIEW),
migrations.RunSQL(PERSONS_WITH_PROPS_TABLE_SQL),
migrations.RunSQL(MAT_PERSONS_WITH_PROPS_TABLE_SQL),
migrations.RunSQL(MAT_PERSONS_PROP_TABLE_SQL),
migrations.RunSQL(PERSONS_PROP_UP_TO_DATE_VIEW),
]
2 changes: 2 additions & 0 deletions ee/clickhouse/models/person.py
Expand Up @@ -12,6 +12,7 @@
from ee.clickhouse.sql.person import (
DELETE_PERSON_BY_ID,
DELETE_PERSON_DISTINCT_ID_BY_PERSON_ID,
DELETE_PERSON_MATERIALIZED_BY_ID,
GET_DISTINCT_IDS_SQL,
GET_DISTINCT_IDS_SQL_BY_ID,
GET_PERSON_BY_DISTINCT_ID,
Expand Down Expand Up @@ -142,6 +143,7 @@ def merge_people(team_id: int, target: Dict, old_id: int, old_props: Dict) -> No

def delete_person(person_id):
sync_execute(DELETE_PERSON_BY_ID, {"id": person_id,})
sync_execute(DELETE_PERSON_MATERIALIZED_BY_ID, {"id": person_id})
sync_execute(DELETE_PERSON_DISTINCT_ID_BY_PERSON_ID, {"id": person_id,})


Expand Down
7 changes: 5 additions & 2 deletions ee/clickhouse/models/property.py
Expand Up @@ -28,8 +28,11 @@ def parse_prop_clauses(key: str, filters: List[Property], team: Team, prepend: s
arg = "v{}_{}".format(prepend, idx)
operator_clause, value = get_operator(prop, arg)

filter = "(ep.key = %(k{prepend}_{idx})s) AND {operator_clause}".format(
idx=idx, operator_clause=operator_clause, prepend=prepend
filter = "(ep.key = %(k{prepend}_{idx})s) {and_statement} {operator_clause}".format(
idx=idx,
and_statement="AND" if operator_clause else "",
operator_clause=operator_clause,
prepend=prepend,
)
clause = GET_DISTINCT_IDS_BY_PROPERTY_SQL.format(
filters=filter, negation="NOT " if prop.operator and "not" in prop.operator else ""
Expand Down
176 changes: 138 additions & 38 deletions ee/clickhouse/sql/person.py
Expand Up @@ -10,6 +10,30 @@
DROP TABLE person_distinct_id
"""

DROP_PERSON_MATERIALIZED_SQL = """
DROP TABLE persons_up_to_date
"""

DROP_PERSON_VIEW_SQL = """
DROP VIEW persons_up_to_date_view
"""

DROP_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL = """
DROP TABLE persons_with_array_props_view
"""

DROP_MAT_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL = """
DROP TABLE persons_with_array_props_mv
"""

DROP_MAT_PERSONS_PROP_TABLE_SQL = """
DROP TABLE persons_properties_view
"""

DROP_PERSONS_PROP_UP_TO_DATE_VIEW_SQL = """
DROP VIEW persons_properties_up_to_date_view
"""

PERSONS_TABLE = "person"

PERSONS_TABLE_BASE_SQL = """
Expand Down Expand Up @@ -37,7 +61,7 @@
)

KAFKA_PERSONS_TABLE_SQL = PERSONS_TABLE_BASE_SQL.format(
table_name="kafka_" + PERSONS_TABLE, engine=kafka_engine(KAFKA_PERSON), extra_fields=""
table_name="kafka_" + PERSONS_TABLE, engine=kafka_engine(KAFKA_PERSON), extra_fields="",
)

PERSONS_TABLE_MV_SQL = """
Expand All @@ -56,9 +80,101 @@
table_name=PERSONS_TABLE
)

PERSONS_UP_TO_DATE_MATERIALIZED_VIEW = """
CREATE MATERIALIZED VIEW persons_up_to_date
ENGINE = AggregatingMergeTree() ORDER BY (
team_id,
updated_at,
id
)
POPULATE
AS SELECT
id,
argMaxState(team_id, created_at) team_id,
argMaxState(is_identified, created_at) is_identified,
argMaxState(properties, created_at) properties,
minState(created_at) created_at_,
maxState(created_at) updated_at
FROM {table_name}
GROUP BY id
""".format(
table_name=PERSONS_TABLE
)

PERSONS_UP_TO_DATE_VIEW = """
CREATE VIEW persons_up_to_date_view
AS
SELECT
id,
minMerge(created_at_) as created_at,
argMaxMerge(team_id) as team_id,
argMaxMerge(properties) as properties,
argMaxMerge(is_identified) as is_identified,
maxMerge(updated_at) as updated_at
FROM persons_up_to_date
GROUP BY id
"""

PERSONS_WITH_PROPS_TABLE_SQL = """
CREATE TABLE persons_with_array_props_view
(
id UUID,
created_at DateTime64,
team_id Int64,
properties VARCHAR,
is_identified Boolean,
array_property_keys Array(VARCHAR),
array_property_values Array(VARCHAR),
_timestamp UInt64,
_offset UInt64
) ENGINE = {engine}
PARTITION BY toYYYYMM(created_at)
ORDER BY (team_id, toDate(created_at), id)
SAMPLE BY id
{storage_policy}
""".format(
engine=table_engine("persons_with_array_props_view", "_timestamp"), storage_policy=STORAGE_POLICY
)

MAT_PERSONS_WITH_PROPS_TABLE_SQL = """
CREATE MATERIALIZED VIEW persons_with_array_props_mv
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to manually push the persons table into this materialized view when we deploy on prod because this is a TO table so it doesn't have auto-populating capabilities with the POPULATE

TO persons_with_array_props_view
AS SELECT
id,
created_at,
team_id,
properties,
is_identified,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) array_property_values,
_timestamp,
_offset
FROM person
"""

MAT_PERSONS_PROP_TABLE_SQL = """
CREATE MATERIALIZED VIEW persons_properties_view
ENGINE = MergeTree()
ORDER BY (team_id, key, value, id)
AS SELECT
id,
team_id,
array_property_keys as key,
array_property_values as value,
created_at
from persons_with_array_props_view
ARRAY JOIN array_property_keys, array_property_values
"""

PERSONS_PROP_UP_TO_DATE_VIEW = """
CREATE VIEW persons_properties_up_to_date_view
AS
SELECT * FROM persons_properties_view WHERE (id, created_at) IN (SELECT id, maxMerge(updated_at) as latest FROM persons_up_to_date GROUP BY id)
"""


GET_PERSON_SQL = """
SELECT * FROM person WHERE team_id = %(team_id)s
SELECT * FROM persons_up_to_date_view WHERE team_id = %(team_id)s
"""

PERSONS_DISTINCT_ID_TABLE = "person_distinct_id"
Expand Down Expand Up @@ -87,7 +203,7 @@
)

KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL = PERSONS_DISTINCT_ID_TABLE_BASE_SQL.format(
table_name="kafka_" + PERSONS_DISTINCT_ID_TABLE, engine=kafka_engine(KAFKA_PERSON_UNIQUE_ID), extra_fields=""
table_name="kafka_" + PERSONS_DISTINCT_ID_TABLE, engine=kafka_engine(KAFKA_PERSON_UNIQUE_ID), extra_fields="",
)

PERSONS_DISTINCT_ID_TABLE_MV_SQL = """
Expand All @@ -114,7 +230,7 @@
"""

GET_PERSON_BY_DISTINCT_ID = """
SELECT p.* FROM person as p inner join person_distinct_id as pid on p.id = pid.person_id where team_id = %(team_id)s AND distinct_id = %(distinct_id)s
SELECT p.* FROM persons_up_to_date_view as p inner join person_distinct_id as pid on p.id = pid.person_id where team_id = %(team_id)s AND distinct_id = %(distinct_id)s
"""

GET_PERSONS_BY_DISTINCT_IDS = """
Expand Down Expand Up @@ -168,6 +284,10 @@
ALTER TABLE person DELETE where id = %(id)s
"""

DELETE_PERSON_MATERIALIZED_BY_ID = """
ALTER TABLE persons_up_to_date DELETE where id = %(id)s
"""

DELETE_PERSON_DISTINCT_ID_BY_PERSON_ID = """
ALTER TABLE person_distinct_id DELETE where person_id = %(id)s
"""
Expand All @@ -181,61 +301,41 @@
"""

PEOPLE_THROUGH_DISTINCT_SQL = """
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM person INNER JOIN (
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM persons_up_to_date_view INNER JOIN (
SELECT DISTINCT person_id, distinct_id FROM person_distinct_id WHERE distinct_id IN ({content_sql})
) as pdi ON person.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
) as pdi ON persons_up_to_date_view.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
LIMIT 200 OFFSET %(offset)s
"""

PEOPLE_SQL = """
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM person INNER JOIN (
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM persons_up_to_date_view INNER JOIN (
SELECT DISTINCT person_id, distinct_id FROM person_distinct_id WHERE person_id IN ({content_sql})
) as pdi ON person.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
) as pdi ON persons_up_to_date_view.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
LIMIT 200 OFFSET %(offset)s
"""

PEOPLE_BY_TEAM_SQL = """
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM person INNER JOIN (
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM persons_up_to_date_view INNER JOIN (
SELECT DISTINCT person_id, distinct_id FROM person_distinct_id WHERE team_id = %(team_id)s
) as pdi ON person.id = pdi.person_id
) as pdi ON persons_up_to_date_view.id = pdi.person_id
WHERE team_id = %(team_id)s {filters}
GROUP BY id, created_at, team_id, properties, is_identified
LIMIT 100 OFFSET %(offset)s
"""

GET_PERSON_TOP_PROPERTIES = """
SELECT key, count(1) as count FROM (
SELECT
array_property_keys as key,
array_property_values as value
from (
SELECT
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) GROUP BY key ORDER BY count DESC LIMIT %(limit)s
SELECT key, count(1) as count FROM
persons_properties_up_to_date_view
WHERE team_id = %(team_id)s
GROUP BY key ORDER BY count DESC LIMIT %(limit)s
"""


GET_DISTINCT_IDS_BY_PROPERTY_SQL = """
SELECT distinct_id FROM person_distinct_id WHERE person_id {negation}IN
(
SELECT id FROM (
SELECT
id,
array_property_keys as key,
array_property_values as value
from (
SELECT
id,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) ep
WHERE {filters}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goodbye monster query

) AND team_id = %(team_id)s
SELECT id
FROM persons_properties_up_to_date_view AS ep
WHERE {filters} AND team_id = %(team_id)s
)
"""
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/trends/breakdown.py
Expand Up @@ -58,7 +58,7 @@
id,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
FROM persons_up_to_date_view WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) ep
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/trends/top_person_props.py
Expand Up @@ -16,7 +16,7 @@
id,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
FROM persons_up_to_date_view WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) ep
Expand Down
9 changes: 0 additions & 9 deletions ee/clickhouse/test/test_process_event_ee.py
Expand Up @@ -667,9 +667,6 @@ def test_alias_merge_properties(self) -> None:
distinct_ids = [item["distinct_id"] for item in get_person_distinct_ids(team_id=self.team.pk)]
self.assertEqual(sorted(distinct_ids), sorted(["old_distinct_id", "new_distinct_id"]))

# Assume that clickhouse has done replacement
ch_client.execute("OPTIMIZE TABLE person")

persons = get_persons(team_id=self.team.pk)
self.assertEqual(
persons[0]["properties"],
Expand Down Expand Up @@ -991,9 +988,6 @@ def test_distinct_team_leakage(self) -> None:
self.assertEqual(sorted(ids[self.team.pk]), sorted(["1", "2"]))
self.assertEqual(ids[team2.pk], ["2"])

# Assume that clickhouse has done replacement
ch_client.execute("OPTIMIZE TABLE person")

people1 = get_persons(team_id=self.team.pk)
people2 = get_persons(team_id=team2.pk)

Expand Down Expand Up @@ -1029,8 +1023,5 @@ def test_set_is_identified(self) -> None:
now().isoformat(),
)

# Assume that clickhouse has done replacement
ch_client.execute("OPTIMIZE TABLE person")

person_after_event = get_person_by_distinct_id(team_id=self.team.pk, distinct_id=distinct_id)
self.assertTrue(person_after_event["is_identified"])