Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configurable validation strategy by topic #745

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,12 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``protobuf_runtime_directory``
- ``runtime``
- Runtime directory for the ``protoc`` protobuf schema parser and code generator
* - ``name_strategy``
- ``topic_name``
- Name strategy to use when storing schemas from the kafka rest proxy service
* - ``default_name_strategy``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change would require a major version upgrade

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I've decided to get rid of this change. This pr needs to be rebased on top of the #754. Probably I will close this pr since the design of making stateful the request for publish using the rest endpoint isn't a great idea.
It's probably mark this pr as draft

- ``topic_name``, ``record_name``, ``topic_record_name``, ``no_validation``
- Default name strategy to use when storing schemas from the kafka rest proxy service, could be overriden for each topic by calling the `/topic/{topic}/name_strategy/{strategy}` endpoint
* - ``name_strategy_validation``
- ``true``
- If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest
- If enabled, validate that given schema is registered under the expected subjects requireds by the specified name strategy (default or overridden) when producing messages from Kafka Rest
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
Expand Down
29 changes: 22 additions & 7 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Config(TypedDict):
session_timeout_ms: int
karapace_rest: bool
karapace_registry: bool
name_strategy: str
default_name_strategy: str
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str
Expand Down Expand Up @@ -146,7 +146,7 @@ class ConfigDefaults(Config, total=False):
"session_timeout_ms": 10000,
"karapace_rest": False,
"karapace_registry": False,
"name_strategy": "topic_name",
"default_name_strategy": "topic_name",
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
Expand All @@ -158,17 +158,30 @@ class InvalidConfiguration(Exception):
pass


class StrEnum(str, Enum):
def __str__(self) -> str:
return str(self.value)


@unique
class ElectionStrategy(Enum):
highest = "highest"
lowest = "lowest"


@unique
class NameStrategy(Enum):
class NameStrategy(StrEnum):
topic_name = "topic_name"
record_name = "record_name"
topic_record_name = "topic_record_name"
no_validation = "no_validation_strategy"


@unique
class SubjectType(StrEnum):
key = "key"
value = "value"
partition = "partition"


def parse_env_value(value: str) -> str | int | bool:
Expand Down Expand Up @@ -269,12 +282,14 @@ def validate_config(config: Config) -> None:
f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}"
) from None

name_strategy = config["name_strategy"]
deafault_name_strategy = config["default_name_strategy"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Suggested change
deafault_name_strategy = config["default_name_strategy"]
default_name_strategy = config["default_name_strategy"]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are right, I've copy-pasted the same in the following pr

try:
NameStrategy(name_strategy)
NameStrategy(deafault_name_strategy)
except ValueError:
valid_strategies = [strategy.value for strategy in NameStrategy]
raise InvalidConfiguration(f"Invalid name strategy: {name_strategy}, valid values are {valid_strategies}") from None
valid_strategies = list(NameStrategy)
raise InvalidConfiguration(
f"Invalid default name strategy: {deafault_name_strategy}, valid values are {valid_strategies}"
) from None

if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None:
raise InvalidConfiguration(
Expand Down
13 changes: 12 additions & 1 deletion karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from __future__ import annotations

from dataclasses import dataclass, field
from karapace.config import NameStrategy
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName
from threading import Lock, RLock
from typing import Iterable, Sequence

Expand All @@ -32,6 +33,7 @@ def __init__(self) -> None:
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
self.topic_validation_strategies: dict[TopicName, NameStrategy] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -229,6 +231,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
if schema_version.deleted is False
}

def get_topic_strategy(self, *, topic_name: TopicName) -> NameStrategy | None:
if topic_name not in self.topic_validation_strategies:
return None

return self.topic_validation_strategies[topic_name]
Comment on lines +235 to +238
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

self.topic_validation_strategies.get(topic_name)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer an exception instead of a None as return type since I'm guarding the access on the if before, am I wrong?


def override_topic_strategy(self, *, topic_name: TopicName, name_strategy: NameStrategy) -> None:
self.topic_validation_strategies[topic_name] = name_strategy

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
with self.schema_lock_thread:
for schema_version in self.subjects[subject].schemas.values():
Expand Down
61 changes: 38 additions & 23 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from karapace.config import Config, create_client_ssl_context
from karapace.config import Config, create_client_ssl_context, NameStrategy, SubjectType
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.authentication import (
Expand All @@ -28,8 +28,14 @@
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError
from karapace.typing import SchemaId, Subject
from karapace.serialization import (
get_subject_name,
InvalidMessageSchema,
InvalidPayload,
SchemaRegistrySerializer,
SchemaRetrievalError,
)
from karapace.typing import SchemaId, Subject, TopicName
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Callable, Dict, List, Optional, Tuple, Union

Expand All @@ -39,7 +45,7 @@
import logging
import time

RECORD_KEYS = ["key", "value", "partition"]
SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we getting rid of partition here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, if you look previously there was a tricky side effect of using the zip function.
If you run zip on two lists and the first one it's shorter than the second you will get as output a list of tuple2 with the same length of the first list. And we were exactly in that specific case 😄

PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"}
RECORD_CODES = [42201, 42202]
KNOWN_FORMATS = {"json", "avro", "protobuf", "binary"}
Expand Down Expand Up @@ -759,7 +765,7 @@ async def get_schema_id(
self,
data: dict,
topic: str,
prefix: str,
subject_type: SubjectType,
schema_type: SchemaType,
) -> SchemaId:
"""
Expand All @@ -770,29 +776,35 @@ async def get_schema_id(
"""
log.debug("[resolve schema id] Retrieving schema id for %r", data)
schema_id: Union[SchemaId, None] = (
SchemaId(int(data[f"{prefix}_schema_id"])) if f"{prefix}_schema_id" in data else None
SchemaId(int(data[f"{subject_type}_schema_id"])) if f"{subject_type}_schema_id" in data else None
)
schema_str = data.get(f"{prefix}_schema")
schema_str = data.get(f"{subject_type}_schema")
naming_strategy = await self.serializer.get_topic_strategy_name(topic_name=TopicName(topic))

if schema_id is None and schema_str is None:
raise InvalidSchema()

if schema_id is None:
parsed_schema = ValidatedTypedSchema.parse(schema_type, schema_str)
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type)

subject_name = get_subject_name(topic, parsed_schema, subject_type, naming_strategy)
schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name)
else:

def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type)
subject = get_subject_name(topic, schema, subject_type, naming_strategy)
return subject not in subjects

parsed_schema, valid_subjects = await self._query_schema_and_subjects(
schema_id,
need_new_call=subject_not_included,
)

if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
if (
self.config["name_strategy_validation"]
and naming_strategy != NameStrategy.no_validation
and subject_not_included(parsed_schema, valid_subjects)
):
raise InvalidSchema()

return schema_id
Expand Down Expand Up @@ -833,7 +845,9 @@ async def _query_schema_id_from_cache_or_registry(
)
return schema_id

async def validate_schema_info(self, data: dict, prefix: str, content_type: str, topic: str, schema_type: str):
async def validate_schema_info(
self, data: dict, subject_type: SubjectType, content_type: str, topic: str, schema_type: str
):
try:
schema_type = SCHEMA_MAPPINGS[schema_type]
except KeyError:
Expand All @@ -848,7 +862,7 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str,

# will do in place updates of id keys, since calling these twice would be expensive
try:
data[f"{prefix}_schema_id"] = await self.get_schema_id(data, topic, prefix, schema_type)
data[f"{subject_type}_schema_id"] = await self.get_schema_id(data, topic, subject_type, schema_type)
except InvalidPayload:
log.exception("Unable to retrieve schema id")
KafkaRest.r(
Expand All @@ -863,16 +877,17 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str,
KafkaRest.r(
body={
"error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value,
"message": f"Error when registering schema. format = {schema_type.value}, subject = {topic}-{prefix}",
"message": f"Error when registering schema."
f"format = {schema_type.value}, subject = {topic}-{subject_type}",
},
content_type=content_type,
status=HTTPStatus.REQUEST_TIMEOUT,
)
except InvalidSchema:
if f"{prefix}_schema" in data:
err = f'schema = {data[f"{prefix}_schema"]}'
if f"{subject_type}_schema" in data:
err = f'schema = {data[f"{subject_type}_schema"]}'
else:
err = f'schema_id = {data[f"{prefix}_schema_id"]}'
err = f'schema_id = {data[f"{subject_type}_schema_id"]}'
KafkaRest.r(
body={
"error_code": RESTErrorCodes.INVALID_DATA.value,
Expand Down Expand Up @@ -1002,26 +1017,26 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
status=HTTPStatus.BAD_REQUEST,
)
convert_to_int(r, "partition", content_type)
if set(r.keys()).difference(RECORD_KEYS):
if set(r.keys()).difference({subject_type.value for subject_type in SubjectType}):
KafkaRest.unprocessable_entity(
message="Invalid request format",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
)
# disallow missing id and schema for any key/value list that has at least one populated element
if formats["embedded_format"] in {"avro", "jsonschema", "protobuf"}:
for prefix, code in zip(RECORD_KEYS, RECORD_CODES):
if self.all_empty(data, prefix):
for subject_type, code in zip(SUBJECT_VALID_POSTFIX, RECORD_CODES):
if self.all_empty(data, subject_type):
continue
if not self.is_valid_schema_request(data, prefix):
if not self.is_valid_schema_request(data, subject_type):
KafkaRest.unprocessable_entity(
message=f"Request includes {prefix}s and uses a format that requires schemas "
f"but does not include the {prefix}_schema or {prefix}_schema_id fields",
message=f"Request includes {subject_type}s and uses a format that requires schemas "
f"but does not include the {subject_type}_schema or {subject_type.value}_schema_id fields",
content_type=content_type,
sub_code=code,
)
try:
await self.validate_schema_info(data, prefix, content_type, topic, formats["embedded_format"])
await self.validate_schema_info(data, subject_type, content_type, topic, formats["embedded_format"])
except InvalidMessageSchema as e:
KafkaRest.unprocessable_entity(
message=str(e),
Expand Down
47 changes: 37 additions & 10 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from avro.schema import Schema as AvroSchema
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
Expand All @@ -20,7 +21,7 @@
TopicAlreadyExistsError,
)
from karapace import constants
from karapace.config import Config
from karapace.config import Config, NameStrategy
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema
from karapace.in_memory_database import InMemoryDatabase
Expand All @@ -31,7 +32,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand All @@ -58,6 +59,14 @@
METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions"


class MessageType(Enum):
config = "CONFIG"
schema = "SCHEMA"
delete_subject = "DELETE_SUBJECT"
schema_strategy = "SCHEMA_STRATEGY"
no_operation = "NOOP"


def _create_consumer_from_config(config: Config) -> KafkaConsumer:
# Group not set on purpose, all consumers read the same data
session_timeout_ms = config["session_timeout_ms"]
Expand Down Expand Up @@ -429,6 +438,11 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #
LOG.info("Deleting subject: %r, value: %r", subject, value)
self.database.delete_subject(subject=subject, version=version)

def _handle_msg_schema_strategy(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
assert isinstance(value, dict)
topic, strategy = value["topic"], value["strategy"]
self.database.override_topic_strategy(topic_name=TopicName(topic), name_strategy=NameStrategy(strategy))

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]

Expand Down Expand Up @@ -522,14 +536,27 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
self.database.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)

def handle_msg(self, key: dict, value: dict | None) -> None:
if key["keytype"] == "CONFIG":
self._handle_msg_config(key, value)
elif key["keytype"] == "SCHEMA":
self._handle_msg_schema(key, value)
elif key["keytype"] == "DELETE_SUBJECT":
self._handle_msg_delete_subject(key, value)
elif key["keytype"] == "NOOP": # for spec completeness
pass
if "keytype" in key:
try:
message_type = MessageType(key["keytype"])

if message_type == MessageType.config:
self._handle_msg_config(key, value)
elif message_type == MessageType.schema:
self._handle_msg_schema(key, value)
elif message_type == MessageType.delete_subject:
self._handle_msg_delete_subject(key, value)
elif message_type == MessageType.schema_strategy:
self._handle_msg_schema_strategy(key, value)
elif message_type == MessageType.no_operation:
pass
except ValueError:
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"])

else:
LOG.error(
"The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value
)

def remove_referenced_by(
self,
Expand Down