Skip to content

Conversation

@johnny-schmidt
Copy link
Contributor

What

How

Review guide

User Impact

Can this PR be safely reverted and rolled back?

  • YES πŸ’š
  • NO ❌

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So both 3 and 13 in your example of 10 concurrent would still write into the same socket just through a channel so even if they're running concurrently it won't mess us objects tied to the socket such as mapper or writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, they will both post to the same channel, and per socket the write coroutine will tear down the channel 1-at-a-time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when:

  1. Two threads writing to the same channel?
  2. Channel at capacity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kotlin will handle the contention; i'm not sure of the exact way it's handled, but it's supposed to be more efficient than java locks

at capacity the writing coroutine will suspend until the channel clears

@vercel
Copy link

vercel bot commented Apr 10, 2025

The latest updates on your projects. Learn more about Vercel for Git β†—οΈŽ

Name Status Preview Comments Updated (UTC)
airbyte-docs βœ… Ready (Inspect) Visit Preview πŸ’¬ Add feedback Apr 11, 2025 10:40pm

lateinit var socketOutputConsumer: UnixDomainSocketOutputConsumer
override suspend fun acceptAsync(recordData: ObjectNode, changes: Map<Field, FieldValueChange>?, totalNum: Int?, num: Long?) {
outputConsumer.getSocketConsumer(num!!.toInt()).acceptAsync(recordData, stream.namespace ?: "", stream.name)
if (::socketOutputConsumer.isInitialized.not()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is threadsafe b/c this is a suspend fun; the init state of socketOutputConsumer might not be synced across threads if the coroutine changes threads?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants