Skip to content

Commit

Permalink
Add dependencies/references support (#560)
Browse files Browse the repository at this point in the history
Protobuf references support:
Add references to Protobuf schemas,
Support compare protobuf schemas with references,
Support serialization/deserialization of protobuf schemas with dependencies.
  • Loading branch information
libretto committed Apr 13, 2023
1 parent 23c5fee commit 7e1de2a
Show file tree
Hide file tree
Showing 31 changed files with 2,644 additions and 182 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ __pycache__/
/kafka_*/
venv
/karapace/version.py
.run
.python-version
57 changes: 57 additions & 0 deletions karapace/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - dependency
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.schema_references import Reference
from karapace.typing import JsonData, Subject, Version
from typing import Any, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from karapace.schema_models import ValidatedTypedSchema


class DependencyVerifierResult:
def __init__(self, result: bool, message: Optional[str] = "") -> None:
self.result = result
self.message = message


class Dependency:
def __init__(self, name: str, subject: Subject, version: Version, target_schema: "ValidatedTypedSchema") -> None:
self.name = name
self.subject = subject
self.version = version
self.schema = target_schema

def get_schema(self) -> "ValidatedTypedSchema":
return self.schema

@staticmethod
def of(reference: Reference, target_schema: "ValidatedTypedSchema") -> "Dependency":
return Dependency(reference.name, reference.subject, reference.version, target_schema)

def to_dict(self) -> JsonData:
return {
"name": self.name,
"subject": self.subject,
"version": self.version,
}

def identifier(self) -> str:
return self.name + "_" + self.subject + "_" + str(self.version)

def __hash__(self) -> int:
return hash((self.name, self.subject, self.version, self.schema))

def __eq__(self, other: Any) -> bool:
if other is None or not isinstance(other, Dependency):
return False
return (
self.name == other.name
and self.subject == other.subject
and self.version == other.version
and self.schema == other.schema
)
17 changes: 17 additions & 0 deletions karapace/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.schema_references import Referents
from karapace.typing import Version


class VersionNotFoundException(Exception):
Expand All @@ -20,10 +22,18 @@ class InvalidSchema(Exception):
pass


class InvalidTest(Exception):
pass


class InvalidSchemaType(Exception):
pass


class InvalidReferences(Exception):
pass


class SchemasNotFoundException(Exception):
pass

Expand All @@ -44,6 +54,13 @@ class SubjectNotSoftDeletedException(Exception):
pass


class ReferenceExistsException(Exception):
def __init__(self, referenced_by: Referents, version: Version):
super().__init__()
self.version = version
self.referenced_by = referenced_by


class SubjectSoftDeletedException(Exception):
pass

Expand Down
31 changes: 30 additions & 1 deletion karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from threading import Lock, RLock
from typing import Dict, List, Optional, Tuple
Expand All @@ -28,6 +29,7 @@ def __init__(self) -> None:
self.subjects: Dict[Subject, SubjectData] = {}
self.schemas: Dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: Dict[Tuple[Subject, ResolvedVersion], Referents] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -96,7 +98,14 @@ def get_next_version(self, *, subject: Subject) -> ResolvedVersion:
return max(self.subjects[subject].schemas) + 1

def insert_schema_version(
self, *, subject: Subject, schema_id: SchemaId, version: ResolvedVersion, deleted: bool, schema: TypedSchema
self,
*,
subject: Subject,
schema_id: SchemaId,
version: ResolvedVersion,
deleted: bool,
schema: TypedSchema,
references: List[Reference],
) -> None:
with self.schema_lock_thread:
self.global_schema_id = max(self.global_schema_id, schema_id)
Expand All @@ -119,6 +128,7 @@ def insert_schema_version(
deleted=deleted,
schema_id=schema_id,
schema=schema,
references=references,
)

if not deleted:
Expand Down Expand Up @@ -235,3 +245,22 @@ def num_schema_versions(self) -> Tuple[int, int]:
else:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: ResolvedVersion, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
referents = self.referenced_by.get((subject, version), None)
if referents:
referents.append(schema_id)
else:
self.referenced_by[(subject, version)] = [schema_id]

def get_referenced_by(self, subject: Subject, version: ResolvedVersion) -> Optional[Referents]:
with self.schema_lock_thread:
return self.referenced_by.get((subject, version), None)

def remove_referenced_by(self, schema_id: SchemaId, references: List[Reference]) -> None:
with self.schema_lock_thread:
for ref in references:
key = (ref.subject, ref.version)
if self.referenced_by.get(key, None) and schema_id in self.referenced_by[key]:
self.referenced_by[key].remove(schema_id)
73 changes: 73 additions & 0 deletions karapace/protobuf/compare_type_lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
karapace - compare_type_lists
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from itertools import chain
from karapace.protobuf.compare_result import CompareResult, Modification
from karapace.protobuf.compare_type_storage import CompareTypes
from karapace.protobuf.enum_element import EnumElement
from karapace.protobuf.exception import IllegalStateException
from karapace.protobuf.message_element import MessageElement
from karapace.protobuf.type_element import TypeElement
from typing import List


def compare_type_lists(
self_types_list: List[TypeElement],
other_types_list: List[TypeElement],
result: CompareResult,
compare_types: CompareTypes,
) -> CompareResult:
self_types = {}
other_types = {}
self_indexes = {}
other_indexes = {}

type_: TypeElement
for i, type_ in enumerate(self_types_list):
self_types[type_.name] = type_
self_indexes[type_.name] = i
compare_types.add_self_type(compare_types.self_package_name, type_)

for i, type_ in enumerate(other_types_list):
other_types[type_.name] = type_
other_indexes[type_.name] = i
compare_types.add_other_type(compare_types.other_package_name, type_)

for name in chain(self_types.keys(), other_types.keys() - self_types.keys()):
result.push_path(str(name), True)

if self_types.get(name) is None and other_types.get(name) is not None:
if isinstance(other_types[name], MessageElement):
result.add_modification(Modification.MESSAGE_ADD)
elif isinstance(other_types[name], EnumElement):
result.add_modification(Modification.ENUM_ADD)
else:
raise IllegalStateException("Instance of element is not applicable")
elif self_types.get(name) is not None and other_types.get(name) is None:
if isinstance(self_types[name], MessageElement):
result.add_modification(Modification.MESSAGE_DROP)
elif isinstance(self_types[name], EnumElement):
result.add_modification(Modification.ENUM_DROP)
else:
raise IllegalStateException("Instance of element is not applicable")
else:
if other_indexes[name] != self_indexes[name]:
if isinstance(self_types[name], MessageElement):
# incompatible type
result.add_modification(Modification.MESSAGE_MOVE)
else:
raise IllegalStateException("Instance of element is not applicable")
else:
if isinstance(self_types[name], MessageElement) and isinstance(other_types[name], MessageElement):
self_types[name].compare(other_types[name], result, compare_types)
elif isinstance(self_types[name], EnumElement) and isinstance(other_types[name], EnumElement):
self_types[name].compare(other_types[name], result, compare_types)
else:
# incompatible type
result.add_modification(Modification.TYPE_ALTER)
result.pop_path(True)

return result
19 changes: 13 additions & 6 deletions karapace/protobuf/compare_type_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def compute_name(t: ProtoType, result_path: List[str], package_name: str, types:

class CompareTypes:
def __init__(self, self_package_name: str, other_package_name: str, result: CompareResult) -> None:
self.self_package_name = self_package_name
self.other_package_name = other_package_name
self.self_package_name = self_package_name or ""
self.other_package_name = other_package_name or ""

self.self_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.other_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.locked_messages: List["MessageElement"] = []
Expand Down Expand Up @@ -93,17 +94,23 @@ def self_type_short_name(self, t: ProtoType) -> Optional[str]:
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.self_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1) :]
package_name = type_record.package_name
if package_name is None:
return name
if name.startswith(package_name):
return name[(len(package_name) + 1) :]
return name

def other_type_short_name(self, t: ProtoType) -> Optional[str]:
name = compute_name(t, self.result.path, self.other_package_name, self.other_types)
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.other_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1) :]
package_name = type_record.package_name
if package_name is None:
return name
if name.startswith(package_name):
return name[(len(package_name) + 1) :]
return name

def lock_message(self, message: "MessageElement") -> bool:
Expand Down
63 changes: 63 additions & 0 deletions karapace/protobuf/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
karapace - dependency
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.dependency import DependencyVerifierResult
from karapace.protobuf.known_dependency import DependenciesHardcoded, KnownDependency
from karapace.protobuf.one_of_element import OneOfElement
from typing import List


class ProtobufDependencyVerifier:
def __init__(self) -> None:
self.declared_types: List[str] = []
self.used_types: List[str] = []
self.import_path: List[str] = []

def add_declared_type(self, full_name: str) -> None:
self.declared_types.append(full_name)

def add_used_type(self, parent: str, element_type: str) -> None:
if element_type.find("map<") == 0:
end = element_type.find(">")
virgule = element_type.find(",")
key = element_type[4:virgule]
value = element_type[virgule + 1 : end]
value = value.strip()
self.used_types.append(parent + ";" + key)
self.used_types.append(parent + ";" + value)
else:
self.used_types.append(parent + ";" + element_type)

def add_import(self, import_name: str) -> None:
self.import_path.append(import_name)

def verify(self) -> DependencyVerifierResult:
declared_index = set(self.declared_types)
for used_type in self.used_types:
delimiter = used_type.rfind(";")
used_type_with_scope = ""
if delimiter != -1:
used_type_with_scope = used_type[:delimiter] + "." + used_type[delimiter + 1 :]
used_type = used_type[delimiter + 1 :]

if not (
used_type in DependenciesHardcoded.index
or KnownDependency.index_simple.get(used_type) is not None
or KnownDependency.index.get(used_type) is not None
or used_type in declared_index
or (delimiter != -1 and used_type_with_scope in declared_index)
or "." + used_type in declared_index
):
return DependencyVerifierResult(False, f"type {used_type} is not defined")

return DependencyVerifierResult(True)


def process_one_of(verifier: ProtobufDependencyVerifier, package_name: str, parent_name: str, one_of: OneOfElement) -> None:
parent = package_name + "." + parent_name
for field in one_of.fields:
verifier.add_used_type(parent, field.element_type)
8 changes: 4 additions & 4 deletions karapace/protobuf/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
import json


class ProtobufParserRuntimeException(Exception):
pass


class IllegalStateException(Exception):
pass

Expand All @@ -29,6 +25,10 @@ class ProtobufTypeException(Error):
"""Generic Protobuf type error."""


class ProtobufUnresolvedDependencyException(ProtobufException):
"""a Protobuf schema has unresolved dependency"""


class SchemaParseException(ProtobufException):
"""Error while parsing a Protobuf schema descriptor."""

Expand Down
7 changes: 7 additions & 0 deletions karapace/protobuf/field_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,17 @@ def compare_message(

self_type_record = types.get_self_type(self_type)
other_type_record = types.get_other_type(other_type)

self_type_element: MessageElement = self_type_record.type_element
other_type_element: MessageElement = other_type_record.type_element

if types.self_type_short_name(self_type) != types.other_type_short_name(other_type):
result.add_modification(Modification.FIELD_NAME_ALTER)
else:
self_type_element.compare(other_type_element, result, types)

def __repr__(self):
return f"{self.element_type} {self.name} = {self.tag}"

def __str__(self):
return f"{self.element_type} {self.name} = {self.tag}"

0 comments on commit 7e1de2a

Please sign in to comment.