Skip to content

Commit

Permalink
#4289 Make analytic duplicate checking optional
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed May 24, 2024
1 parent b69fa4f commit 2f9219b
Show file tree
Hide file tree
Showing 34 changed files with 1,777 additions and 685 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ protected void configure() {
RestResourcesBinder.create(binder())
.bind(AnalyticProcessResourceImpl.class)
.bind(AnalyticDataShardResourceImpl.class)
.bind(DuplicateCheckResourceImpl.class)
.bind(ExecutionScheduleResourceImpl.class);

bind(AnalyticsService.class).to(AnalyticsServiceImpl.class);
bind(DuplicateCheckFactory.class).to(DuplicateCheckFactoryImpl2.class);
bind(DuplicateCheckFactory.class).to(DuplicateCheckFactoryImpl.class);

GuiceUtil.buildMultiBinder(binder(), Clearable.class)
.addBinding(StreamingAnalyticCache.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ public DuplicateCheckDirs(final LmdbEnvDirFactory lmdbEnvDirFactory,
this.duplicateCheckStoreConfig = duplicateCheckStoreConfig;
}

public LmdbEnvDir getDir(final AnalyticRuleDoc analyticRuleDoc) {
return getDir(analyticRuleDoc.getUuid());
}

public LmdbEnvDir getDir(final String analyticRuleUUID) {
return lmdbEnvDirFactory
.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,234 +1,94 @@
package stroom.analytics.impl;

import stroom.analytics.shared.AnalyticRuleDoc;
import stroom.analytics.shared.DeleteDuplicateCheckRequest;
import stroom.analytics.shared.DuplicateCheckRow;
import stroom.analytics.shared.FindDuplicateCheckCriteria;
import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.lmdb2.LmdbDb;
import stroom.lmdb2.LmdbEnv;
import stroom.lmdb2.LmdbEnvDir;
import stroom.lmdb2.LmdbEnvDirFactory;
import stroom.lmdb2.WriteTxn;
import stroom.query.api.v2.Row;
import stroom.query.common.v2.CompiledColumns;
import stroom.query.common.v2.DuplicateCheckStoreConfig;
import stroom.query.common.v2.LmdbKV;
import stroom.query.common.v2.TransferState;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.Metrics;
import stroom.util.shared.ResultPage;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.EnvFlags;
import org.lmdbjava.PutFlags;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Deprecated
@Singleton
public class DuplicateCheckFactoryImpl implements DuplicateCheckFactory {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(DuplicateCheckFactoryImpl.class);

private static final long COMMIT_FREQUENCY_MS = 10000;

private final LmdbEnv lmdbEnv;
private final LmdbDb db;
private final ArrayBlockingQueue<WriteOperation> queue;
private final ByteBufferFactory byteBufferFactory;
private final int maxPutsBeforeCommit = 100;
private final AtomicBoolean shutdown = new AtomicBoolean();
private final TransferState transferState = new TransferState();
private final CountDownLatch transferring = new CountDownLatch(1);
private final DuplicateCheckStoreConfig analyticResultStoreConfig;
private final DuplicateCheckStorePool<String, DuplicateCheckStore> pool;

@Inject
public DuplicateCheckFactoryImpl(final LmdbEnvDirFactory lmdbEnvDirFactory,
final Provider<Executor> executorProvider,
public DuplicateCheckFactoryImpl(final DuplicateCheckDirs duplicateCheckDirs,
final ByteBufferFactory byteBufferFactory,
final DuplicateCheckStoreConfig duplicateCheckStoreConfig) {
final LmdbEnvDir lmdbEnvDir = lmdbEnvDirFactory
.builder()
.config(duplicateCheckStoreConfig.getLmdbConfig())
.build();
this.lmdbEnv = LmdbEnv
.builder()
.config(duplicateCheckStoreConfig.getLmdbConfig())
.lmdbEnvDir(lmdbEnvDir)
.maxDbs(1)
.maxReaders(1)
.addEnvFlag(EnvFlags.MDB_NOTLS)
.build();
this.db = lmdbEnv.openDb("duplicate-check", DbiFlags.MDB_CREATE, DbiFlags.MDB_DUPSORT);
queue = new ArrayBlockingQueue<>(10);
final DuplicateCheckStoreConfig duplicateCheckStoreConfig,
final DuplicateCheckRowSerde duplicateCheckRowSerde,
final Provider<Executor> executorProvider) {
this.byteBufferFactory = byteBufferFactory;
this.analyticResultStoreConfig = duplicateCheckStoreConfig;

// Start transfer loop.
executorProvider.get().execute(this::transfer);
}

@Override
public DuplicateCheck create(final AnalyticRuleDoc analyticRuleDoc, final CompiledColumns compiledColumns) {
final DuplicateKeyFactory duplicateKeyFactory = new DuplicateKeyFactory(
pool = new DuplicateCheckStorePool<>(k -> new DuplicateCheckStore(
duplicateCheckDirs,
byteBufferFactory,
analyticRuleDoc,
compiledColumns);
return new DuplicateCheckImpl(duplicateKeyFactory, byteBufferFactory, queue);
analyticResultStoreConfig,
duplicateCheckRowSerde,
executorProvider,
k),
null,
DuplicateCheckStore::flush,
DuplicateCheckStore::close);
}

private void transfer() {
Metrics.measure("Transfer", () -> {
transferState.setThread(Thread.currentThread());

lmdbEnv.write(writeTxn -> {
try {
long lastCommitMs = System.currentTimeMillis();
long uncommittedCount = 0;

try {
while (!transferState.isTerminated()) {
LOGGER.trace(() -> "Transferring");
final WriteOperation queueItem = queue.poll(1, TimeUnit.SECONDS);

if (queueItem != null) {
queueItem.apply(db, writeTxn);
uncommittedCount++;
}

if (uncommittedCount > 0) {
final long count = uncommittedCount;
if (count >= maxPutsBeforeCommit ||
lastCommitMs < System.currentTimeMillis() - COMMIT_FREQUENCY_MS) {

// Commit
LOGGER.trace(() -> {
if (count >= maxPutsBeforeCommit) {
return "Committing for max puts " + maxPutsBeforeCommit;
} else {
return "Committing for elapsed time";
}
});
writeTxn.commit();
lastCommitMs = System.currentTimeMillis();
uncommittedCount = 0;
}
}
}
} catch (final InterruptedException e) {
LOGGER.trace(e::getMessage, e);
// Keep interrupting this thread.
Thread.currentThread().interrupt();
} catch (final RuntimeException e) {
LOGGER.error(e::getMessage, e);
}

if (uncommittedCount > 0) {
LOGGER.debug(() -> "Final commit");
writeTxn.commit();
}

} catch (final Throwable e) {
LOGGER.error(e::getMessage, e);
} finally {
// Ensure we complete.
LOGGER.debug(() -> "Finished transfer while loop");
transferState.setThread(null);
transferring.countDown();
@Override
public DuplicateCheck create(final AnalyticRuleDoc analyticRuleDoc,
final CompiledColumns compiledColumns) {
if (!analyticRuleDoc.isRememberNotifications() &&
!analyticRuleDoc.isIgnoreDuplicateNotifications()) {
return new DuplicateCheck() {
@Override
public boolean check(final Row row) {
return true;
}
});
});
}

public synchronized void close() {
LOGGER.debug(() -> "close called");
LOGGER.trace(() -> "close()", new RuntimeException("close"));
if (shutdown.compareAndSet(false, true)) {

// Let the transfer loop know it should stop ASAP.
transferState.terminate();
@Override
public void close() {
// Ignore
}
};
}

// Wait for transferring to stop.
try {
LOGGER.debug(() -> "Waiting for transfer to stop");
transferring.await();
} catch (final InterruptedException e) {
LOGGER.trace(e::getMessage, e);
// Keep interrupting this thread.
Thread.currentThread().interrupt();
final DuplicateCheckStore store = pool.borrow(analyticRuleDoc.getUuid());
final DuplicateCheckRowFactory duplicateCheckRowFactory = new DuplicateCheckRowFactory(compiledColumns);

return new DuplicateCheck() {
@Override
public boolean check(final Row row) {
final DuplicateCheckRow duplicateCheckRow = duplicateCheckRowFactory.createDuplicateCheckRow(row);
final boolean success = store.tryInsert(duplicateCheckRow);
if (analyticRuleDoc.isIgnoreDuplicateNotifications()) {
return success;
} else {
return true;
}
}

try {
lmdbEnv.close();
} catch (final RuntimeException e) {
LOGGER.error(e::getMessage, e);
@Override
public void close() {
pool.release(analyticRuleDoc.getUuid());
}
}
};
}

public interface WriteOperation {

void apply(LmdbDb db, WriteTxn writeTxn);
public synchronized ResultPage<DuplicateCheckRow> fetchData(final FindDuplicateCheckCriteria criteria) {
return pool.use(criteria.getAnalyticDocUuid(), store -> store.fetchData(criteria));
}

private static class DuplicateCheckImpl implements DuplicateCheck {

private final DuplicateKeyFactory duplicateKeyFactory;
private final ByteBufferFactory byteBufferFactory;
private final ArrayBlockingQueue<WriteOperation> queue;

public DuplicateCheckImpl(final DuplicateKeyFactory duplicateKeyFactory,
final ByteBufferFactory byteBufferFactory,
final ArrayBlockingQueue<WriteOperation> queue) {
this.duplicateKeyFactory = duplicateKeyFactory;
this.byteBufferFactory = byteBufferFactory;
this.queue = queue;
}

@Override
public boolean check(final Row row) {
final LmdbKV lmdbKV = duplicateKeyFactory.createRow(row);
boolean result = false;

try {
final SynchronousQueue<Boolean> transferQueue = new SynchronousQueue<>();
final WriteOperation writeOperation = (dbi, writeTxn) -> {
boolean success = false;
try {
success = dbi.put(writeTxn,
lmdbKV.getRowKey(),
lmdbKV.getRowValue(),
PutFlags.MDB_NODUPDATA);
} finally {
try {
transferQueue.put(success);
} catch (final InterruptedException e) {
LOGGER.error(e::getMessage, e);
Thread.currentThread().interrupt();
}

byteBufferFactory.release(lmdbKV.getRowKey());
byteBufferFactory.release(lmdbKV.getRowValue());
}
};
queue.put(writeOperation);
result = transferQueue.take();

} catch (final InterruptedException e) {
LOGGER.error(e::getMessage, e);
Thread.currentThread().interrupt();
}

return result;
}

@Override
public void close() {

}
public synchronized Boolean delete(final DeleteDuplicateCheckRequest request) {
return pool.use(request.getAnalyticDocUuid(), store -> store.delete(request, byteBufferFactory));
}
}
Loading

0 comments on commit 2f9219b

Please sign in to comment.