From 66bda1b1d9e1e073efcd1c9caa166e6817874a47 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Tue, 22 Dec 2015 20:58:10 +0530 Subject: [PATCH 01/12] Marking class org.apache.kafka.common.TopicPartition as serializable --- .../src/main/java/org/apache/kafka/common/TopicPartition.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java index 33486847a21b3..383c00d4a1cb6 100644 --- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.common; +import java.io.Serializable; + /** * A topic name and partition number */ -public final class TopicPartition { +public final class TopicPartition implements Serializable { private int hash = 0; private final int partition; From e3bfdc90b0d6f28b46359919e562d195a87e5e0c Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Wed, 23 Dec 2015 12:33:35 +0530 Subject: [PATCH 02/12] Making class OffsetAndMetadata too serializable as this can be used in the consumer and would need serialization --- .../org/apache/kafka/clients/consumer/OffsetAndMetadata.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java index 1a9304798bd2f..66b257dba3099 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java @@ -12,12 +12,14 @@ */ package org.apache.kafka.clients.consumer; +import java.io.Serializable; + /** * The Kafka offset commit API allows users to provide additional metadata (in the form of a string) * when an offset is committed. This can be useful (for example) to store information about which * node made the commit, what time the commit was made, etc. */ -public class OffsetAndMetadata { +public class OffsetAndMetadata implements Serializable { private final long offset; private final String metadata; From ea22af3dac2f633ca7a7fb411e8ef7eda7334e19 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Tue, 5 Jan 2016 22:55:24 +0530 Subject: [PATCH 03/12] Adding tests to ensure serialization compatibility of TopicPartition and OffsetAndMetadata classes --- ...rializeCompatibilityOffsetAndMetadata.java | 68 +++++++++++++++++ .../SerializeCompatibilityTopicPartition.java | 70 ++++++++++++++++++ clients/src/test/resources/oamserializedfile | Bin 0 -> 144 bytes clients/src/test/resources/tpserializedfile | Bin 0 -> 125 bytes 4 files changed, 138 insertions(+) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java create mode 100644 clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java create mode 100644 clients/src/test/resources/oamserializedfile create mode 100644 clients/src/test/resources/tpserializedfile diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java new file mode 100644 index 0000000000000..0a150d27df8e8 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectInputStream; +import java.io.IOException; +import java.io.File; +import java.io.FileInputStream; + + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class SerializeCompatibilityOffsetAndMetadata { + public static String metadata = "test commit metadata"; + public static long offset = 10; + public static String fileName = "oamserializedfile"; + + @Test + public void testOffsetMetadataSerialization() throws IOException, ClassNotFoundException { + //assert OffsetAndMetadata is serializable + OffsetAndMetadata origOAM = new OffsetAndMetadata(offset, metadata); + + ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); + ooStream.writeObject(origOAM); + arrayOutputStream.close(); + ooStream.close(); + + + // assert serialized OffsetAndMetadata object in file (oamserializedfile under resources folder) is + // de-serializable into OffsetAndMetadata and compatible + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(classLoader.getResource(fileName).getFile()); + FileInputStream fis = new FileInputStream(file); + + ObjectInputStream inputStream = new ObjectInputStream(fis); + + Object deserializedObject = inputStream.readObject(); + + assertTrue(deserializedObject instanceof OffsetAndMetadata); + + if (deserializedObject instanceof OffsetAndMetadata) { + //assert metadata is of type String + assertTrue(((OffsetAndMetadata) deserializedObject).metadata() instanceof String); + + //assert de-serialized values are same as original + //not using assertEquals for offset to ensure the type casting will catch any change in datatype + assertTrue("Offset should be " + offset + " and of type long. Got " + ((OffsetAndMetadata) deserializedObject).offset(), offset == (long) ((OffsetAndMetadata) deserializedObject).offset()); + assertEquals("metadata should be " + metadata + " but got " + ((OffsetAndMetadata) deserializedObject).metadata(), metadata, ((OffsetAndMetadata) deserializedObject).metadata()); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java new file mode 100644 index 0000000000000..abb46f2e932e4 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common; + +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectInputStream; +import java.io.IOException; +import java.io.FileInputStream; +import java.io.File; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class SerializeCompatibilityTopicPartition { + + public static String topicName = "mytopic"; + public static int partNum = 5; + public static String fileName = "tpserializedfile"; + + @Test + public void testTopicPartitionSerialization() throws IOException, ClassNotFoundException { + //assert TopicPartition is serializable and de-serialization renders the clone of original properly + TopicPartition origTp = new TopicPartition(topicName, partNum); + + ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); + ooStream.writeObject(origTp); + arrayOutputStream.close(); + ooStream.close(); + + // assert serialized TopicPartition object in file (tpserializedfile under resources folder) is + // de-serializable into TopicPartition and compatible + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(classLoader.getResource(fileName).getFile()); + FileInputStream fis = new FileInputStream(file); + + ObjectInputStream inputStream = new ObjectInputStream(fis); + + Object deserializedObject = inputStream.readObject(); + + fis.close(); + inputStream.close(); + + assertTrue(deserializedObject instanceof TopicPartition); + + if (deserializedObject instanceof TopicPartition) { + //assert topic is of type String + assertTrue("topic should be of type String", ((TopicPartition) deserializedObject).topic() instanceof String); + + //assert de-serialized values are same as original + //not using assertEquals for partition number to ensure the type casting will catch any change in datatype + assertTrue("partition number should be " + partNum + " and of type int. Got " + ((TopicPartition) deserializedObject).partition(), partNum == (int) ((TopicPartition) deserializedObject).partition()); + assertEquals("topic should be " + topicName + " but got " + ((TopicPartition) deserializedObject).topic(), topicName, ((TopicPartition) deserializedObject).topic()); + } + } +} diff --git a/clients/src/test/resources/oamserializedfile b/clients/src/test/resources/oamserializedfile new file mode 100644 index 0000000000000000000000000000000000000000..95319cb592d2dbd1081fd1bea640311c55baa93a GIT binary patch literal 144 zcmZ4UmVvdnh`~6&C|xhHATc>3RWCa+Ejv*!IVUqUucTNnIX|zsG&i+K&p$1#IJLwv zFU2>tBrzqiBvFR#*@vs)Hv*X$n7kO+^1;$R3>>*o=@JGZpRB~PME#t^ymbBGlA_GK dbnA)&1_3RWCa+Ejv*!IX^cyKTj_tzaTR?AhD< Date: Fri, 8 Jan 2016 15:30:36 +0530 Subject: [PATCH 04/12] Avoid typecasting multiple times --- .../consumer/SerializeCompatibilityOffsetAndMetadata.java | 7 ++++--- .../kafka/common/SerializeCompatibilityTopicPartition.java | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java index 0a150d27df8e8..aa1217b1a8824 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java @@ -56,13 +56,14 @@ public void testOffsetMetadataSerialization() throws IOException, ClassNotFoundE assertTrue(deserializedObject instanceof OffsetAndMetadata); if (deserializedObject instanceof OffsetAndMetadata) { + OffsetAndMetadata deSerOAM = (OffsetAndMetadata) deserializedObject; //assert metadata is of type String - assertTrue(((OffsetAndMetadata) deserializedObject).metadata() instanceof String); + assertTrue(deSerOAM.metadata() instanceof String); //assert de-serialized values are same as original //not using assertEquals for offset to ensure the type casting will catch any change in datatype - assertTrue("Offset should be " + offset + " and of type long. Got " + ((OffsetAndMetadata) deserializedObject).offset(), offset == (long) ((OffsetAndMetadata) deserializedObject).offset()); - assertEquals("metadata should be " + metadata + " but got " + ((OffsetAndMetadata) deserializedObject).metadata(), metadata, ((OffsetAndMetadata) deserializedObject).metadata()); + assertTrue("Offset should be " + offset + " and of type long. Got " + deSerOAM.offset(), offset == (long) deSerOAM.offset()); + assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(), metadata, deSerOAM.metadata()); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java index abb46f2e932e4..d939e4a058caf 100644 --- a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java +++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java @@ -58,13 +58,14 @@ public void testTopicPartitionSerialization() throws IOException, ClassNotFoundE assertTrue(deserializedObject instanceof TopicPartition); if (deserializedObject instanceof TopicPartition) { + TopicPartition deSerTP = (TopicPartition) deserializedObject; //assert topic is of type String - assertTrue("topic should be of type String", ((TopicPartition) deserializedObject).topic() instanceof String); + assertTrue("topic should be of type String", deSerTP.topic() instanceof String); //assert de-serialized values are same as original //not using assertEquals for partition number to ensure the type casting will catch any change in datatype - assertTrue("partition number should be " + partNum + " and of type int. Got " + ((TopicPartition) deserializedObject).partition(), partNum == (int) ((TopicPartition) deserializedObject).partition()); - assertEquals("topic should be " + topicName + " but got " + ((TopicPartition) deserializedObject).topic(), topicName, ((TopicPartition) deserializedObject).topic()); + assertTrue("partition number should be " + partNum + " and of type int. Got " + deSerTP.partition(), partNum == (int) deSerTP.partition()); + assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName, deSerTP.topic()); } } } From 4e16d33772bda001265f02ce49f0eab28bc61fd1 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Thu, 14 Jan 2016 15:56:34 +0530 Subject: [PATCH 05/12] Moving common (de)serialize function into common base class --- ...rializeCompatibilityOffsetAndMetadata.java | 24 ++------- .../SerializeCompatibilityTopicPartition.java | 27 ++-------- .../apache/kafka/common/utils/Serializer.java | 50 ++++++++++++++++++ .../test/resources/serializedData/.DS_Store | Bin 0 -> 6148 bytes .../offsetAndMetadataSerializedfile} | Bin .../topicPartitionSerializedfile} | Bin 6 files changed, 60 insertions(+), 41 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/utils/Serializer.java create mode 100644 clients/src/test/resources/serializedData/.DS_Store rename clients/src/test/resources/{oamserializedfile => serializedData/offsetAndMetadataSerializedfile} (100%) rename clients/src/test/resources/{tpserializedfile => serializedData/topicPartitionSerializedfile} (100%) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java index aa1217b1a8824..7cf05c2fcbe3b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java @@ -13,45 +13,31 @@ package org.apache.kafka.clients.consumer; +import org.apache.kafka.common.utils.Serializer; import org.junit.Test; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectInputStream; import java.io.IOException; -import java.io.File; -import java.io.FileInputStream; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; -public class SerializeCompatibilityOffsetAndMetadata { +public class SerializeCompatibilityOffsetAndMetadata extends Serializer { public static String metadata = "test commit metadata"; public static long offset = 10; - public static String fileName = "oamserializedfile"; + public static String fileName = "serializedData/offsetAndMetadataSerializedfile"; @Test public void testOffsetMetadataSerialization() throws IOException, ClassNotFoundException { //assert OffsetAndMetadata is serializable OffsetAndMetadata origOAM = new OffsetAndMetadata(offset, metadata); - ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); - ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); - ooStream.writeObject(origOAM); - arrayOutputStream.close(); - ooStream.close(); + serialize(origOAM); // assert serialized OffsetAndMetadata object in file (oamserializedfile under resources folder) is // de-serializable into OffsetAndMetadata and compatible - ClassLoader classLoader = getClass().getClassLoader(); - File file = new File(classLoader.getResource(fileName).getFile()); - FileInputStream fis = new FileInputStream(file); - - ObjectInputStream inputStream = new ObjectInputStream(fis); - - Object deserializedObject = inputStream.readObject(); + Object deserializedObject = deSerialize(fileName); assertTrue(deserializedObject instanceof OffsetAndMetadata); diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java index d939e4a058caf..d5c28a7310c27 100644 --- a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java +++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java @@ -13,47 +13,30 @@ package org.apache.kafka.common; +import org.apache.kafka.common.utils.Serializer; import org.junit.Test; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectInputStream; import java.io.IOException; -import java.io.FileInputStream; -import java.io.File; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; -public class SerializeCompatibilityTopicPartition { +public class SerializeCompatibilityTopicPartition extends Serializer { public static String topicName = "mytopic"; public static int partNum = 5; - public static String fileName = "tpserializedfile"; + public static String fileName = "serializedData/topicPartitionSerializedfile"; @Test public void testTopicPartitionSerialization() throws IOException, ClassNotFoundException { //assert TopicPartition is serializable and de-serialization renders the clone of original properly TopicPartition origTp = new TopicPartition(topicName, partNum); - ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); - ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); - ooStream.writeObject(origTp); - arrayOutputStream.close(); - ooStream.close(); + serialize(origTp); // assert serialized TopicPartition object in file (tpserializedfile under resources folder) is // de-serializable into TopicPartition and compatible - ClassLoader classLoader = getClass().getClassLoader(); - File file = new File(classLoader.getResource(fileName).getFile()); - FileInputStream fis = new FileInputStream(file); - - ObjectInputStream inputStream = new ObjectInputStream(fis); - - Object deserializedObject = inputStream.readObject(); - - fis.close(); - inputStream.close(); + Object deserializedObject = deSerialize(fileName); assertTrue(deserializedObject instanceof TopicPartition); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java new file mode 100644 index 0000000000000..a88afe5813851 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.ObjectInputStream; + +/** + * Created by praveendevarao on 14/01/16. + */ +public class Serializer { + + public void serialize(Object toSerialize) throws IOException { + ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); + ooStream.writeObject(toSerialize); + arrayOutputStream.close(); + ooStream.close(); + } + + public Object deSerialize(String fileName) throws IOException, ClassNotFoundException { + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(classLoader.getResource(fileName).getFile()); + FileInputStream fis = new FileInputStream(file); + + ObjectInputStream inputStream = new ObjectInputStream(fis); + + Object deserializedObject = inputStream.readObject(); + + fis.close(); + inputStream.close(); + + return deserializedObject; + } +} diff --git a/clients/src/test/resources/serializedData/.DS_Store b/clients/src/test/resources/serializedData/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 Date: Thu, 14 Jan 2016 15:57:43 +0530 Subject: [PATCH 06/12] Removed .DS_Store --- .../src/test/resources/serializedData/.DS_Store | Bin 6148 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 clients/src/test/resources/serializedData/.DS_Store diff --git a/clients/src/test/resources/serializedData/.DS_Store b/clients/src/test/resources/serializedData/.DS_Store deleted file mode 100644 index 5008ddfcf53c02e82d7eee2e57c38e5672ef89f6..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 Date: Fri, 15 Jan 2016 17:53:30 +0530 Subject: [PATCH 07/12] breaking serialization test into 2 types. 1) for serialization and deserialization of current version. 2) for cpmpatibility --- ...rializeCompatibilityOffsetAndMetadata.java | 43 ++++++++++------- .../SerializeCompatibilityTopicPartition.java | 47 ++++++++++++------- .../apache/kafka/common/utils/Serializer.java | 27 ++++++++--- 3 files changed, 78 insertions(+), 39 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java index 7cf05c2fcbe3b..121e6dc31b92f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java @@ -22,34 +22,45 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +/** + * This test case ensures OffsetAndMetadata class is serializable and is serialization compatible. + * Note: this ensures only serialization compatibility backwards but would not be able to ensure the reverse i.e. + * deserialize a object of new version of the class into a Old version class will not be ensured. + */ public class SerializeCompatibilityOffsetAndMetadata extends Serializer { - public static String metadata = "test commit metadata"; - public static long offset = 10; - public static String fileName = "serializedData/offsetAndMetadataSerializedfile"; + private String metadata = "test commit metadata"; + private long offset = 10; + private String fileName = "serializedData/offsetAndMetadataSerializedfile"; + + private void checkValues(OffsetAndMetadata deSerOAM) { + //assert deserialized values are same as original + //not using assertEquals for offset to ensure the type casting will catch any change in datatype + assertTrue("Offset should be " + offset + " and of type long. Got " + deSerOAM.offset(), offset == (long) deSerOAM.offset()); + assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(), metadata, deSerOAM.metadata()); + } @Test public void testOffsetMetadataSerialization() throws IOException, ClassNotFoundException { //assert OffsetAndMetadata is serializable OffsetAndMetadata origOAM = new OffsetAndMetadata(offset, metadata); + byte[] byteArray = serialize(origOAM); - serialize(origOAM); + //deserialize the byteArray and check if the values are same as original + Object deserializedObject = deserialize(byteArray); + assertTrue(deserializedObject instanceof OffsetAndMetadata); + checkValues((OffsetAndMetadata) deserializedObject); + } + + @Test + public void testOffsetMetadataSerializationCompatibility() throws IOException, ClassNotFoundException { // assert serialized OffsetAndMetadata object in file (oamserializedfile under resources folder) is - // de-serializable into OffsetAndMetadata and compatible - Object deserializedObject = deSerialize(fileName); + // deserializable into OffsetAndMetadata and is compatible + Object deserializedObject = deserialize(fileName); assertTrue(deserializedObject instanceof OffsetAndMetadata); - if (deserializedObject instanceof OffsetAndMetadata) { - OffsetAndMetadata deSerOAM = (OffsetAndMetadata) deserializedObject; - //assert metadata is of type String - assertTrue(deSerOAM.metadata() instanceof String); - - //assert de-serialized values are same as original - //not using assertEquals for offset to ensure the type casting will catch any change in datatype - assertTrue("Offset should be " + offset + " and of type long. Got " + deSerOAM.offset(), offset == (long) deSerOAM.offset()); - assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(), metadata, deSerOAM.metadata()); - } + checkValues((OffsetAndMetadata) deserializedObject); } } diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java index d5c28a7310c27..95b6f171d83ae 100644 --- a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java +++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java @@ -21,34 +21,47 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +/** + * This test ensures TopicPartition class is serializable and is serialization compatible. + * Note: this ensures only serialization compatibility backwards but would not be able to ensure the reverse i.e. + * deserialize a object of new version of the class into a Old version class will not be ensured. + */ public class SerializeCompatibilityTopicPartition extends Serializer { - public static String topicName = "mytopic"; - public static int partNum = 5; - public static String fileName = "serializedData/topicPartitionSerializedfile"; + private String topicName = "mytopic"; + private int partNum = 5; + private String fileName = "serializedData/topicPartitionSerializedfile"; + + private void checkValues(TopicPartition deSerTP) { + //assert deserialized values are same as original + //not using assertEquals for partition number to ensure the type casting will catch any change in datatype + assertTrue("partition number should be " + partNum + " and of type int. Got " + deSerTP.partition(), partNum == (int) deSerTP.partition()); + assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName, deSerTP.topic()); + } @Test public void testTopicPartitionSerialization() throws IOException, ClassNotFoundException { - //assert TopicPartition is serializable and de-serialization renders the clone of original properly + //assert TopicPartition is serializable and deserialization renders the clone of original properly TopicPartition origTp = new TopicPartition(topicName, partNum); + byte[] byteArray = serialize(origTp); - serialize(origTp); - // assert serialized TopicPartition object in file (tpserializedfile under resources folder) is - // de-serializable into TopicPartition and compatible - Object deserializedObject = deSerialize(fileName); + //deserialize the byteArray and check if the values are same as original + Object deserializedObject = deserialize(byteArray); assertTrue(deserializedObject instanceof TopicPartition); - if (deserializedObject instanceof TopicPartition) { - TopicPartition deSerTP = (TopicPartition) deserializedObject; - //assert topic is of type String - assertTrue("topic should be of type String", deSerTP.topic() instanceof String); + checkValues((TopicPartition) deserializedObject); + } + + @Test + public void testTopiPartitionSerializationCompatibility() throws IOException, ClassNotFoundException { + // assert serialized TopicPartition object in file (serializedData/topicPartitionSerializedfile) is + // deserializable into TopicPartition and is compatible + Object deserializedObject = deserialize(fileName); + + assertTrue(deserializedObject instanceof TopicPartition); - //assert de-serialized values are same as original - //not using assertEquals for partition number to ensure the type casting will catch any change in datatype - assertTrue("partition number should be " + partNum + " and of type int. Got " + deSerTP.partition(), partNum == (int) deSerTP.partition()); - assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName, deSerTP.topic()); - } + checkValues((TopicPartition) deserializedObject); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java index a88afe5813851..24ebb63c85251 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java @@ -15,25 +15,40 @@ import java.io.IOException; import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; import java.io.ObjectOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.ObjectInputStream; -/** - * Created by praveendevarao on 14/01/16. - */ public class Serializer { - public void serialize(Object toSerialize) throws IOException { + public byte[] serialize(Object toSerialize) throws IOException { + byte[] byteArray = null; ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); + ooStream.writeObject(toSerialize); - arrayOutputStream.close(); + byteArray = arrayOutputStream.toByteArray(); + ooStream.close(); + arrayOutputStream.close(); + return byteArray; + } + + public Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException { + ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray); + ObjectInputStream inputStream = new ObjectInputStream(arrayInputStream); + + Object deserializedObject = inputStream.readObject(); + + inputStream.close(); + arrayInputStream.close(); + + return deserializedObject; } - public Object deSerialize(String fileName) throws IOException, ClassNotFoundException { + public Object deserialize(String fileName) throws IOException, ClassNotFoundException { ClassLoader classLoader = getClass().getClassLoader(); File file = new File(classLoader.getResource(fileName).getFile()); FileInputStream fis = new FileInputStream(file); From 42dc60c56fc75cde6ae9797de0c2c367d8c23565 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Thu, 28 Jan 2016 11:50:05 +0530 Subject: [PATCH 08/12] Changes for code clean-up and better organization of code --- ...rializeCompatibilityOffsetAndMetadata.java | 24 +++++------- .../SerializeCompatibilityTopicPartition.java | 25 +++++------- .../apache/kafka/common/utils/Serializer.java | 39 ++++++++----------- 3 files changed, 37 insertions(+), 51 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java index 121e6dc31b92f..b197ddc96d2ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java @@ -24,32 +24,30 @@ /** * This test case ensures OffsetAndMetadata class is serializable and is serialization compatible. - * Note: this ensures only serialization compatibility backwards but would not be able to ensure the reverse i.e. - * deserialize a object of new version of the class into a Old version class will not be ensured. + * Note: this ensures that the current code can deserialize data serialized with older versions of the code, but not the reverse. + * That is, older code won't necessarily be able to deserialize data serialized with newer code. */ -public class SerializeCompatibilityOffsetAndMetadata extends Serializer { +public class SerializeCompatibilityOffsetAndMetadataTest { private String metadata = "test commit metadata"; - private long offset = 10; - private String fileName = "serializedData/offsetAndMetadataSerializedfile"; + private String fileName = "serializedData/offsetAndMetadataSerializedfile"; + private long offset = 10; private void checkValues(OffsetAndMetadata deSerOAM) { //assert deserialized values are same as original //not using assertEquals for offset to ensure the type casting will catch any change in datatype - assertTrue("Offset should be " + offset + " and of type long. Got " + deSerOAM.offset(), offset == (long) deSerOAM.offset()); + assertTrue("Offset should be " + offset + " but got " + deSerOAM.offset(), offset == deSerOAM.offset()); assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(), metadata, deSerOAM.metadata()); } @Test - public void testOffsetMetadataSerialization() throws IOException, ClassNotFoundException { + public void testSerializationRoundtrip() throws IOException, ClassNotFoundException { //assert OffsetAndMetadata is serializable OffsetAndMetadata origOAM = new OffsetAndMetadata(offset, metadata); - byte[] byteArray = serialize(origOAM); + byte[] byteArray = Serializer.serialize(origOAM); //deserialize the byteArray and check if the values are same as original - Object deserializedObject = deserialize(byteArray); - + Object deserializedObject = Serializer.deserialize(byteArray); assertTrue(deserializedObject instanceof OffsetAndMetadata); - checkValues((OffsetAndMetadata) deserializedObject); } @@ -57,10 +55,8 @@ public void testOffsetMetadataSerialization() throws IOException, ClassNotFoundE public void testOffsetMetadataSerializationCompatibility() throws IOException, ClassNotFoundException { // assert serialized OffsetAndMetadata object in file (oamserializedfile under resources folder) is // deserializable into OffsetAndMetadata and is compatible - Object deserializedObject = deserialize(fileName); - + Object deserializedObject = Serializer.deserialize(fileName); assertTrue(deserializedObject instanceof OffsetAndMetadata); - checkValues((OffsetAndMetadata) deserializedObject); } } diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java index 95b6f171d83ae..8314746c54463 100644 --- a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java +++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java @@ -23,34 +23,31 @@ /** * This test ensures TopicPartition class is serializable and is serialization compatible. - * Note: this ensures only serialization compatibility backwards but would not be able to ensure the reverse i.e. - * deserialize a object of new version of the class into a Old version class will not be ensured. + * Note: this ensures that the current code can deserialize data serialized with older versions of the code, but not the reverse. + * That is, older code won't necessarily be able to deserialize data serialized with newer code. */ -public class SerializeCompatibilityTopicPartition extends Serializer { +public class SerializeCompatibilityTopicPartitionTest { private String topicName = "mytopic"; - private int partNum = 5; - private String fileName = "serializedData/topicPartitionSerializedfile"; + private String fileName = "serializedData/topicPartitionSerializedfile"; + private int partNum = 5; private void checkValues(TopicPartition deSerTP) { //assert deserialized values are same as original //not using assertEquals for partition number to ensure the type casting will catch any change in datatype - assertTrue("partition number should be " + partNum + " and of type int. Got " + deSerTP.partition(), partNum == (int) deSerTP.partition()); + assertTrue("partition number should be " + partNum + " but got " + deSerTP.partition(), partNum == deSerTP.partition()); assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName, deSerTP.topic()); } @Test - public void testTopicPartitionSerialization() throws IOException, ClassNotFoundException { + public void testSerializationRoundtrip() throws IOException, ClassNotFoundException { //assert TopicPartition is serializable and deserialization renders the clone of original properly TopicPartition origTp = new TopicPartition(topicName, partNum); - byte[] byteArray = serialize(origTp); - + byte[] byteArray = Serializer.serialize(origTp); //deserialize the byteArray and check if the values are same as original - Object deserializedObject = deserialize(byteArray); - + Object deserializedObject = Serializer.deserialize(byteArray); assertTrue(deserializedObject instanceof TopicPartition); - checkValues((TopicPartition) deserializedObject); } @@ -58,10 +55,8 @@ public void testTopicPartitionSerialization() throws IOException, ClassNotFoundE public void testTopiPartitionSerializationCompatibility() throws IOException, ClassNotFoundException { // assert serialized TopicPartition object in file (serializedData/topicPartitionSerializedfile) is // deserializable into TopicPartition and is compatible - Object deserializedObject = deserialize(fileName); - + Object deserializedObject = Serializer.deserialize(fileName); assertTrue(deserializedObject instanceof TopicPartition); - checkValues((TopicPartition) deserializedObject); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java index 24ebb63c85251..2dde0912f5a04 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java @@ -13,17 +13,18 @@ package org.apache.kafka.common.utils; -import java.io.IOException; -import java.io.ByteArrayOutputStream; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.ObjectInputStream; public class Serializer { - public byte[] serialize(Object toSerialize) throws IOException { + public static byte[] serialize(Object toSerialize) throws IOException { byte[] byteArray = null; ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); @@ -36,30 +37,24 @@ public byte[] serialize(Object toSerialize) throws IOException { return byteArray; } - public Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException { - ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray); - ObjectInputStream inputStream = new ObjectInputStream(arrayInputStream); - - Object deserializedObject = inputStream.readObject(); - + public static Object deserialize(InputStream inputStream) throws IOException, ClassNotFoundException { + ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); + Object deserializedObject = objectInputStream.readObject(); + objectInputStream.close(); inputStream.close(); - arrayInputStream.close(); - return deserializedObject; } - public Object deserialize(String fileName) throws IOException, ClassNotFoundException { - ClassLoader classLoader = getClass().getClassLoader(); + public static Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException { + ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray); + return deserialize(arrayInputStream); + } + + public static Object deserialize(String fileName) throws IOException, ClassNotFoundException { + ClassLoader classLoader = Serializer.class.getClassLoader(); File file = new File(classLoader.getResource(fileName).getFile()); FileInputStream fis = new FileInputStream(file); - ObjectInputStream inputStream = new ObjectInputStream(fis); - - Object deserializedObject = inputStream.readObject(); - - fis.close(); - inputStream.close(); - - return deserializedObject; + return deserialize(fis); } } From 5db55dde0ff2ec60bde25470217fdc191147fa09 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Thu, 28 Jan 2016 12:44:44 +0530 Subject: [PATCH 09/12] Changing file name in accordance to class name refactor --- ...data.java => SerializeCompatibilityOffsetAndMetadataTest.java} | 0 ...rtition.java => SerializeCompatibilityTopicPartitionTest.java} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename clients/src/test/java/org/apache/kafka/clients/consumer/{SerializeCompatibilityOffsetAndMetadata.java => SerializeCompatibilityOffsetAndMetadataTest.java} (100%) rename clients/src/test/java/org/apache/kafka/common/{SerializeCompatibilityTopicPartition.java => SerializeCompatibilityTopicPartitionTest.java} (100%) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java similarity index 100% rename from clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadata.java rename to clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java similarity index 100% rename from clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartition.java rename to clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java From 52831707b4012585088484722250bf50843c2749 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Fri, 29 Jan 2016 16:22:41 +0530 Subject: [PATCH 10/12] Changes to use try-with-resources --- .DS_Store | Bin 0 -> 8196 bytes clients/.DS_Store | Bin 0 -> 6148 bytes clients/src/.DS_Store | Bin 0 -> 6148 bytes clients/src/test/.DS_Store | Bin 0 -> 6148 bytes clients/src/test/java/.DS_Store | Bin 0 -> 6148 bytes clients/src/test/java/org/.DS_Store | Bin 0 -> 6148 bytes clients/src/test/java/org/apache/.DS_Store | Bin 0 -> 6148 bytes .../src/test/java/org/apache/kafka/.DS_Store | Bin 0 -> 6148 bytes .../java/org/apache/kafka/clients/.DS_Store | Bin 0 -> 6148 bytes .../apache/kafka/clients/consumer/.DS_Store | Bin 0 -> 6148 bytes ...izeCompatibilityOffsetAndMetadataTest.java | 3 +- ...ializeCompatibilityTopicPartitionTest.java | 3 +- .../apache/kafka/common/utils/Serializer.java | 38 +++++++----------- 13 files changed, 17 insertions(+), 27 deletions(-) create mode 100644 .DS_Store create mode 100644 clients/.DS_Store create mode 100644 clients/src/.DS_Store create mode 100644 clients/src/test/.DS_Store create mode 100644 clients/src/test/java/.DS_Store create mode 100644 clients/src/test/java/org/.DS_Store create mode 100644 clients/src/test/java/org/apache/.DS_Store create mode 100644 clients/src/test/java/org/apache/kafka/.DS_Store create mode 100644 clients/src/test/java/org/apache/kafka/clients/.DS_Store create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/.DS_Store diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..055ff3bfb579f02f6d7f1cef84490122d32d2dd6 GIT binary patch literal 8196 zcmeHM-A>d%6g~s%?g9qasL{lm^-dEos2KEaT|g!N5mr&723xkU)!J^lg_RJJz4i&b z_Wv=A(ZmZMK;OVe@D9H-Q^-s!T$xDX4C$Qd^!$G3bY`azk*KdU&k!9WqB1U~l_7M0 zX#AY7Tz}UPkU)MsAQ6sYM?8r_`Z@*77o9E+bO{cK}-+g^=i&7Q}bv z#w9j4(Gz93-EOCyeuwzwwgbPDjE(&eef@=f#Zq}-aESjZ2L~TDT2Z%=ILS(>nQg|j2obK@5`Myl%*6JsOQ@pBU!8wE*KJ&#st6g8R1s?1agMJ4JAWE z*NmAnVb~ioxna>oWSQ)9UJR)sU8QNdL{sSJObaq2KaGeu11b-rz>FhVMn5WJ;|SL> z8aOYyh9feETN>KzAL^$C4X7Yg2cy!JSY#x%a^esug{Ple3cJjDU2I z<7EGJoh4-3Nh=n6Mg~{pp#OsA?-0%F+;SEC<4$Hs4WpKL*HhV!cSDFHF=Vkw?zz^l zJUYhb|LL8-|L-}!twR<8i@@JQKoq7LQ+4Ru`Z<|Z&1dZ@uE)5zF<(znDS}QO$3f+B o9CZ61h8S1D$}+W-_7ueo+CTUZfKLQ=|3}L7UnO?`|IhaS-$Nt52LJ#7 literal 0 HcmV?d00001 diff --git a/clients/.DS_Store b/clients/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..6dc3226d9279e183a0b16fb51f2f0718e9f1224a GIT binary patch literal 6148 zcmeHK&1%~~5Z<+&MoumYZAhUP9b53h`NQo^b<#o)1%paLX^JaFqFxYMBgrwr7YR)f;p7=AGukB3|y&!uw#L$3YpEqoNz;@64TwlZfnISPo|P8?Dm|JB)TzGVZ$% zwscm8DogS|QulUM7VB|WkFvPf3+#BiDlP|oce$4*2awtevt&mVDpIK`51l4neRpLt zdAio});FF_Ti&G8X}7$M_0Dwa{&bv2Pd1;wemk5Vefs?6>$mSee*HG{6%xK_Blihs za0~{qaS-HLsG4A73{EZ*K%fHqZ9i6N<)_x!^p8qdyuK!ICH_QMta5ou{jqPB&3v;8h^~x;qto1-Y rfTCbrW$_{f409AiEFQ%VL5+alLj%w!tSo{DgntAK4cssTf0coA%d%)b literal 0 HcmV?d00001 diff --git a/clients/src/.DS_Store b/clients/src/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..baca01251a21f70aeb9ee4c9dc38ef0355b7e1a4 GIT binary patch literal 6148 zcmeHKUyIW~5TETuP48YJDhj^jQK1jjJE^|K-YNJX6e4$sCp9s_F2rnUwpWf)$g>~A z{~tp{-~0l86W{90?zY%FeJv_8VfHsWJG0CF5_U5HAiCr120#D+J}P0OiOp|>#z{{} z!+AVJL32DP)bq4ZDfVeXmcO3?dUqbYgajm%C*I$E9J@9*Vm9J|h8!x;C*r@pc$t+Ku-AnXf zVLwu;SlUUtFY`%1xVWoIoyamD{gQfkAj?cmx@ufz)nRPsJCKFEcTr_q+_@88G;hYR?M#*(+AcvKdue1X12{34E#8DIvOfps$A&LDqlos05X%m6d+ zPcT5|Lx@TkIjk+3qXQdL0wB_Fq!F~uuRmf)4#db|Z4o^v!lokHRN+bt;VK7V({Vg< z@wG*p4#KRA^SG6Tt5Ae1JVa>1K|~h0Wd@jmRR%U}H=z6f-k)sRgAH86<W75j`mUBVcIYh8b8Z G13v*$+IB4f literal 0 HcmV?d00001 diff --git a/clients/src/test/.DS_Store b/clients/src/test/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..dfb1d5fd41b2db32d9aa7ec617d76d743db8e264 GIT binary patch literal 6148 zcmeHK&2G~`5S~o~IPJv<5J+5nY^59mEehOBD3v%MMRL`uqA1w4MHX4!$aWH;D3Z@S z20w+zK#1P@0)3O-(wW_DL|eE<6`HYTzuDQDUG2Bl{s4gJ4C6-tE&w>FgtZ11e-UaY zy&^U1sUr&N<5j8-qEtm#rw&=JKLfOOb@%`wj3B(Uep}eKiYjI^J&}1_G%f~qolm1d zDvPq!IuW&{`i+~noW}ABeeUh$w?SWxgEB0KMJLSP=$ed^$lBeo?4R4)N~Lo6b2Ftr5Ny}|U;=PzHs9e)4u>$jdU2>h?L z+_88LM`+CX7zKHjs_Z@bD82zXBv62YAzBAyUt;zcqgH2H&|+Z*m;q*B(G1wV%~@IW zz&su^zzke_255iqPzimDl|j9AV4usP9loD6TR%PeDT;#TZLR@dH#X==aD#^et8f(SyPl N0ZjuJ%)mk!I0IYR)f;p7=AGukB3|y&!uw#L$3YpEqoNz;@64TwlZfnISPo|P8?Dm|JB)TzGVZ$% zwscm8DogS|QulUM7VB|WkFvPf3+#BiDlP|oce$4*2awtevt&mVDpIK`51l4neRpLt zdAio});FF_Ti&G8X}7$M_0Dwa{&bv2Pd1;wemk5Vefs?6>$mSee*HG{6%xK_Blihs za0~{qaS-HLs@|!UO%)mcl0QUzE6wxQFESjwY z8dCxQHo$EJZ1eLE=%EA9C#)=j2SnObK$|M5#E?`wNShA*KE+oSZMu-=$uJH*SyF`} zsq~PZ2^Z40$SpI#3|wXaYd;Vd&;J)U*Z(Gn8)kqRxSI^f#&)pXg}Kq$dS#Y))_R~H rKv6KRvUrgKhB=BM7LVeGphm#&p#kULzIRV#7XBS<-fKLu_k&`KPTA{l{DL8*KC>6@7@l2{)M8-JpiRwTjS;HS>+fGi&=>YuCdV?y#RrMH2on`fWK&=o8Nj)lWdZAeUa;}`!;j(}=n5#my#@OqH|%E}JJ&a1 zCy`lRtNrDrnevO-mriAF9uIe6?%Z#yzMltqFKYzd_xemmacKQ!khe$vky6RMAB4v; z?zh~fJ(cExOyll9rDrEHja0v(dTErM`DVNm8RhMk`>NTEFQK#~P2zSeKeIDFq) z7A=n84>)7|UHaWLQRzpBk7`D_gv0=sAc1Vqj_*!1Y0ZB6=1J zgL>%zqgw#LGK95&kH3vrBP@Cr3xjY4gqu=8Q_5|N!A&{Dh0XIU76wf@%~I*AN1Kny%j26%bj-*3RX*}Zk*Epe|E tpdUa{Ft0FpUjmNaiXj)b;y0jLATH1W^eh$zAp$}_0-6Rg#K3=L;0|u4Z3h4V literal 0 HcmV?d00001 diff --git a/clients/src/test/java/org/apache/.DS_Store b/clients/src/test/java/org/apache/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..785f366f60f52ecb613ebdcc98533398320ccf71 GIT binary patch literal 6148 zcmeHK&1%~~5Z-m1R!%M|O=%&QJtp9T+oU+Tscss`Az)BQC{1ysO0Aa)t+C{oVhlR> zF`D1U2qAfhyh(1!%LF~%Fc=q+O|W6S|XELFh#MesW6 zgp_P#29WD9f~`b-3lkMiic<+_p(ON{i4rg6o=Mt26=nr?%zqgw#LGK95&k3WuBBP@Cr3xjY4gqu=8Q_5|N!A&{Dh0XIU76wf@Kx5T|x tfF6LNU|wPHp#&Vg6+9eKn4CqDN&X89p;tBFj_fcN5(|_Ma$K z%>5+Y6ZyCoUb?5GPK3;dCrS_Yh0N5rqee0-4`MsszQ}aH7hdiT^CK+nCNkd>rAS2~ z^y{EOtruRIOm5$7^6i~F(YIJqtF^RPBpA1D=8N%mA$q9F;I~SXnen2O4t(K&)ff2>SHX zNHyGHpZQAA2BL{YG7s~Iq!$aWH;C{osZ z0c+l$fe`HY0QeJj;mo5bk5=7O2vu{fJLh`l-fR2J%R@x0)(Ni@xkThZB^HV(4iMQc zbw=h)jcjBpM?yJ0f^v#yylsVLz%uZ^F(BveGOc00a*Akt{${j^nhdk~8a*eUy5!%F z#cmLb05NjGxCgy5QUXgZ61yajBcl$zL~J_c|3msi*=Qu)XZ?d-);9Tn2kn?=dAa?E)}0PTB6s;{k)Ua{Pc;s<6#sSyYA<$eS1@g_^<5;TRiGD-Q_ir z;{x6Q-6)pXC-)2NHkx}QW_JmUe6dH%{-kXO^a*ze!E zQSw$+Zx2ddzfvieyw#P;VBmgnoXb~l-r0EE9=v}0?)`_4pFV&6roNmQ)=MSl4DQ2s z7&AO}8)*`YWDjjJZizO)ZjL?)p%RbiDfSRi<2`j283cOk#YE2^($DPYRP+UUpq}4D zj&v%=5pSZ#%$tcZ(?$+~FX`ZY4jwb({&*ED$HagcgU;eH<+Eul1D1i4!+=~LJgCH~ z!J$UAbf8d20AK;tN?^-h0~zBPtQs6@L<@xJP@oQF=7_;`IO;v)S2Z})sKbew!v`}@ zX6A&#)YFmQQ|`p78f|GAunbHxFt2Z2x&Pli{QN%|WbZ5kmVpz+04uCF)@%4Av$xKC xoZM?Y=rgDk={MA4ud8*(}f#w^LanUu|37$Qo(;UwjqKO2fTCz6V z9Dk7kIlBV-Jhw6Kv2cEAyOpFZj>=Etl6pK-VGAhXis#(LBkeg-MvpMTKr`IZj+pHP z4;+0#vz9F7dSA(PfI2g@nQO~T?q(%Z2{&AEh#q%P<{n0@tH*xIx;E^}eXQgV=+6Fp;kWE~k9;}zFrxXUW;*(>W#>e_B{ zPcpl?sVWH9Q>vTIXiba(W55{rVt{wHNPgd=S!2K$Fb3uf$oC-uY z_Gs1^Fb0+x*!A6_-2WFpzyFtmtY-`u1OJKvm#a3a6_yn4*0ti~u8pX7R1uk%d7Ou^ ip;s~I%2j+wRiQtU1~Cqpd8CJ8e*}~UGseKLGVlpj(}5-c literal 0 HcmV?d00001 diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java index b197ddc96d2ed..ce1d4cd955df4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java @@ -34,8 +34,7 @@ public class SerializeCompatibilityOffsetAndMetadataTest { private void checkValues(OffsetAndMetadata deSerOAM) { //assert deserialized values are same as original - //not using assertEquals for offset to ensure the type casting will catch any change in datatype - assertTrue("Offset should be " + offset + " but got " + deSerOAM.offset(), offset == deSerOAM.offset()); + assertEquals("Offset should be " + offset + " but got " + deSerOAM.offset(), offset, deSerOAM.offset()); assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(), metadata, deSerOAM.metadata()); } diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java index 8314746c54463..7786a730a3e16 100644 --- a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java @@ -34,8 +34,7 @@ public class SerializeCompatibilityTopicPartitionTest { private void checkValues(TopicPartition deSerTP) { //assert deserialized values are same as original - //not using assertEquals for partition number to ensure the type casting will catch any change in datatype - assertTrue("partition number should be " + partNum + " but got " + deSerTP.partition(), partNum == deSerTP.partition()); + assertEquals("partition number should be " + partNum + " but got " + deSerTP.partition(), partNum, deSerTP.partition()); assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName, deSerTP.topic()); } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java index 2dde0912f5a04..a30bbe0eda449 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java @@ -19,42 +19,34 @@ import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; -import java.io.File; -import java.io.FileInputStream; public class Serializer { public static byte[] serialize(Object toSerialize) throws IOException { - byte[] byteArray = null; - ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); - ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream); - - ooStream.writeObject(toSerialize); - byteArray = arrayOutputStream.toByteArray(); - - ooStream.close(); - arrayOutputStream.close(); - return byteArray; + try (ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream()) { + try (ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream)) { + ooStream.writeObject(toSerialize); + return arrayOutputStream.toByteArray(); + } + } } public static Object deserialize(InputStream inputStream) throws IOException, ClassNotFoundException { - ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); - Object deserializedObject = objectInputStream.readObject(); - objectInputStream.close(); - inputStream.close(); - return deserializedObject; + try (ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) { + return objectInputStream.readObject(); + } } public static Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException { - ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray); - return deserialize(arrayInputStream); + try (ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray)) { + return deserialize(arrayInputStream); + } } public static Object deserialize(String fileName) throws IOException, ClassNotFoundException { ClassLoader classLoader = Serializer.class.getClassLoader(); - File file = new File(classLoader.getResource(fileName).getFile()); - FileInputStream fis = new FileInputStream(file); - - return deserialize(fis); + try (InputStream fileStream = classLoader.getResourceAsStream(fileName)) { + return deserialize(fileStream); + } } } From 6203586681c8198a7376fbac62b64370edcbfbf7 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Fri, 29 Jan 2016 16:23:55 +0530 Subject: [PATCH 11/12] Removing .DS_Store --- .DS_Store | Bin 8196 -> 0 bytes clients/.DS_Store | Bin 6148 -> 0 bytes clients/src/.DS_Store | Bin 6148 -> 0 bytes clients/src/test/.DS_Store | Bin 6148 -> 0 bytes clients/src/test/java/.DS_Store | Bin 6148 -> 0 bytes clients/src/test/java/org/.DS_Store | Bin 6148 -> 0 bytes clients/src/test/java/org/apache/.DS_Store | Bin 6148 -> 0 bytes clients/src/test/java/org/apache/kafka/.DS_Store | Bin 6148 -> 0 bytes .../test/java/org/apache/kafka/clients/.DS_Store | Bin 6148 -> 0 bytes .../org/apache/kafka/clients/consumer/.DS_Store | Bin 6148 -> 0 bytes 10 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .DS_Store delete mode 100644 clients/.DS_Store delete mode 100644 clients/src/.DS_Store delete mode 100644 clients/src/test/.DS_Store delete mode 100644 clients/src/test/java/.DS_Store delete mode 100644 clients/src/test/java/org/.DS_Store delete mode 100644 clients/src/test/java/org/apache/.DS_Store delete mode 100644 clients/src/test/java/org/apache/kafka/.DS_Store delete mode 100644 clients/src/test/java/org/apache/kafka/clients/.DS_Store delete mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/.DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 055ff3bfb579f02f6d7f1cef84490122d32d2dd6..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 8196 zcmeHM-A>d%6g~s%?g9qasL{lm^-dEos2KEaT|g!N5mr&723xkU)!J^lg_RJJz4i&b z_Wv=A(ZmZMK;OVe@D9H-Q^-s!T$xDX4C$Qd^!$G3bY`azk*KdU&k!9WqB1U~l_7M0 zX#AY7Tz}UPkU)MsAQ6sYM?8r_`Z@*77o9E+bO{cK}-+g^=i&7Q}bv z#w9j4(Gz93-EOCyeuwzwwgbPDjE(&eef@=f#Zq}-aESjZ2L~TDT2Z%=ILS(>nQg|j2obK@5`Myl%*6JsOQ@pBU!8wE*KJ&#st6g8R1s?1agMJ4JAWE z*NmAnVb~ioxna>oWSQ)9UJR)sU8QNdL{sSJObaq2KaGeu11b-rz>FhVMn5WJ;|SL> z8aOYyh9feETN>KzAL^$C4X7Yg2cy!JSY#x%a^esug{Ple3cJjDU2I z<7EGJoh4-3Nh=n6Mg~{pp#OsA?-0%F+;SEC<4$Hs4WpKL*HhV!cSDFHF=Vkw?zz^l zJUYhb|LL8-|L-}!twR<8i@@JQKoq7LQ+4Ru`Z<|Z&1dZ@uE)5zF<(znDS}QO$3f+B o9CZ61h8S1D$}+W-_7ueo+CTUZfKLQ=|3}L7UnO?`|IhaS-$Nt52LJ#7 diff --git a/clients/.DS_Store b/clients/.DS_Store deleted file mode 100644 index 6dc3226d9279e183a0b16fb51f2f0718e9f1224a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHK&1%~~5Z<+&MoumYZAhUP9b53h`NQo^b<#o)1%paLX^JaFqFxYMBgrwr7YR)f;p7=AGukB3|y&!uw#L$3YpEqoNz;@64TwlZfnISPo|P8?Dm|JB)TzGVZ$% zwscm8DogS|QulUM7VB|WkFvPf3+#BiDlP|oce$4*2awtevt&mVDpIK`51l4neRpLt zdAio});FF_Ti&G8X}7$M_0Dwa{&bv2Pd1;wemk5Vefs?6>$mSee*HG{6%xK_Blihs za0~{qaS-HLsG4A73{EZ*K%fHqZ9i6N<)_x!^p8qdyuK!ICH_QMta5ou{jqPB&3v;8h^~x;qto1-Y rfTCbrW$_{f409AiEFQ%VL5+alLj%w!tSo{DgntAK4cssTf0coA%d%)b diff --git a/clients/src/.DS_Store b/clients/src/.DS_Store deleted file mode 100644 index baca01251a21f70aeb9ee4c9dc38ef0355b7e1a4..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKUyIW~5TETuP48YJDhj^jQK1jjJE^|K-YNJX6e4$sCp9s_F2rnUwpWf)$g>~A z{~tp{-~0l86W{90?zY%FeJv_8VfHsWJG0CF5_U5HAiCr120#D+J}P0OiOp|>#z{{} z!+AVJL32DP)bq4ZDfVeXmcO3?dUqbYgajm%C*I$E9J@9*Vm9J|h8!x;C*r@pc$t+Ku-AnXf zVLwu;SlUUtFY`%1xVWoIoyamD{gQfkAj?cmx@ufz)nRPsJCKFEcTr_q+_@88G;hYR?M#*(+AcvKdue1X12{34E#8DIvOfps$A&LDqlos05X%m6d+ zPcT5|Lx@TkIjk+3qXQdL0wB_Fq!F~uuRmf)4#db|Z4o^v!lokHRN+bt;VK7V({Vg< z@wG*p4#KRA^SG6Tt5Ae1JVa>1K|~h0Wd@jmRR%U}H=z6f-k)sRgAH86<W75j`mUBVcIYh8b8Z G13v*$+IB4f diff --git a/clients/src/test/.DS_Store b/clients/src/test/.DS_Store deleted file mode 100644 index dfb1d5fd41b2db32d9aa7ec617d76d743db8e264..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHK&2G~`5S~o~IPJv<5J+5nY^59mEehOBD3v%MMRL`uqA1w4MHX4!$aWH;D3Z@S z20w+zK#1P@0)3O-(wW_DL|eE<6`HYTzuDQDUG2Bl{s4gJ4C6-tE&w>FgtZ11e-UaY zy&^U1sUr&N<5j8-qEtm#rw&=JKLfOOb@%`wj3B(Uep}eKiYjI^J&}1_G%f~qolm1d zDvPq!IuW&{`i+~noW}ABeeUh$w?SWxgEB0KMJLSP=$ed^$lBeo?4R4)N~Lo6b2Ftr5Ny}|U;=PzHs9e)4u>$jdU2>h?L z+_88LM`+CX7zKHjs_Z@bD82zXBv62YAzBAyUt;zcqgH2H&|+Z*m;q*B(G1wV%~@IW zz&su^zzke_255iqPzimDl|j9AV4usP9loD6TR%PeDT;#TZLR@dH#X==aD#^et8f(SyPl N0ZjuJ%)mk!I0IYR)f;p7=AGukB3|y&!uw#L$3YpEqoNz;@64TwlZfnISPo|P8?Dm|JB)TzGVZ$% zwscm8DogS|QulUM7VB|WkFvPf3+#BiDlP|oce$4*2awtevt&mVDpIK`51l4neRpLt zdAio});FF_Ti&G8X}7$M_0Dwa{&bv2Pd1;wemk5Vefs?6>$mSee*HG{6%xK_Blihs za0~{qaS-HLs@|!UO%)mcl0QUzE6wxQFESjwY z8dCxQHo$EJZ1eLE=%EA9C#)=j2SnObK$|M5#E?`wNShA*KE+oSZMu-=$uJH*SyF`} zsq~PZ2^Z40$SpI#3|wXaYd;Vd&;J)U*Z(Gn8)kqRxSI^f#&)pXg}Kq$dS#Y))_R~H rKv6KRvUrgKhB=BM7LVeGphm#&p#kULzIRV#7XBS<-fKLu_k&`KPTA{l{DL8*KC>6@7@l2{)M8-JpiRwTjS;HS>+fGi&=>YuCdV?y#RrMH2on`fWK&=o8Nj)lWdZAeUa;}`!;j(}=n5#my#@OqH|%E}JJ&a1 zCy`lRtNrDrnevO-mriAF9uIe6?%Z#yzMltqFKYzd_xemmacKQ!khe$vky6RMAB4v; z?zh~fJ(cExOyll9rDrEHja0v(dTErM`DVNm8RhMk`>NTEFQK#~P2zSeKeIDFq) z7A=n84>)7|UHaWLQRzpBk7`D_gv0=sAc1Vqj_*!1Y0ZB6=1J zgL>%zqgw#LGK95&kH3vrBP@Cr3xjY4gqu=8Q_5|N!A&{Dh0XIU76wf@%~I*AN1Kny%j26%bj-*3RX*}Zk*Epe|E tpdUa{Ft0FpUjmNaiXj)b;y0jLATH1W^eh$zAp$}_0-6Rg#K3=L;0|u4Z3h4V diff --git a/clients/src/test/java/org/apache/.DS_Store b/clients/src/test/java/org/apache/.DS_Store deleted file mode 100644 index 785f366f60f52ecb613ebdcc98533398320ccf71..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHK&1%~~5Z-m1R!%M|O=%&QJtp9T+oU+Tscss`Az)BQC{1ysO0Aa)t+C{oVhlR> zF`D1U2qAfhyh(1!%LF~%Fc=q+O|W6S|XELFh#MesW6 zgp_P#29WD9f~`b-3lkMiic<+_p(ON{i4rg6o=Mt26=nr?%zqgw#LGK95&k3WuBBP@Cr3xjY4gqu=8Q_5|N!A&{Dh0XIU76wf@Kx5T|x tfF6LNU|wPHp#&Vg6+9eKn4CqDN&X89p;tBFj_fcN5(|_Ma$K z%>5+Y6ZyCoUb?5GPK3;dCrS_Yh0N5rqee0-4`MsszQ}aH7hdiT^CK+nCNkd>rAS2~ z^y{EOtruRIOm5$7^6i~F(YIJqtF^RPBpA1D=8N%mA$q9F;I~SXnen2O4t(K&)ff2>SHX zNHyGHpZQAA2BL{YG7s~Iq!$aWH;C{osZ z0c+l$fe`HY0QeJj;mo5bk5=7O2vu{fJLh`l-fR2J%R@x0)(Ni@xkThZB^HV(4iMQc zbw=h)jcjBpM?yJ0f^v#yylsVLz%uZ^F(BveGOc00a*Akt{${j^nhdk~8a*eUy5!%F z#cmLb05NjGxCgy5QUXgZ61yajBcl$zL~J_c|3msi*=Qu)XZ?d-);9Tn2kn?=dAa?E)}0PTB6s;{k)Ua{Pc;s<6#sSyYA<$eS1@g_^<5;TRiGD-Q_ir z;{x6Q-6)pXC-)2NHkx}QW_JmUe6dH%{-kXO^a*ze!E zQSw$+Zx2ddzfvieyw#P;VBmgnoXb~l-r0EE9=v}0?)`_4pFV&6roNmQ)=MSl4DQ2s z7&AO}8)*`YWDjjJZizO)ZjL?)p%RbiDfSRi<2`j283cOk#YE2^($DPYRP+UUpq}4D zj&v%=5pSZ#%$tcZ(?$+~FX`ZY4jwb({&*ED$HagcgU;eH<+Eul1D1i4!+=~LJgCH~ z!J$UAbf8d20AK;tN?^-h0~zBPtQs6@L<@xJP@oQF=7_;`IO;v)S2Z})sKbew!v`}@ zX6A&#)YFmQQ|`p78f|GAunbHxFt2Z2x&Pli{QN%|WbZ5kmVpz+04uCF)@%4Av$xKC xoZM?Y=rgDk={MA4ud8*(}f#w^LanUu|37$Qo(;UwjqKO2fTCz6V z9Dk7kIlBV-Jhw6Kv2cEAyOpFZj>=Etl6pK-VGAhXis#(LBkeg-MvpMTKr`IZj+pHP z4;+0#vz9F7dSA(PfI2g@nQO~T?q(%Z2{&AEh#q%P<{n0@tH*xIx;E^}eXQgV=+6Fp;kWE~k9;}zFrxXUW;*(>W#>e_B{ zPcpl?sVWH9Q>vTIXiba(W55{rVt{wHNPgd=S!2K$Fb3uf$oC-uY z_Gs1^Fb0+x*!A6_-2WFpzyFtmtY-`u1OJKvm#a3a6_yn4*0ti~u8pX7R1uk%d7Ou^ ip;s~I%2j+wRiQtU1~Cqpd8CJ8e*}~UGseKLGVlpj(}5-c From 31ee6bde09889f3fe61d6c69d8463a30d3b693e8 Mon Sep 17 00:00:00 2001 From: Praveen Devarao Date: Fri, 29 Jan 2016 17:14:29 +0530 Subject: [PATCH 12/12] Removing extra code check on stream close --- .../apache/kafka/common/utils/Serializer.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java index a30bbe0eda449..f30c0e1aec5fe 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java @@ -23,11 +23,10 @@ public class Serializer { public static byte[] serialize(Object toSerialize) throws IOException { - try (ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream()) { - try (ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream)) { - ooStream.writeObject(toSerialize); - return arrayOutputStream.toByteArray(); - } + ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream)) { + ooStream.writeObject(toSerialize); + return arrayOutputStream.toByteArray(); } } @@ -38,15 +37,13 @@ public static Object deserialize(InputStream inputStream) throws IOException, Cl } public static Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException { - try (ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray)) { - return deserialize(arrayInputStream); - } + ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray); + return deserialize(arrayInputStream); } public static Object deserialize(String fileName) throws IOException, ClassNotFoundException { ClassLoader classLoader = Serializer.class.getClassLoader(); - try (InputStream fileStream = classLoader.getResourceAsStream(fileName)) { - return deserialize(fileStream); - } + InputStream fileStream = classLoader.getResourceAsStream(fileName); + return deserialize(fileStream); } }