Skip to content

Commit

Permalink
Merge pull request #243 from Backbase/hotfix/fix-memory-leak-2.88.x
Browse files Browse the repository at this point in the history
Hotfix/fix memory leak 2.88.x
  • Loading branch information
sushiljoshi22 committed Oct 28, 2022
2 parents 01da075 + 5dcc4e4 commit 21e92e5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog
All notable changes to this project will be documented in this file.
## [2.88.2](https://github.com/Backbase/stream-services/compare/2.88.1...2.88.2)
- Fix memory leak with UnitOfWorkExecutor

## [2.88.1](https://github.com/Backbase/stream-services/compare/2.88.0...2.88.1)
### Fixed
- Added InterestDetails to BaseProduct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import com.backbase.stream.worker.model.TaskHistory;
import com.backbase.stream.worker.model.UnitOfWork;
import com.backbase.stream.worker.repository.UnitOfWorkRepository;
import java.time.OffsetDateTime;
import java.util.stream.Collectors;
import com.backbase.stream.worker.repository.impl.InMemoryReactiveUnitOfWorkRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.ContinueSpan;
import org.springframework.cloud.sleuth.annotation.NewSpan;
Expand All @@ -17,6 +16,9 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.OffsetDateTime;
import java.util.stream.Collectors;

@Slf4j
public abstract class UnitOfWorkExecutor<T extends StreamTask> {

Expand Down Expand Up @@ -60,6 +62,16 @@ public Mono<UnitOfWork<T>> retrieve(String unitOfWorkId) {
return repository.findById(unitOfWorkId);
}

private Mono<UnitOfWork<T>> cleanup(UnitOfWork<T> unitOfWork) {
// Clean up completed/errored out unit of works from the InMemoryRepository
if (repository instanceof InMemoryReactiveUnitOfWorkRepository) {
log.info("Cleaning up Unit Of Work: {}", unitOfWork.getUnitOfOWorkId());
return repository.delete(unitOfWork)
.thenReturn(unitOfWork);
}
return Mono.just(unitOfWork);
}

private Mono<UnitOfWork<T>> complete(UnitOfWork<T> unitOfWork) {
log.info("Completing Unit Of Work: {}", unitOfWork.getUnitOfOWorkId());
unitOfWork.setLockedAt(null);
Expand Down Expand Up @@ -99,7 +111,8 @@ public Mono<UnitOfWork<T>> executeUnitOfWork(UnitOfWork<T> unitOfWork) {
return Mono.just(unitOfWork)
.flatMap(this::setLocked)
.flatMap(this::executeTasks)
.flatMap(this::complete);
.flatMap(this::complete)
.doFinally(r -> cleanup(unitOfWork));
}

@ContinueSpan(log = "Locking Unit Of Work")
Expand Down Expand Up @@ -136,14 +149,15 @@ private Mono<T> executeTask(UnitOfWork<T> unitOfWork, T streamTask, @SpanTag("st
return actual;
})
.onErrorResume(Throwable.class, throwable -> {
streamTask.setState(StreamTask.State.FAILED);
streamTask.setError(throwable.getMessage());
log.error("Stream Task: {} from Unit Of Work: {} failed: \n{}",
streamTaskId,
streamTask.getId(),
streamTask.getHistory().stream().map(TaskHistory::toString)
.collect(Collectors.joining("\n")));
streamTask.setState(StreamTask.State.FAILED);
return Mono.error(throwable);
// return Mono.just(streamTask);

return Mono.just(streamTask);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import com.backbase.dbs.transaction.api.service.v2.model.TransactionsPostResponseBody;
import com.backbase.stream.worker.StreamTaskExecutor;
import com.backbase.stream.worker.exception.StreamTaskException;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
public class TransactionTaskExecutor implements StreamTaskExecutor<TransactionTask> {

Expand All @@ -35,7 +36,7 @@ public Mono<TransactionTask> executeTask(TransactionTask streamTask) {
})
.collectList()
.map(transactionIds -> {
streamTask.error("transactions", "post", "success", externalIds, transactionIds.stream().map(
streamTask.info("transactions", "post", "success", externalIds, transactionIds.stream().map(
TransactionsPostResponseBody::getId).collect(Collectors.joining(",")), "Ingested Transactions");
streamTask.setResponse(transactionIds);
return streamTask;
Expand Down

0 comments on commit 21e92e5

Please sign in to comment.