Skip to content
Permalink
Browse files
[NO ISSUE][*DB] Enable large parsing resources to be freed on memory …
…pressure

Change-Id: Ieaca566f765a5d3531baff8128d6ff03c95956e2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11365
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
  • Loading branch information
mblow committed May 8, 2021
1 parent 91f91d2 commit 5d02b03854911d7fb889b078f0ada37b282261f9
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 35 deletions.
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.parser.jackson;

import java.util.ArrayDeque;
import java.util.Queue;

import org.apache.asterix.om.util.container.IObjectFactory;

/**
* Object pool for DFS traversal mode, which allows to recycle objects
* as soon as it is not needed.
*/
public abstract class AbstractObjectPool<E, T, Q> implements IObjectPool<E> {
private final IObjectFactory<E, T> objectFactory;
private final Queue<Q> recycledObjects;
private final T param;

protected AbstractObjectPool(IObjectFactory<E, T> objectFactory, T param) {
this.objectFactory = objectFactory;
recycledObjects = new ArrayDeque<>();
this.param = param;
}

public E getInstance() {
E instance = unwrap(recycledObjects.poll());
if (objectFactory != null && instance == null) {
instance = objectFactory.create(param);
}
return instance;
}

public void recycle(E object) {
if (object != null) {
recycledObjects.add(wrap(object));
}
}

protected abstract E unwrap(Q element);

protected abstract Q wrap(E element);

@Override
public String toString() {
return recycledObjects.toString();
}
}
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.parser.jackson;

/**
* Object pool for DFS traversal mode, which allows to recycle objects
* as soon as it is not needed.
*/
public interface IObjectPool<E> {
E getInstance();

void recycle(E object);
}
@@ -18,20 +18,13 @@
*/
package org.apache.asterix.external.parser.jackson;

import java.util.ArrayDeque;
import java.util.Queue;

import org.apache.asterix.om.util.container.IObjectFactory;

/**
* Object pool for DFS traversal mode, which allows to recycle objects
* as soon as it is not needed.
*/
public class ObjectPool<E, T> {
private final IObjectFactory<E, T> objectFactory;
private final Queue<E> recycledObjects;
private final T element;

public class ObjectPool<E, T> extends AbstractObjectPool<E, T, E> {
public ObjectPool() {
this(null, null);
}
@@ -40,28 +33,17 @@ public ObjectPool(IObjectFactory<E, T> objectFactory) {
this(objectFactory, null);
}

public ObjectPool(IObjectFactory<E, T> objectFactory, T element) {
this.objectFactory = objectFactory;
recycledObjects = new ArrayDeque<>();
this.element = element;
public ObjectPool(IObjectFactory<E, T> objectFactory, T param) {
super(objectFactory, param);
}

public E getInstance() {
E instance = recycledObjects.poll();
if (objectFactory != null && instance == null) {
instance = objectFactory.create(element);
}
return instance;
}

public void recycle(E object) {
if (object != null) {
recycledObjects.add(object);
}
@Override
protected E unwrap(E wrapped) {
return wrapped;
}

@Override
public String toString() {
return recycledObjects.toString();
protected E wrap(E element) {
return element;
}
}
@@ -51,7 +51,7 @@
public class ParserContext {
private static final int SERIALIZED_FIELDNAME_MAP_MAX_SIZE = 128;

private final ObjectPool<IARecordBuilder, ATypeTag> objectBuilderPool;
private final IObjectPool<IARecordBuilder> objectBuilderPool;
private final ObjectPool<IAsterixListBuilder, ATypeTag> arrayBuilderPool;

/**
@@ -61,7 +61,7 @@ public class ParserContext {
* <p>
* Scalar value 5 is written 4 times in tempBuffer("d") then tempBuffer("c") ... tempBuffer("a")
*/
private final ObjectPool<IMutableValueStorage, ATypeTag> tempBufferPool;
private final IObjectPool<IMutableValueStorage> tempBufferPool;
private final ObjectPool<BitSet, Void> nullBitmapPool;
private final Map<String, IMutableValueStorage> serializedFieldNames;
private final ISerializerDeserializer<AString> stringSerDe;
@@ -76,9 +76,9 @@ public ParserContext() {

@SuppressWarnings("unchecked")
public ParserContext(boolean allocateModfiedUTF8Writer) {
objectBuilderPool = new ObjectPool<>(new RecordBuilderFactory());
objectBuilderPool = new SoftObjectPool<>(new RecordBuilderFactory());
arrayBuilderPool = new ObjectPool<>(new ListBuilderFactory(), ATypeTag.ARRAY);
tempBufferPool = new ObjectPool<>(new AbvsBuilderFactory());
tempBufferPool = new SoftObjectPool<>(new AbvsBuilderFactory());
nullBitmapPool = new ObjectPool<>();
serializedFieldNames = new LRUMap<>(SERIALIZED_FIELDNAME_MAP_MAX_SIZE);
stringSerDe = SerializerDeserializerProvider.INSTANCE.getAStringSerializerDeserializer();
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.parser.jackson;

import java.lang.ref.SoftReference;

import org.apache.asterix.om.util.container.IObjectFactory;

/**
* Object pool for DFS traversal mode, which allows to recycle objects
* as soon as it is not needed.
*/
public class SoftObjectPool<E, T> extends AbstractObjectPool<E, T, SoftReference<E>> {
public SoftObjectPool() {
this(null, null);
}

public SoftObjectPool(IObjectFactory<E, T> objectFactory) {
this(objectFactory, null);
}

public SoftObjectPool(IObjectFactory<E, T> objectFactory, T element) {
super(objectFactory, element);
}

@Override
protected E unwrap(SoftReference<E> wrapped) {
return wrapped == null ? null : wrapped.get();
}

@Override
protected SoftReference<E> wrap(E element) {
return new SoftReference<>(element);
}
}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.lang.ref.SoftReference;

import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;

@@ -690,10 +691,12 @@ private static byte[] getTempBytes(UTF8StringWriter writer, int utflen) {
if (writer == null) {
tempBytes = new byte[utflen + 5];
} else {
if (writer.tempBytes == null || writer.tempBytes.length < utflen + 5) {
writer.tempBytes = new byte[utflen + 5];
byte[] writerTempBytes = writer.tempBytesRef != null ? writer.tempBytesRef.get() : null;
if (writerTempBytes == null || writerTempBytes.length < utflen + 5) {
writerTempBytes = new byte[utflen + 5];
writer.tempBytesRef = new SoftReference<>(writerTempBytes);
}
tempBytes = writer.tempBytes;
tempBytes = writerTempBytes;
}
return tempBytes;
}
@@ -21,11 +21,12 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.lang.ref.SoftReference;

public class UTF8StringWriter implements Serializable {

private static final long serialVersionUID = 1L;
transient byte[] tempBytes;
transient SoftReference<byte[]> tempBytesRef;

public final void writeUTF8(CharSequence str, DataOutput out) throws IOException {
UTF8StringUtil.writeUTF8(str, out, this);

0 comments on commit 5d02b03

Please sign in to comment.