From a3540057b7f594e9ab226622711e81829411ebff Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Wed, 18 Jan 2023 09:24:47 +0100 Subject: [PATCH 1/2] BUG: Memory-leak on streaming queries when LoadBuffers are not aligned --- .../basic/TestPersistenceContextMany.java | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java diff --git a/ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java b/ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java new file mode 100644 index 0000000000..2a5f9b6f88 --- /dev/null +++ b/ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java @@ -0,0 +1,169 @@ +package org.tests.basic; + +import io.ebean.DB; +import io.ebean.DatabaseFactory; +import io.ebean.QueryIterator; +import io.ebean.config.DatabaseConfig; +import io.ebean.xtest.BaseTestCase; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import javax.persistence.CascadeType; +import javax.persistence.DiscriminatorValue; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Inheritance; +import javax.persistence.ManyToOne; +import javax.persistence.OneToMany; +import javax.validation.constraints.Size; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestPersistenceContextMany extends BaseTestCase { + + + @Entity + @Inheritance + public abstract static class TestModel3 { + + @Id + private int id; + + @Size(max = 255) + private String someData; + + @OneToMany(cascade = CascadeType.REFRESH) + //@Where(clause = "1=1") + private List many1; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public void setSomeData(String someData) { + this.someData = someData; + } + + public String getSomeData() { + return someData; + } + } + + @Entity + @Inheritance + @DiscriminatorValue("A") + public static class TestModel3A extends TestModel3 { + + } + + @Entity + @Inheritance + @DiscriminatorValue("B") + public static class TestModel3B extends TestModel3 { + @OneToMany(cascade = CascadeType.REFRESH) + private List many2; + + } + + @Entity + public static class TestModel3Many1 { + @Id + private int id; + + @ManyToOne + private TestModel3 base; + } + + @Entity + public static class TestModel3Many2 { + @Id + private int id; + + @ManyToOne + private TestModel3B base; + } + + @Test + @Disabled + void initDb() { + DatabaseConfig config = new DatabaseConfig(); + config.setName("h2-batch"); + config.loadFromProperties(); + config.setDdlExtra(false); + config.getDataSourceConfig().setUsername("sa"); + config.getDataSourceConfig().setPassword("sa"); + config.getDataSourceConfig().setUrl("jdbc:h2:file:./testsFileMany;DB_CLOSE_ON_EXIT=FALSE;NON_KEYWORDS=KEY,VALUE"); + config.addClass(TestModel3.class); + config.addClass(TestModel3A.class); + config.addClass(TestModel3B.class); + config.addClass(TestModel3Many1.class); + config.addClass(TestModel3Many2.class); + DatabaseFactory.create(config); + + String base = "x".repeat(240); + // 10 mio TestModel - each needs about 1/4 kbytes -> 2,5 GB in total + List batch = new ArrayList<>(); + for (int i = 0; i < 1_000_000; i++) { + + TestModel3 m = i == 5 ? new TestModel3A() : new TestModel3B(); // produces "off-by-one" + m.setSomeData(base + i); // ensure we have not duplicates + batch.add(m); + if (i % 1000 == 0) { + DB.saveAll(batch); + batch.clear(); + } + if (i % 100000 == 0) { + System.out.println(i); + } + } + DB.saveAll(batch); + } + + @Test + @Disabled + void testFindEachFindList() { + DatabaseConfig config = new DatabaseConfig(); + config.setName("h2-batch"); + config.loadFromProperties(); + config.setDdlRun(false); + config.getDataSourceConfig().setUsername("sa"); + config.getDataSourceConfig().setPassword("sa"); + config.getDataSourceConfig().setUrl("jdbc:h2:file:./testsFileMany;DB_CLOSE_ON_EXIT=FALSE;NON_KEYWORDS=KEY,VALUE"); + config.addClass(TestModel3.class); + config.addClass(TestModel3A.class); + config.addClass(TestModel3B.class); + config.addClass(TestModel3Many1.class); + config.addClass(TestModel3Many2.class); + DatabaseFactory.create(config); + + AtomicInteger i = new AtomicInteger(); + System.out.println("Doing findEach"); + DB.find(TestModel3.class).select("*").findEach(c -> { + i.incrementAndGet(); + }); + System.out.println("Read " + i + " entries"); + + i.set(0); + System.out.println("Doing findStream"); + DB.find(TestModel3.class).select("*").findStream().forEach(c -> i.incrementAndGet()); + System.out.println("Read " + i + " entries"); + + i.set(0); + System.out.println("Doing findIterate"); + QueryIterator iter = DB.find(TestModel3.class).select("*").findIterate(); + while (iter.hasNext()) { + iter.next(); + i.incrementAndGet(); + } + System.out.println("Read " + i + " entries"); + + System.out.println("Doing FindList, will hold all entries in memory. Expect OOM with -Xmx100m."); + List lst = DB.find(TestModel3.class).select("*").findList(); + System.out.println("Read " + lst.size() + " entries"); + } +} From 61d6cccbcb7f4995d1066f2d1666e6543659f666 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Fri, 20 Jan 2023 16:11:42 +0100 Subject: [PATCH 2/2] FIX: Using weak references in load buffer for streaming queries --- .../io/ebeaninternal/api/LoadContext.java | 5 + .../io/ebeaninternal/api/LoadManyBuffer.java | 24 ++- .../io/ebeaninternal/api/LoadManyRequest.java | 33 ++-- .../server/core/OrmQueryRequest.java | 1 + .../server/loadcontext/DLoadContext.java | 7 +- .../server/loadcontext/DLoadManyContext.java | 150 +++++++++++++++--- .../basic/TestPersistenceContextMany.java | 33 +++- 7 files changed, 205 insertions(+), 48 deletions(-) diff --git a/ebean-core/src/main/java/io/ebeaninternal/api/LoadContext.java b/ebean-core/src/main/java/io/ebeaninternal/api/LoadContext.java index b23a183bb3..9d70c448fa 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/api/LoadContext.java +++ b/ebean-core/src/main/java/io/ebeaninternal/api/LoadContext.java @@ -50,4 +50,9 @@ public interface LoadContext { * Register a collection for lazy loading. */ void register(String path, BeanPropertyAssocMany many, BeanCollection bc); + + /** + * Use soft-references for streaming queries, so unreachable entries can be garbage collected. + */ + void useReferences(boolean useReferences); } diff --git a/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyBuffer.java b/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyBuffer.java index 5fa30220e5..43bff710fc 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyBuffer.java +++ b/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyBuffer.java @@ -6,16 +6,30 @@ import io.ebeaninternal.server.deploy.BeanDescriptor; import io.ebeaninternal.server.deploy.BeanPropertyAssocMany; -import java.util.List; - /** * A buffer of bean collections for batch lazy loading and secondary query loading. */ public interface LoadManyBuffer { - int getBatchSize(); - - List> getBatch(); + /** + * The batch (max) size; + */ + int batchSize(); + + /** + * The actual size. + */ + int size(); + + /** + * Get the ith element from buffer. This can be null. + */ + BeanCollection get(int i); + + /** + * Removes an element from the buffer. This will NOT affect size. + */ + boolean removeFromBuffer(BeanCollection collection); BeanPropertyAssocMany getBeanProperty(); diff --git a/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyRequest.java b/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyRequest.java index 392a16b6cd..e83e32aa4b 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyRequest.java +++ b/ebean-core/src/main/java/io/ebeaninternal/api/LoadManyRequest.java @@ -19,8 +19,6 @@ public final class LoadManyRequest extends LoadRequest { private static final System.Logger log = CoreLog.log; - - private final List> batch; private final LoadManyBuffer loadContext; private final boolean onlyIds; private final boolean loadCache; @@ -42,7 +40,6 @@ public LoadManyRequest(LoadManyBuffer loadContext, OrmQueryRequest parentRequ private LoadManyRequest(LoadManyBuffer loadContext, OrmQueryRequest parentRequest, boolean lazy, boolean onlyIds, boolean loadCache) { super(parentRequest, lazy); this.loadContext = loadContext; - this.batch = loadContext.getBatch(); this.onlyIds = onlyIds; this.loadCache = loadCache; } @@ -59,9 +56,12 @@ public String description() { private List parentIdList(SpiEbeanServer server) { List idList = new ArrayList<>(); BeanPropertyAssocMany many = many(); - for (BeanCollection bc : batch) { - idList.add(many.parentId(bc.getOwnerBean())); - bc.setLoader(server); // don't use the load buffer again + for (int i = 0; i < loadContext.size(); i++) { + BeanCollection bc = loadContext.get(i); + if (bc != null) { + idList.add(many.parentId(bc.getOwnerBean())); + bc.setLoader(server); // don't use the load buffer again + } } if (many.targetDescriptor().isPadInExpression()) { BindPadding.padIds(idList); @@ -91,7 +91,7 @@ public SpiQuery createQuery(SpiEbeanServer server) { query.setPersistenceContext(loadContext.getPersistenceContext()); query.setLoadDescription(lazy ? "+lazy" : "+query", description()); if (lazy) { - query.setLazyLoadBatchSize(loadContext.getBatchSize()); + query.setLazyLoadBatchSize(loadContext.batchSize()); } else { query.setBeanCacheMode(CacheMode.OFF); } @@ -112,15 +112,18 @@ public void postLoad() { BeanPropertyAssocMany many = many(); // check for BeanCollection's that where never processed // in the +query or +lazy load due to no rows (predicates) - for (BeanCollection bc : batch) { - if (bc.checkEmptyLazyLoad()) { - if (log.isLoggable(DEBUG)) { - EntityBean ownerBean = bc.getOwnerBean(); - Object parentId = desc.getId(ownerBean); - log.log(DEBUG, "BeanCollection after lazy load was empty. type:{0} id:{1} owner:{2}", ownerBean.getClass().getName(), parentId, ownerBean); + for (int i = 0; i < loadContext.size(); i++) { + BeanCollection bc = loadContext.get(i); + if (bc != null) { + if (bc.checkEmptyLazyLoad()) { + if (log.isLoggable(DEBUG)) { + EntityBean ownerBean = bc.getOwnerBean(); + Object parentId = desc.getId(ownerBean); + log.log(DEBUG, "BeanCollection after lazy load was empty. type:{0} id:{1} owner:{2}", ownerBean.getClass().getName(), parentId, ownerBean); + } + } else if (loadCache && many.isUseCache()) { + desc.cacheManyPropPut(many, bc, desc.cacheKeyForBean(bc.getOwnerBean())); } - } else if (loadCache && many.isUseCache()) { - desc.cacheManyPropPut(many, bc, desc.cacheKeyForBean(bc.getOwnerBean())); } } } diff --git a/ebean-core/src/main/java/io/ebeaninternal/server/core/OrmQueryRequest.java b/ebean-core/src/main/java/io/ebeaninternal/server/core/OrmQueryRequest.java index 473380c3be..abe1499425 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/server/core/OrmQueryRequest.java +++ b/ebean-core/src/main/java/io/ebeaninternal/server/core/OrmQueryRequest.java @@ -229,6 +229,7 @@ public void initTransIfRequired() { persistenceContext.beginIterate(); } loadContext = new DLoadContext(this, secondaryQueries); + loadContext.useReferences(Type.ITERATE == query.getType()); } /** diff --git a/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadContext.java b/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadContext.java index 6b7f477810..dd0cbc9f87 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadContext.java +++ b/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadContext.java @@ -58,6 +58,7 @@ public final class DLoadContext implements LoadContext { private final ProfilingListener profilingListener; private final Map nodePathMap = new HashMap<>(); private final PersistenceContext persistenceContext; + boolean useReferences; private List secQuery; private Object tenantId; @@ -118,7 +119,6 @@ public DLoadContext(OrmQueryRequest request, SpiQuerySecondary secondaryQueri this.origin = null; this.relativePath = null; } - // initialise rootBeanContext after origin and relativePath have been set this.rootBeanContext = new DLoadBeanContext(this, rootDescriptor, null, null); registerSecondaryQueries(secondaryQueries); @@ -156,6 +156,11 @@ private void registerSecondaryQueries(SpiQuerySecondary secondaryQueries) { } } + @Override + public void useReferences(boolean useReferences) { + this.useReferences = useReferences; + } + /** * Setup the load context at this path with OrmQueryProperties which is * used to build the appropriate query for +query or +lazy loading. diff --git a/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadManyContext.java b/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadManyContext.java index 15403f8b1c..56270820e5 100644 --- a/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadManyContext.java +++ b/ebean-core/src/main/java/io/ebeaninternal/server/loadcontext/DLoadManyContext.java @@ -14,7 +14,10 @@ import io.ebeaninternal.server.deploy.BeanPropertyAssocMany; import io.ebeaninternal.server.querydefn.OrmQueryProperties; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.locks.ReentrantLock; @@ -38,7 +41,7 @@ final class DLoadManyContext extends DLoadBaseContext implements LoadManyContext } private LoadBuffer createBuffer(int size) { - LoadBuffer buffer = new LoadBuffer(this, size); + LoadBuffer buffer = parent.useReferences ? new LoadBufferWeakRef(this, size) : new LoadBufferHardRef(this, size); if (bufferList != null) { bufferList.add(buffer); } @@ -94,7 +97,7 @@ public void loadSecondaryQuery(OrmQueryRequest parentRequest, boolean forEach try { if (bufferList != null) { for (LoadBuffer loadBuffer : bufferList) { - if (!loadBuffer.list.isEmpty()) { + if (loadBuffer.size() > 0) { LoadManyRequest req = new LoadManyRequest(loadBuffer, parentRequest); parent.getEbeanServer().loadMany(req); } @@ -115,13 +118,12 @@ public void loadSecondaryQuery(OrmQueryRequest parentRequest, boolean forEach * A buffer for batch loading bean collections on a given path. * Supports batch lazy loading and secondary query loading. */ - static class LoadBuffer implements BeanCollectionLoader, LoadManyBuffer { + static abstract class LoadBuffer implements BeanCollectionLoader, LoadManyBuffer { private final ReentrantLock lock = new ReentrantLock(); private final PersistenceContext persistenceContext; private final DLoadManyContext context; - private final int batchSize; - private final List> list; + final int batchSize; LoadBuffer(DLoadManyContext context, int batchSize) { this.context = context; @@ -129,7 +131,6 @@ static class LoadBuffer implements BeanCollectionLoader, LoadManyBuffer { // case it changes as part of a findIterate etc this.persistenceContext = context.getPersistenceContext(); this.batchSize = batchSize; - this.list = new ArrayList<>(batchSize); } @Override @@ -138,7 +139,7 @@ public boolean isUseDocStore() { } @Override - public int getBatchSize() { + public int batchSize() { return batchSize; } @@ -146,20 +147,15 @@ public int getBatchSize() { * Return true if the buffer is full. */ public boolean isFull() { - return batchSize == list.size(); + return batchSize() == size(); } /** * Return true if the buffer is full. */ - public void add(BeanCollection bc) { - list.add(bc); - } + public abstract void add(BeanCollection bc); - @Override - public List> getBatch() { - return list; - } + abstract void clear(); @Override public BeanPropertyAssocMany getBeanProperty() { @@ -208,25 +204,133 @@ public void loadMany(BeanCollection bc, boolean onlyIds) { final String parentKey = parentDesc.cacheKey(parentId); if (parentDesc.cacheManyPropLoad(context.property, bc, parentKey, context.parent.isReadOnly())) { // we loaded the bean collection from cache so remove it from the buffer - for (int i = 0; i < list.size(); i++) { - // find it using instance equality - avoiding equals() and potential deadlock issue - if (list.get(i) == bc) { - list.remove(i); - bc.setLoader(context.parent.getEbeanServer()); - return; - } + if (removeFromBuffer(bc)) { + bc.setLoader(context.parent.getEbeanServer()); } + // find it using instance equality - avoiding equals() and potential deadlock issue return; } } context.parent.getEbeanServer().loadMany(new LoadManyRequest(this, onlyIds, useCache)); // clear the buffer as all entries have been loaded - list.clear(); + clear(); } finally { lock.unlock(); } } + } + + static class LoadBufferHardRef extends LoadBuffer { + private final BeanCollection[] list; + + private int size; + + LoadBufferHardRef(DLoadManyContext context, int batchSize) { + super(context, batchSize); + this.list = new BeanCollection[batchSize]; + } + + /** + * Return true if the buffer is full. + */ + @Override + public void add(BeanCollection bc) { + list[size++] = bc; + } + + @Override + void clear() { + Arrays.fill(list, null); + size = 0; + } + + @Override + public int size() { + return size; + } + + @Override + public BeanCollection get(int i) { + return list[i]; + } + + @Override + public boolean removeFromBuffer(BeanCollection collection) { + for (int i = 0; i < size; i++) { + // find it using instance equality - avoiding equals() and potential deadlock issue + if (list[i] == collection) { + list[i] = null; + return true; + } + } + return false; + } + } + /** + * This load buffer uses weak references, so unreachable beanCollections will drop out from the buffer. + */ + static class LoadBufferWeakRef extends LoadBuffer { + private final Reference>[] list; + + private int size; + + LoadBufferWeakRef(DLoadManyContext context, int batchSize) { + super(context, batchSize); + this.list = new Reference[batchSize]; + } + + /** + * Return true if the buffer is full. + */ + @Override + public void add(BeanCollection bc) { + list[size++] = new WeakReference<>(bc); + } + + @Override + void clear() { + Arrays.fill(list, null); + size = 0; + } + + @Override + public int size() { + return size; + } + + @Override + public BeanCollection get(int i) { + Reference> ref = list[i]; + if (ref == null) { + return null; + } + BeanCollection bc = ref.get(); + if (bc == null) { + // remove dead references + list[i] = null; + } + return bc; + } + + @Override + public boolean removeFromBuffer(BeanCollection collection) { + for (int i = 0; i < size; i++) { + if (list[i] != null) { + BeanCollection bc = list[i].get(); + if (bc == null) { + // remove dead references + list[i] = null; + } + // find it using instance equality - avoiding equals() and potential deadlock issue + if (bc == collection) { + list[i] = null; + return true; + } + } + } + return false; + } } } diff --git a/ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java b/ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java index 2a5f9b6f88..d1ef2bc7c1 100644 --- a/ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java +++ b/ebean-test/src/test/java/org/tests/basic/TestPersistenceContextMany.java @@ -33,8 +33,7 @@ public abstract static class TestModel3 { @Size(max = 255) private String someData; - @OneToMany(cascade = CascadeType.REFRESH) - //@Where(clause = "1=1") + @OneToMany(cascade = CascadeType.ALL) private List many1; public int getId() { @@ -52,6 +51,10 @@ public void setSomeData(String someData) { public String getSomeData() { return someData; } + + public List getMany1() { + return many1; + } } @Entity @@ -65,9 +68,12 @@ public static class TestModel3A extends TestModel3 { @Inheritance @DiscriminatorValue("B") public static class TestModel3B extends TestModel3 { - @OneToMany(cascade = CascadeType.REFRESH) + @OneToMany(cascade = CascadeType.ALL) private List many2; + public List getMany2() { + return many2; + } } @Entity @@ -109,8 +115,16 @@ void initDb() { // 10 mio TestModel - each needs about 1/4 kbytes -> 2,5 GB in total List batch = new ArrayList<>(); for (int i = 0; i < 1_000_000; i++) { + TestModel3 m; + if (i == 5) { + m = new TestModel3A(); + } else { + m = new TestModel3B(); + ((TestModel3B) m).getMany2().add(new TestModel3Many2()); + ((TestModel3B) m).getMany2().add(new TestModel3Many2()); + } + m.getMany1().add(new TestModel3Many1()); - TestModel3 m = i == 5 ? new TestModel3A() : new TestModel3B(); // produces "off-by-one" m.setSomeData(base + i); // ensure we have not duplicates batch.add(m); if (i % 1000 == 0) { @@ -148,6 +162,17 @@ void testFindEachFindList() { }); System.out.println("Read " + i + " entries"); + i.set(0); + System.out.println("Doing findEach with lazyLoad"); + DB.find(TestModel3.class).select("*").findEach(c -> { + i.incrementAndGet(); + i.addAndGet(c.getMany1().size()); + if (c instanceof TestModel3B) { + i.addAndGet(((TestModel3B) c).getMany2().size()); + } + }); + System.out.println("Read " + i + " entries"); // 3999998 is correct + i.set(0); System.out.println("Doing findStream"); DB.find(TestModel3.class).select("*").findStream().forEach(c -> i.incrementAndGet());