Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EOFException when using PagingPredicate with Kryo in v3.8.7 #13147

Closed
krasnyt opened this issue May 18, 2018 · 4 comments
Closed

EOFException when using PagingPredicate with Kryo in v3.8.7 #13147

krasnyt opened this issue May 18, 2018 · 4 comments

Comments

@krasnyt
Copy link

@krasnyt krasnyt commented May 18, 2018

Hello, I am not sure if this issue report belongs to here or to Kryo's github but I'm sure you will help me. I am getting the following exception when using PagingPredicate on a map with items that are serialized by Kryo. Regular Serializable works correctly.

Dependencies:
Hazelcast 3.8.7
Kryo 4.0.2

Description:
I have 10 items in the map and try to retrieve them in pages. I observed that the exception is always thrown after the 3rd page request. First two are ok but every third call fails. So for pageSize <= 5 it always fails but for pageSize >= 6 it works (because you need only two calls to retrieve everything). Unfortunately I can't explain this behavior.

Thank you for any explanation.

Stacktrace from the client:

Exception in thread "main" com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.EOFException: Cannot read 4 bytes!
	at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:61)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:190)
	at com.hazelcast.client.impl.protocol.task.map.MapValuesWithPagingPredicateMessageTask.getPredicate(MapValuesWithPagingPredicateMessageTask.java:51)
	at com.hazelcast.client.impl.protocol.task.map.AbstractMapQueryMessageTask.call(AbstractMapQueryMessageTask.java:87)
	at com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask.processMessage(AbstractCallableMessageTask.java:35)
	at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.initializeAndProcessMessage(AbstractMessageTask.java:123)
	at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.run(AbstractMessageTask.java:103)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
	at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
	at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)
	at ------ submitted from ------.(Unknown Source)
	at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveAndThrowIfException(ClientInvocationFuture.java:95)
	at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveAndThrowIfException(ClientInvocationFuture.java:32)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:155)
	at com.hazelcast.client.spi.ClientProxy.invoke(ClientProxy.java:198)
	at com.hazelcast.client.proxy.ClientMapProxy.valuesForPagingPredicate(ClientMapProxy.java:1211)
	at com.hazelcast.client.proxy.ClientMapProxy.values(ClientMapProxy.java:1197)
	at hztest.HzPagingTestClientKryo.main(HzPagingTestClientKryo.java:38)
Caused by: java.io.EOFException: Cannot read 4 bytes!
	at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.checkAvailable(ByteArrayObjectDataInput.java:667)
	at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readInt(ByteArrayObjectDataInput.java:286)
	at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readInt(ByteArrayObjectDataInput.java:280)
	at com.hazelcast.query.PagingPredicate.readData(PagingPredicate.java:363)
	at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:155)
	at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:103)
	at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:49)
	at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:184)
	at com.hazelcast.client.impl.protocol.task.map.MapValuesWithPagingPredicateMessageTask.getPredicate(MapValuesWithPagingPredicateMessageTask.java:51)
	at com.hazelcast.client.impl.protocol.task.map.AbstractMapQueryMessageTask.call(AbstractMapQueryMessageTask.java:87)
	at com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask.processMessage(AbstractCallableMessageTask.java:35)
	at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.initializeAndProcessMessage(AbstractMessageTask.java:123)
	at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.run(AbstractMessageTask.java:103)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
	at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
	at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)

The cluster is simply defined as follows:

public class HzPagingTestServer {

    public static void main(String[] args) {
        SerializerConfig kryoStudentSerializer = new SerializerConfig();
        kryoStudentSerializer.setClass(StudentKryoSerializer.class);
        kryoStudentSerializer.setTypeClass(HzPagingTestClientKryo.Student.class);

        Config config = new Config();
        config.setGroupConfig(new GroupConfig("user", "pass"));

        config.getSerializationConfig().addSerializerConfig(kryoStudentSerializer);

        Hazelcast.newHazelcastInstance(config);
    }

}

The client including the Student class:

public class HzPagingTestClientKryo {

    public static void main(String[] args) {
        SerializerConfig kryoStudentSerializer = new SerializerConfig();
        kryoStudentSerializer.setClass(StudentKryoSerializer.class);
        kryoStudentSerializer.setTypeClass(Student.class);

        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().addAddress("127.0.0.1");
        clientConfig.setGroupConfig(new GroupConfig("user", "pass"));
        clientConfig.getSerializationConfig().addSerializerConfig(kryoStudentSerializer);

        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);

        IMap<Integer, Student> map = client.getMap("map");

        for (int i = 0; i < 10; i++) {
            Student s = new Student(i, "michael" + i);
            map.put(i, s);
        }

        int pageSize = 3;
        PagingPredicate<Integer, Student> predicate = new PagingPredicate<>(pageSize);

        while (true) {
            Collection<Student> values = map.values(predicate);
            if (values.size() == 0) {
                break;
            } else {
                System.out.println("got students=" + values.size());
                predicate.nextPage();
            }
        }
    }

    public static class Student implements Comparable<Student> {

        private int id;
        private String name;

        public Student() {
        }

        public Student(int id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public int compareTo(Student o) {
            return Integer.compare(o.getId(), id);
        }

        public int getId() {

            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

    }

}

And last is the serializer:

public class StudentKryoSerializer implements StreamSerializer<HzPagingTestClientKryo.Student> {

    public static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
        final Kryo kryo = new Kryo();
        kryo.register(HzPagingTestClientKryo.Student.class);
        return kryo;
    });

    @Override
    public int getTypeId() { return 123; }

    @Override
    public void destroy() {}

    @Override
    public void write(ObjectDataOutput objectDataOutput, HzPagingTestClientKryo.Student object) throws IOException {
        final Kryo kryo = KRYO_THREAD_LOCAL.get();

        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(16384);
        final DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream);
        final Output output = new Output(deflaterOutputStream);
        kryo.writeObject(output, object);
        output.close();

        final byte[] bytes = byteArrayOutputStream.toByteArray();
        objectDataOutput.write(bytes);
    }

    @Override
    public HzPagingTestClientKryo.Student read(ObjectDataInput objectDataInput) {
        InputStream in = (InputStream) objectDataInput;
        in = new InflaterInputStream(in);
        final Input input = new Input(in);
        final Kryo kryo = KRYO_THREAD_LOCAL.get();
        return kryo.readObject(input, HzPagingTestClientKryo.Student.class);
    }
}
@jerrinot
Copy link
Contributor

@jerrinot jerrinot commented May 18, 2018

hi @krasnyt,

I believe the issue is caused by Kryo being too eager when consuming input stream during deserialization. This leads to a stream corruption. See this issue for details: #6781

A possible solution is chunked streams which forces Kryo to read only what it belong to it. See this as an example: https://github.com/jerrinot/subzero/blob/master/subzero-core/src/main/java/info/jerrinot/subzero/internal/strategy/KryoStrategy.java#L64-L81

@mmedenjak mmedenjak added this to the 3.11 milestone May 18, 2018
@jerrinot
Copy link
Contributor

@jerrinot jerrinot commented May 21, 2018

hi @krasnyt, did you have a chance to have a look at ☝️ ?

@krasnyt
Copy link
Author

@krasnyt krasnyt commented May 21, 2018

Yes, I just finished testing it (also with compression) and it works nicely. Thanks for the feedback, feel free to close it.

@jerrinot
Copy link
Contributor

@jerrinot jerrinot commented May 21, 2018

that's great to hear, thanks for reporting back.
Happy Hazelcasting!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
3 participants
You can’t perform that action at this time.