Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit state when no partitions are generated for ccdk #34605

Merged

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Jan 29, 2024

What

CCDK does not emit the state message when no partitions are generated. This is something the platform expect and therefore CCDK should support that.

Note that until we have clarity on the platform expectations regarding the state, we will always emit the state at the end of a sync even if it's duplication

How

I was struggling to make this fit in our domain as I don't get why the platform needs this. I've asked for more information on https://airbytehq-team.slack.com/archives/C03GD9SV36E/p1706540517839939.

Given the information I have, this is the best way I have of formalizing the domain. I don't like this because:

  • it adds a dependency between the cursor and the stream + read processor (while the dependency was only with the partition before)
  • the interface is kind of weird with emit_state_given_no_partitions_generated. If we decide to go with something more generic like emit_state, we are not explicit as to why this is needed (which might be fine but I don't like that when I check the Cursor class, I don't see everything related to state management). The other name I can think of is ensure_state_emitted_at_least_once which is less generic but still relates to the platform expectations. Once we understand that platform expectation, we can simply emit a state even if one as already been submitted or track if Cursor._emit_state_message has been called. Whatever the solution, I would like to think about this in the lens of Sync data accuracy project

I thought of other solutions which where also not optimal

Keep the logic in DefaultStream

The benefit is that there is no interface change and we can do something like:

    def generate_partitions(self) -> Iterable[Partition]:
        for partition in self._stream_partition_generator.generate():
            yield partition
        else:
            # something like `cursor.emit_state()`

The big drawback is that the solution only applies for sources using DefaultStream and this logic would need to be implemented in the FCDK as well. Hence, it feels like a bad way for formalize our domain and that interfaces should change.

Having ConcurrentReadProcessor do all the work

We could have passed the ConnectorStateManager to the ConcurrentReadProcessor and expose AbstractStream.namespace so that when no partitions generated (ConcurrentReadProcessor or PartitionEnqueuer can track that), we return self._connector_state_manager.create_state_message(...)

For this solution, the interfaces still do not change a lot and this would be supported by all the sources. What I don't like about this is:

  • if I do a change in terms of state management, there is a use case that is outside of Cursor and hence as a dev, I could miss some crucial information
  • once we will work on the Sync data accuracy project, if the record count in the state message is by stream, cursor.observe would be able to keep track of the number of records and the solution would be very easy for us if the cursor emits that state. In the Having ConcurrentReadProcessor do all the work solution, it is not the cursor that emits the state so the observe can't be used. Not that if the record count is expected to be across all streams, our domain does not capture that very well today and it might be challenging...

@maxi297 maxi297 requested a review from a team as a code owner January 29, 2024 15:26
Copy link

vercel bot commented Jan 29, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jan 29, 2024 10:47pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Jan 29, 2024
Copy link
Contributor

@clnoll clnoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation @maxi297.

I think it would be ideal not to have state being stored on the sentinel but it feels pretty safe overall (i.e. it doesn't look like anything will be able to or be tempted to mutate that state anywhere).

Regarding your "Keep the logic in DefaultStream" option - is it worth revisiting now that the file-based approach doesn't have its own DefaultStream?

Comment on lines 74 to 75
sentinel.stream.cursor.emit_state_given_no_partitions_generated()
yield from self._message_repository.consume_queue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably being slow, but what is the reasoning behind emitting the state before consuming from the queue? If the previous lines have generated messages that go into the repository it feels like they should be emitted first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emitting the state only push it to the message repository queue. The owner of actually ordering/printing the records decide when to emit these.

I don't really like this logic because as a user, I would assume that emit state would actually send it. I don't remember why we did it like that. That being said, it has advantages: we can send messages to the message repository in a multi threaded way and emit them only on the main thread which allows us to avoid issues regarding messages order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, that makes sense!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can send messages to the message repository in a multi threaded way and emit them only on the main thread which allows us to avoid issues regarding messages order

+1. super important to avoid emitting state messages before the records

@@ -50,6 +51,7 @@ def generate_partitions(self, stream: AbstractStream) -> None:
while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit():
time.sleep(self._sleep_time_in_seconds)
self._queue.put(partition)
self._queue.put(PartitionGenerationCompletedSentinel(stream))
has_generated_partitions = True
self._queue.put(PartitionGenerationCompletedSentinel(stream, has_generated_partitions))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more useful to have the partition count instead of a bool, down the line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked myself the same question and didn't see benefits today for that. There is a (very minor) drawback and it's using a int take more space and this space can grow as the int gets bigger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay yeah not much of a drawback. Given that this feels like this is something we'll want when we add record counts and debug logging, I feel like it makes sense to go ahead and implement it as an int.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #34605 (comment) which seems somewhat relevant to this comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - I read the drawbacks that you listed for that approach and agree that the sentinel approach seems preferable.

@maxi297
Copy link
Contributor Author

maxi297 commented Jan 29, 2024

Regarding your "Keep the logic in DefaultStream" option - is it worth revisiting now that the file-based approach doesn't have its own DefaultStream?

Any source that would re-implement AbstractStream is at risk though and would have to re-implement the logic or emitting at least one state message

@maxi297
Copy link
Contributor Author

maxi297 commented Jan 29, 2024

I think it would be ideal not to have state being stored on the sentinel

We don't need to. The PartitionEnqueuer could call stream.cursor.emit_state_given_no_partitions_generated if no partitions are generated. We would just have ConcurrentReadProcessor consume the queue and no state would be needed

Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

Comment on lines 74 to 75
sentinel.stream.cursor.emit_state_given_no_partitions_generated()
yield from self._message_repository.consume_queue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can send messages to the message repository in a multi threaded way and emit them only on the main thread which allows us to avoid issues regarding messages order

+1. super important to avoid emitting state messages before the records

@maxi297 maxi297 merged commit 2c8b47b into master Jan 30, 2024
20 checks passed
@maxi297 maxi297 deleted the maxi297/emit-state-when-no-partition-generated-solution1 branch January 30, 2024 13:45
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 21, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants