Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

package org.apache.iceberg.hadoop;

import com.github.benmanes.caffeine.cache.Cache;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -40,6 +46,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class TestCachingCatalog extends HadoopTableTestBase {
Expand Down Expand Up @@ -276,6 +283,44 @@ public void testCacheExpirationIsDisabledByANegativeValue() throws IOException {
catalog.isCacheExpirationEnabled());
}

@Test
@Ignore("reproduces https://github.com/apache/iceberg/issues/3791")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test passes with the changes in #3801

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we might want to put this test into the source (and leave it with the Ignore) so it can be reran. At the least lets, reference the other PR here so people will have a higher chance of seeing the link: #3801

public void testDeadlock() throws IOException, InterruptedException {
HadoopCatalog underlyingCatalog = hadoopCatalog();
TestableCachingCatalog catalog = TestableCachingCatalog.wrap(underlyingCatalog, Duration.ofSeconds(1), ticker);
Namespace namespace = Namespace.of("db", "ns1", "ns2");
int numThreads = 20;
for (int i = 0; i < numThreads; i++) {
TableIdentifier tableIdent = TableIdentifier.of(namespace, "tbl" + i);
catalog.createTable(tableIdent, SCHEMA, SPEC, ImmutableMap.of("key", "value"));
}

Cache<TableIdentifier, Table> cache = catalog.cache();
AtomicInteger cacheGetCount = new AtomicInteger(0);
AtomicInteger cacheCleanupCount = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
Random rand = new Random();
for (int i = 0; i < numThreads; i++) {
if (i % 2 == 0) {
executor.submit(() -> {
ticker.advance(Duration.ofSeconds(2));
cache.get(TableIdentifier.of(namespace, "tbl" + rand.nextInt(numThreads)), underlyingCatalog::loadTable);
cacheGetCount.incrementAndGet();
});
} else {
executor.submit(() -> {
ticker.advance(Duration.ofSeconds(2));
cache.cleanUp();
cacheCleanupCount.incrementAndGet();
});
}
}
executor.awaitTermination(2, TimeUnit.SECONDS);
Assertions.assertThat(cacheGetCount).hasValue(numThreads / 2);
Assertions.assertThat(cacheCleanupCount).hasValue(numThreads / 2);
executor.shutdown();
}

public static TableIdentifier[] metadataTables(TableIdentifier tableIdent) {
return Arrays.stream(MetadataTableType.values())
.map(type -> TableIdentifier.parse(tableIdent + "." + type.name().toLowerCase(Locale.ROOT)))
Expand Down