Skip to content

Commit

Permalink
Add a maximum page size and use the count instead of the list (#10443)
Browse files Browse the repository at this point in the history
* Add a maximum page size and use the count instead of the list

* Fix typo
  • Loading branch information
benmoriceau committed Feb 17, 2022
1 parent 33cd51f commit 9d546d3
Showing 1 changed file with 7 additions and 5 deletions.
Expand Up @@ -31,7 +31,6 @@
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.workers.temporal.spec.SpecWorkflow;
import io.airbyte.workers.temporal.sync.SyncWorkflow;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse;
import io.temporal.client.BatchRequest;
Expand All @@ -40,7 +39,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand All @@ -65,6 +63,8 @@ public class TemporalClient {
*/
private static final int DELAY_BETWEEN_QUERY_MS = 10;

private static final int MAXIMUM_SEARCH_PAGE_SIZE = 50;

public static TemporalClient production(final String temporalHost, final Path workspaceRoot, final Configs configs) {
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
return new TemporalClient(WorkflowClient.newInstance(temporalService), workspaceRoot, temporalService, configs);
Expand Down Expand Up @@ -454,14 +454,15 @@ public boolean isWorkflowRunning(final String workflowName) {
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();
do {
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest);
final List<WorkflowExecutionInfo> workflowExecutionInfos = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
final long matchingWorkflowCount = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId().equals(workflowName)))
.collect(Collectors.toList());
if (!workflowExecutionInfos.isEmpty()) {
.count();
if (matchingWorkflowCount != 0) {
return true;
}
token = listOpenWorkflowExecutionsRequest.getNextPageToken();
Expand All @@ -470,6 +471,7 @@ public boolean isWorkflowRunning(final String workflowName) {
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();

} while (token != null && token.size() > 0);
Expand Down

0 comments on commit 9d546d3

Please sign in to comment.