Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change dataflow application id to unique #4569

Merged
merged 1 commit into from
Jan 11, 2023
Merged

Change dataflow application id to unique #4569

merged 1 commit into from
Jan 11, 2023

Conversation

ukclivecox
Copy link
Contributor

Ensures we can update new pipelines without issue in KStreams.If the application id is left as nane of pipeline you can get errors of the form:

18:33:30.910276095  INFO [b6e80-StreamThread-1] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v1) changing to state REBALANCING
18:33:30.924662754  INFO [b6e80-StreamThread-1] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v1) changing to state RUNNING
18:33:30.949271579  INFO [810dc-StreamThread-1] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v2) changing to state RUNNING
18:33:30.954164396  INFO [                main] : d.PipelineSubscriber : received request for image-embedding-pl:1 Id:ceuqurh7307s73fkibfg
18:33:30.956366511  INFO [                main] : d.PipelineSubscriber : Delete pipeline image-embedding-pl version: 1 id: ceuqurh7307s73fkibfg
18:33:30.959070087  INFO [                main] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v1) changing to state PENDING_SHUTDOWN
18:33:30.965224106  INFO [810dc-StreamThread-1] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v2) changing to state REBALANCING
18:33:30.971998129 ERROR [810dc-StreamThread-1] : o.a.k.s.KafkaStreams : stream-client [seldon-dataflow-2e5ed81bda93f796280a962ce448cd8e-f2be91c7-2676-49e9-bd81-b3694bc810dc] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic is unknown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order.
	at org.apache.kafka.streams.processor.internals.StreamTask$RecordQueueCreator.createQueue(StreamTask.java:1280)
	at org.apache.kafka.streams.processor.internals.StreamTask.createPartitionQueues(StreamTask.java:211)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:189)
	at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:271)
	at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:194)
	at org.apache.kafka.streams.processor.internals.Tasks.createTasks(Tasks.java:128)
	at org.apache.kafka.streams.processor.internals.Tasks.handleNewAssignmentAndCreateTasks(Tasks.java:93)
	at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:349)
	at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1341)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:304)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:458)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:461)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:372)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:549)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:969)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:917)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:736)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)

18:33:30.972201962  INFO [810dc-StreamThread-1] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v2) changing to state PENDING_ERROR
18:33:31.003134979  INFO [4bc810dc-CloseThread] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v2) changing to state ERROR
18:33:31.042989217  INFO [009b6e80-CloseThread] :     i.s.d.k.Pipeline : pipeline image-embedding-pl (v1) changing to state NOT_RUNNING

However,this means each pipeline will start with new consumers so in order for not all messages to be rerun on each update have also changed consumer offset reset to latest. This seems the best default for now. In future we could make it configurable per Pipeline.

@sakoush
Copy link
Member

sakoush commented Jan 11, 2023

For future reference and to confirm the intuition of the above changes, here is the post about how to deal with pipeline changes in kstreams.

In particular:

In general, you should reset an application cleanly or use a new application.id if you upgrade your app to a newer version that changes the structure of the topology.

Copy link
Member

@sakoush sakoush left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Given the circumstances I think setting the offset reset to latest is the right approach. However I suspect that we might hit some loss of messages during a pipeline upgrade.

It is worth a test perhaps to confirm if not yet done? i.e. have a inference load in the background, keep upgrading the same pipeline and check if all messages have been processed?

@agrski
Copy link
Contributor

agrski commented Jan 11, 2023

LGTM. Given the circumstances I think setting the offset reset to latest is the right approach. However I suspect that we might hit some loss of messages during a pipeline upgrade.

I suspect there could be two potential issues here:

  • Loss of messages (old topology stops processing, new messages arrive before new topology starts processing).
  • Duplication of messages (messages are tagged with pipeline names; old topology may have instances still processing and rebalancing load as new topology starts processing so both old and new process some of the same messages).

It's not an ideal scenario, but I understand the motivation and that right now it may be impossible to upgrade the pipeline whatsoever when this issue appears.

I'd like to investigate whether using named topologies avoids the issue by removing ambiguity over node identifiers in the topology, but have some work left to do on that.

@ukclivecox ukclivecox merged commit 0bd289c into SeldonIO:v2 Jan 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants