Skip to content

Commit

Permalink
Protobufize events to protect from malformed JSON (#2085)
Browse files Browse the repository at this point in the history
* Protobuf all the things

* oops

* Protobufize events to protect from malformed JSON

* format the generated files (will need to remember this for future)

* format

* clean up kafka produce serializer

* fixes
  • Loading branch information
fuziontech committed Oct 28, 2020
1 parent d95c8c8 commit 83b5273
Show file tree
Hide file tree
Showing 18 changed files with 445 additions and 162 deletions.
36 changes: 23 additions & 13 deletions ee/clickhouse/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import uuid
from typing import Dict, List, Optional, Tuple, Union

import pytz
from dateutil.parser import isoparse
from django.utils import timezone
from rest_framework import serializers

from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.element import chain_to_elements, elements_to_string
from ee.clickhouse.models.util import cast_timestamp_or_now
from ee.clickhouse.sql.events import GET_EVENTS_BY_TEAM_SQL, GET_EVENTS_SQL, INSERT_EVENT_SQL
from ee.idl.gen import events_pb2 # type: ignore
from ee.kafka_client.client import ClickhouseProducer
from ee.kafka_client.topics import KAFKA_EVENTS
from posthog.models.element import Element
Expand All @@ -26,24 +27,33 @@ def create_event(
properties: Optional[Dict] = {},
elements: Optional[List[Element]] = None,
) -> str:
timestamp = cast_timestamp_or_now(timestamp)

if not timestamp:
timestamp = timezone.now()
assert timestamp is not None

# clickhouse specific formatting
if isinstance(timestamp, str):
timestamp = isoparse(timestamp)
else:
timestamp = timestamp.astimezone(pytz.utc)

elements_chain = ""
if elements and len(elements) > 0:
elements_chain = elements_to_string(elements=elements)

data = {
"uuid": str(event_uuid),
"event": event,
"properties": json.dumps(properties),
"timestamp": timestamp,
"team_id": team.pk,
"distinct_id": distinct_id,
"created_at": timestamp,
"elements_chain": elements_chain,
}
pb_event = events_pb2.Event()
pb_event.uuid = str(event_uuid)
pb_event.event = event
pb_event.properties = json.dumps(properties)
pb_event.timestamp = timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")
pb_event.team_id = team.pk
pb_event.distinct_id = str(distinct_id)
pb_event.elements_chain = elements_chain

p = ClickhouseProducer()
p.produce(sql=INSERT_EVENT_SQL, topic=KAFKA_EVENTS, data=data)

p.produce_proto(sql=INSERT_EVENT_SQL, topic=KAFKA_EVENTS, data=pb_event)
return str(event_uuid)


Expand Down
Empty file added ee/idl/__init__.py
Empty file.
53 changes: 0 additions & 53 deletions ee/idl/element.avro

This file was deleted.

19 changes: 19 additions & 0 deletions ee/idl/element.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";

message Element {
string uuid = 1;
string event_uuid = 2;
string text = 3;
string tag_name = 4;
string href = 5;
string attr_id = 6;
repeated string attr_class = 7;
uint64 nth_child = 8;
uint64 nth_of_type = 9;
string attributes = 10;
uint64 order = 11;
uint64 team_id = 12;
google.protobuf.Timestamp created_at = 13;
string elements_hash = 14;
}
19 changes: 0 additions & 19 deletions ee/idl/element_hash.avro

This file was deleted.

32 changes: 0 additions & 32 deletions ee/idl/event.avro

This file was deleted.

15 changes: 15 additions & 0 deletions ee/idl/events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";

message Event {
string uuid = 1;
string event = 2;
string properties = 3;
string timestamp = 4;
uint64 team_id = 5;
string distinct_id = 6;
string created_at = 7;
string elements_chain = 8;
google.protobuf.Timestamp proto_created_at = 9;
google.protobuf.Timestamp proto_timestamp = 10;
}
Loading

0 comments on commit 83b5273

Please sign in to comment.