From 16935628d865a8187d022b62e062dd8e7d692ffa Mon Sep 17 00:00:00 2001 From: Vadim Markovtsev Date: Fri, 23 Sep 2022 15:33:54 +0200 Subject: [PATCH] [DEV-5042] Optimize /filter/jira cache performance --- .../api/controllers/jira_controller.py | 10 +- server/athenian/api/models/web_model_io.pyx | 451 ++++++++++++++++++ server/setup.py | 1 + server/tests/models/test_web_model_io.py | 96 ++++ 4 files changed, 553 insertions(+), 5 deletions(-) create mode 100644 server/athenian/api/models/web_model_io.pyx create mode 100644 server/tests/models/test_web_model_io.py diff --git a/server/athenian/api/controllers/jira_controller.py b/server/athenian/api/controllers/jira_controller.py index 14ac8041b1..25ff69aefc 100644 --- a/server/athenian/api/controllers/jira_controller.py +++ b/server/athenian/api/controllers/jira_controller.py @@ -3,7 +3,6 @@ from datetime import datetime, timedelta, timezone from itertools import chain import logging -import pickle from typing import Any, Collection, Mapping, Optional, Type, Union from aiohttp import web @@ -91,6 +90,7 @@ JIRAUser, PullRequest as WebPullRequest, ) +from athenian.api.models.web_model_io import deserialize_models, serialize_models from athenian.api.request import AthenianWebRequest from athenian.api.response import ResponseError, model_response from athenian.api.tracing import sentry_span @@ -224,8 +224,8 @@ async def filter_jira_stuff(request: AthenianWebRequest, body: dict) -> web.Resp @sentry_span @cached( exptime=short_term_exptime, - serialize=pickle.dumps, - deserialize=pickle.loads, + serialize=serialize_models, + deserialize=deserialize_models, key=lambda return_, time_from, time_to, exclude_inactive, label_filter, priorities, reporters, assignees, commenters, default_branches, release_settings, logical_settings, **_: ( # noqa JIRAFilterReturn.EPICS in return_, JIRAFilterReturn.PRIORITIES in return_, @@ -548,8 +548,8 @@ async def _epic_flow( @sentry_span @cached( exptime=short_term_exptime, - serialize=pickle.dumps, - deserialize=pickle.loads, + serialize=serialize_models, + deserialize=deserialize_models, key=lambda return_, time_from, time_to, exclude_inactive, label_filter, priorities, reporters, assignees, commenters, default_branches, release_settings, logical_settings, **_: ( # noqa JIRAFilterReturn.ISSUES in return_, JIRAFilterReturn.ISSUE_BODIES in return_, diff --git a/server/athenian/api/models/web_model_io.pyx b/server/athenian/api/models/web_model_io.pyx new file mode 100644 index 0000000000..a399c7328b --- /dev/null +++ b/server/athenian/api/models/web_model_io.pyx @@ -0,0 +1,451 @@ +# cython: language_level=3, boundscheck=False, nonecheck=False, optimize.unpack_method_calls=True +# cython: warn.maybe_uninitialized=True +# distutils: language = c++ +# distutils: extra_compile_args = -mavx2 -ftree-vectorize + +from cpython cimport ( + Py_DECREF, + Py_INCREF, + PyBytes_FromStringAndSize, + PyFloat_FromDouble, + PyList_New, + PyList_SET_ITEM, + PyLong_FromLong, + PyObject, + PyTuple_New, + PyTuple_SET_ITEM, +) +from cpython.datetime cimport PyDateTimeAPI, import_datetime +from cpython.dict cimport PyDict_GetItemString +from libc.stdint cimport uint16_t, uint32_t +from libc.stdio cimport FILE, SEEK_CUR, fclose, fread, fseek, ftell, fwrite +from libc.stdlib cimport free +from libcpp.vector cimport vector + +import pickle + +from athenian.api.typing_utils import is_generic, is_optional + + +cdef extern from "stdio.h" nogil: + FILE *open_memstream(char **ptr, size_t *sizeloc) + FILE *fmemopen(void *buf, size_t size, const char *mode) + + +cdef extern from "structmember.h": + ctypedef struct PyMemberDef: + const char *name + int type + Py_ssize_t offset + int flags + const char *doc + + +cdef extern from "Python.h": + ctypedef struct PyTypeObject: + PyMemberDef *tp_members + + char *PyBytes_AS_STRING(PyObject *) nogil + Py_ssize_t PyBytes_GET_SIZE(PyObject *) nogil + PyObject *PyList_GET_ITEM(PyObject *, Py_ssize_t) nogil + Py_ssize_t PyList_GET_SIZE(PyObject *) nogil + PyObject *PyTuple_GET_ITEM(PyObject *, Py_ssize_t) nogil + bint PyUnicode_Check(PyObject *) nogil + bint PyBytes_Check(PyObject *) nogil + bint PyList_CheckExact(PyObject *) nogil + Py_ssize_t PyUnicode_GET_LENGTH(PyObject *) nogil + void *PyUnicode_DATA(PyObject *) nogil + unsigned int PyUnicode_KIND(PyObject *) nogil + bint PyLong_CheckExact(PyObject *) nogil + long PyLong_AsLong(PyObject *) nogil + double PyFloat_AS_DOUBLE(PyObject *) nogil + bint PyFloat_CheckExact(PyObject *) nogil + + unsigned int PyUnicode_1BYTE_KIND + unsigned int PyUnicode_2BYTE_KIND + unsigned int PyUnicode_4BYTE_KIND + + PyObject *Py_None + PyTypeObject PyLong_Type + PyTypeObject PyFloat_Type + PyTypeObject PyUnicode_Type + + str PyUnicode_FromKindAndData(unsigned int kind, void *buffer, Py_ssize_t size) + PyObject *PyObject_GetAttr(PyObject *o, object attr_name) + + +cdef extern from "datetime.h" nogil: + bint PyDateTime_CheckExact(PyObject *) + bint PyDelta_CheckExact(PyObject *) + + int PyDateTime_GET_YEAR(PyObject *) + int PyDateTime_GET_MONTH(PyObject *) + int PyDateTime_GET_DAY(PyObject *) + + int PyDateTime_DATE_GET_HOUR(PyObject *) + int PyDateTime_DATE_GET_MINUTE(PyObject *) + int PyDateTime_DATE_GET_SECOND(PyObject *) + + int PyDateTime_DELTA_GET_DAYS(PyObject *) + int PyDateTime_DELTA_GET_SECONDS(PyObject *) + + ctypedef struct PyDateTime_CAPI: + PyObject *TimeZone_UTC + + +import_datetime() + + +cdef enum DataType: + DT_INVALID = 0 + DT_MODEL = 1 + DT_LIST = 2 + DT_LONG = 3 + DT_FLOAT = 4 + DT_STRING = 5 + DT_DT = 6 + DT_TD = 7 + + +ctypedef struct ModelFields: + DataType type + Py_ssize_t offset + PyTypeObject *model + vector[ModelFields] nested + + +cdef inline DataType _discover_data_type(PyObject *obj, PyTypeObject **deref) except DT_INVALID: + cdef: + PyTypeObject *as_type = obj + + if is_optional( obj): + args = ( obj).__args__ + as_type = PyTuple_GET_ITEM( args, 0) + if as_type == &PyLong_Type: + return DT_LONG + elif as_type == &PyFloat_Type: + return DT_FLOAT + elif as_type == &PyUnicode_Type: + return DT_STRING + elif as_type == PyDateTimeAPI.DateTimeType: + return DT_DT + elif as_type == PyDateTimeAPI.DeltaType: + return DT_TD + elif is_generic( obj): + args = ( obj).__args__ + as_type = PyTuple_GET_ITEM( args, 0) + if is_optional( as_type): + args = ( as_type).__args__ + as_type = PyTuple_GET_ITEM( args, 0) + deref[0] = as_type + return DT_LIST + elif hasattr( obj, "attribute_types"): + deref[0] = as_type + return DT_MODEL + else: + raise AssertionError(f"Field type is not supported: { obj}") + + +cdef inline void _apply_data_type( + Py_ssize_t offset, + PyObject *member_type, + ModelFields *fields, +) except *: + cdef: + PyTypeObject *deref = NULL + DataType dtype = _discover_data_type(member_type, &deref) + if deref != NULL: + fields.nested.push_back(_discover_fields(deref, dtype, offset)) + else: + fields.nested.push_back(ModelFields(dtype, offset, NULL)) + + +cdef ModelFields _discover_fields(PyTypeObject *model, DataType dtype, Py_ssize_t offset) except *: + cdef: + PyObject *attribute_types_attr + object attribute_types + PyObject *member_type + PyMemberDef *members + ModelFields fields = ModelFields(dtype, offset, NULL) + + attribute_types_attr = PyObject_GetAttr( model, "attribute_types") + if attribute_types_attr != NULL: + attribute_types = attribute_types_attr + Py_DECREF( attribute_types_attr) + fields.model = model + members = model.tp_members + for i in range(len(( model).__slots__)): + member_type = PyDict_GetItemString(attribute_types, members[i].name + 1) + _apply_data_type(members[i].offset, member_type, &fields) + else: + _apply_data_type(0, model, &fields) + return fields + + +cdef PyObject *_write_object(PyObject *obj, ModelFields *spec, FILE *stream) nogil: + cdef: + char dtype = spec.type + long val_long + double val_double + uint32_t str_length, val32, i + uint16_t val16[3] + PyObject *exc + bint is_unicode, is_float + if obj == Py_None: + dtype = 0 + fwrite(&dtype, 1, 1, stream) + return NULL + fwrite(&dtype, 1, 1, stream) + if dtype == DT_LONG: + if not PyLong_CheckExact(obj): + return obj + val_long = PyLong_AsLong(obj) + fwrite(&val_long, sizeof(long), 1, stream) + elif dtype == DT_FLOAT: + is_float = PyFloat_CheckExact(obj) + if not is_float and not PyLong_CheckExact(obj): + return obj + if is_float: + val_double = PyFloat_AS_DOUBLE(obj) + else: + val_double = PyLong_AsLong(obj) + fwrite(&val_double, sizeof(double), 1, stream) + elif dtype == DT_STRING: + is_unicode = PyUnicode_Check(obj) + if not is_unicode and not PyBytes_Check(obj): + return obj + if is_unicode: + str_length = PyUnicode_GET_LENGTH(obj) + val32 = str_length | ((PyUnicode_KIND(obj) - 1) << 30) + fwrite(&val32, 4, 1, stream) + fwrite(PyUnicode_DATA(obj), 1, str_length, stream) + else: + val32 = PyBytes_GET_SIZE(obj) + fwrite(&val32, 4, 1, stream) + fwrite(PyBytes_AS_STRING(obj), 1, val32, stream) + elif dtype == DT_DT: + if not PyDateTime_CheckExact(obj): + return obj + val16[0] = PyDateTime_GET_YEAR(obj) << 4 + val16[0] |= PyDateTime_GET_MONTH(obj) + val16[1] = PyDateTime_GET_DAY(obj) << 7 + val16[1] |= PyDateTime_DATE_GET_HOUR(obj) + val16[2] = PyDateTime_DATE_GET_MINUTE(obj) << 8 + val16[2] |= PyDateTime_DATE_GET_SECOND(obj) + fwrite(val16, 2, 3, stream) + elif dtype == DT_TD: + if not PyDelta_CheckExact(obj): + return obj + val32 = PyDateTime_DELTA_GET_DAYS(obj) << 1 + var_long = PyDateTime_DELTA_GET_SECONDS(obj) + if var_long >= 1 << 16: + val32 |= 1 + val16[0] = var_long & 0xFFFF + fwrite(&val32, 4, 1, stream) + fwrite(val16, 2, 1, stream) + elif dtype == DT_LIST: + if not PyList_CheckExact(obj): + return obj + val32 = PyList_GET_SIZE(obj) + fwrite(&val32, 4, 1, stream) + if spec.model != NULL: + spec.type = DT_MODEL + for i in range(val32): + exc = _write_object(PyList_GET_ITEM(obj, i), spec, stream) + if exc != NULL: + return exc + spec.type = DT_LIST + else: + for i in range(val32): + exc = _write_object(PyList_GET_ITEM(obj, i), &spec.nested[0], stream) + if exc != NULL: + return exc + elif dtype == DT_MODEL: + val32 = spec.nested.size() + fwrite(&val32, 4, 1, stream) + for field in spec.nested: + exc = _write_object(((( obj) + field.offset))[0], &field, stream) + if exc != NULL: + return exc + else: + return obj + return NULL + + +cdef void _serialize_list_of_models(list models, FILE *stream) except *: + cdef: + uint32_t size + ModelFields spec + type item_type + PyObject *exc + + if len(models) == 0: + size = 0 + fwrite(&size, 4, 1, stream) + return + item_type = type(models[0]) + result = pickle.dumps(item_type) + spec = _discover_fields( item_type, DT_LIST, 0) + with nogil: + size = PyBytes_GET_SIZE( result) + fwrite(&size, 4, 1, stream) + fwrite(PyBytes_AS_STRING( result), 1, size, stream) + exc = _write_object( models, &spec, stream) + if exc != NULL: + raise ValueError(f"Could not serialize { exc}") + + +cdef void _serialize_generic(model, FILE *stream) except *: + cdef: + bytes buf = pickle.dumps(model) + uint32_t size = len(buf) + fwrite(&size, 4, 1, stream) + fwrite(PyBytes_AS_STRING( buf), size, 1, stream) + + +def serialize_models(tuple models not None) -> bytes: + cdef: + char *output = NULL + size_t output_size = 0 + FILE *stream + bytes result + char count + assert len(models) < 255 + stream = open_memstream(&output, &output_size) + count = len(models) + fwrite(&count, 1, 1, stream) + try: + for model in models: + if PyList_CheckExact( model): + _serialize_list_of_models(model, stream) + else: + _serialize_generic(model, stream) + finally: + fclose(stream) + result = PyBytes_FromStringAndSize(output, output_size) + free(output) + return result + + +def deserialize_models(bytes buffer not None) -> tuple[list[object], ...]: + cdef: + char *input = PyBytes_AS_STRING( buffer) + uint32_t aux = 0, tuple_pos + str corrupted_msg = "Corrupted buffer ar position %d: %s" + FILE *stream + tuple result + long pos + bytes type_buf + object model_type + ModelFields spec + + stream = fmemopen(input, PyBytes_GET_SIZE( buffer), b"r") + if fread(&aux, 1, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "tuple")) + result = PyTuple_New(aux) + for tuple_pos in range(aux): + if fread(&aux, 4, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "pickle/header")) + if aux == 0: + model = [] + else: + pos = ftell(stream) + if fseek(stream, aux, SEEK_CUR): + raise ValueError(corrupted_msg % (ftell(stream), "pickle/body")) + type_buf = PyBytes_FromStringAndSize(input + pos, aux) + model_type = pickle.loads(type_buf) + if not isinstance(model_type, type): + model = model_type + else: + spec = _discover_fields( model_type, DT_LIST, 0) + model = _read_model(&spec, stream, input, corrupted_msg) + Py_INCREF(model) + PyTuple_SET_ITEM(result, tuple_pos, model) + fclose(stream) + return result + + +cdef object _read_model(ModelFields *spec, FILE *stream, const char *raw, str corrupted_msg): + cdef: + char dtype = 0 + long long_val = 0 + double double_val = 0 + uint32_t aux32 = 0, i + uint16_t aux16[3] + unsigned int kind + int year, month, day, hour, minute, second + PyObject *utctz = ( PyDateTimeAPI).TimeZone_UTC + PyObject *obj_val + ModelFields field + + if fread(&dtype, 1, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "dtype")) + if dtype == DT_INVALID: + return None + if dtype != spec.type: + raise ValueError(corrupted_msg % (ftell(stream), f"dtype {dtype} != {spec.type}")) + if dtype == DT_LONG: + if fread(&long_val, sizeof(long), 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "long")) + return PyLong_FromLong(long_val) + elif dtype == DT_FLOAT: + if fread(&double_val, sizeof(double), 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "float")) + return PyFloat_FromDouble(double_val) + elif dtype == DT_STRING: + if fread(&aux32, 4, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "str/header")) + kind = (aux32 >> 30) + 1 + aux32 &= 0x3FFFFFFF + long_val = ftell(stream) + if fseek(stream, aux32, SEEK_CUR): + raise ValueError(corrupted_msg % (ftell(stream), "str/body")) + return PyUnicode_FromKindAndData(kind, raw + long_val, aux32) + elif dtype == DT_DT: + if fread(aux16, 2, 3, stream) != 3: + raise ValueError(corrupted_msg % (ftell(stream), "dt")) + year = aux16[0] >> 4 + month = aux16[0] & 0xF + day = aux16[1] >> 7 + hour = aux16[1] & 0x7F + minute = aux16[2] >> 8 + second = aux16[2] & 0xFF + return PyDateTimeAPI.DateTime_FromDateAndTime( + year, month, day, hour, minute, second, 0, utctz, PyDateTimeAPI.DateTimeType, + ) + elif dtype == DT_TD: + if fread(&aux32, 4, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "td")) + if fread(aux16, 2, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "td")) + day = aux32 >> 1 + second = aux16[0] + ((aux32 & 1) << 16) + return PyDateTimeAPI.Delta_FromDelta(day, second, 0, 1, PyDateTimeAPI.DeltaType) + elif dtype == DT_MODEL: + obj = spec.model + if fread(&aux32, 4, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "model")) + if aux32 != spec.nested.size(): + raise ValueError(corrupted_msg % (ftell(stream), f"{obj} has changed")) + obj = obj.__new__(obj) + for field in spec.nested: + val = _read_model(&field, stream, raw, corrupted_msg) + Py_INCREF(val) + ((( obj) + field.offset))[0] = val + return obj + elif dtype == DT_LIST: + if fread(&aux32, 4, 1, stream) != 1: + raise ValueError(corrupted_msg % (ftell(stream), "list")) + obj = PyList_New(aux32) + for i in range(aux32): + if spec.model != NULL: + spec.type = DT_MODEL + val = _read_model(spec, stream, raw, corrupted_msg) + spec.type = DT_LIST + else: + val = _read_model(&spec.nested[0], stream, raw, corrupted_msg) + Py_INCREF(val) + PyList_SET_ITEM(obj, i, val) + return obj + else: + raise AssertionError(f"Unsupported dtype: {dtype}") diff --git a/server/setup.py b/server/setup.py index 95fac0d096..8b8ad8f423 100644 --- a/server/setup.py +++ b/server/setup.py @@ -49,6 +49,7 @@ code_root / "pandas_io.pyx", code_root / "sentry_native.pyx", code_root / "models" / "sql_builders.pyx", + code_root / "models" / "web_model_io.pyx", ) # fmt: on ], diff --git a/server/tests/models/test_web_model_io.py b/server/tests/models/test_web_model_io.py new file mode 100644 index 0000000000..371c31b97e --- /dev/null +++ b/server/tests/models/test_web_model_io.py @@ -0,0 +1,96 @@ +from datetime import datetime, timedelta, timezone + +from athenian.api.models.web import JIRAEpic, JIRAEpicChild, MappedJIRAIdentity, PullRequestNumbers +from athenian.api.models.web_model_io import deserialize_models, serialize_models + + +def test_serialize_models_smoke(): + now = datetime.now(timezone.utc).replace(microsecond=0) + models = ( + [ + JIRAEpic( + project="project", + children=[], + prs=10, + id="id", + title="title", + created=now.replace(tzinfo=None), + updated=now, + work_began=now, + resolved=None, + lead_time=timedelta(seconds=10), + life_time=timedelta(seconds=20), + reporter="reporter", + assignee=None, + comments=7, + priority="priority", + status="status", + type="type", + url="url", + ), + JIRAEpic( + project="other_project", + children=[ + JIRAEpicChild( + id="child_id", + title=b"child_title", + created=now, + updated=now, + work_began=None, + resolved=now, + lead_time=None, + life_time=timedelta(days=12, seconds=24 * 3600 - 39), + reporter="child_reporter", + assignee="child_assignee", + comments=176, + priority=None, + status="child_status", + type="child_type", + url="child_url", + subtasks=100, + prs=123, + ), + ], + prs=10, + id="id", + title="title", + created=now, + updated=now, + work_began=now, + resolved=None, + lead_time=timedelta(seconds=10), + life_time=timedelta(seconds=20), + reporter="reporter", + assignee=None, + comments=7, + priority="priority", + status="status", + type="type", + url="url", + ), + ], + [], + [ + MappedJIRAIdentity( + developer_id="dev_id", + developer_name=None, + jira_name="jira_name_val", + confidence=0.14159, + ), + MappedJIRAIdentity( + developer_id="222dev_id", + developer_name="Vadim", + jira_name="2222jira_name_val", + confidence=0, + ), + ], + {"a": 111}, + [ + PullRequestNumbers(repository="athenian", numbers=[1, 2, 7, 4]), + ], + ) + new_models = deserialize_models(serialize_models(models)) + models[0][0].created = models[0][0].created.replace(tzinfo=timezone.utc) + models[0][1].children[0].title = models[0][1].children[0].title.decode() + models[2][1].confidence = 0.0 + assert models == new_models