Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Stroom resilient to bad processor filters. #4276

Open
gcdev373 opened this issue May 16, 2024 · 0 comments
Open

Make Stroom resilient to bad processor filters. #4276

gcdev373 opened this issue May 16, 2024 · 0 comments
Assignees
Labels
Milestone

Comments

@gcdev373
Copy link
Member

gcdev373 commented May 16, 2024

When a processor filter is enabled for a feed that doesn't exist - this can happen following import for example, Stroom can fail to perform any processing, only endlessly erroring because of the one bad filter.

 ERROR  [2024-05-14T11:19:19.593Z] [Create Processor Tasks #4] stroom.processor.impl.ProcessorTaskCreatorImpl - 
 Error creating tasks for filter: 
filter id=16, filter uuid=0cbd118e-ea16-40f1-a19a-8f7f7f4acd7a, 
pipeline uuid=5722027f-5546-44dc-9a8e-dfd3af9b62ef 
RuntimeException - Unable to find doc with reference 
'DocRef{type='Feed', uuid='a2604bae-5ded-425d-b488-78eaec2f368d', name='TEST'}' 
for term: Feed is TEST 
	at stroom.db.util.TermHandler.getDocValue(TermHandler.java:172)
	at stroom.db.util.TermHandler.apply(TermHandler.java:130)
	at stroom.db.util.TermHandler.apply(TermHandler.java:29)
	at stroom.db.util.CommonExpressionMapper.innerApply(CommonExpressionMapper.java:103)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at stroom.db.util.CommonExpressionMapper.innerApply(CommonExpressionMapper.java:121)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at stroom.db.util.CommonExpressionMapper.innerApply(CommonExpressionMapper.java:121)
	at stroom.db.util.CommonExpressionMapper.apply(CommonExpressionMapper.java:65)
	at stroom.db.util.ExpressionMapper.apply(ExpressionMapper.java:85)
	at stroom.meta.impl.db.MetaDaoImpl.createCondition(MetaDaoImpl.java:1660)
	at stroom.meta.impl.db.MetaDaoImpl.createCondition(MetaDaoImpl.java:1656)
	at stroom.meta.impl.db.MetaDaoImpl.find(MetaDaoImpl.java:1374)
	at stroom.meta.impl.MetaServiceImpl.secureFind(MetaServiceImpl.java:379)
	at stroom.meta.impl.MetaServiceImpl.lambda$find$6(MetaServiceImpl.java:324)
	at stroom.util.logging.LocationAwareLambdaLogger.logDurationIfTraceEnabled(LocationAwareLambdaLogger.java:275)
	at stroom.util.logging.LambdaLogger.logDurationIfTraceEnabled(LambdaLogger.java:80)
	at stroom.meta.impl.MetaServiceImpl.find(MetaServiceImpl.java:316)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.runSelectMetaQuery(ProcessorTaskCreatorImpl.java:737)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.createTasksFromCriteria(ProcessorTaskCreatorImpl.java:412)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.doCreateTasksForFilter(ProcessorTaskCreatorImpl.java:352)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createTasksForFilter$8(ProcessorTaskCreatorImpl.java:285)
	at java.base/java.util.Optional.ifPresent(Optional.java:178)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.createTasksForFilter(ProcessorTaskCreatorImpl.java:267)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createTasksForFilter$6(ProcessorTaskCreatorImpl.java:241)
	at stroom.task.impl.TaskContextFactoryImpl.lambda$createFromConsumer$0(TaskContextFactoryImpl.java:181)
	at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$2(TaskContextFactoryImpl.java:253)
	at stroom.util.logging.LocationAwareLambdaLogger.logDurationIfDebugEnabled(LocationAwareLambdaLogger.java:307)
	at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$4(TaskContextFactoryImpl.java:253)
	at stroom.util.pipeline.scope.PipelineScopeRunnable.scopeResult(PipelineScopeRunnable.java:39)
	at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$5(TaskContextFactoryImpl.java:250)
	at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$7(TaskContextFactoryImpl.java:264)
	at stroom.security.impl.SecurityContextImpl.asUserResult(SecurityContextImpl.java:322)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at com.google.inject.internal.DelegatingInvocationHandler.invoke(DelegatingInvocationHandler.java:50)
	at jdk.proxy2/jdk.proxy2.$Proxy122.asUserResult(Unknown Source)
	at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$11(TaskContextFactoryImpl.java:260)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createTasksForFilter$7(ProcessorTaskCreatorImpl.java:247)
	at stroom.security.impl.SecurityContextImpl.asUser(SecurityContextImpl.java:340)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.createTasksForFilter(ProcessorTaskCreatorImpl.java:229)
	at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createNewTasks$3(ProcessorTaskCreatorImpl.java:189)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Possible explanation:
Stroom creates tasks in blocks of a fixed size. First time round it makes that many threads. Each of these is allocated a task to process the pathological filter. Each of these runs for ~100ms or whatever and fails. That’s the whole block / opportunity for processor creation done. No other processors get a look in.
On the next run, there’s another block. Same fixed size. It also creates that many threads. Same logic is used and once again the block is filled with tasks that process same dodgy filter…
So Stroom never does any other processing and can’t get over this without manual intervention to identify the bad filter and disable it.

@gcdev373 gcdev373 added the enhancement A new feature or enhancement to an existing feature label May 16, 2024
@stroomdev66 stroomdev66 added this to the v7.4 milestone May 16, 2024
@stroomdev66 stroomdev66 removed the enhancement A new feature or enhancement to an existing feature label May 16, 2024
@stroomdev66 stroomdev66 self-assigned this May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants