Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add serialization exceptions to processing logger #6084

Merged
merged 5 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 57 additions & 13 deletions docs/developer-guide/test-and-debug/processing-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ writing the processing log to {{ site.ak }} and consuming it as ksqlDB stream.

!!! important
The processing log is not for server logging, but rather for per-record
logging on ksqlDB applications. If you want to configure a Kafka appender
logging on ksqlDB applications. If you want to configure an {{ site.ak }} appender
for the server logs, assign the `log4j.appender.kafka_appender.Topic`
and `log4j.logger.io.confluent.ksql` configuration settings in the ksqlDB
Server config file. For more information, see
Expand Down Expand Up @@ -114,12 +114,12 @@ message.type (INT)

: An int that describes the type of the log message. Currently, the
following types are defined: 0 (DESERIALIZATION_ERROR), 1
(RECORD_PROCESSING_ERROR), 2 (PRODUCTION_ERROR).
(RECORD_PROCESSING_ERROR), 2 (PRODUCTION_ERROR), 3 (SERIALIZATION_ERROR).

message.deserializationError (STRUCT)

: The contents of a message with type 0 (DESERIALIZATION_ERROR).
Logged when a deserializer fails to deserialize a Kafka record.
Logged when a deserializer fails to deserialize an {{ site.ak }} record.

message.deserializationError.errorMessage (STRING)

Expand All @@ -128,7 +128,16 @@ message.deserializationError.errorMessage (STRING)

message.deserializationError.recordB64 (STRING)

: The Kafka record, encoded in Base64.
: The {{ site.ak }} record, encoded in Base64.

message.deserializationError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

message.deserializationError.topic (STRING)

: The {{ site.ak }} topic of the record for which deserialization failed.

message.recordProcessingError (STRUCT)

Expand All @@ -146,21 +155,50 @@ message.recordProcessingError.record (STRING)

: The SQL record, serialized as a JSON string.

message.recordProcessingError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

message.productionError (STRUCT)

: The contents of a message with type 2 (PRODUCTION_ERROR). Logged
when a producer fails to publish a Kafka record.
when a producer fails to publish an {{ site.ak }} record.

message.productionError.errorMessage (STRING)

: A string containing a human-readable error message detailing the
error encountered.

message.serializationError (STRUCT)

: The contents of a message with type 3 (SERIALIZATION_ERROR).
Logged when a serializer fails to serialize a ksqlDB row.

message.serializationError.errorMessage (STRING)

: A string containing a human-readable error message detailing the
error encountered.

message.serializationError.record (STRING)

: The ksqlDB row, as a human-readable string.

message.serializationError.cause (LIST<STRING>)

: A list of strings containing human-readable error messages
for the chain of exceptions that caused the main error.

message.serializationError.topic (STRING)

: The {{ site.ak }} topic to which the ksqlDB row that failed to serialize
would have been produced.

Log Stream
----------

We recommend configuring the query processing log to write entries back
to Kafka. This way, you can configure ksqlDB to set up a stream over the
to {{ site.ak }}. This way, you can configure ksqlDB to set up a stream over the
topic automatically.

To log to Kafka, set up a Kafka appender and a special layout for
Expand Down Expand Up @@ -223,12 +261,12 @@ ksql> describe PROCESSING_LOG;

Name : PROCESSING_LOG
Field | Type
---------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
LOGGER | VARCHAR(STRING)
LEVEL | VARCHAR(STRING)
TIME | BIGINT
MESSAGE | STRUCT<type INTEGER, deserializationError STRUCT<errorMessage VARCHAR(STRING), recordB64 VARCHAR(STRING)>, ...>
---------------------------------------------------------------------------------------------------------------------------
MESSAGE | STRUCT<type INTEGER, deserializationError STRUCT<errorMessage VARCHAR(STRING), recordB64 VARCHAR(STRING), cause ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, ...>
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

You can query the stream just like you would any other ksqlDB stream.
Expand All @@ -244,14 +282,20 @@ ksql> CREATE STREAM PROCESSING_LOG_STREAM (
`TYPE` INTEGER,
deserializationError STRUCT<
errorMessage STRING,
recordB64 STRING,
cause ARRAY<STRING>,
recordB64 STRING>,
`topic` STRING>,
recordProcessingError STRUCT<
errorMessage STRING,
cause ARRAY<STRING>,
record STRING>,
record STRING,
cause ARRAY<STRING>>,
productionError STRUCT<
errorMessage STRING>>)
errorMessage STRING>,
serializationError STRUCT<
errorMessage STRING,
record STRING,
cause ARRAY<STRING>,
`topic` STRING>>)
WITH (KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,25 @@ public final class ProcessingLogMessageSchema {
.optional()
.build();

public static final String SERIALIZATION_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String SERIALIZATION_ERROR_FIELD_RECORD = "record";
public static final String SERIALIZATION_ERROR_FIELD_CAUSE = "cause";
public static final String SERIALIZATION_ERROR_FIELD_TOPIC = "topic";

private static final Schema SERIALIZATION_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "SerializationError")
.field(SERIALIZATION_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_RECORD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_TOPIC, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build();

public enum MessageType {
DESERIALIZATION_ERROR(0, DESERIALIZATION_ERROR_SCHEMA),
RECORD_PROCESSING_ERROR(1, RECORD_PROCESSING_ERROR_SCHEMA),
PRODUCTION_ERROR(2, PRODUCTION_ERROR_SCHEMA);
PRODUCTION_ERROR(2, PRODUCTION_ERROR_SCHEMA),
SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA);

private final int typeId;
private final Schema schema;
Expand All @@ -84,13 +99,15 @@ public Schema getSchema() {
public static final String DESERIALIZATION_ERROR = "deserializationError";
public static final String RECORD_PROCESSING_ERROR = "recordProcessingError";
public static final String PRODUCTION_ERROR = "productionError";
public static final String SERIALIZATION_ERROR = "serializationError";

public static final Schema PROCESSING_LOG_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "ProcessingLogRecord")
.field(TYPE, Schema.OPTIONAL_INT32_SCHEMA)
.field(DESERIALIZATION_ERROR, DESERIALIZATION_ERROR_SCHEMA)
.field(RECORD_PROCESSING_ERROR, RECORD_PROCESSING_ERROR_SCHEMA)
.field(PRODUCTION_ERROR, PRODUCTION_ERROR_SCHEMA)
.field(SERIALIZATION_ERROR, SERIALIZATION_ERROR_SCHEMA)
.optional()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public void shouldBuildCorrectStreamCreateDDL() {
+ "type INT, "
+ "deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, "
+ "productionError STRUCT<errorMessage VARCHAR>"
+ "productionError STRUCT<errorMessage VARCHAR>, "
+ "serializationError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>"
+ ">"
+ ") WITH(KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.logging.processing;

import static java.util.Objects.requireNonNull;

import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serializer;

public final class LoggingSerializer<T> implements Serializer<T> {

private final Serializer<T> delegate;
private final ProcessingLogger processingLogger;

public LoggingSerializer(
final Serializer<T> delegate,
final ProcessingLogger processingLogger
) {
this.delegate = requireNonNull(delegate, "delegate");
this.processingLogger = requireNonNull(processingLogger, "processingLogger");
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
delegate.configure(configs, isKey);
}

@Override
public byte[] serialize(final String topic, final T data) {
try {
return delegate.serialize(topic, data);
} catch (final RuntimeException e) {
processingLogger.error(new SerializationError<>(e, Optional.of(data), topic));
throw e;
}
}

@Override
public void close() {
delegate.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.logging.processing;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType;
import io.confluent.ksql.util.ErrorMessageUtil;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;

public class SerializationError<T> implements ProcessingLogger.ErrorMessage {

private final Throwable exception;
private final Optional<T> record;
private final String topic;

public SerializationError(
final Throwable exception,
final Optional<T> record,
final String topic
) {
this.exception = requireNonNull(exception, "exception");
this.record = requireNonNull(record, "record");
this.topic = requireNonNull(topic, "topic");
}

@Override
public SchemaAndValue get(final ProcessingLogConfig config) {
final Struct struct = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA)
.put(ProcessingLogMessageSchema.TYPE, MessageType.SERIALIZATION_ERROR.getTypeId())
.put(ProcessingLogMessageSchema.SERIALIZATION_ERROR, serializationError(config));

return new SchemaAndValue(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA, struct);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SerializationError<?> that = (SerializationError) o;
return Objects.equals(exception, that.exception)
&& Objects.equals(record, that.record)
&& Objects.equals(topic, that.topic);
}

@Override
public int hashCode() {
return Objects.hash(exception, record, topic);
}

private Struct serializationError(final ProcessingLogConfig config) {
final Struct serializationError = new Struct(MessageType.SERIALIZATION_ERROR.getSchema())
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_MESSAGE,
exception.getMessage())
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_CAUSE,
getCause()
)
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_TOPIC,
topic
);

if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) {
serializationError.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_RECORD,
record.map(Object::toString).orElse(null)
);
}

return serializationError;
}

private List<String> getCause() {
final List<String> cause = ErrorMessageUtil.getErrorMessages(exception);
cause.remove(0);
return cause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.SchemaNotSupportedException;
import io.confluent.ksql.logging.processing.LoggingDeserializer;
import io.confluent.ksql.logging.processing.LoggingSerializer;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
Expand All @@ -45,6 +46,7 @@

public final class GenericKeySerDe implements KeySerdeFactory {

static final String SERIALIZER_LOGGER_NAME = "serializer";
static final String DESERIALIZER_LOGGER_NAME = "deserializer";

private final SerdeFactories serdeFactories;
Expand Down Expand Up @@ -130,16 +132,18 @@ private <T> Serde<Struct> createInner(
final Serde<T> serde = serdeFactories
.create(format, schema, ksqlConfig, schemaRegistryClientFactory, targetType);

final ProcessingLogger processingLogger = processingLogContext.getLoggerFactory()
final ProcessingLogger serializerProcessingLogger = processingLogContext.getLoggerFactory()
.getLogger(join(loggerNamePrefix, SERIALIZER_LOGGER_NAME));
final ProcessingLogger deserializerProcessingLogger = processingLogContext.getLoggerFactory()
.getLogger(join(loggerNamePrefix, DESERIALIZER_LOGGER_NAME));

final Serde<Struct> inner = schema.isUnwrapped()
? unwrapped(serde, schema)
: wrapped(serde, targetType);

final Serde<Struct> result = Serdes.serdeFrom(
inner.serializer(),
new LoggingDeserializer<>(inner.deserializer(), processingLogger)
new LoggingSerializer<>(inner.serializer(), serializerProcessingLogger),
new LoggingDeserializer<>(inner.deserializer(), deserializerProcessingLogger)
);

result.configure(Collections.emptyMap(), true);
Expand Down Expand Up @@ -172,7 +176,7 @@ private static <T> Serde<Struct> wrapped(
final Class<T> type
) {
if (type != Struct.class) {
throw new IllegalArgumentException("Unwrapped must be of type Struct");
throw new IllegalArgumentException("Wrapped must be of type Struct");
}

return (Serde) innerSerde;
Expand Down
Loading