Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

llegalStateException on calling free for pooled item to synchronized pool using soft references and being full #804

Closed
wyhasany opened this issue Jan 7, 2021 · 4 comments · Fixed by #805 or apache/incubator-heron#3683
Labels

Comments

@wyhasany
Copy link
Contributor

wyhasany commented Jan 7, 2021

Using com.esotericsoftware.kryo.util.Pool with threadSafe = true and softReferences = false and a limit.

java.lang.IllegalStateException: Queue full
	at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
	at com.esotericsoftware.kryo.util.Pool$1.add(Pool.java:52)
	at com.esotericsoftware.kryo.util.Pool$SoftReferenceQueue.offer(Pool.java:168)
	at com.esotericsoftware.kryo.util.Pool.free(Pool.java:95)
	at serializers.KryoSnappyRedisSerializer.serialize(KryoSnappyRedisSerializer.java:79)

I'm running following code under JMH with 8 concurrent threads. Is that correct behavior?

package tech.viacomcbs.serializers;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.SerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
import com.esotericsoftware.kryo.util.Pool;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class KryoRedisSerializer implements RedisSerializer<Object> {

    public static final Logger log = LoggerFactory.getLogger(KryoRedisSerializer.class);

    static final byte[] EMPTY_ARRAY = new byte[0];

    private static final Pool<Kryo> kryoPool = new Pool<Kryo>(true, true, 16) {
        @Override
        protected Kryo create() {
            Kryo kryo = new Kryo();
            try {
                kryo.setRegistrationRequired(false);
                CompatibleFieldSerializer.CompatibleFieldSerializerConfig config = new CompatibleFieldSerializer.CompatibleFieldSerializerConfig();
                config.setExtendedFieldNames(false);
                config.setReadUnknownFieldData(true);
                kryo.setDefaultSerializer(new SerializerFactory.CompatibleFieldSerializerFactory(config));
                kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
            return kryo;
        }
    };

    Pool<Output> outputPool = new Pool<Output>(true, false, 16) {
        protected Output create () {
            return new Output(1024, -1);
        }
    };

    Pool<Input> inputPool = new Pool<Input>(true, false, 16) {
        protected Input create () {
            return new Input(4096);
        }
    };


    @Override
    public byte[] serialize(Object t) throws SerializationException {
        if (t == null) {
            return EMPTY_ARRAY;
        } else {
            Kryo kryo = null;
            Output output = null;
            try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
                kryo = kryoPool.obtain();
                output = outputPool.obtain();
                output.setOutputStream(stream);
                kryo.writeClassAndObject(output, t);
                output.flush();
                return stream.toByteArray();
            } catch (IOException e) {
                throw new SerializationException("Unable to close stream", e);
            } finally {
                if (kryo != null) {
                    kryoPool.free(kryo);
                }
                if (output != null) {
                    outputPool.free(output);
                }
            }
        }
    }

    @Override
    public Object deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null || bytes.length == 0) {
            return null;
        } else {
            Kryo kryo = null;
            Input input = null;
            try {
                kryo = kryoPool.obtain();
                input = inputPool.obtain();
                input.setBuffer(bytes);
                return kryo.readClassAndObject(input);
            } finally {
                if (kryo != null) {
                    kryoPool.free(kryo);
                }
                if (input != null) {
                    inputPool.free(input);
                }
            }
        }
    }

}
@wyhasany
Copy link
Contributor Author

wyhasany commented Jan 7, 2021

It seems that's happened because I've shared Pool<Kryo> between many instance of KryoRedisSerializer as the pool was static.

@wyhasany wyhasany closed this as completed Jan 7, 2021
@wyhasany
Copy link
Contributor Author

wyhasany commented Jan 7, 2021

Oh, it seems I've been mistaken. It's looks like that we can't use Pool with threadSafe=true and softReference=true in multithread env. As under congestion the limit of the queue can be reached.

Then: freeObjects.offer(object) at at com.esotericsoftware.kryo.util.Pool.free(Pool.java:95)
calls SoftReferenceQueue.offer which delegates call to queue delegate.add() to this anonymously extended LinkedBlockingQueue:

queue = new LinkedBlockingQueue<T>(maximumCapacity) {
	@Override
	public boolean add (T o) {
		if (size() >= maximumCapacity) return false;
		super.add(o);
		return true;
	}
};

there is size checking, however we don't have here any locking, so the size could be changed by the another thread. Then super.add(o) can throw exception which is nowhere handled. To protect ISE Queue full you can just make following change:

-					if (size() >= maximumCapacity) return false;
-					super.add(o);
-					return true;
+                                      return super.offer(o);

@wyhasany wyhasany reopened this Jan 7, 2021
wyhasany added a commit to wyhasany/kryo that referenced this issue Jan 7, 2021
Prevent throwing exception on multi-threaded environment where two
concurrent threads can change state between these lines:
```
if (size() >= maximumCapacity) return false;
super.add(o);
```
what causes mentioned exception.
@theigl theigl added the bug label Jan 8, 2021
theigl added a commit that referenced this issue Jan 8, 2021
@theigl theigl linked a pull request Jan 8, 2021 that will close this issue
@theigl
Copy link
Collaborator

theigl commented Jan 8, 2021

PR is merged and will be part of the next release. Thanks!

@theigl theigl closed this as completed Jan 8, 2021
@wyhasany
Copy link
Contributor Author

wyhasany commented Jan 8, 2021

Until the release of Kryo 5.0.4 (probably). If you've found similar bug the basic workaround is switching off soft references:

//fails
Pool<Kryo> kryoPool = new Pool<Kryo>(true, true, 16);
//passes
Pool<Kryo> kryoPool = new Pool<Kryo>(true, false, 16)

Since the next release the first version would work smoothlessly in multi-threaded environment.

@theigl thanks for cooperation 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment