Skip to content

Commit

Permalink
merge with master and fixup conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Feb 13, 2024
2 parents 03f1189 + 201428d commit 2c06480
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 44 deletions.
2 changes: 1 addition & 1 deletion SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ receiving such patches depend on the CVSS v3.0 Rating:
## Reporting a Vulnerability

Please report (suspected) security vulnerabilities to our **[bug bounty
program](https://hackerone.com/aiven_ltd)**. You will receive a response from
program](https://bugcrowd.com/aiven-mbb-og)**. You will receive a response from
us within 2 working days. If the issue is confirmed, we will release a patch as
soon as possible depending on impact and complexity.

Expand Down
46 changes: 35 additions & 11 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ async def get_user_proxy(self, request: HTTPRequest) -> "UserRestProxy":
if self.proxies.get(key) is None:
self.proxies[key] = UserRestProxy(self.config, self.kafka_timeout, self.serializer)
except (NoBrokersAvailable, AuthenticationFailedError):
log.exception("Failed to connect to Kafka with the credentials")
log.warning("Failed to connect to Kafka with the credentials")
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)
proxy = self.proxies[key]
proxy.mark_used()
Expand Down Expand Up @@ -462,6 +462,10 @@ def num_consumers(self) -> int:
return len(self.consumer_manager.consumers)

async def _maybe_create_async_producer(self) -> AsyncKafkaProducer:
"""
:raises NoBrokersAvailable:
:raises AuthenticationFailedError:
"""
if self._async_producer is not None:
return self._async_producer

Expand Down Expand Up @@ -497,9 +501,9 @@ async def _maybe_create_async_producer(self) -> AsyncKafkaProducer:
except (NoBrokersAvailable, AuthenticationFailedError):
await producer.stop()
if retry:
log.exception("Unable to connect to the bootstrap servers, retrying")
log.warning("Unable to connect to the bootstrap servers, retrying")
else:
log.exception("Giving up after trying to connect to the bootstrap servers")
log.warning("Giving up after trying to connect to the bootstrap servers")
raise
await asyncio.sleep(1)
except Exception:
Expand Down Expand Up @@ -626,7 +630,7 @@ async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict:
self._cluster_metadata = metadata
self._cluster_metadata_complete = topics is None
except KafkaException:
log.exception("Could not refresh cluster metadata")
log.warning("Could not refresh cluster metadata")
KafkaRest.r(
body={
"message": "Kafka node not ready",
Expand All @@ -653,9 +657,9 @@ def init_admin_client(self):
break
except: # pylint: disable=bare-except
if retry:
log.exception("Unable to start admin client, retrying")
log.warning("Unable to start admin client, retrying")
else:
log.exception("Giving up after failing to start admin client")
log.warning("Giving up after failing to start admin client")
raise
time.sleep(1)

Expand All @@ -672,6 +676,10 @@ async def aclose(self) -> None:
self.consumer_manager = None

async def publish(self, topic: str, partition_id: Optional[str], content_type: str, request: HTTPRequest) -> None:
"""
:raises NoBrokersAvailable:
:raises AuthenticationFailedError:
"""
formats: dict = request.content_type
data: dict = request.json
_ = await self.get_topic_info(topic, content_type)
Expand All @@ -683,7 +691,6 @@ async def publish(self, topic: str, partition_id: Optional[str], content_type: s
await self.validate_publish_request_format(data, formats, content_type, topic)
status = HTTPStatus.OK
ser_format = formats["embedded_format"]
prepared_records = []
try:
prepared_records = await self._prepare_records(
content_type=content_type,
Expand Down Expand Up @@ -726,11 +733,25 @@ async def publish(self, topic: str, partition_id: Optional[str], content_type: s

async def partition_publish(self, topic: str, partition_id: str, content_type: str, *, request: HTTPRequest) -> None:
log.debug("Executing partition publish on topic %s and partition %s", topic, partition_id)
await self.publish(topic, partition_id, content_type, request)
try:
await self.publish(topic, partition_id, content_type, request)
except (NoBrokersAvailable, AuthenticationFailedError):
KafkaRest.service_unavailable(
message="Service unavailable",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_SERVICE_UNAVAILABLE.value,
)

async def topic_publish(self, topic: str, content_type: str, *, request: HTTPRequest) -> None:
log.debug("Executing topic publish on topic %s", topic)
await self.publish(topic, None, content_type, request)
try:
await self.publish(topic, None, content_type, request)
except (NoBrokersAvailable, AuthenticationFailedError):
KafkaRest.service_unavailable(
message="Service unavailable",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_SERVICE_UNAVAILABLE.value,
)

@staticmethod
def validate_partition_id(partition_id: str, content_type: str) -> int:
Expand Down Expand Up @@ -858,7 +879,7 @@ async def validate_schema_info(
try:
data[f"{subject_type}_schema_id"] = await self.get_schema_id(data, topic, subject_type, schema_type)
except InvalidPayload:
log.exception("Unable to retrieve schema id")
log.warning("Unable to retrieve schema id")
KafkaRest.r(
body={
"error_code": RESTErrorCodes.HTTP_BAD_REQUEST.value,
Expand Down Expand Up @@ -1039,6 +1060,10 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
)

async def produce_messages(self, *, topic: str, prepared_records: List) -> List:
"""
:raises NoBrokersAvailable:
:raises AuthenticationFailedError:
"""
producer = await self._maybe_create_async_producer()

produce_futures = []
Expand Down Expand Up @@ -1097,7 +1122,6 @@ async def produce_messages(self, *, topic: str, prepared_records: List) -> List:
# cancel is retriable
produce_results.append({"error_code": 1, "error": "Publish message cancelled"})
elif isinstance(result, BrokerResponseError):
log.error("Broker error", exc_info=result)
resp = {"error_code": 1, "error": result.description}
if hasattr(result, "retriable") and result.retriable:
resp["error_code"] = 2
Expand Down
4 changes: 2 additions & 2 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
return c
except: # pylint: disable=bare-except
if retry:
LOG.exception("Unable to create consumer, retrying")
LOG.warning("Unable to create consumer, retrying")
else:
LOG.exception("Giving up after failing to create consumer")
LOG.warning("Giving up after failing to create consumer")
raise
await asyncio.sleep(1)

Expand Down
1 change: 1 addition & 0 deletions karapace/kafka_rest_apis/error_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RESTErrorCodes(Enum):
HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value
HTTP_INTERNAL_SERVER_ERROR = HTTPStatus.INTERNAL_SERVER_ERROR.value
HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value
HTTP_SERVICE_UNAVAILABLE = HTTPStatus.SERVICE_UNAVAILABLE.value
TOPIC_NOT_FOUND = 40401
PARTITION_NOT_FOUND = 40402
CONSUMER_NOT_FOUND = 40403
Expand Down
11 changes: 11 additions & 0 deletions karapace/karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ def not_found(message: str, sub_code: int, content_type: str) -> NoReturn:
content_type=content_type, status=HTTPStatus.NOT_FOUND, body={"message": message, "error_code": sub_code}
)

@staticmethod
def service_unavailable(message: str, sub_code: int, content_type: str) -> NoReturn:
KarapaceBase.r(
content_type=content_type,
status=HTTPStatus.SERVICE_UNAVAILABLE,
body={
"message": message,
"error_code": sub_code,
},
)

async def root_get(self) -> NoReturn:
self.r({}, "application/json")

Expand Down
9 changes: 5 additions & 4 deletions karapace/protobuf/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from __future__ import annotations

from karapace.errors import InvalidSchema
from karapace.protobuf.enum_constant_element import EnumConstantElement
from karapace.protobuf.enum_element import EnumElement
from karapace.protobuf.field import Field
Expand Down Expand Up @@ -47,16 +48,16 @@


def _deserialize_field(field: Any) -> FieldElement:
if field.type not in _TYPE_MAP:
raise NotImplementedError(f"Unsupported field type {field.type}")

label = None
if (field.HasField("proto3_optional") and field.proto3_optional) or Field.Label(field.label) != Field.Label.OPTIONAL:
label = Field.Label(field.label)
if field.HasField("type_name"):
element_type = field.type_name
else:
assert field.HasField("type")
if not field.HasField("type"):
raise InvalidSchema(f"field {field.name} has no type_name nor type")
if field.type not in _TYPE_MAP:
raise NotImplementedError(f"Unsupported field type {field.type}")
element_type = _TYPE_MAP[field.type]
return FieldElement(DEFAULT_LOCATION, label=label, element_type=element_type, name=field.name, tag=field.number)

Expand Down
8 changes: 8 additions & 0 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ async def _handle_request(
raise HTTPResponse( # pylint: disable=raise-missing-from
body="Invalid request JSON body", status=HTTPStatus.BAD_REQUEST
)

# Prevent string, int etc. going further from here
if not isinstance(rapu_request.json, dict):
http_error(
message="Malformed request",
content_type=JSON_CONTENT_TYPE,
code=HTTPStatus.BAD_REQUEST,
)
else:
if body not in {b"", b"{}"}:
raise HTTPResponse(body="No request body allowed for this operation", status=HTTPStatus.BAD_REQUEST)
Expand Down
4 changes: 2 additions & 2 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def normalize_schema_str(
try:
schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True)
except JSONDecodeError as e:
LOG.error("Schema is not valid JSON")
LOG.info("Schema is not valid JSON")
raise e
elif schema_type == SchemaType.PROTOBUF:
if schema:
Expand All @@ -138,7 +138,7 @@ def normalize_schema_str(
try:
schema_str = str(parse_protobuf_schema_definition(schema_str, None, None, False))
except InvalidSchema as e:
LOG.exception("Schema is not valid ProtoBuf definition")
LOG.info("Schema is not valid ProtoBuf definition")
raise e

else:
Expand Down
32 changes: 19 additions & 13 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
NoBrokersAvailable,
NodeNotReadyError,
TopicAlreadyExistsError,
UnknownTopicOrPartitionError,
)
from karapace import constants
from karapace.config import Config
Expand Down Expand Up @@ -166,7 +167,7 @@ def run(self) -> None:
LOG.warning("[Admin Client] No Brokers available yet. Retrying")
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
except KafkaConfigurationError:
LOG.exception("[Admin Client] Invalid configuration. Bailing")
LOG.info("[Admin Client] Invalid configuration. Bailing")
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Admin Client] Unexpected exception. Retrying")
Expand All @@ -183,7 +184,7 @@ def run(self) -> None:
LOG.warning("[Consumer] No Brokers available yet. Retrying")
self._stop_schema_reader.wait(timeout=2.0)
except KafkaConfigurationError:
LOG.exception("[Consumer] Invalid configuration. Bailing")
LOG.info("[Consumer] Invalid configuration. Bailing")
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Consumer] Unexpected exception. Retrying")
Expand Down Expand Up @@ -240,7 +241,9 @@ def _get_beginning_offset(self) -> int:
# * See `OFFSET_EMPTY` and `OFFSET_UNINITIALIZED`
return beginning_offset - 1
except KafkaTimeoutError:
LOG.exception("Reading begin offsets timed out.")
LOG.warning("Reading begin offsets timed out.")
except UnknownTopicOrPartitionError:
LOG.warning("Topic does not yet exist.")
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="_get_beginning_offset")
LOG.exception("Unexpected exception when reading begin offsets.")
Expand All @@ -255,7 +258,10 @@ def _is_ready(self) -> bool:
try:
_, end_offset = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0))
except KafkaTimeoutError:
LOG.exception("Reading end offsets timed out.")
LOG.warning("Reading end offsets timed out.")
return False
except UnknownTopicOrPartitionError:
LOG.warning("Topic does not yet exist.")
return False
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="_is_ready")
Expand Down Expand Up @@ -421,13 +427,13 @@ def _handle_msg_config(self, key: dict, value: dict | None) -> None:

def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
if value is None:
LOG.error("DELETE_SUBJECT record doesnt have a value, should have")
LOG.warning("DELETE_SUBJECT record doesnt have a value, should have")
return

subject = value["subject"]
version = value["version"]
if self.database.find_subject(subject=subject) is None:
LOG.error("Subject: %r did not exist, should have", subject)
LOG.warning("Subject: %r did not exist, should have", subject)
else:
LOG.info("Deleting subject: %r, value: %r", subject, value)
self.database.delete_subject(subject=subject, version=version)
Expand All @@ -436,9 +442,9 @@ def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]

if self.database.find_subject(subject=subject) is None:
LOG.error("Hard delete: Subject %s did not exist, should have", subject)
LOG.warning("Hard delete: Subject %s did not exist, should have", subject)
elif version not in self.database.find_subject_schemas(subject=subject, include_deleted=True):
LOG.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject)
LOG.warning("Hard delete: Version %d for subject %s did not exist, should have", version, subject)
else:
LOG.info("Hard delete: subject: %r version: %r", subject, version)
self.database.delete_subject_schema(subject=subject, version=version)
Expand All @@ -463,7 +469,7 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
try:
schema_type_parsed = SchemaType(schema_type)
except ValueError:
LOG.error("Invalid schema type: %s", schema_type)
LOG.warning("Invalid schema type: %s", schema_type)
return

# This does two jobs:
Expand All @@ -479,7 +485,7 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
LOG.error("Schema is not valid JSON")
LOG.warning("Schema is not valid JSON")
return
elif schema_type_parsed == SchemaType.PROTOBUF:
try:
Expand Down Expand Up @@ -537,11 +543,11 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
self._handle_msg_delete_subject(key, value)
elif message_type == MessageType.no_operation:
pass
except ValueError:
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"])
except (KeyError, ValueError):
LOG.warning("The message %r-%r has been discarded because the %s is not managed", key, value, key["keytype"])

else:
LOG.error(
LOG.warning(
"The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value
)

Expand Down
8 changes: 4 additions & 4 deletions requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
accept-types==0.4.1
# via -r requirements.txt
aiohttp==3.9.1
aiohttp==3.9.2
# via -r requirements.txt
aiokafka==0.8.1
# via -r requirements.txt
Expand Down Expand Up @@ -37,7 +37,7 @@ blinker==1.7.0
# via flask
brotli==1.1.0
# via geventhttpclient
cachetools==5.3.1
cachetools==5.3.2
# via -r requirements.txt
certifi==2023.11.17
# via
Expand Down Expand Up @@ -109,7 +109,7 @@ itsdangerous==2.1.2
# via flask
jinja2==3.1.3
# via flask
jsonschema==4.20.0
jsonschema==4.21.1
# via -r requirements.txt
jsonschema-specifications==2023.11.2
# via
Expand Down Expand Up @@ -197,7 +197,7 @@ requests==2.31.0
# via
# -r requirements-dev.in
# locust
rich==13.6.0
rich==13.7.0
# via -r requirements.txt
roundrobin==0.0.4
# via locust
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-typing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tomli==2.0.1
# mypy
types-cachetools==5.3.0.7
# via -r requirements-typing.in
types-jsonschema==4.20.0.0
types-jsonschema==4.21.0.20240118
# via -r requirements-typing.in
types-protobuf==3.20.4.6
# via -r requirements-typing.in
Expand Down
Loading

0 comments on commit 2c06480

Please sign in to comment.