Skip to content

Commit

Permalink
Refactor analytics queries (#1280)
Browse files Browse the repository at this point in the history
* Refactor analytics queries

* Fix timedelta issue

* Fix tests

* Improve test

* Fix stickiness action

* Add default shape

* Refactor tests

* Add types to filter func

* Add comments to functions

* Fix tests

* remove unused import

* remove more unused imports

* Fix retention issues
  • Loading branch information
timgl committed Jul 29, 2020
1 parent 7dc4e2e commit bfb258c
Show file tree
Hide file tree
Showing 16 changed files with 1,452 additions and 1,116 deletions.
431 changes: 16 additions & 415 deletions posthog/api/action.py

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions posthog/api/team.py
@@ -1,7 +1,8 @@
from django.db.models import QuerySet
from rest_framework import viewsets, mixins, exceptions, response, status
from posthog.models import User, Team
from rest_framework import exceptions, mixins, response, status, viewsets

from posthog.api.user import UserSerializer
from posthog.models import Team, User


class TeamUserViewSet(mixins.DestroyModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet):
Expand Down
667 changes: 3 additions & 664 deletions posthog/api/test/test_action.py

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion posthog/api/test/test_dashboard.py
Expand Up @@ -4,7 +4,6 @@
from django.utils.timezone import now
from freezegun import freeze_time

from posthog.api.action import calculate_trends
from posthog.decorators import TRENDS_ENDPOINT
from posthog.models import Dashboard, DashboardItem, Filter

Expand Down
9 changes: 6 additions & 3 deletions posthog/api/test/test_team.py
@@ -1,9 +1,12 @@
from typing import List, Dict
import random
from typing import Dict, List

from django.db.models import Q
from rest_framework import status

from posthog.models import Team, User

from .base import BaseTest
from posthog.models import User, Team
import random


class TestTeamUser(BaseTest):
Expand Down
Empty file added posthog/queries/__init__.py
Empty file.
112 changes: 112 additions & 0 deletions posthog/queries/base.py
@@ -0,0 +1,112 @@
import copy
from typing import Any, Callable, Dict, List, Optional

from dateutil.relativedelta import relativedelta
from django.db.models import Q, QuerySet

from posthog.constants import TREND_FILTER_TYPE_ACTIONS, TREND_FILTER_TYPE_EVENTS, TRENDS_CUMULATIVE, TRENDS_STICKINESS
from posthog.models import Entity, Event, Filter, Team
from posthog.utils import get_compare_period_dates

"""
process_entity_for_events takes in an Entity and team_id, and returns an Event QuerySet that's correctly filtered
"""


def process_entity_for_events(entity: Entity, team_id: int, order_by="-id") -> QuerySet:
if entity.type == TREND_FILTER_TYPE_ACTIONS:
events = Event.objects.filter(action__pk=entity.id).add_person_id(team_id)
if order_by:
events = events.order_by(order_by)
return events
elif entity.type == TREND_FILTER_TYPE_EVENTS:
return Event.objects.filter_by_event_with_people(event=entity.id, team_id=team_id, order_by=order_by)
return QuerySet()


def _determine_compared_filter(filter: Filter) -> Filter:
if not filter.date_to or not filter.date_from:
raise ValueError("You need date_from and date_to to compare")
date_from, date_to = get_compare_period_dates(filter.date_from, filter.date_to)
compared_filter = copy.deepcopy(filter)
compared_filter._date_from = date_from.date().isoformat()
compared_filter._date_to = date_to.date().isoformat()
return compared_filter


def _convert_to_comparison(trend_entity: List[Dict[str, Any]], filter: Filter, label: str) -> List[Dict[str, Any]]:
for entity in trend_entity:
days = [i for i in range(len(entity["days"]))]
labels = [
"{} {}".format(filter.interval if filter.interval is not None else "day", i)
for i in range(len(entity["labels"]))
]
entity.update(
{"labels": labels, "days": days, "label": label, "dates": entity["days"], "compare": True,}
)
return trend_entity


"""
handle_compare takes an Entity, Filter and a callable.
It'll automatically create a new entity with the 'current' and 'previous' labels and automatically pick the right date_from and date_to filters .
It will then call func(entity, filter, team_id).
"""


def handle_compare(entity: Entity, filter: Filter, func: Callable, team_id: int) -> List:
entities_list = []
trend_entity = func(entity=entity, filter=filter, team_id=team_id)
if filter.compare:
trend_entity = _convert_to_comparison(trend_entity, filter, "{} - {}".format(entity.name, "current"))
entities_list.extend(trend_entity)

compared_filter = _determine_compared_filter(filter)
compared_trend_entity = func(entity=entity, filter=compared_filter, team_id=team_id)

compared_trend_entity = _convert_to_comparison(
compared_trend_entity, compared_filter, "{} - {}".format(entity.name, "previous"),
)
entities_list.extend(compared_trend_entity)
else:
entities_list.extend(trend_entity)
return entities_list


"""
filter_events takes team_id, filter, entity and generates a Q objects that you can use to filter a QuerySet
"""


def filter_events(team_id: int, filter: Filter, entity: Optional[Entity] = None) -> Q:
filters = Q()
if filter.date_from:
filters &= Q(timestamp__gte=filter.date_from)
if filter.date_to:
relativity = relativedelta(days=1)
if filter.interval == "hour":
relativity = relativedelta(hours=1)
elif filter.interval == "minute":
relativity = relativedelta(minutes=1)
elif filter.interval == "week":
relativity = relativedelta(weeks=1)
elif filter.interval == "month":
relativity = relativedelta(months=1) - relativity # go to last day of month instead of first of next
filters &= Q(timestamp__lte=filter.date_to + relativity)
if filter.properties:
filters &= filter.properties_to_Q(team_id=team_id)
if entity and entity.properties:
filters &= entity.properties_to_Q(team_id=team_id)
return filters


class BaseQuery:
"""
Run needs to be implemented in the individual Query class. It takes in a Filter, Team
and optionally other arguments within kwargs (though use sparingly!)
The output is a List comprised of Dicts. What those dicts looks like depend on the needs of the frontend.
"""

def run(self, filter: Filter, team: Team, *args, **kwargs) -> List[Dict[str, Any]]:
raise NotImplementedError("You need to implement run")
35 changes: 35 additions & 0 deletions posthog/queries/retention.py
@@ -0,0 +1,35 @@
import datetime
from datetime import timedelta
from typing import Any, Dict, List, Optional

from posthog.models import Entity, Event, Filter, Team
from posthog.queries.base import BaseQuery


class Retention(BaseQuery):
def calculate_retention(self, filter: Filter, team: Team, start_entity: Optional[Entity] = None, total_days=11):
date_from: datetime.datetime = filter.date_from # type: ignore
filter._date_to = (date_from + timedelta(days=total_days)).isoformat()
labels_format = "%a. %-d %B"
resultset = Event.objects.query_retention(filter, team, start_entity=start_entity)

result = [
{
"values": [
resultset.get((first_day, day), {"count": 0, "people": []}) for day in range(total_days - first_day)
],
"label": "Day {}".format(first_day),
"date": (date_from + timedelta(days=first_day)).strftime(labels_format),
}
for first_day in range(total_days)
]

return result

def run(self, filter: Filter, team: Team, *args, **kwargs) -> List[Dict[str, Any]]:
return self.calculate_retention(
filter=filter,
team=team,
start_entity=filter.entities[0] if len(filter.entities) > 0 else None,
total_days=kwargs.get("total_days", 11),
)
94 changes: 94 additions & 0 deletions posthog/queries/stickiness.py
@@ -0,0 +1,94 @@
from typing import List, Dict, Any
from .base import filter_events, handle_compare, process_entity_for_events, BaseQuery
from posthog.models import Entity, Filter, Team, Event, Action
from posthog.constants import TREND_FILTER_TYPE_ACTIONS
from django.db.models import QuerySet, Count, functions
from django.utils.timezone import now
from django.db import connection
import copy


def execute_custom_sql(query, params):
cursor = connection.cursor()
cursor.execute(query, params)
return cursor.fetchall()


class Stickiness(BaseQuery):
def _serialize_entity(self, entity: Entity, filter: Filter, team_id: int) -> List[Dict[str, Any]]:
if filter.interval is None:
filter.interval = "day"

serialized: Dict[str, Any] = {
"action": entity.to_dict(),
"label": entity.name,
"count": 0,
"data": [],
"labels": [],
"days": [],
}
response = []
events = process_entity_for_events(entity=entity, team_id=team_id, order_by=None,)
events = events.filter(filter_events(team_id, filter, entity))
new_dict = copy.deepcopy(serialized)
new_dict.update(self.stickiness(filtered_events=events, entity=entity, filter=filter, team_id=team_id))
response.append(new_dict)
return response

def stickiness(self, filtered_events: QuerySet, entity: Entity, filter: Filter, team_id: int) -> Dict[str, Any]:
if not filter.date_to or not filter.date_from:
raise ValueError("_stickiness needs date_to and date_from set")
range_days = (filter.date_to - filter.date_from).days + 2

events = (
filtered_events.filter(filter_events(team_id, filter, entity))
.values("person_id")
.annotate(day_count=Count(functions.TruncDay("timestamp"), distinct=True))
.filter(day_count__lte=range_days)
)

events_sql, events_sql_params = events.query.sql_with_params()
aggregated_query = "select count(v.person_id), v.day_count from ({}) as v group by v.day_count".format(
events_sql
)
aggregated_counts = execute_custom_sql(aggregated_query, events_sql_params)

response: Dict[int, int] = {}
for result in aggregated_counts:
response[result[1]] = result[0]

labels = []
data = []

for day in range(1, range_days):
label = "{} day{}".format(day, "s" if day > 1 else "")
labels.append(label)
data.append(response[day] if day in response else 0)

return {
"labels": labels,
"days": [day for day in range(1, range_days)],
"data": data,
"count": sum(data),
}

def run(self, filter: Filter, team: Team, *args, **kwargs) -> List[Dict[str, Any]]:
response = []

if not filter.date_from:
filter._date_from = (
Event.objects.filter(team_id=team.pk)
.order_by("timestamp")[0]
.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
.isoformat()
)
if not filter.date_to:
filter._date_to = now().isoformat()

for entity in filter.entities:
if entity.type == TREND_FILTER_TYPE_ACTIONS:
entity.name = Action.objects.only("name").get(team=team, pk=entity.id).name

entity_resp = handle_compare(entity=entity, filter=filter, func=self._serialize_entity, team_id=team.pk)
response.extend(entity_resp)
return response
Empty file.

0 comments on commit bfb258c

Please sign in to comment.