Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: PulsarIO.read() is failing with java.lang.RuntimeException: Could not find a way to create AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor #30688

Open
1 of 16 tasks
rthneha opened this issue Mar 20, 2024 · 1 comment

Comments

@rthneha
Copy link

rthneha commented Mar 20, 2024

What happened?

I have configured PulsarIO plugin via Beam to read messages from Pulsar as below:

PCollection<PulsarMessage> pCollectionAll = p.apply("ReadPulsarMessage", PulsarIO
                .read()
                .withAdminUrl(options.getPulsarAdminURL())
                .withClientUrl(options.getPulsarClientURL())
                .withTopic(options.getPulsarTopic())); 

I can see PulsarSourceDescriptor has 3 mandatory things so I set those up. But I am not able to read messages & getting below error:


Error message from worker: java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@53cad662
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:821)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Could not find a way to create AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor
        org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator(AutoValueSchema.java:133)
        org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
        org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
        org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:63)
        org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:43)
        org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126)
        org.apache.beam.sdk.coders.Coder.decode(Coder.java:154)
        org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:142)
        org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:102)
        org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:96)
        org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:560)
        org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:542)
        org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:474)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:452)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:304)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:297)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:750)

I saw other way of consuming messages using PulsarClient & it uses JWT token:


import java.net.URL;
import org.apache.pulsar.client.api.*;

public class SNConsumer {

    public static void main(String[] args) throws Exception
    {

        PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsarClientUrl")
            .authentication(
                AuthenticationFactory.token("<JWT Token>")
            )
            .build();

I have this JWT token, but not able to set it up in PulsarClient due to SerializableFunction used. Can someone help:


import org.apache.pulsar.client.api.AuthenticationFactory;

 public Read withPulsarClient(SerializableFunction<String, PulsarClient> pulsarClientFn) {
      //return builder().setPulsarClient(pulsarClientFn).build();
      
      PulsarClient client = PulsarClient.builder()
      .serviceUrl("")
      .authentication(
          AuthenticationFactory.token("<JWT Token>")
      )
      .build();
      return client;
    }

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@rthneha
Copy link
Author

rthneha commented Mar 27, 2024

@MarcoRob Can you help? PulsarIO will it support streaming beam pipeline. We were using rabbitMq earlier, for better scale turning to Pulsar. I dont see expand method in PulsarSourceDescriptor & also can't find anything related to UnboundedReadFromBoundedSource in PulsarIO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant