Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14368: Connect offset write REST API #13465

Merged
merged 12 commits into from May 26, 2023

Conversation

yashmayya
Copy link
Contributor

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@yashmayya
Copy link
Contributor Author

Hi @C0urante, this will need to be rebased once #13424 and #13434 are merged but I raised this draft PR to solicit some early feedback if possible - particularly around this statement from the KIP:

Offsets will be reset transactionally for each topic that they exist in: a single transaction will be used to emit all tombstone records for the connector's dedicated offsets topic (if one is used) and another transaction will be used to emit all tombstone records for the worker's global offsets topic.

We don't currently use a transactional producer for writing offsets to the worker's global offset backing store. Wouldn't doing so be a breaking change due to the requirement of additional ACLs for the worker's producer principal? For workers where exactly-once source support is enabled, the only way we could do so would be if the connector's configured offsets topic is the same as the worker's global offset topic (in which case, a connector specific offset store is used with a transactional producer). If the connector's configured offsets topic is a different one or if exactly-once source support is not enabled, I don't think we'll be able to write all the provided offsets in a single transaction for the worker's global offset backing store right?

Also one minor comment on the KIP section - the "tombstone records" bit that was copied from the resetting offsets section needs to be updated.

@C0urante C0urante added connect kip Requires or implements a KIP labels Mar 28, 2023
@C0urante
Copy link
Contributor

Hi Yash! Thanks for the draft, I'll try to get to it sometime this week but that may not happen.

With regards to your points:

We don't currently use a transactional producer for writing offsets to the worker's global offset backing store. Wouldn't doing so be a breaking change due to the requirement of additional ACLs for the worker's producer principal?

Since we're introducing a new opt-in API, it's not a breaking change. That said, it's probably worth calling out in the KIP and on the discussion thread. And now that I think about it, maybe a transaction on the global offsets topic isn't really necessary if we've already done a round of zombie fencing. Thoughts? We should notify the discussion thread for the KIP no matter what, just wanted to bounce the idea off you first.

If the connector's configured offsets topic is a different one or if exactly-once source support is not enabled, I don't think we'll be able to write all the provided offsets in a single transaction for the worker's global offset backing store right?

The use of transactions is only necessary if exactly-once source support is enabled for source connectors (both paragraphs that mention the use of transactions begin with "If exactly-once source support is enabled").

Also one minor comment on the KIP section - the "tombstone records" bit that was copied from the resetting offsets section needs to be updated.

🤦 (done)

@yashmayya
Copy link
Contributor Author

Thanks for the swift reply Chris and no rush on the review, I mainly wanted to get the clarifications regarding transactions on the offset topic(s).

Since we're introducing a new opt-in API, it's not a breaking change. That said, it's probably worth calling out in the KIP and on the discussion thread. And now that I think about it, maybe a transaction on the global offsets topic isn't really necessary if we've already done a round of zombie fencing. Thoughts? We should notify the discussion thread for the KIP no matter what, just wanted to bounce the idea off you first.

Ah yeah that's a good point about the opt-in API, we wouldn't require the use of a transactional producer for regular connector task initiated offset writes to the worker's global offset backing store. However, I do agree that it doesn't seem necessary to use transactions on the global offsets topic if exactly-once source support is enabled and the connector is using a custom offsets topic (if it isn't, then we can write to the global offsets topic transactionally using a transactional producer corresponding to the connector). This will be in line with how regular offset writes are handled where not only do we not write offsets transactionally to the secondary store of a connector (the worker's global offset backing store assuming the connector has a custom offset topic configured as the primary store), but we also essentially ignore any errors arising from writes to the secondary store.

The use of transactions is only necessary if exactly-once source support is enabled for source connectors (both paragraphs that mention the use of transactions begin with "If exactly-once source support is enabled").

Ah okay, I misunderstood then. I thought the "If exactly-once source support is enabled" bit was only applicable to the zombie fencing.

@yashmayya yashmayya force-pushed the KAFKA-14368-offset-write-api branch 3 times, most recently from 84ca2f3 to 3bfe686 Compare April 6, 2023 11:45
@yashmayya yashmayya force-pushed the KAFKA-14368-offset-write-api branch 2 times, most recently from 2a41100 to 4dd4686 Compare April 9, 2023 11:20
@yashmayya yashmayya force-pushed the KAFKA-14368-offset-write-api branch from d3f2e09 to d237d6c Compare April 11, 2023 14:33
@yashmayya yashmayya marked this pull request as ready for review April 11, 2023 14:33
@yashmayya yashmayya changed the title KAFKA-14368: WIP: Connect offset write REST API KAFKA-14368: Connect offset write REST API Apr 11, 2023
Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks very good, I did a pass over the main implementation and only found nits.
Looking forward to getting this in before feature freeze!

@gharris1727
Copy link
Contributor

gharris1727 commented Apr 11, 2023

So I was manually testing this feature and ran across a serialization problem. Here's the most concise repro case I can think of:

$ curl -sSX PATCH -H "Content-Type: application/json" localhost:8083/connectors/test/offsets -d '{
  "offsets": [
    {
      "partition": {
        "float": 1.0
      },
      "offset": {
        "key": "value"
      }
    }
  ]
}' | jq .
{
  "message": "The Connect framework managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
}
$ curl -sSX GET localhost:8083/connectors/test/offsets | jq .
{
  "offsets": [
    {
      "partition": {
        "float": 1
      },
      "offset": {
        "key": "value"
      }
    }
  ]
}
$ curl -sSX PATCH -H "Content-Type: application/json" localhost:8083/connectors/test/offsets -d '{
  "offsets": [
    {
      "partition": { 
        "float": 1
      },  
      "offset": {
        "key": "value"
      }
    }
  ]
}' | jq .
{
  "message": "The Connect framework managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
}
$ curl -sSX GET localhost:8083/connectors/test/offsets | jq .
{
  "offsets": [
    {
      "partition": {
        "float": 1
      },
      "offset": {
        "key": "value"
      }
    },
    {
      "partition": {
        "float": 1
      },
      "offset": {
        "key": "value"
      }
    }
  ]
}

The GET portion of the API is mapping decimals 1.0 to the integer-looking 1, which is distinct when serialized by the JsonConverter in the offsets topic. When you copy-paste the result of GET for a subsequent PATCH, it actually edits a completely different partition, since the equality check (and kafka keying/compaction) is done by the serialized form.

I think the ConnectorOffset serialization needs to be tweaked to force showing the decimals, to be consistent with the JsonConverter.

Other manual testing seems to indicate this works great. LGTM once the above is addressed.

@yashmayya
Copy link
Contributor Author

yashmayya commented Apr 12, 2023

I tried really hard to reproduce this via integration tests and I wasn't able to, so I tried doing a repro manually just as you outlined above. Turns out that this isn't actually a bug in either the GET or PATCH offsets API - jq is automatically converting floats like x.0 into x (you can try the exact same repro above without using jq to confirm).

Thanks for the manual testing (I've done some myself too, but the more the merrier of course), it's really appreciated since there's only so many cases that ITs can cover!

@gharris1727
Copy link
Contributor

jq is automatically converting floats like x.0 into x

Thanks for catching that, you're completely correct. I should have cut jq out of my tests to verify that but I hadn't even considered that jq would change that.

It sounds like there's nothing to be done on the framework side then. Hopefully offsets containing floating point numbers is rare enough that not too many people end up finding the same footgun.

@@ -114,6 +127,7 @@ public class Worker {

public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
public static final long ALTER_OFFSETS_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I think this may be too short. With sink connectors it's fairly straightforward to alter consumer group offsets, but for source connectors we have to start and complete a read-to-end of the offsets topic, then write the new offsets to it. And in both cases, we have the alterOffsets connector method to worry about as well.

Can we make the Worker API for altering offsets asynchronous, similar to what we do for reading offsets?

I know that there's concern about tasks being brought up for the connector while the request is being handled, but I think this might be alright.

If the connector is a sink connector, the requests to alter its consumer group's offsets will be rejected by the broker if any tasks are active.

If the connector is a source connector and exactly-once support is enabled, zombie fencing will take place and we won't be able to complete our write to the offsets topic.

Unless I'm mistaken, the only case that's left is non-exactly-once source connectors, which IMO it's acceptable for us to ignore since we can't guarantee that there aren't zombie tasks running around writing their own offsets anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm but for non-exactly-once source connectors (which is the default mode), this would leave the door open to confusing behavior where users could get successful responses from the alter offsets API but the connector could completely ignore the overwritten offsets (if the user resumes the connector in the interim). I agree that the zombie task case is unhandled for non EoS source connectors, but at least that would only typically occur for misbehaving connector plugins whereas making the alter offsets API async would allow users to shoot themselves in the foot. I don't disagree that the 5 second timeout is quite non-ideal and even more concerning is the fact that if a connector's alterOffsets method hangs, it can disable a worker (something that was a big problem with other connector methods until your elegant fix in #8069). I'm just trying to weigh the pros and cons here but it does seem like doing alter offset operations asynchronously in the worker has more benefits than drawbacks.

Copy link
Contributor

@C0urante C0urante Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

users could get successful responses from the alter offsets API but the connector could completely ignore the overwritten offsets (if the user resumes the connector in the interim).

That's a fair point. I was thinking that with the STOPPED state and a possible additional guard to prevent cancelled source tasks from committing offsets, we would have reasonable protection against zombie tasks from overwriting recently-altered offsets. However, I wasn't thinking of the other scenario, where the offset alter request is initiated and left ongoing while the connector is resumed.

I think the risk of blocking the herder thread that you've brought up is perhaps the most convincing argument still in favor of making this operation asynchronous. There's risks not just with calls to the alterOffsets method, but also with reading to the end of offsets topics and contacting the transaction coordinator (if altering offsets for an exactly-once source connector), to name a few.

If we really want to get fancy, one way we could try to decrease the risks of an asynchronous API for non-exactly-once source connectors could be to refuse to assign tasks for connectors with ongoing offset alterations during rebalance, even if the connector is resumed. The task configs for that connector could continue to live in the config topic, we'd just hold off on assigning them until the operation succeeds. Of course, this doesn't work if the leader of the cluster changes while an offset alter request is being serviced, but then (correct me if I'm wrong?) the same risks apply even with a synchronous API (although they're probably less likely). We could also try to add interruption logic that cancels any in-progress offset alter/reset requests when a rebalance starts. Either of these would be fine as a follow-up ticket, if they sound reasonable at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response and it sounds like we're on the same page now. I've refactored the alter offsets worker API to be asynchronous.

possible additional guard to prevent cancelled source tasks from committing offsets

I guess we don't really need to worry too much about cancelled source tasks since during regular task stop, we also remove the periodic offset commit task in the SourceTaskOffsetCommitter?

one way we could try to decrease the risks of an asynchronous API for non-exactly-once source connectors could be to refuse to assign tasks for connectors with ongoing offset alterations during rebalance, even if the connector is resumed

That's an interesting idea but it does seem to be a pretty invasive change w.r.t the current rebalancing logic which is agnostic to all on-going operations in the workers. The limitation is also a valid one and yeah the same risks apply even with the sync API although I'm not sure I follow why you think it's less likely? Isn't it more likely that a synchronous alter offsets request hangs and causes the leader to fall out of the group leading to a new leader being elected?

We could also try to add interruption logic that cancels any in-progress offset alter/reset requests when a rebalance starts

We would need to be careful about the exact points where we allow interruptions. For instance, we wouldn't want to abandon a request midway through writing offsets (in the non-EoS source connector case where it isn't an atomic operation, or for consumer groups when we're altering offsets for some partitions + resetting offsets for some others). Although, this does seem like a more appealing option overall and I've filed this Jira as a potential follow up item - https://issues.apache.org/jira/browse/KAFKA-14910

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whew, finally made it all the way through the non-testing changes!

Thanks for your patience Yash, and apologies for the delays. If it helps, we can get together and discuss this further in person at Kafka Summit to help speed things along; let me know.

@yashmayya yashmayya force-pushed the KAFKA-14368-offset-write-api branch 2 times, most recently from f99e610 to 8704674 Compare May 23, 2023 12:08
…s concurrently instead of sequentially; improve checks in DistributedHerder::alterConnectorOffsetsChecks; various other renames, rewordings, refactors and simplifications.
Comment on lines 1458 to 1433
offsetStore.configure(config);
// This reads to the end of the offsets topic and can be a potentially time-consuming operation
offsetStore.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Believe this still needs to be addressed?

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting really close!

… getFenceZombieSourceTasksCallable method; refactor composite future construction in Worker::alterSinkConnectorOffsets
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yash, I've finished a full pass over the test code. Everything's looking great, this is really close to being merged!

…eword exception message on attempting to alter offsets for connector not in stopped state; introduce additional null checks for Kafka topic names and partitions in SinkUtils::parseSinkConnectorOffsets; various minor test improvements
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Yash! Looking forward to the offset reset PR 😄

@yashmayya
Copy link
Contributor Author

I just noticed that testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted is failing in the CI run. There's also https://issues.apache.org/jira/browse/KAFKA-14956 where testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted has been failing on CI for a while. Interestingly, neither of them have failed for me locally in over a 100 runs under various loads. The point at which both of them are failing is also interesting:

  1. We create an embedded Connect cluster with its own embedded backing Kafka cluster
  2. We create a second embedded Kafka cluster
  3. We configure a sink connector in the embedded Connect cluster which consumes from a topic on the second embedded Kafka cluster
  4. We produce 10 messages each to 5 different partitions of a topic on the second Kafka cluster (which the connector is configured to consume from)
  5. We use the offsets read REST API to get the consumer group offsets for the sink connector and wait until it "catches up" to the expected offsets. This operation is retried up to 15 seconds and if the consumer group offsets (obtained via an admin client in the worker) don't match the expected offsets, the test fails.

Both the tests are failing at this point. Since they consistently pass locally, it doesn't seem to be a correctness issue with connectors that target different Kafka clusters. I'm wondering if we need to up the timeout although 15 seconds should be enough to consume just 50 messages 😕

@C0urante
Copy link
Contributor

Ah, spoke too soon!

I'd be open to bumping timeouts. If this does turn out to be a correctness issue (which is still possible since the timing on CI may be different and therefore more likely to unearth certain kinds of concurrency bugs), we can investigate further.

Also worth noting that WorkerTest::testAlterOffsetsSourceConnectorError is also failing right now because offsetStore::stop hasn't been invoked by the time we check for it. I think you handle this kind of issue elsewhere by adding a second timeout(1000) argument when making calls to Mockito::verify; hopefully that's sufficient for this test as well?

…ector offset alter tests in WorkerTest; increase offset read timeouts in OffsetsApiIntegrationTest
@yashmayya yashmayya force-pushed the KAFKA-14368-offset-write-api branch from edf20dc to 08c20a9 Compare May 25, 2023 14:47
@yashmayya yashmayya requested a review from C0urante May 26, 2023 02:11
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@C0urante C0urante merged commit 7ff2dbb into apache:trunk May 26, 2023
1 check failed
@yashmayya
Copy link
Contributor Author

Thanks Chris!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
connect kip Requires or implements a KIP
Projects
None yet
3 participants