Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,22 @@ public ScanWorkflowActivityImpl(IWorkflowService serviceClient) {

@Override
public ScanWorkflowActivityResult scan(ScanWorkflowActivityParams params) throws Throwable {
log.info("Starting scan with params: {} ", params);

ListWorkflowExecutionsRequest scanRequest =
new ListWorkflowExecutionsRequest()
.setDomain(params.getDomain())
.setNextPageToken(params.getNextPageToken())
.setPageSize(params.getPageSize())
.setQuery(params.getWorkflowQuery());

log.debug("Created ListWorkflowExecutionsRequest: {} ", scanRequest);
log.info("Scanning workflows with query: {}", params.getWorkflowQuery());

ListWorkflowExecutionsResponse resp = scanWorkflows(scanRequest);

log.info("Received response with {} executions", resp.getExecutions().size());

List<WorkflowExecution> executions =
samplingWorkflows(resp.getExecutions(), params.getSamplingRate());

Expand All @@ -60,18 +68,26 @@ public ScanWorkflowActivityResult scan(ScanWorkflowActivityParams params) throws
.map(com.uber.cadence.internal.shadowing.WorkflowExecution::new)
.collect(Collectors.toList()));
result.setNextPageToken(resp.getNextPageToken());
log.info("Scan completed with {} sampled executions", executions.size());
return result;
}

protected ListWorkflowExecutionsResponse scanWorkflows(ListWorkflowExecutionsRequest request)
throws Throwable {
log.info(
"Scanning workflows for domain: {} with query: {}",
request.getDomain(),
request.getQuery());
try {
return RpcRetryer.retryWithResult(
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
() -> this.serviceClient.ScanWorkflowExecutions(request));
ListWorkflowExecutionsResponse response =
RpcRetryer.retryWithResult(
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
() -> this.serviceClient.ScanWorkflowExecutions(request));
log.info("Successfully scanned workflows for domain: {}", request.getDomain());
return response;
} catch (BadRequestError | EntityNotExistsError | ClientVersionNotSupportedError e) {
log.error(
"failed to scan workflow records with non-retryable error. domain: "
"failed to scan workflow records with non-retryable error. Domain: "
+ request.getDomain()
+ "; query: "
+ request.getQuery(),
Expand All @@ -90,13 +106,17 @@ protected ListWorkflowExecutionsResponse scanWorkflows(ListWorkflowExecutionsReq

protected List<WorkflowExecution> samplingWorkflows(
List<WorkflowExecutionInfo> executionInfoList, double samplingRate) {
log.info("Sampling workflows with rate: {}", samplingRate);
int capacity = (int) (executionInfoList.size() * samplingRate);
capacity = Math.max(capacity, 1);
return executionInfoList
.stream()
.unordered()
.map((executionInfo -> executionInfo.getExecution()))
.limit((long) (capacity))
.collect(Collectors.toList());
List<WorkflowExecution> sampledExecutions =
executionInfoList
.stream()
.unordered()
.map((executionInfo -> executionInfo.getExecution()))
.limit((long) (capacity))
.collect(Collectors.toList());
log.info("Sampled {} workflows out of {}", sampledExecutions.size(), executionInfoList.size());
return sampledExecutions;
}
}