Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3515: migrate json serde from connect to common #1191

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ project(':clients') {
compile libs.lz4
compile libs.snappy
compile libs.slf4jApi
compile libs.jacksonDatabind
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot this dependency change would happen. You almost definitely should ping the mailing list with this -- dependency changes on the clients lib are pretty significant, especially a widely used one like Jackson that can easily conflict (although I think they are very good about compatibility).

Another option would be to use a separate jar -- still move out of connect, but put it in something like a kafka-serde-json jar. Streams code could easily depend on that without depending on all of connect, but it wouldn't require everyone using Kafka clients to pull in Jackson. (In general I think Kafka should be broken into finer-grained jars).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should eventually break the kafka-clients jar, but I'm just not sure whether we should create a separate kafka-serde-json jar, or if we should just have a kafka-common.jar or finner grained kafka-common-serialization.jar. Since this discussion requires further discussions and go way beyond this JIRA, I would prefer to go with the first option here. cc @ewencp @ijuma

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't necessarily suggesting a specific grouping yet, just identifying the problem and a solution. I think we'd need to propose a way to break them down to the mailing list. Anything we add to kafka-clients is basically stuck there permanently (or moved to a new dependency of kafka-clients).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, so for this JIRA shall I create a KIP for dragging the dependency into kafka-clients?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, no, I feel like we're not on the same page. I don't think we want to add the dependency to clients. That would mean we'd eventually have to move it to a separate jar, which would make it source incompatible between Kafka releases. I was suggesting we should just figure out a reasonable organization of jars or use some fine-grained jars because at least those you can collapse into a single jar and maintain compatibility for awhile using empty jars that just depend on the collapsed one. I don't know how you make compatibility and migration work if you dump everything into kafka-clients.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it seems to me like introducing a kafka-serialization-json.jar is the simplest option at this stage. And yes, it seems like a simple KIP may be the way forward.


testCompile libs.bcpkix
testCompile libs.junit
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@

<subpackage name="serialization">
<allow class="org.apache.kafka.common.errors.SerializationException" />
<allow pkg="com.fasterxml.jackson" />
</subpackage>
</subpackage>

Expand Down Expand Up @@ -208,7 +209,6 @@
</subpackage>

<subpackage name="json">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.connect.storage" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.common.serialization;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;

/**
* JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
*/
public class JsonDeserializer implements Deserializer<JsonNode> {
private ObjectMapper objectMapper = new ObjectMapper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be final.

Copy link
Contributor

@stepio stepio Apr 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it can be static, as it's thread-safe.
Or an alternative option. In terms of flexibility, it's wise to move initialization to configure() method. This way you'll be able to retrieve some jackson-specific options (if necessary) from the "props" Map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.


/**
* Default constructor needed by Kafka
*/
public JsonDeserializer() {
}

@Override
public void configure(Map<String, ?> props, boolean isKey) {
}

@Override
public JsonNode deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;

JsonNode data;
try {
data = objectMapper.readTree(bytes);
} catch (Exception e) {
Copy link
Contributor

@stepio stepio Apr 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to add special handling for JsonParseException - just log it instead of rethrowing.

If such an exception is not handled properly, consuming may be blocked with any non-json message - just text, for example. I got this while playing with Kafka locally: just one simple "dummy" message from console client brought tons of exceptions to my log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

throw new SerializationException(e);
}

return data;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.common.serialization;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;

/**
* Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
* structured data without corresponding Java classes. This serializer also supports Connect schemas.
*/
public class JsonSerializer implements Serializer<JsonNode> {
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Default constructor needed by Kafka
*/
public JsonSerializer() {

}

@Override
public void configure(Map<String, ?> config, boolean isKey) {
}

@Override
public byte[] serialize(String topic, JsonNode data) {
if (data == null)
return null;

try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.JsonSerializer;
import org.apache.kafka.common.serialization.JsonDeserializer;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
*/
@Deprecated
public class JsonDeserializer implements Deserializer<JsonNode> {
private ObjectMapper objectMapper = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
* structured data without corresponding Java classes. This serializer also supports Connect schemas.
*/
@Deprecated
public class JsonSerializer implements Serializer<JsonNode> {
private final ObjectMapper objectMapper = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.common.serialization.JsonSerializer;
import org.apache.kafka.common.serialization.JsonDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
Expand Down