-
Notifications
You must be signed in to change notification settings - Fork 857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add OffsetsForTimes API to Consumer #235
Conversation
It looks like @denisivan0v hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
|
It looks like @denisivan0v hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
|
@confluentinc It looks like @denisivan0v just signed our Contributor License Agreement. 👍 Always at your service, clabot |
@mhowlett Is there any feed where I can get these artifacts https://ci.appveyor.com/project/mhowlett/confluent-kafka-dotnet/build/1.0.53/artifacts ? |
Thanks a lat for PR! Not online (only release packages are pushed on nuget), but you can download them and add a repository as feed source to test them. |
that was quick! thanks! General comments:
API choice: I wouldn't use All of the following is up for discussion/change, just thinking out loud here: Currently, no other method on I'd invent a new type For return value I'd use I see in the returned collection you're including the Note to self: Read this closely, and review entire API: |
Also, I'm a fan of using |
Thanks for your comments! I've made required changes to unify API and inserted API docs on public methods and types. As for Also, I think that is better to use I'm going to also add integration test(s) later today as well. |
excellent! looking good at a quick glance - look forward to the integration test :-) I don't think librdkafka will produce an error for duplicate input (or at least if I had written it it wouldn't..). I think you can feel confident using this branch in your own code (most conveniently via the artifact produced on appveyor). I'm open to changing my mind on our use of our collections but I think for now consistency should win and I'll open a general issue soliciting feedback across the entire API. FYI: the timeline for getting this merged is as follows:
|
src/Confluent.Kafka/Producer.cs
Outdated
@@ -1005,8 +1005,12 @@ public class Producer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDispo | |||
ISerializer<TValue> valueSerializer, | |||
bool manualPoll, bool disableDeliveryReports) | |||
{ | |||
var configWithoutKeySerializerProperties = KeySerializer.Configure(config, true); | |||
var configWithoutValueSerializerProperties = ValueSerializer.Configure(config, false); | |||
var configWithoutKeySerializerProperties = keySerializer != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, noticed it this we, it was introduced in #205 but waiting for matt to come back as there are other things to fix on it. This will be done in separate PR I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's ok if it will be another PR for this. But I need it to be fixed here as I'm going to use this CI build. I'll resolve conflicts if any.
Still didn't get proper time to review/try it, but should definitely have some this week ;) |
@mhowlett I've added an integration test and basic unit tests. Also, I've found that most of the existing integration tests don't work (at least in my env). Moreover, |
@treziac Looking forward to your review as well. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again!
Major problem for me is the wrong values returned without exception, but it's more a librdkafka issue I think.
You should add integration test on what happen when using wrong timestamp (in future or too much in past) and check we properly throw.
Still have to review and test properly for possible issue
@@ -713,6 +719,11 @@ private class NativeMethods | |||
int partition, out long low, out long high); | |||
|
|||
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] | |||
internal static extern ErrorCode rd_kafka_offsets_for_times(IntPtr rk, | |||
/* rd_kafka_topic_partition_list_t * */ IntPtr topics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offsets, not topics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/Confluent.Kafka/Consumer.cs
Outdated
@@ -612,6 +612,31 @@ public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition) | |||
=> consumer.QueryWatermarkOffsets(topicPartition); | |||
|
|||
/// <summary> | |||
/// Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the | |||
/// earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. | |||
/// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove empty line for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, as you repeat the doc, you may want to use include_docs.xml (matt used it recently, this seems a good idea to use it when doc is duplicated)
So just need a
/// <include file='include_docs.xml' path='API/Member[@name="Consumer_OffsetsForTimes"]/*' />
And add the proper doc in the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -391,6 +392,11 @@ internal static SafeTopicHandle topic_new(IntPtr rk, string topic, IntPtr conf) | |||
out long low, out long high) | |||
=> _get_watermark_offsets(rk, topic, partition, out low, out high); | |||
|
|||
private delegate ErrorCode OffsetsForTimes(IntPtr rk, IntPtr topics, IntPtr timeout_ms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offsets, not topics (by reading rdkafka.h, may change this comment later finaly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal IEnumerable<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, int millisecondsTimeout) | ||
{ | ||
IEnumerable<TopicPartitionOffset> offsets = timestampsToSearch | ||
.Select(t => new TopicPartitionOffset(t.TopicPartition, t.Timestamp.UnixTimestampMs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems so weird with the TopicPartitionOffset taking a long timestamp (though required by librdkafka)
Add some commentary to explain quickly why this is done, or perhaps create a new GetCTopicPartitionList(IEnumerable<TopicPartitionTimestamp> offsets)
- not sure what is best
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment with explanation
@@ -30,13 +30,19 @@ public static IEnumerable<object[]> KafkaParameters() | |||
{ | |||
if (kafkaParameters == null) | |||
{ | |||
var codeBase = typeof(Tests).GetTypeInfo().Assembly.CodeBase; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps in another PR - I never took time to do it for now as I wanted to put properly appveyor for integration tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assembly.Location
provides the full path of the tests dll. And it's exactly what is needed here.
From the MSDN article for the CodeBase
property:
To get the absolute path to the loaded manifest-containing file, use the
Assembly.Location
property instead.
I can rollback this and move to anther PR. Should I do it?
var assemblyDirectory = Path.GetDirectoryName(assemblyPath); | ||
var jsonPath = Path.Combine(assemblyDirectory, "kafka.parameters.json"); | ||
dynamic json = JsonConvert.DeserializeObject(File.ReadAllText(jsonPath)); | ||
kafkaParameters = new List<object[]>() { new object[] { json.bootstrapServers.ToString(), json.topic.ToString(), json.partitionedTopic.ToString() } }; | ||
var json = JObject.Parse(File.ReadAllText(jsonPath)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to go from JsonConvert.DeserializeObject to JObject.Parse?
Same things, would be better in another PR with the previous one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With JsonConvert.DeserializeObject
we need to have strongly typed object (or dynamic object, like in this case). With JObject.Parse
we have parsed JSON tree as the result and can access to the object properties without any other object creations. In my opinion, it's better to use JObject
here instead of dynamic
object.
// See librdkafka implementation for details https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.c#L2433 | ||
var result = consumer.OffsetsForTimes( | ||
new[] {new TopicPartitionTimestamp(messages[0].TopicPartition, messages[0].Timestamp)}, | ||
TimeSpan.FromSeconds(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have have some issues here, I think it is librdkafka related
When passing TimeSpan.FromSeconds(1), I don't receive any error but the offset is not updated - I retrieve timestamp as offset. Will try to test more this we - won't have more time for reviewing today
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. The current rdlibkafka implementation just returns from the wait loop when timeout ends. This leads to the issue when offset is not updated, unfortunately.
See https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.c#L2475 for details
{ | ||
for (int index = 0; index < N; index++) | ||
{ | ||
var message = await producer.ProduceAsync(topic, $"test key {index}", $"test val {index}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid having too long tests, prefer using an IDeliveryHandler, allowing you to produce all your messages async - this is not a no-go, feel free to change it or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried using IDeliveryHandler
to make code shorter, but this approach gives me about the same. I'd like to leave it like this, if you don't mind.
/// <param name="timestamp"> | ||
/// A Kafka timestamp value. | ||
/// </param> | ||
public TopicPartitionTimestamp(string topic, int partition, Timestamp timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may modify Timestamp back to the way where it accepted Datetime - if we want to retrieve offset at a particular date, it's kind of heavy with current api (new Timestamp(Timestamp.FromDateTime(...)), or perhaps just add a Timestamp.FromDateTime static method, or an implicit/explicit conversion from datetime to timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added Timestamp.FromDateTime
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we want to provide easier conversion from DateTime
to Timestamp
.
Of the options @treziac notes, I'm definitely against implicit conversion and TimestampType
needs to be specified so explicit conversion is out too.
Between the static method and constructor, I'm currently favoring using a constructor because 1. it's fewer characters. 2. we only ever want to return a Timestamp
struct - we don't need the flexibility provided by a static method. 3. It's slightly incorrectly named - DateTime
is not the only parameter. 4. the constructor variant reads easier to me.
I'm definitely open to being convinced back to using a static method if someone wants to present a good argument, but currently think we should change the FromDateTime
method to an additional Timestamp
constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, some previous conversation related to DateTime/Timestamps is in #120
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agreed that a constructor much better than the static method. Done in a517357
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edit: found the error by proper checking of librdkafka code. Need to adapt your code and add tests given this scenario
|
||
var list = GetTopicPartitionOffsetErrorList(cOffsets); | ||
LibRdKafka.topic_partition_list_destroy(cOffsets); | ||
return list.Select(t => t.TopicPartitionOffset).ToList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can have error by partition - the error returned by offsets_for_times i a general error (broker not available), but each partition may return an error. In this case, the offset will still be at the timestamp. You can reproduce easily by querying to an inexistant partition.
Two choices: either we return a list of TopicPartitionOffsetError (and user will have to map it before assign), either we throw exception if any member has error. I prefer first solution, eventualy with an implcit cast from TopicPartitionErrorOffset to topicPartitionOffset which would throw if an error is present
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure that I can repro the partition error when querying non-existent partition. In my case I've got a general error.
But I've made checks for errors for every partition, and in case of any error OffsetsForTimes
throws the KafkaException
for the first found error.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer first solution,
yes, I agree (as noted elsewhere).
eventualy with an implcit cast from TopicPartitionErrorOffset to topicPartitionOffset which would throw if an error is present
This is an interesting (good) idea. We could do implicit or explicit casting or have a ToTopicPartitionOffset
method. A potential problem is confusion with the TopicPartitionOffset
property (which doesn't throw). We could make that throw, but then that would be inconsistent with other properties.
It's intuitive to me that a cast would throw if there was a problem, so I like the idea of using a cast for conversion (though it could be argued the semantics aren't perfectly clear). We want an explicit cast though, as the operation can fail. See:
https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/types/casting-and-type-conversions
I think we should include that in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I'm not a big fan of cast operations that can throw. Client can test Error
property before get a TopicPartitionOffset
without cast operation and try/catch block.
However, it was easy to add an explicit cast operation and unit test for it, so I did it. Thanks!
@treziac many thanks for your very detailed review. Please check the changes I've made. I started to use the CI build in my project and will report other issues if any. |
} | ||
|
||
var list = GetTopicPartitionOffsetErrorList(cOffsets); | ||
var error = list.Where(x => x.Error.Code != ErrorCode.NoError).Select(x => x.Error).FirstOrDefault(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I suggested making a TopicPartitionTimestamp
type, I wasn't realizing that offsets_for_times
could return per partition errors. Sorry! We should not have the TopicPartitionTimestamp
type, replace it with a TopicPartitionTimestampError
type and not throw an exception when a partition specific error occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry, we return TopicPartitionOffset
not TopicPartitionTimestamp
. So keep the TopicPartitionTimestamp
type but return TopicPartitionOffsetError
instead of TopicPartitionOffset
(and get rid of the check for per partition errors here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in a517357, thanks!
@@ -16,4 +16,8 @@ | |||
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" /> | |||
</ItemGroup> | |||
|
|||
<ItemGroup> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think we want this ItemGroup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Visual studio keeps adding it, I think we could include it. Didn't look if they changed it in 2.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do this https://stackoverflow.com/a/33414270
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in a517357
thanks for the latest commits @denisivan0v - this is looking good to me now. Will review again tomorrow and probably merge :-) |
Many thanks for your changes and upstream merge @mhowlett. Looking forward for any updates here. |
Closes #228.