Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the issue with "pinot-pulsar" module (potentially library conflicts) #7270

Closed
snleee opened this issue Aug 10, 2021 · 25 comments · Fixed by #8017
Closed

Fix the issue with "pinot-pulsar" module (potentially library conflicts) #7270

snleee opened this issue Aug 10, 2021 · 25 comments · Fixed by #8017

Comments

@snleee
Copy link
Contributor

snleee commented Aug 10, 2021

Apache Pulsar connector has been added from #7026

However, it currently is facing some issues on runtime (potentially dependency conflicts). We need to fix the conflicts to make the connector work correctly.

2021/08/06 10:53:35.451 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel] [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: myTable_REALTIME, partition: myTable__0__0__20210806T1753Z
java.lang.RuntimeException: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
        at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.api.MessageId.fromByteArray(MessageId.java:58) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.<init>(MessageIdStreamOffset.java:47) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.<init>(LLRealtimeSegmentDataManager.java:1183) ~[pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:349) ~[pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:162) ~[pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:168) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:89) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [pinot-all-0.8.0-jar-with-dependencies.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
        at org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.skipField(ByteBufCodedInputStream.java:192) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.common.api.proto.PulsarApi$MessageIdData$Builder.mergeFrom(PulsarApi.java:1602) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.impl.MessageIdImpl.fromByteArray(MessageIdImpl.java:106) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newMessageIdFromByteArray$3(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:35) ~[pinot-pulsar-0.8.0-shaded.jar:0.8.0-a206db39710e2495f1d72adb90387617b03566d4]
        ... 21 more
pinot-server_1      | 2021/08/05 14:25:59.009 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel] [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: datasource_610bf4bf19200003007e55b3_REALTIME, partition: datasource_610bf4bf19200003007e55b3__0__0__20210805T1425Z
pinot-server_1      | java.lang.IndexOutOfBoundsException: readerIndex(1) + length(8) exceeds writerIndex(6): UnpooledHeapByteBuf(ridx: 1, widx: 6, cap: 6/6)
pinot-server_1      | 	at shaded.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1478) ~[pinot-azure-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at shaded.io.netty.buffer.AbstractByteBuf.readLongLE(AbstractByteBuf.java:845) ~[pinot-azure-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.readRawLittleEndian64(ByteBufCodedInputStream.java:309) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.skipField(ByteBufCodedInputStream.java:177) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.common.api.proto.PulsarApi$MessageIdData$Builder.mergeFrom(PulsarApi.java:1602) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.client.impl.MessageIdImpl.fromByteArray(MessageIdImpl.java:106) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
pinot-server_1      | 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
pinot-server_1      | 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
pinot-server_1      | 	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
pinot-server_1      | 	at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newMessageIdFromByteArray$3(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:35) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
pinot-server_1      | 	at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.8.0-SNAPSHOT-shaded.jar:0.8.0-SNAPSHOT-4da1dae06aef50f0a7c96b5a22019e541310fdd9]
@snleee snleee changed the title Cleaning up "pinot-pulsar" module Fix the issue with "pinot-pulsar" module (potentially library conflicts) Aug 10, 2021
snleee pushed a commit to snleee/pinot that referenced this issue Aug 10, 2021
Currently, pinot-pulsar module is reported to have some
issues on runtime. This PR removes the pinot-pulsar module
from the binary release.

apache#7270
snleee pushed a commit that referenced this issue Aug 10, 2021
Currently, pinot-pulsar module is reported to have some
issues on runtime. This PR removes the pinot-pulsar module
from the binary release.

#7270
snleee pushed a commit that referenced this issue Aug 10, 2021
Currently, pinot-pulsar module is reported to have some
issues on runtime. This PR removes the pinot-pulsar module
from the binary release.

#7270
@Eywek
Copy link

Eywek commented Sep 27, 2021

Hello,
Any news on this?

@kishoreg
Copy link
Member

We should probably shade some of the conflicting libraries

@aleksdikanski
Copy link

Hi, I had a look at this issue, as I'm currently investigating pulsar and pinot integration for my current company.
I first encountered the first error with the InvalidProtocolBufferException (kubernetes deployment) and later with a different setup (local docker) the IndexOutOfBoundsException.

So far, it does not look like an issue with false shaded libraries. As one can see from the stacktraces, the errors occur in libraries that are already shaded by the pulsar client lib.

I looked into the second IndexOutOfBoundsException first and think I found the issue. When constructing a MessageIdStreamOffset from a string the current implementation tries to parse a pulsar MessageId from a string provided.

  /**
   * returns the class object from string message id in the format ledgerId:entryId:partitionId
   * throws {@link IOException} if message if format is invalid.
   * @param messageId
   */
  public MessageIdStreamOffset(String messageId) {
    try {
      _messageId = MessageId.fromByteArray(messageId.getBytes(StandardCharsets.UTF_8));
    } catch (IOException e) {
      LOGGER.warn("Cannot parse message id " + messageId, e);
    }
  }

As you can see from the comment the assumption about the structure of a MessageId is made. Unfortunately the passed String is only the MessageId.toString() representation and not the actual wire transfer representation as a byte array. Hence the parsing fails.
I have currently fixed this by splitting the incoming String and using the constructors of MessageIdImpl and BatchMessageIdImpl to create a MessageId from the string. With that I had no execptions any more, not even the second one, i.e., I could the airline stats example.
I can create a MR for this, but I don't know if splitting and constructing the MessageId using impl classes is such a nice solution and would rather have the wire format of the MessageId passed as an input, but I have not found, were this is happening. Would be nice to get some pointers here. Thanks

@kishoreg
Copy link
Member

@KKcorps can you help here?

@KKcorps
Copy link
Contributor

KKcorps commented Dec 16, 2021

Taking it up.

@aleksdikanski
Copy link

thanks, I linked the changes that I made for it to work for me as a draft PR, in case it helps you.

@KKcorps
Copy link
Contributor

KKcorps commented Dec 19, 2021

@aleksdikanski I remember implementing it this way because I don't want to make any assumptions regarding message-id format in the pinot code. The pulsar team can change the message-id format but our code should still work as long as pulsar lib is handling the new format correctly.

I remember testing this out as well but your concern is valid. I am looking for another solution if that doesn't work will go with your PR one.

@KKcorps
Copy link
Contributor

KKcorps commented Dec 19, 2021

@snleee @aleksdikanski can you tell me how to reproduce this issue? I feel like this can also be related to offset at the startup of the client. If might be throwing error in case of empty string.

@aleksdikanski
Copy link

aleksdikanski commented Dec 20, 2021

Sure @KKcorps , I basically followed the guides on the pulsar and pinot websites:

  • set up a pulsar standalone cluster using https://pulsar.apache.org/docs/en/standalone-docker/#start-pulsar-in-docker
    Please note that I ran the pulsar container in the same network as the pinot cluster for this demo setup and that I gave the pulsar container a name (basically add a --network <pinot-network> --name pulsar to the docker run command)
  • set up a pinot cluster using the manual setup https://docs.pinot.apache.org/basics/getting-started/running-pinot-in-docker#manual-cluster
  • add a pulsar topic (it didn't matter if it was partitioned or not):
    $ docker run -it \
       --rm \
       --network <pinot-network> \
       -p 127.0.0.1:6650:6650 \
       -p 127.0.0.1:8080:8080 \
       apachepulsar/pulsar:2.8.1 \
       bin/pulsar-admin create persistent://public/default/pinot
    
  • add a table and schema to pinot using the following table and schema declaration
    table.json
    {
     "tableName": "airlineStats",
      "tableType": "REALTIME",
     "tenants": {
       "broker": "DefaultTenant",
       "server": "DefaultTenant"
     },
     "segmentsConfig": {
        "schemaName": "airlineStats",
        "timeColumnName": "DaysSinceEpoch",
        "replication": "1",
        "replicasPerPartition": "1",
        "timeType": "DAYS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "365",
        "segmentPushType": "APPEND",
        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
      },
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "pulsar",
          "stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
          "stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
          "stream.pulsar.consumer.type": "lowlevel",
          "stream.pulsar.topic.name": "pinot",
          "stream.pulsar.fetch.timeout.millis": "10000",
          "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
          "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
          "realtime.segment.flush.threshold.size": "10000",
          "realtime.segment.flush.threshold.time": "1h"
        }
      },
      "metadata": {}
    }
    
    schema.json
    {
        "metricFieldSpecs": [
        ],
        "dimensionFieldSpecs": [
          {
            "dataType": "INT",
            "name": "ActualElapsedTime"
          },
          {
            "dataType": "INT",
            "name": "AirTime"
          },
          {
            "dataType": "INT",
            "name": "AirlineID"
          },
          {
            "dataType": "INT",
            "name": "ArrDel15"
          },
          {
            "dataType": "INT",
            "name": "ArrDelay"
          },
          {
            "dataType": "INT",
            "name": "ArrDelayMinutes"
          },
          {
            "dataType": "INT",
            "name": "ArrTime"
          },
          {
            "dataType": "STRING",
            "name": "ArrTimeBlk"
          },
          {
            "dataType": "INT",
            "name": "ArrivalDelayGroups"
          },
          {
            "dataType": "INT",
            "name": "CRSArrTime"
          },
          {
            "dataType": "INT",
            "name": "CRSDepTime"
          },
          {
            "dataType": "INT",
            "name": "CRSElapsedTime"
          },
          {
            "dataType": "STRING",
            "name": "CancellationCode"
          },
          {
            "dataType": "INT",
            "name": "Cancelled"
          },
          {
            "dataType": "STRING",
            "name": "Carrier"
          },
          {
            "dataType": "INT",
            "name": "CarrierDelay"
          },
          {
            "dataType": "INT",
            "name": "DayOfWeek"
          },
          {
            "dataType": "INT",
            "name": "DayofMonth"
          },
          {
            "dataType": "INT",
            "name": "DepDel15"
          },
          {
            "dataType": "INT",
            "name": "DepDelay"
          },
          {
            "dataType": "INT",
            "name": "DepDelayMinutes"
          },
          {
            "dataType": "INT",
            "name": "DepTime"
          },
          {
            "dataType": "STRING",
            "name": "DepTimeBlk"
          },
          {
            "dataType": "INT",
            "name": "DepartureDelayGroups"
          },
          {
            "dataType": "STRING",
            "name": "Dest"
          },
          {
            "dataType": "INT",
            "name": "DestAirportID"
          },
          {
            "dataType": "INT",
            "name": "DestAirportSeqID"
          },
          {
            "dataType": "INT",
            "name": "DestCityMarketID"
          },
          {
            "dataType": "STRING",
            "name": "DestCityName"
          },
          {
            "dataType": "STRING",
            "name": "DestState"
          },
          {
            "dataType": "INT",
            "name": "DestStateFips"
          },
          {
            "dataType": "STRING",
            "name": "DestStateName"
          },
          {
            "dataType": "INT",
            "name": "DestWac"
          },
          {
            "dataType": "INT",
            "name": "Distance"
          },
          {
            "dataType": "INT",
            "name": "DistanceGroup"
          },
          {
            "dataType": "INT",
            "name": "DivActualElapsedTime"
          },
          {
            "dataType": "INT",
            "name": "DivAirportIDs",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "DivAirportLandings"
          },
          {
            "dataType": "INT",
            "name": "DivAirportSeqIDs",
            "singleValueField": false
          },
          {
            "dataType": "STRING",
            "name": "DivAirports",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "DivArrDelay"
          },
          {
            "dataType": "INT",
            "name": "DivDistance"
          },
          {
            "dataType": "INT",
            "name": "DivLongestGTimes",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "DivReachedDest"
          },
          {
            "dataType": "STRING",
            "name": "DivTailNums",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "DivTotalGTimes",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "DivWheelsOffs",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "DivWheelsOns",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "Diverted"
          },
          {
            "dataType": "INT",
            "name": "FirstDepTime"
          },
          {
            "dataType": "STRING",
            "name": "FlightDate"
          },
          {
            "dataType": "INT",
            "name": "FlightNum"
          },
          {
            "dataType": "INT",
            "name": "Flights"
          },
          {
            "dataType": "INT",
            "name": "LateAircraftDelay"
          },
          {
            "dataType": "INT",
            "name": "LongestAddGTime"
          },
          {
            "dataType": "INT",
            "name": "Month"
          },
          {
            "dataType": "INT",
            "name": "NASDelay"
          },
          {
            "dataType": "STRING",
            "name": "Origin"
          },
          {
            "dataType": "INT",
            "name": "OriginAirportID"
          },
          {
            "dataType": "INT",
            "name": "OriginAirportSeqID"
          },
          {
            "dataType": "INT",
            "name": "OriginCityMarketID"
          },
          {
            "dataType": "STRING",
            "name": "OriginCityName"
          },
          {
            "dataType": "STRING",
            "name": "OriginState"
          },
          {
            "dataType": "INT",
            "name": "OriginStateFips"
          },
          {
            "dataType": "STRING",
            "name": "OriginStateName"
          },
          {
            "dataType": "INT",
            "name": "OriginWac"
          },
          {
            "dataType": "INT",
            "name": "Quarter"
          },
          {
            "dataType": "STRING",
            "name": "RandomAirports",
            "singleValueField": false
          },
          {
            "dataType": "INT",
            "name": "SecurityDelay"
          },
          {
            "dataType": "STRING",
            "name": "TailNum"
          },
          {
            "dataType": "INT",
            "name": "TaxiIn"
          },
          {
            "dataType": "INT",
            "name": "TaxiOut"
          },
          {
            "dataType": "INT",
            "name": "Year"
          },
          {
            "dataType": "INT",
            "name": "WheelsOn"
          },
          {
            "dataType": "INT",
            "name": "WheelsOff"
          },
          {
            "dataType": "INT",
            "name": "WeatherDelay"
          },
          {
            "dataType": "STRING",
            "name": "UniqueCarrier"
          },
          {
            "dataType": "INT",
            "name": "TotalAddGTime"
          }
        ],
        "dateTimeFieldSpecs": [
          {
            "name": "DaysSinceEpoch",
            "dataType": "INT",
            "format": "1:DAYS:EPOCH",
            "granularity": "1:DAYS"
          }
        ],
        "schemaName": "airlineStats"
    }
    
  • send a json message from the airline stats example using the pulsar-client
    airlinestats00.json
    {"Quarter":1,"FlightNum":1,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null,
      "DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null,
      "DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null,
      "DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA",
      "DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null,
     "LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203,
     "DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New York",
     "FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New York, NY",
     "OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233,
    "OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238, 
    "ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS",
    "DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD",
    "ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3,
    "Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14,"
    TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null,
    "TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null,
    "Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los Angeles, CA"}
    
    send this message using producer client
     $ docker run -it \
        --rm \
        --network <pinot-network> \
        apachepulsar/pulsar:2.8.1 \
        -v <path/to/dir/containing_airlinestats00.json>:/pulsar/airline \
        bin/pulsar-client produce -f airline/airlinestats00.json -k 0 -n 1 pinot
    

funny enough I tested this today and actually got another error that is different from the two mentioned above:

2021/12/20 16:26:21.807 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: airlineStats_REALTIME, partition: airlineStats__0__0__20211220T1626Z
java.lang.RuntimeException: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pulsar.client.api.MessageId.fromByteArray(MessageId.java:58) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] 
at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.(MessageIdStreamOffset.java:47) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1209) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:344) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:162) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:164) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:86) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]

it also related to the MessageId parsing and was also fixed with my implementation

@KKcorps
Copy link
Contributor

KKcorps commented Dec 21, 2021

Can you also mentioned the Pulsar version you are testing this on? Pinot version seems to be 0.9.1 from the exception logs.

@aleksdikanski
Copy link

I did give the pulsar docker images, which uses the latest version 2.8.1,

@aleksdikanski
Copy link

I also dig a bit yesterday and saw, that the start offset for a segment is fetched from Zookeeper. So I looked at the entry (PinotCluster / PROPERTYSTORE / SEGMENTS / / ) and saw, that the offset is set to the toString() value of the MessageId as well, e.g., something like "segment.realtime.startOffset": "3432:0:1:0". IMHO it is a bit dangerous to use the toString() representation, as this is more likely to change. I guess it should just be the byte array characters of the MessageId.toByteArray() method. I'm afraid I don't know where the ZK value is written.

@KKcorps
Copy link
Contributor

KKcorps commented Dec 21, 2021

Yeah, I think the string offsets are borrowed from the Kafka implementation. We might need to make this more generic. That's also another reason I don't want to split the message id on : and create from there as it can change on pulsar side.
IMO, what the solution should be here is to store in Hex string format and then parse hex string -> byte array -> message id.

@KKcorps
Copy link
Contributor

KKcorps commented Dec 21, 2021

@aleksdikanski In which file does this exception occur? Is it in pinot-all.log or pinot-quickstart.log or during creation of the table itself when running the pinot-admin command?

@aleksdikanski
Copy link

aleksdikanski commented Dec 21, 2021

Hi, I used the log output of the docker containers, so actually no log files were created. the error occurred in the logs of the pinot-server container at the time the first message was read from the topic. so I guess it is fair to say, it occurred when the first segment was created.
The AddTable command of the pinot-admin cli did not result in the exception.

@KKcorps
Copy link
Contributor

KKcorps commented Dec 22, 2021

@aleksdikanski #7947 can you try this and see if it is working on your end. It is working on my end. There was also a corrupt \n in your airline data json. Please remove that before testing.

I am also noticing some issue with using the auto.offset.reset to largest. It works fine when using smallest
Working on the fix for that as well.

@aleksdikanski
Copy link

aleksdikanski commented Dec 22, 2021

@KKcorps thanks I tested your changes and I don't get any errors regarding the MessageId anymore.
The messages are now correctly consumed by pinot.

There was also a corrupt \n in your airline data json. Please remove that before testing.

Must have been a copy paste error, did not have a problem with the original file.

@aleksdikanski
Copy link

aleksdikanski commented Jan 12, 2022

Hi @KKcorps are there any updates on this? Can I support in any way?

@KKcorps
Copy link
Contributor

KKcorps commented Jan 12, 2022

Hi, The PR for the fix is already merged. Is there any other issue with code will be happy to help.

@aleksdikanski
Copy link

@KKcorps You mentioned something not working with the largest offset, does this persist?

I am also noticing some issue with using the auto.offset.reset to largest. It works fine when using smallest
Working on the fix for that as well.

I'm not using the largestoffset for my use case, and have demoed the pulsar / pinot integration in my company. So I would be interested to make this part of the pulsar release again by reverting PR #7272. Maybe also a question for @snleee

@KKcorps
Copy link
Contributor

KKcorps commented Jan 13, 2022

My bad. I need to get on this issue again. I remember testing it out thoroughly and it seemed to be a problem with my test case rather than the plugin itself. I will update you in next 24 hrs.
Update:
Tested it out and the issue is definitely there. IMO, the source of the issue is the Pulsar Reader is always resetting to the latest when starting up. I am trying out a fix by using the Consumer interface instead of the Reader.

@KKcorps
Copy link
Contributor

KKcorps commented Jan 13, 2022

https://github.com/apache/pinot/pull/8017/files
Raised a draft PR. Still testing out more scenarios with this fix.

@polyzos
Copy link

polyzos commented Jan 21, 2022

@KKcorps any updates on this? is there anything to help here?

@KKcorps
Copy link
Contributor

KKcorps commented Jan 21, 2022

Hi The PR was upgraded from draft status to ready for review. Still waiting to be merged

@aleksdikanski
Copy link

Hi @KKcorps,

could you also revert the change in https://github.com/apache/pinot/pull/7272/files in your PR? Otherwise the issue will be fixed but pulsar plugin will not be included in the next pinot release.
Also there are some tests failing in your PR.
Thx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants