Skip to content

Commit

Permalink
Add new person matieralized (#1944)
Browse files Browse the repository at this point in the history
* add new table migrations and change table names

* include necessaray config for new tables in tests

* fix tests and table

* fix table name param

* add populate clause

* added table for key value person props

* adjust person filtering to use new table

* .

* add ordering on updated_at

* add back all the condition handling on persons filtering endpoint

* fix typgin

* remove print

* re-order sort key for persons_up_to_date

Co-authored-by: James Greenhill <fuziontech@gmail.com>
  • Loading branch information
EDsCODE and fuziontech committed Oct 22, 2020
1 parent 8132dea commit 51105ac
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 82 deletions.
19 changes: 19 additions & 0 deletions ee/clickhouse/migrations/0005_person_materialized.py
@@ -0,0 +1,19 @@
from infi.clickhouse_orm import migrations # type: ignore

from ee.clickhouse.sql.person import (
MAT_PERSONS_PROP_TABLE_SQL,
MAT_PERSONS_WITH_PROPS_TABLE_SQL,
PERSONS_PROP_UP_TO_DATE_VIEW,
PERSONS_UP_TO_DATE_MATERIALIZED_VIEW,
PERSONS_UP_TO_DATE_VIEW,
PERSONS_WITH_PROPS_TABLE_SQL,
)

operations = [
migrations.RunSQL(PERSONS_UP_TO_DATE_MATERIALIZED_VIEW),
migrations.RunSQL(PERSONS_UP_TO_DATE_VIEW),
migrations.RunSQL(PERSONS_WITH_PROPS_TABLE_SQL),
migrations.RunSQL(MAT_PERSONS_WITH_PROPS_TABLE_SQL),
migrations.RunSQL(MAT_PERSONS_PROP_TABLE_SQL),
migrations.RunSQL(PERSONS_PROP_UP_TO_DATE_VIEW),
]
2 changes: 2 additions & 0 deletions ee/clickhouse/models/person.py
Expand Up @@ -12,6 +12,7 @@
from ee.clickhouse.sql.person import (
DELETE_PERSON_BY_ID,
DELETE_PERSON_DISTINCT_ID_BY_PERSON_ID,
DELETE_PERSON_MATERIALIZED_BY_ID,
GET_DISTINCT_IDS_SQL,
GET_DISTINCT_IDS_SQL_BY_ID,
GET_PERSON_BY_DISTINCT_ID,
Expand Down Expand Up @@ -142,6 +143,7 @@ def merge_people(team_id: int, target: Dict, old_id: int, old_props: Dict) -> No

def delete_person(person_id):
sync_execute(DELETE_PERSON_BY_ID, {"id": person_id,})
sync_execute(DELETE_PERSON_MATERIALIZED_BY_ID, {"id": person_id})
sync_execute(DELETE_PERSON_DISTINCT_ID_BY_PERSON_ID, {"id": person_id,})


Expand Down
7 changes: 5 additions & 2 deletions ee/clickhouse/models/property.py
Expand Up @@ -28,8 +28,11 @@ def parse_prop_clauses(key: str, filters: List[Property], team: Team, prepend: s
arg = "v{}_{}".format(prepend, idx)
operator_clause, value = get_operator(prop, arg)

filter = "(ep.key = %(k{prepend}_{idx})s) AND {operator_clause}".format(
idx=idx, operator_clause=operator_clause, prepend=prepend
filter = "(ep.key = %(k{prepend}_{idx})s) {and_statement} {operator_clause}".format(
idx=idx,
and_statement="AND" if operator_clause else "",
operator_clause=operator_clause,
prepend=prepend,
)
clause = GET_DISTINCT_IDS_BY_PROPERTY_SQL.format(
filters=filter, negation="NOT " if prop.operator and "not" in prop.operator else ""
Expand Down
176 changes: 138 additions & 38 deletions ee/clickhouse/sql/person.py
Expand Up @@ -10,6 +10,30 @@
DROP TABLE person_distinct_id
"""

DROP_PERSON_MATERIALIZED_SQL = """
DROP TABLE persons_up_to_date
"""

DROP_PERSON_VIEW_SQL = """
DROP VIEW persons_up_to_date_view
"""

DROP_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL = """
DROP TABLE persons_with_array_props_view
"""

DROP_MAT_PERSONS_WITH_ARRAY_PROPS_TABLE_SQL = """
DROP TABLE persons_with_array_props_mv
"""

DROP_MAT_PERSONS_PROP_TABLE_SQL = """
DROP TABLE persons_properties_view
"""

DROP_PERSONS_PROP_UP_TO_DATE_VIEW_SQL = """
DROP VIEW persons_properties_up_to_date_view
"""

PERSONS_TABLE = "person"

PERSONS_TABLE_BASE_SQL = """
Expand Down Expand Up @@ -37,7 +61,7 @@
)

KAFKA_PERSONS_TABLE_SQL = PERSONS_TABLE_BASE_SQL.format(
table_name="kafka_" + PERSONS_TABLE, engine=kafka_engine(KAFKA_PERSON), extra_fields=""
table_name="kafka_" + PERSONS_TABLE, engine=kafka_engine(KAFKA_PERSON), extra_fields="",
)

PERSONS_TABLE_MV_SQL = """
Expand All @@ -56,9 +80,101 @@
table_name=PERSONS_TABLE
)

PERSONS_UP_TO_DATE_MATERIALIZED_VIEW = """
CREATE MATERIALIZED VIEW persons_up_to_date
ENGINE = AggregatingMergeTree() ORDER BY (
team_id,
updated_at,
id
)
POPULATE
AS SELECT
id,
argMaxState(team_id, created_at) team_id,
argMaxState(is_identified, created_at) is_identified,
argMaxState(properties, created_at) properties,
minState(created_at) created_at_,
maxState(created_at) updated_at
FROM {table_name}
GROUP BY id
""".format(
table_name=PERSONS_TABLE
)

PERSONS_UP_TO_DATE_VIEW = """
CREATE VIEW persons_up_to_date_view
AS
SELECT
id,
minMerge(created_at_) as created_at,
argMaxMerge(team_id) as team_id,
argMaxMerge(properties) as properties,
argMaxMerge(is_identified) as is_identified,
maxMerge(updated_at) as updated_at
FROM persons_up_to_date
GROUP BY id
"""

PERSONS_WITH_PROPS_TABLE_SQL = """
CREATE TABLE persons_with_array_props_view
(
id UUID,
created_at DateTime64,
team_id Int64,
properties VARCHAR,
is_identified Boolean,
array_property_keys Array(VARCHAR),
array_property_values Array(VARCHAR),
_timestamp UInt64,
_offset UInt64
) ENGINE = {engine}
PARTITION BY toYYYYMM(created_at)
ORDER BY (team_id, toDate(created_at), id)
SAMPLE BY id
{storage_policy}
""".format(
engine=table_engine("persons_with_array_props_view", "_timestamp"), storage_policy=STORAGE_POLICY
)

MAT_PERSONS_WITH_PROPS_TABLE_SQL = """
CREATE MATERIALIZED VIEW persons_with_array_props_mv
TO persons_with_array_props_view
AS SELECT
id,
created_at,
team_id,
properties,
is_identified,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) array_property_values,
_timestamp,
_offset
FROM person
"""

MAT_PERSONS_PROP_TABLE_SQL = """
CREATE MATERIALIZED VIEW persons_properties_view
ENGINE = MergeTree()
ORDER BY (team_id, key, value, id)
AS SELECT
id,
team_id,
array_property_keys as key,
array_property_values as value,
created_at
from persons_with_array_props_view
ARRAY JOIN array_property_keys, array_property_values
"""

PERSONS_PROP_UP_TO_DATE_VIEW = """
CREATE VIEW persons_properties_up_to_date_view
AS
SELECT * FROM persons_properties_view WHERE (id, created_at) IN (SELECT id, maxMerge(updated_at) as latest FROM persons_up_to_date GROUP BY id)
"""


GET_PERSON_SQL = """
SELECT * FROM person WHERE team_id = %(team_id)s
SELECT * FROM persons_up_to_date_view WHERE team_id = %(team_id)s
"""

PERSONS_DISTINCT_ID_TABLE = "person_distinct_id"
Expand Down Expand Up @@ -87,7 +203,7 @@
)

KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL = PERSONS_DISTINCT_ID_TABLE_BASE_SQL.format(
table_name="kafka_" + PERSONS_DISTINCT_ID_TABLE, engine=kafka_engine(KAFKA_PERSON_UNIQUE_ID), extra_fields=""
table_name="kafka_" + PERSONS_DISTINCT_ID_TABLE, engine=kafka_engine(KAFKA_PERSON_UNIQUE_ID), extra_fields="",
)

PERSONS_DISTINCT_ID_TABLE_MV_SQL = """
Expand All @@ -114,7 +230,7 @@
"""

GET_PERSON_BY_DISTINCT_ID = """
SELECT p.* FROM person as p inner join person_distinct_id as pid on p.id = pid.person_id where team_id = %(team_id)s AND distinct_id = %(distinct_id)s
SELECT p.* FROM persons_up_to_date_view as p inner join person_distinct_id as pid on p.id = pid.person_id where team_id = %(team_id)s AND distinct_id = %(distinct_id)s
"""

GET_PERSONS_BY_DISTINCT_IDS = """
Expand Down Expand Up @@ -168,6 +284,10 @@
ALTER TABLE person DELETE where id = %(id)s
"""

DELETE_PERSON_MATERIALIZED_BY_ID = """
ALTER TABLE persons_up_to_date DELETE where id = %(id)s
"""

DELETE_PERSON_DISTINCT_ID_BY_PERSON_ID = """
ALTER TABLE person_distinct_id DELETE where person_id = %(id)s
"""
Expand All @@ -181,61 +301,41 @@
"""

PEOPLE_THROUGH_DISTINCT_SQL = """
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM person INNER JOIN (
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM persons_up_to_date_view INNER JOIN (
SELECT DISTINCT person_id, distinct_id FROM person_distinct_id WHERE distinct_id IN ({content_sql})
) as pdi ON person.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
) as pdi ON persons_up_to_date_view.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
LIMIT 200 OFFSET %(offset)s
"""

PEOPLE_SQL = """
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM person INNER JOIN (
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM persons_up_to_date_view INNER JOIN (
SELECT DISTINCT person_id, distinct_id FROM person_distinct_id WHERE person_id IN ({content_sql})
) as pdi ON person.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
) as pdi ON persons_up_to_date_view.id = pdi.person_id GROUP BY id, created_at, team_id, properties, is_identified
LIMIT 200 OFFSET %(offset)s
"""

PEOPLE_BY_TEAM_SQL = """
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM person INNER JOIN (
SELECT id, created_at, team_id, properties, is_identified, groupArray(distinct_id) FROM persons_up_to_date_view INNER JOIN (
SELECT DISTINCT person_id, distinct_id FROM person_distinct_id WHERE team_id = %(team_id)s
) as pdi ON person.id = pdi.person_id
) as pdi ON persons_up_to_date_view.id = pdi.person_id
WHERE team_id = %(team_id)s {filters}
GROUP BY id, created_at, team_id, properties, is_identified
LIMIT 100 OFFSET %(offset)s
"""

GET_PERSON_TOP_PROPERTIES = """
SELECT key, count(1) as count FROM (
SELECT
array_property_keys as key,
array_property_values as value
from (
SELECT
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) GROUP BY key ORDER BY count DESC LIMIT %(limit)s
SELECT key, count(1) as count FROM
persons_properties_up_to_date_view
WHERE team_id = %(team_id)s
GROUP BY key ORDER BY count DESC LIMIT %(limit)s
"""


GET_DISTINCT_IDS_BY_PROPERTY_SQL = """
SELECT distinct_id FROM person_distinct_id WHERE person_id {negation}IN
(
SELECT id FROM (
SELECT
id,
array_property_keys as key,
array_property_values as value
from (
SELECT
id,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) ep
WHERE {filters}
) AND team_id = %(team_id)s
SELECT id
FROM persons_properties_up_to_date_view AS ep
WHERE {filters} AND team_id = %(team_id)s
)
"""
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/trends/breakdown.py
Expand Up @@ -58,7 +58,7 @@
id,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
FROM persons_up_to_date_view WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) ep
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/trends/top_person_props.py
Expand Up @@ -16,7 +16,7 @@
id,
arrayMap(k -> toString(k.1), JSONExtractKeysAndValuesRaw(properties)) AS array_property_keys,
arrayMap(k -> toString(k.2), JSONExtractKeysAndValuesRaw(properties)) AS array_property_values
FROM person WHERE team_id = %(team_id)s
FROM persons_up_to_date_view WHERE team_id = %(team_id)s
)
ARRAY JOIN array_property_keys, array_property_values
) ep
Expand Down
9 changes: 0 additions & 9 deletions ee/clickhouse/test/test_process_event_ee.py
Expand Up @@ -667,9 +667,6 @@ def test_alias_merge_properties(self) -> None:
distinct_ids = [item["distinct_id"] for item in get_person_distinct_ids(team_id=self.team.pk)]
self.assertEqual(sorted(distinct_ids), sorted(["old_distinct_id", "new_distinct_id"]))

# Assume that clickhouse has done replacement
ch_client.execute("OPTIMIZE TABLE person")

persons = get_persons(team_id=self.team.pk)
self.assertEqual(
persons[0]["properties"],
Expand Down Expand Up @@ -991,9 +988,6 @@ def test_distinct_team_leakage(self) -> None:
self.assertEqual(sorted(ids[self.team.pk]), sorted(["1", "2"]))
self.assertEqual(ids[team2.pk], ["2"])

# Assume that clickhouse has done replacement
ch_client.execute("OPTIMIZE TABLE person")

people1 = get_persons(team_id=self.team.pk)
people2 = get_persons(team_id=team2.pk)

Expand Down Expand Up @@ -1029,8 +1023,5 @@ def test_set_is_identified(self) -> None:
now().isoformat(),
)

# Assume that clickhouse has done replacement
ch_client.execute("OPTIMIZE TABLE person")

person_after_event = get_person_by_distinct_id(team_id=self.team.pk, distinct_id=distinct_id)
self.assertTrue(person_after_event["is_identified"])

0 comments on commit 51105ac

Please sign in to comment.