Skip to content

SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close()#1287

Closed
PanTheMan wants to merge 1 commit intoapache:masterfrom
PanTheMan:fixConsumerAllocatedBug
Closed

SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close()#1287
PanTheMan wants to merge 1 commit intoapache:masterfrom
PanTheMan:fixConsumerAllocatedBug

Conversation

@PanTheMan
Copy link
Contributor

@PanTheMan PanTheMan commented Feb 21, 2020

Symptom: Users when deploying a job in dev will see the following error message in their logs repeatedly:

2019-09-11 14:39:37.193 [Finalizer] [] LifecycleAwareConsumer [ERROR] kafka consumer allocated and not closed (see attached exception for point of allocation)

java.lang.Throwable

        at com.linkedin.kafka.linkedinclients.decorators.LifecycleAwareConsumer.<init>(LifecycleAwareConsumer.java:54)

        at com.linkedin.kafka.clients.factory.RawKafkaConsumerFactoryFactory$RawKafkaConsumerBuilder.apply(RawKafkaConsumerFactoryFactory.java:72)

        at com.linkedin.kafka.clients.factory.RawKafkaConsumerFactoryFactory$RawKafkaConsumerBuilder.apply(RawKafkaConsumerFactoryFactory.java:44)

        at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemFactory.getKafkaConsumer(SamzaRawLiKafkaSystemFactory.java:135)

        at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemFactory.getAdmin(SamzaRawLiKafkaSystemFactory.java:104)

        at org.apache.samza.config.SystemConfig.lambda$getSystemAdmins$0(SystemConfig.java:97)

        at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)

        at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)

        at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)

        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

        at org.apache.samza.config.SystemConfig.getSystemAdmins(SystemConfig.java:95)

        at org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)

        at org.apache.samza.job.JobRunner.run(JobRunner.scala:82)

        at org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76)

        at java.util.ArrayList.forEach(ArrayList.java:1257)

        at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73)

        at org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:54)

        at org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)

2019-09-11 14:39:37.193 [kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] [] KafkaProducer [INFO] [Producer clientId=kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.

2019-09-11 14:39:37.194 [Finalizer] [] LiKafkaConsumerImpl [INFO] Shutting down in PT1S...

2019-09-11 14:39:37.194 [kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] [] AbstractAuditor [ERROR] Auditor encounter exception.

org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1168)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1126)

        at com.linkedin.kafka.linkedinclients.auditing.TmeAuditor.onClosed(TmeAuditor.java:366)

        at com.linkedin.kafka.clients.auditing.abstractimpl.AbstractAuditor.run(AbstractAuditor.java:157)

Caused by: java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1252)

        at java.lang.Thread.join(Thread.java:1326)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1166)

        ... 3 more

This error message will appear multiple times in their job and doesn't actually affect the job's performance or correctness. This leads to users falsely believing that any job failures is because of this message.

Cause:
All Kafka consumers have a finalize method in them. This means when the garbage collector determines that there's no more reference to the consumer, it will try to close the consumer and find that the consumer wasn't closed properly yet. This happens because in the ProcessJobFactory, a CoordinatorStreamStore is initialized. It is then wrapped around a NamespaceAwareCoordinatorStreamStore. Later on when the ProcessJob is done, a close() is called on the NamespaceAwareCoordinatorStreamStore, however this doesn't close the original CoordinatorStreamStore (As there could be multiple NamespaceAware stores pointing to one CoordinatorStreamStore). So, the garbage collector will see that there are no more references to the CoordinatorStreamStore, leading to the finalize method reporting the above error.

Changes: ProcessJob will now have an additional parameter which will be the CoordinatorStreamStore. Then a close is called on the store when the job is done running. That way there is still a reference to the store after the ProcessJob starts. Also updated the TestProcessJob file to test that the store is closed properly.

Tests: To reproduce this error, deploy any job running the latest version of Samza and look in the .out log file. Then run the same job with this fix included and that error message shouldn't appear anymore

@PanTheMan PanTheMan marked this pull request as ready for review February 21, 2020 23:40
@PanTheMan PanTheMan closed this Feb 23, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant