From 83b5273113515654774379a4c9455293cddb3553 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 28 Oct 2020 15:18:52 -0700 Subject: [PATCH] Protobufize events to protect from malformed JSON (#2085) * Protobuf all the things * oops * Protobufize events to protect from malformed JSON * format the generated files (will need to remember this for future) * format * clean up kafka produce serializer * fixes --- ee/clickhouse/models/event.py | 36 +++-- ee/idl/__init__.py | 0 ee/idl/element.avro | 53 ------- ee/idl/element.proto | 19 +++ ee/idl/element_hash.avro | 19 --- ee/idl/event.avro | 32 ----- ee/idl/events.proto | 15 ++ ee/idl/gen/events_pb2.py | 256 ++++++++++++++++++++++++++++++++++ ee/idl/gen/events_pb2.pyi | 86 ++++++++++++ ee/idl/omni_person.proto | 12 ++ ee/idl/person.avro | 20 --- ee/idl/person.proto | 9 ++ ee/idl/person_distinct.avro | 19 --- ee/kafka_client/client.py | 23 ++- ee/kafka_client/topics.py | 2 +- posthog/api/__init__.py | 3 +- requirements.txt | 2 + requirements/dev.txt | 1 + 18 files changed, 445 insertions(+), 162 deletions(-) create mode 100644 ee/idl/__init__.py delete mode 100644 ee/idl/element.avro create mode 100644 ee/idl/element.proto delete mode 100644 ee/idl/element_hash.avro delete mode 100644 ee/idl/event.avro create mode 100644 ee/idl/events.proto create mode 100644 ee/idl/gen/events_pb2.py create mode 100644 ee/idl/gen/events_pb2.pyi create mode 100644 ee/idl/omni_person.proto delete mode 100644 ee/idl/person.avro create mode 100644 ee/idl/person.proto delete mode 100644 ee/idl/person_distinct.avro diff --git a/ee/clickhouse/models/event.py b/ee/clickhouse/models/event.py index 86c20f306e366..6a87cd81f69a8 100644 --- a/ee/clickhouse/models/event.py +++ b/ee/clickhouse/models/event.py @@ -2,14 +2,15 @@ import uuid from typing import Dict, List, Optional, Tuple, Union +import pytz from dateutil.parser import isoparse from django.utils import timezone from rest_framework import serializers from ee.clickhouse.client import sync_execute from ee.clickhouse.models.element import chain_to_elements, elements_to_string -from ee.clickhouse.models.util import cast_timestamp_or_now from ee.clickhouse.sql.events import GET_EVENTS_BY_TEAM_SQL, GET_EVENTS_SQL, INSERT_EVENT_SQL +from ee.idl.gen import events_pb2 # type: ignore from ee.kafka_client.client import ClickhouseProducer from ee.kafka_client.topics import KAFKA_EVENTS from posthog.models.element import Element @@ -26,24 +27,33 @@ def create_event( properties: Optional[Dict] = {}, elements: Optional[List[Element]] = None, ) -> str: - timestamp = cast_timestamp_or_now(timestamp) + + if not timestamp: + timestamp = timezone.now() + assert timestamp is not None + + # clickhouse specific formatting + if isinstance(timestamp, str): + timestamp = isoparse(timestamp) + else: + timestamp = timestamp.astimezone(pytz.utc) elements_chain = "" if elements and len(elements) > 0: elements_chain = elements_to_string(elements=elements) - data = { - "uuid": str(event_uuid), - "event": event, - "properties": json.dumps(properties), - "timestamp": timestamp, - "team_id": team.pk, - "distinct_id": distinct_id, - "created_at": timestamp, - "elements_chain": elements_chain, - } + pb_event = events_pb2.Event() + pb_event.uuid = str(event_uuid) + pb_event.event = event + pb_event.properties = json.dumps(properties) + pb_event.timestamp = timestamp.strftime("%Y-%m-%d %H:%M:%S.%f") + pb_event.team_id = team.pk + pb_event.distinct_id = str(distinct_id) + pb_event.elements_chain = elements_chain + p = ClickhouseProducer() - p.produce(sql=INSERT_EVENT_SQL, topic=KAFKA_EVENTS, data=data) + + p.produce_proto(sql=INSERT_EVENT_SQL, topic=KAFKA_EVENTS, data=pb_event) return str(event_uuid) diff --git a/ee/idl/__init__.py b/ee/idl/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/ee/idl/element.avro b/ee/idl/element.avro deleted file mode 100644 index 022f09e6678d3..0000000000000 --- a/ee/idl/element.avro +++ /dev/null @@ -1,53 +0,0 @@ -{ - "namespace": "com.posthog.ee", - "name": "element", - "type": "record", - "fields" : [ - { - "name" : "text", - "type" : "string" - }, - { - "name" : "tag_name", - "type" : "string" - }, - { - "name" : "href", - "type" : "string" - }, - { - "name" : "attr_id", - "type" : "string" - }, - { - "name" : "attr_class", - "type" : "string" - }, - { - "name" : "nth_child", - "type" : "string" - }, - { - "name" : "nth_of_type", - "type" : "string" - }, - { - "name" : "attributes", - "type" : "map", - "values" : "string" - }, - { - "name" : "order", - "type" : "long" - }, - { - "name" : "team_id", - "type" : "long" - }, - { - "name" : "elements_hash", - "type" : "string" - } - ] -} - diff --git a/ee/idl/element.proto b/ee/idl/element.proto new file mode 100644 index 0000000000000..a96a0c6b5a5f0 --- /dev/null +++ b/ee/idl/element.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + +message Element { + string uuid = 1; + string event_uuid = 2; + string text = 3; + string tag_name = 4; + string href = 5; + string attr_id = 6; + repeated string attr_class = 7; + uint64 nth_child = 8; + uint64 nth_of_type = 9; + string attributes = 10; + uint64 order = 11; + uint64 team_id = 12; + google.protobuf.Timestamp created_at = 13; + string elements_hash = 14; +} \ No newline at end of file diff --git a/ee/idl/element_hash.avro b/ee/idl/element_hash.avro deleted file mode 100644 index 7ddd89d7e8d68..0000000000000 --- a/ee/idl/element_hash.avro +++ /dev/null @@ -1,19 +0,0 @@ -{ - "namespace": "com.posthog.ee", - "name": "elements_hash", - "type": "record", - "fields" : [ - { - "name" : "id", - "type" : "string" - }, - { - "name" : "elements_hash", - "type" : "string" - }, - { - "name" : "team_id", - "type" : "long" - } - ] -} diff --git a/ee/idl/event.avro b/ee/idl/event.avro deleted file mode 100644 index 9d1455a3eb2fb..0000000000000 --- a/ee/idl/event.avro +++ /dev/null @@ -1,32 +0,0 @@ -{ - "namespace": "com.posthog.ee", - "name": "event", - "type": "record", - "fields" : [ - { - "name" : "event", - "type" : "string" - }, - { - "name" : "team_id", - "type" : "long" - }, - { - "name" : "properties", - "type" : "map", - "values" : "string" - }, - { - "name": "timestamp", - "type": "string" - }, - { - "name": "distinct_id", - "type": "string" - }, - { - "name": "elements_hash", - "type": "string" - } - ] -} \ No newline at end of file diff --git a/ee/idl/events.proto b/ee/idl/events.proto new file mode 100644 index 0000000000000..afb1ce07641d9 --- /dev/null +++ b/ee/idl/events.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + +message Event { + string uuid = 1; + string event = 2; + string properties = 3; + string timestamp = 4; + uint64 team_id = 5; + string distinct_id = 6; + string created_at = 7; + string elements_chain = 8; + google.protobuf.Timestamp proto_created_at = 9; + google.protobuf.Timestamp proto_timestamp = 10; +} \ No newline at end of file diff --git a/ee/idl/gen/events_pb2.py b/ee/idl/gen/events_pb2.py new file mode 100644 index 0000000000000..0df9af5722039 --- /dev/null +++ b/ee/idl/gen/events_pb2.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: events.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 + +DESCRIPTOR = _descriptor.FileDescriptor( + name="events.proto", + package="", + syntax="proto3", + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\x0c\x65vents.proto\x1a\x1fgoogle/protobuf/timestamp.proto"\x88\x02\n\x05\x45vent\x12\x0c\n\x04uuid\x18\x01 \x01(\t\x12\r\n\x05\x65vent\x18\x02 \x01(\t\x12\x12\n\nproperties\x18\x03 \x01(\t\x12\x11\n\ttimestamp\x18\x04 \x01(\t\x12\x0f\n\x07team_id\x18\x05 \x01(\x04\x12\x13\n\x0b\x64istinct_id\x18\x06 \x01(\t\x12\x12\n\ncreated_at\x18\x07 \x01(\t\x12\x16\n\x0e\x65lements_chain\x18\x08 \x01(\t\x12\x34\n\x10proto_created_at\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x33\n\x0fproto_timestamp\x18\n \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3', + dependencies=[google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,], +) + + +_EVENT = _descriptor.Descriptor( + name="Event", + full_name="Event", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="uuid", + full_name="Event.uuid", + index=0, + number=1, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="event", + full_name="Event.event", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="properties", + full_name="Event.properties", + index=2, + number=3, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="timestamp", + full_name="Event.timestamp", + index=3, + number=4, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="team_id", + full_name="Event.team_id", + index=4, + number=5, + type=4, + cpp_type=4, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="distinct_id", + full_name="Event.distinct_id", + index=5, + number=6, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="created_at", + full_name="Event.created_at", + index=6, + number=7, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="elements_chain", + full_name="Event.elements_chain", + index=7, + number=8, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="proto_created_at", + full_name="Event.proto_created_at", + index=8, + number=9, + type=11, + cpp_type=10, + label=1, + has_default_value=False, + default_value=None, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="proto_timestamp", + full_name="Event.proto_timestamp", + index=9, + number=10, + type=11, + cpp_type=10, + label=1, + has_default_value=False, + default_value=None, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=50, + serialized_end=314, +) + +_EVENT.fields_by_name["proto_created_at"].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP +_EVENT.fields_by_name["proto_timestamp"].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP +DESCRIPTOR.message_types_by_name["Event"] = _EVENT +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Event = _reflection.GeneratedProtocolMessageType( + "Event", + (_message.Message,), + { + "DESCRIPTOR": _EVENT, + "__module__": "events_pb2" + # @@protoc_insertion_point(class_scope:Event) + }, +) +_sym_db.RegisterMessage(Event) + + +# @@protoc_insertion_point(module_scope) diff --git a/ee/idl/gen/events_pb2.pyi b/ee/idl/gen/events_pb2.pyi new file mode 100644 index 0000000000000..97bd67f817b43 --- /dev/null +++ b/ee/idl/gen/events_pb2.pyi @@ -0,0 +1,86 @@ +# @generated by generate_proto_mypy_stubs.py. Do not edit! +import sys +from google.protobuf.descriptor import ( + Descriptor as google___protobuf___descriptor___Descriptor, + FileDescriptor as google___protobuf___descriptor___FileDescriptor, +) + +from google.protobuf.message import Message as google___protobuf___message___Message + +from google.protobuf.timestamp_pb2 import Timestamp as google___protobuf___timestamp_pb2___Timestamp + +from typing import ( + Optional as typing___Optional, + Text as typing___Text, +) + +from typing_extensions import Literal as typing_extensions___Literal + +builtin___bool = bool +builtin___bytes = bytes +builtin___float = float +builtin___int = int + +DESCRIPTOR: google___protobuf___descriptor___FileDescriptor = ... + +class Event(google___protobuf___message___Message): + DESCRIPTOR: google___protobuf___descriptor___Descriptor = ... + uuid: typing___Text = ... + event: typing___Text = ... + properties: typing___Text = ... + timestamp: typing___Text = ... + team_id: builtin___int = ... + distinct_id: typing___Text = ... + created_at: typing___Text = ... + elements_chain: typing___Text = ... + @property + def proto_created_at(self) -> google___protobuf___timestamp_pb2___Timestamp: ... + @property + def proto_timestamp(self) -> google___protobuf___timestamp_pb2___Timestamp: ... + def __init__( + self, + *, + uuid: typing___Optional[typing___Text] = None, + event: typing___Optional[typing___Text] = None, + properties: typing___Optional[typing___Text] = None, + timestamp: typing___Optional[typing___Text] = None, + team_id: typing___Optional[builtin___int] = None, + distinct_id: typing___Optional[typing___Text] = None, + created_at: typing___Optional[typing___Text] = None, + elements_chain: typing___Optional[typing___Text] = None, + proto_created_at: typing___Optional[google___protobuf___timestamp_pb2___Timestamp] = None, + proto_timestamp: typing___Optional[google___protobuf___timestamp_pb2___Timestamp] = None, + ) -> None: ... + def HasField( + self, + field_name: typing_extensions___Literal[ + "proto_created_at", b"proto_created_at", "proto_timestamp", b"proto_timestamp", + ], + ) -> builtin___bool: ... + def ClearField( + self, + field_name: typing_extensions___Literal[ + "created_at", + b"created_at", + "distinct_id", + b"distinct_id", + "elements_chain", + b"elements_chain", + "event", + b"event", + "properties", + b"properties", + "proto_created_at", + b"proto_created_at", + "proto_timestamp", + b"proto_timestamp", + "team_id", + b"team_id", + "timestamp", + b"timestamp", + "uuid", + b"uuid", + ], + ) -> None: ... + +type___Event = Event diff --git a/ee/idl/omni_person.proto b/ee/idl/omni_person.proto new file mode 100644 index 0000000000000..d327e1ac5f1b6 --- /dev/null +++ b/ee/idl/omni_person.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + +message Person { + string uuid = 1; + string event_uuid = 2; + uint64 team_id = 3; + string distinct_id = 4; + string properties = 5; + bool is_identified = 6; + google.protobuf.Timestamp ts = 7; +} diff --git a/ee/idl/person.avro b/ee/idl/person.avro deleted file mode 100644 index a23930e53c6bc..0000000000000 --- a/ee/idl/person.avro +++ /dev/null @@ -1,20 +0,0 @@ -{ - "namespace": "com.posthog.ee", - "name": "person", - "type": "record", - "fields" : [ - { - "name" : "id", - "type" : "string" - }, - { - "name" : "team_id", - "type" : "long" - }, - { - "name" : "properties", - "type" : "map", - "values" : "string" - } - ] -} \ No newline at end of file diff --git a/ee/idl/person.proto b/ee/idl/person.proto new file mode 100644 index 0000000000000..221f526464b5c --- /dev/null +++ b/ee/idl/person.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + +message Person { + string uuid = 1; + google.protobuf.Timestamp created_at = 2; + string properties = 3; + bool is_identified = 4; +} diff --git a/ee/idl/person_distinct.avro b/ee/idl/person_distinct.avro deleted file mode 100644 index 3308414175d27..0000000000000 --- a/ee/idl/person_distinct.avro +++ /dev/null @@ -1,19 +0,0 @@ -{ - "namespace": "com.posthog.ee", - "name": "person_distinct", - "type": "record", - "fields" : [ - { - "name" : "distinct_id", - "type" : "string" - }, - { - "name" : "team_id", - "type" : "long" - }, - { - "name" : "person_id", - "type" : "long" - } - ] -} diff --git a/ee/kafka_client/client.py b/ee/kafka_client/client.py index 8440f93cfc99a..d1b076ca5c3ec 100644 --- a/ee/kafka_client/client.py +++ b/ee/kafka_client/client.py @@ -1,7 +1,8 @@ import json -from typing import Any, Dict +from typing import Any, Callable, Dict, Optional import kafka_helper # type: ignore +from google.protobuf.json_format import MessageToJson from kafka import KafkaProducer as KP # type: ignore from ee.clickhouse.client import async_execute, sync_execute @@ -14,7 +15,7 @@ class TestKafkaProducer: def __init__(self): pass - def send(self, topic: str, data: Dict[str, Any]): + def send(self, topic: str, data: Any): return def flush(self): @@ -35,8 +36,10 @@ def json_serializer(d): b = json.dumps(d).encode("utf-8") return b - def produce(self, topic: str, data: Dict[str, Any]): - b = self.json_serializer(data) + def produce(self, topic: str, data: Any, value_serializer: Optional[Callable[[Any], Any]] = None): + if not value_serializer: + value_serializer = self.json_serializer + b = value_serializer(data) self.producer.send(topic, b) def close(self): @@ -54,6 +57,18 @@ def __init__(self): else: self.send_to_kafka = False + def produce_proto(self, sql: str, topic: str, data: Any, sync: bool = True): + if self.send_to_kafka: + self.producer.produce(topic=topic, data=data, value_serializer=lambda x: x.SerializeToString()) + else: + dict_data = json.loads( + MessageToJson(data, including_default_value_fields=True, preserving_proto_field_name=True) + ) + if sync: + sync_execute(sql, dict_data) + else: + async_execute(sql, dict_data) + def produce(self, sql: str, topic: str, data: Dict[str, Any], sync: bool = True): if self.send_to_kafka: self.producer.produce(topic=topic, data=data) diff --git a/ee/kafka_client/topics.py b/ee/kafka_client/topics.py index b038176dc8dfe..5c3d4443fca30 100644 --- a/ee/kafka_client/topics.py +++ b/ee/kafka_client/topics.py @@ -1,4 +1,4 @@ -KAFKA_EVENTS = "clickhouse_events" +KAFKA_EVENTS = "clickhouse_events_proto" KAFKA_ELEMENTS = "clickhouse_elements" KAFKA_PERSON = "clickhouse_person" KAFKA_PERSON_UNIQUE_ID = "clickhouse_person_unique_id" diff --git a/posthog/api/__init__.py b/posthog/api/__init__.py index 479806c010aae..94e02a3c49618 100644 --- a/posthog/api/__init__.py +++ b/posthog/api/__init__.py @@ -69,8 +69,9 @@ def api_not_found(request): from ee.clickhouse.views.insights import ClickhouseInsights from ee.clickhouse.views.paths import ClickhousePathsViewSet from ee.clickhouse.views.person import ClickhousePerson - except ImportError: + except ImportError as e: print("Clickhouse enabled but missing enterprise capabilities. Defaulting to postgres.") + print(e) router.register(r"action", ClickhouseActions, basename="action") router.register(r"event", ClickhouseEvents, basename="event") diff --git a/requirements.txt b/requirements.txt index a957f3c740a8f..61b7d70139265 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,7 @@ drf-exceptions-hog==0.0.4 fakeredis==1.4.3 future==0.18.2 gunicorn==20.0.4 +grpcio==1.33.1 idna==2.8 importlib-metadata==1.6.0 infi.clickhouse-orm==2.1.0 @@ -53,6 +54,7 @@ pexpect==4.7.0 pickleshare==0.7.5 posthoganalytics==1.1.2 prompt-toolkit==2.0.10 +protobuf==3.13.0 psycopg2-binary==2.8.4 ptyprocess==0.6.0 pycparser==2.20 diff --git a/requirements/dev.txt b/requirements/dev.txt index 83a29ebc78616..4c98d378e73bd 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -36,6 +36,7 @@ jedi==0.17.0 # via ipython mccabe==0.6.1 # via flake8 mypy-extensions==0.4.3 # via -r requirements/dev.in, mypy mypy==0.770 # via -r requirements/dev.in, django-stubs, djangorestframework-stubs +mypy-protobuf==1.23 packaging==20.4 # via -r requirements/dev.in parso==0.7.0 # via jedi pathspec==0.8.0 # via black