Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Multi Writer DeltaStreamer (W1 and W2) Writing into Partition IN and US One of them failing #11417

Closed
soumilshah1995 opened this issue Jun 8, 2024 · 0 comments

Comments

@soumilshah1995
Copy link

When running the Hoodie DeltaStreamer with two writers simultaneously, one for the US partition and the other for the IN partition, one of the writers fails with a NullPointerException. This issue occurs during the offset fetching process from Kafka.

image

Steps

spin up stack

version: "3"

services:
  trino-coordinator:
    image: 'trinodb/trino:400'
    hostname: trino-coordinator
    ports:
      - '8080:8080'
    volumes:
      - ./trino/etc:/etc/trino

  metastore_db:
    image: postgres:11
    hostname: metastore_db
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: hive
      POSTGRES_PASSWORD: hive
      POSTGRES_DB: metastore

  hive-metastore:
    hostname: hive-metastore
    image: 'starburstdata/hive:3.1.2-e.18'
    ports:
      - '9083:9083' # Metastore Thrift
    environment:
      HIVE_METASTORE_DRIVER: org.postgresql.Driver
      HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
      HIVE_METASTORE_USER: hive
      HIVE_METASTORE_PASSWORD: hive
      HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
      S3_ENDPOINT: http://minio:9000
      S3_ACCESS_KEY: admin
      S3_SECRET_KEY: password
      S3_PATH_STYLE_ACCESS: "true"
      REGION: ""
      GOOGLE_CLOUD_KEY_FILE_PATH: ""
      AZURE_ADL_CLIENT_ID: ""
      AZURE_ADL_CREDENTIAL: ""
      AZURE_ADL_REFRESH_URL: ""
      AZURE_ABFS_STORAGE_ACCOUNT: ""
      AZURE_ABFS_ACCESS_KEY: ""
      AZURE_WASB_STORAGE_ACCOUNT: ""
      AZURE_ABFS_OAUTH: ""
      AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
      AZURE_ABFS_OAUTH_CLIENT_ID: ""
      AZURE_ABFS_OAUTH_SECRET: ""
      AZURE_ABFS_OAUTH_ENDPOINT: ""
      AZURE_WASB_ACCESS_KEY: ""
      HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
    depends_on:
      - metastore_db
    healthcheck:
      test: bash -c "exec 6<> /dev/tcp/localhost/9083"


  fast-data-dev:
    image: dougdonohoe/fast-data-dev
    ports:
      - "3181:3181"
      - "3040:3040"
      - "7081:7081"
      - "7082:7082"
      - "7083:7083"
      - "7092:7092"
      - "8081:8081"
    environment:
      - ZK_PORT=3181
      - WEB_PORT=3040
      - REGISTRY_PORT=8081
      - REST_PORT=7082
      - CONNECT_PORT=7083
      - BROKER_PORT=7092
      - ADV_HOST=127.0.0.1

volumes:
  hive-metastore-postgresql:

networks:
  default:
    name: hudi

publish some data

from faker import Faker
from time import sleep
import random
import uuid
from datetime import datetime
from kafka_schema_registry import prepare_producer

# Configuration
KAFKA_BOOTSTRAP_SERVERS = ['localhost:7092']
SCHEMA_REGISTRY_URL = 'http://localhost:8081'
NUM_MESSAGES = 20
SLEEP_INTERVAL = 1
TOPIC_NAME = 'orders'
NUM_PARTITIONS = 1
REPLICATION_FACTOR = 1

# Avro Schema
SAMPLE_SCHEMA = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "name", "type": "string"},
        {"name": "order_value", "type": "string"},
        {"name": "priority", "type": "string"},
        {"name": "order_date", "type": "string"},
        {"name": "customer_id", "type": "string"},
        {"name": "ts", "type": "string"},
        {"name": "country", "type": "string"}
    ]
}

# Kafka Producer
producer = prepare_producer(
    KAFKA_BOOTSTRAP_SERVERS,
    SCHEMA_REGISTRY_URL,
    TOPIC_NAME,
    NUM_PARTITIONS,
    REPLICATION_FACTOR,
    value_schema=SAMPLE_SCHEMA
)

# Faker instance
faker = Faker()


class DataGenerator:
    @staticmethod
    def get_orders_data():
        """
        Generate and return a dictionary with mock order data.
        """
        country = random.choice(['US', 'IN'])  # Define country variable

        return {
            "order_id": str(uuid.uuid4()),
            "name": faker.text(max_nb_chars=20),
            "order_value": str(random.randint(10, 1000)),
            "priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
            "order_date": faker.date_between(start_date='-30d', end_date='today').strftime('%Y-%m-%d'),
            "customer_id": str(uuid.uuid4()),
            "ts": str(datetime.now().timestamp()),
            "country": country
        }

    @staticmethod
    def produce_avro_message(producer, data):
        """
        Produce an Avro message and send it to the appropriate Kafka topic based on the country.
        """
        topic = 'orders_in' if data['country'] == 'IN' else 'orders_us'
        producer.send(topic, data)


# Generate and send order data
for _ in range(NUM_MESSAGES):
    order_data = DataGenerator.get_orders_data()
    print(order_data, type(order_data))
    DataGenerator.produce_avro_message(producer, order_data)
    print("Order Payload:", order_data)
    sleep(SLEEP_INTERVAL)

Job 1


spark-submit \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2' \
    --properties-file spark-config.properties \
    --master 'local[*]' \
    --executor-memory 1g \
    /Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
    --table-type COPY_ON_WRITE \
    --op UPSERT \
    --min-sync-interval-seconds 60 \
    --continuous \
    --source-ordering-field ts \
    --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
    --target-base-path 'file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders'  \
    --target-table orders \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --transformer-class 'org.apache.hudi.utilities.transform.SqlFileBasedTransformer' \
    --props tbl_hudi_us.props

tbl_hudi_us.props

hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.partitionpath.field=country
hoodie.datasource.write.precombine.field=ts

# Kafka Prop
bootstrap.servers=localhost:7092
auto.offset.reset=earliest
hoodie.deltastreamer.source.kafka.topic=orders_us
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
schema.registry.url=http://localhost:8081/
hoodie.deltastreamer.schemaprovider.registry.schemaconverter=
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest

# Hive Sync
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083
hoodie.datasource.hive_sync.mode=hms
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.hive_sync.database=default
hoodie.datasource.hive_sync.table=orders
hoodie.datasource.write.hive_style_partitioning=true

# Locks Providers
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider

hoodie.streamer.transformer.sql.file=/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/us.sql

us.sql

SELECT
    *
FROM
        <SRC> a

job 2

spark-submit \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2' \
    --properties-file spark-config.properties \
    --master 'local[*]' \
    --executor-memory 1g \
    /Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
    --table-type COPY_ON_WRITE \
    --op UPSERT \
    --min-sync-interval-seconds 60 \
    --continuous \
    --source-ordering-field ts \
    --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
    --target-base-path 'file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders'  \
    --target-table orders \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --transformer-class 'org.apache.hudi.utilities.transform.SqlFileBasedTransformer' \
    --props tbl_hudi_in.props

tbl_hudi_in.props


hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.partitionpath.field=country
hoodie.datasource.write.precombine.field=ts

# Kafka Prop
bootstrap.servers=localhost:7092
auto.offset.reset=earliest
hoodie.deltastreamer.source.kafka.topic=orders_in
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
schema.registry.url=http://localhost:8081/
hoodie.deltastreamer.schemaprovider.registry.schemaconverter=
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest

# Hive Sync
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083
hoodie.datasource.hive_sync.mode=hms
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.hive_sync.database=default
hoodie.datasource.hive_sync.table=orders
hoodie.datasource.write.hive_style_partitioning=true

# Locks Providers
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider

hoodie.streamer.transformer.sql.file=/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/in.sql

in.sql

SELECT
    *
FROM
        <SRC> a

spark-config.properties

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.hive.convertMetastoreParquet=false

one of writer fails

name=orders
24/06/08 09:58:02 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
24/06/08 09:58:02 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders
24/06/08 09:58:02 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240608095731003__commit__COMPLETED__20240608095736885]}
24/06/08 09:58:02 INFO StreamSync: Checkpoint to resume from : Option{val=orders_in,0:8}
24/06/08 09:58:02 INFO ConsumerConfig: ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:7092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-null-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = null
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer

24/06/08 09:58:02 INFO KafkaAvroDeserializerConfig: KafkaAvroDeserializerConfig values: 
	bearer.auth.token = [hidden]
	schema.registry.url = [http://localhost:8081/]
	basic.auth.user.info = [hidden]
	auto.register.schemas = true
	max.schemas.per.subject = 1000
	basic.auth.credentials.source = URL
	schema.registry.basic.auth.user.info = [hidden]
	bearer.auth.credentials.source = STATIC_TOKEN
	specific.avro.reader = false
	value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
	key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

24/06/08 09:58:02 WARN ConsumerConfig: The configuration 'hoodie.deltastreamer.source.kafka.value.deserializer.class' was supplied but isn't a known config.
24/06/08 09:58:02 WARN ConsumerConfig: The configuration 'hoodie.streamer.source.kafka.value.deserializer.schema' was supplied but isn't a known config.
24/06/08 09:58:02 INFO AppInfoParser: Kafka version: 2.8.0
24/06/08 09:58:02 INFO AppInfoParser: Kafka commitId: ebb1d6e21cc92130
24/06/08 09:58:02 INFO AppInfoParser: Kafka startTimeMs: 1717855082275
24/06/08 09:58:02 INFO Metadata: [Consumer clientId=consumer-null-1, groupId=null] Cluster ID: Vd3KoCf_Qbu1gx33_kRL3g
24/06/08 09:58:02 INFO Metrics: Metrics scheduler closed
24/06/08 09:58:02 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
24/06/08 09:58:02 INFO Metrics: Metrics reporters closed
24/06/08 09:58:02 INFO AppInfoParser: App info kafka.consumer for consumer-null-1 unregistered
24/06/08 09:58:02 ERROR HoodieStreamer: Shutting down delta-sync due to exception
java.lang.NullPointerException
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340)
	at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
	at java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785)
	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340)
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255)
	at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63)
	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
	at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228)
	at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527)
	at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
	at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
24/06/08 09:58:02 INFO HoodieStreamer: Delta Sync shutdown. Error ?true
24/06/08 09:58:02 INFO HoodieStreamer: Ingestion completed. Has error: true
24/06/08 09:58:02 INFO StreamSync: Shutting down embedded timeline server
24/06/08 09:58:02 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
	at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
	at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
	at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205)
	at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException
	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:796)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340)
	at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
	at java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785)
	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340)
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255)
	at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63)
	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
	at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228)
	at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527)
	at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
	at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
	... 4 more
24/06/08 09:58:02 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/06/08 09:58:02 INFO SparkUI: Stopped Spark web UI at http://soumils-mbp:8091
24/06/08 09:58:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/06/08 09:58:02 INFO MemoryStore: MemoryStore cleared
24/06/08 09:58:02 INFO BlockManager: BlockManager stopped
24/06/08 09:58:02 INFO BlockManagerMaster: BlockManagerMaster stopped
24/06/08 09:58:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/06/08 09:58:02 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception.
	at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:67)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
	at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205)
	at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
	at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
	at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
	... 15 more
Caused by: org.apache.hudi.exception.HoodieException
	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:796)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340)
	at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
	at java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785)
	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340)
	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255)
	at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63)
	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
	at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228)
	at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527)
	at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
	at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
	... 4 more
24/06/08 09:58:02 INFO ShutdownHookManager: Shutdown hook called
24/06/08 09:58:02 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-c4a23229-86f5-4787-94bc-4c32ab8066c4
24/06/08 09:58:02 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-0c931763-81aa-4831-ab7e-9b1b7e2a3103
(base) soumilshah@Soumils-MBP deltastreamerBroadcastJoins % 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant