Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OffsetsForTimes API to Consumer #235

Merged
merged 16 commits into from Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/Confluent.Kafka/Consumer.cs
Expand Up @@ -611,6 +611,31 @@ public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, Tim
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
=> consumer.QueryWatermarkOffsets(topicPartition);

/// <summary>
/// Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
/// earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty line for consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, as you repeat the doc, you may want to use include_docs.xml (matt used it recently, this seems a good idea to use it when doc is duplicated)

So just need a
/// <include file='include_docs.xml' path='API/Member[@name="Consumer_OffsetsForTimes"]/*' />

And add the proper doc in the file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/// </summary>
/// <remarks>
/// This is a blocking call.The consumer does not have to be assigned the partitions.
/// If the message format version in a partition is before 0.10.0, i.e.the messages do not have timestamps, null
/// will be returned for that partition.
///
/// Notice that this method may block indefinitely if the partition does not exist.
/// </remarks>
/// <param name="timestampsToSearch">
/// The mapping from partition to the timestamp to look up.
/// </param>
/// <param name="timeout">
/// The maximum period of time the call may block.
/// </param>
/// <returns>
/// A mapping from partition to the timestamp and offset of the first message with timestamp greater
/// than or equal to the target timestamp. null will be returned for the partition if there is no such message.
/// </returns>
public IEnumerable<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
=> consumer.OffsetsForTimes(timestampsToSearch, timeout);

/// <summary>
/// Refer to <see cref="Confluent.Kafka.Producer.GetMetadata(bool,string,int)" /> for more information.
///
Expand Down Expand Up @@ -1240,6 +1265,31 @@ public GroupInfo ListGroup(string group)
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
=> kafkaHandle.GetWatermarkOffsets(topicPartition.Topic, topicPartition.Partition);

/// <summary>
/// Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
/// earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
///
/// </summary>
/// <remarks>
/// This is a blocking call.The consumer does not have to be assigned the partitions.
/// If the message format version in a partition is before 0.10.0, i.e.the messages do not have timestamps, null
/// will be returned for that partition.
///
/// Notice that this method may block indefinitely if the partition does not exist.
/// </remarks>
/// <param name="timestampsToSearch">
/// The mapping from partition to the timestamp to look up.
/// </param>
/// <param name="timeout">
/// The maximum period of time the call may block.
/// </param>
/// <returns>
/// A mapping from partition to the timestamp and offset of the first message with timestamp greater
/// than or equal to the target timestamp. null will be returned for the partition if there is no such message.
/// </returns>
public IEnumerable<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
=> kafkaHandle.OffsetsForTimes(timestampsToSearch, timeout.TotalMillisecondsAsInt());

/// <summary>
/// Query the Kafka cluster for low (oldest/beginning) and high (newest/end)
/// offsets for the specified topic/partition (blocking)
Expand Down
11 changes: 11 additions & 0 deletions src/Confluent.Kafka/Impl/LibRdKafka.cs
Expand Up @@ -148,6 +148,7 @@ static LibRdKafka()
_poll_set_consumer = NativeMethods.rd_kafka_poll_set_consumer;
_query_watermark_offsets = NativeMethods.rd_kafka_query_watermark_offsets;
_get_watermark_offsets = NativeMethods.rd_kafka_get_watermark_offsets;
_offsets_for_times = NativeMethods.rd_kafka_offsets_for_times;
_mem_free = NativeMethods.rd_kafka_mem_free;
_subscribe = NativeMethods.rd_kafka_subscribe;
_unsubscribe = NativeMethods.rd_kafka_unsubscribe;
Expand Down Expand Up @@ -391,6 +392,11 @@ internal static SafeTopicHandle topic_new(IntPtr rk, string topic, IntPtr conf)
out long low, out long high)
=> _get_watermark_offsets(rk, topic, partition, out low, out high);

private delegate ErrorCode OffsetsForTimes(IntPtr rk, IntPtr topics, IntPtr timeout_ms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offsets, not topics (by reading rdkafka.h, may change this comment later finaly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private static OffsetsForTimes _offsets_for_times;
internal static ErrorCode offsets_for_times(IntPtr rk, IntPtr topics, IntPtr timeout_ms)
=> _offsets_for_times(rk, topics, timeout_ms);

private static Action<IntPtr, IntPtr> _mem_free;
internal static void mem_free(IntPtr rk, IntPtr ptr)
=> _mem_free(rk, ptr);
Expand Down Expand Up @@ -712,6 +718,11 @@ private class NativeMethods
[MarshalAs(UnmanagedType.LPStr)] string topic,
int partition, out long low, out long high);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ErrorCode rd_kafka_offsets_for_times(IntPtr rk,
/* rd_kafka_topic_partition_list_t * */ IntPtr topics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offsets, not topics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

IntPtr timeout_ms);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern void rd_kafka_mem_free(IntPtr rk, IntPtr ptr);

Expand Down
16 changes: 16 additions & 0 deletions src/Confluent.Kafka/Impl/SafeKafkaHandle.cs
Expand Up @@ -258,6 +258,22 @@ internal WatermarkOffsets GetWatermarkOffsets(string topic, int partition)
return new WatermarkOffsets(low, high);
}

internal IEnumerable<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, int millisecondsTimeout)
{
IEnumerable<TopicPartitionOffset> offsets = timestampsToSearch
.Select(t => new TopicPartitionOffset(t.TopicPartition, t.Timestamp.UnixTimestampMs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems so weird with the TopicPartitionOffset taking a long timestamp (though required by librdkafka)
Add some commentary to explain quickly why this is done, or perhaps create a new GetCTopicPartitionList(IEnumerable<TopicPartitionTimestamp> offsets) - not sure what is best

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment with explanation

.ToList();
IntPtr cOffsets = GetCTopicPartitionList(offsets);
ErrorCode err = LibRdKafka.offsets_for_times(handle, cOffsets, (IntPtr)millisecondsTimeout);
if (err != ErrorCode.NoError)
{
throw new KafkaException(err);
}

var list = GetTopicPartitionOffsetErrorList(cOffsets);
return list.Select(t => t.TopicPartitionOffset).ToList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can have error by partition - the error returned by offsets_for_times i a general error (broker not available), but each partition may return an error. In this case, the offset will still be at the timestamp. You can reproduce easily by querying to an inexistant partition.

Two choices: either we return a list of TopicPartitionOffsetError (and user will have to map it before assign), either we throw exception if any member has error. I prefer first solution, eventualy with an implcit cast from TopicPartitionErrorOffset to topicPartitionOffset which would throw if an error is present

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that I can repro the partition error when querying non-existent partition. In my case I've got a general error.

But I've made checks for errors for every partition, and in case of any error OffsetsForTimes throws the KafkaException for the first found error.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer first solution,

yes, I agree (as noted elsewhere).

eventualy with an implcit cast from TopicPartitionErrorOffset to topicPartitionOffset which would throw if an error is present

This is an interesting (good) idea. We could do implicit or explicit casting or have a ToTopicPartitionOffset method. A potential problem is confusion with the TopicPartitionOffset property (which doesn't throw). We could make that throw, but then that would be inconsistent with other properties.

It's intuitive to me that a cast would throw if there was a problem, so I like the idea of using a cast for conversion (though it could be argued the semantics aren't perfectly clear). We want an explicit cast though, as the operation can fail. See:

https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/types/casting-and-type-conversions

I think we should include that in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a big fan of cast operations that can throw. Client can test Error property before get a TopicPartitionOffset without cast operation and try/catch block.

However, it was easy to add an explicit cast operation and unit test for it, so I did it. Thanks!

}

internal void Subscribe(IEnumerable<string> topics)
{
IntPtr list = LibRdKafka.topic_partition_list_new((IntPtr) topics.Count());
Expand Down
8 changes: 6 additions & 2 deletions src/Confluent.Kafka/Producer.cs
Expand Up @@ -1005,8 +1005,12 @@ public class Producer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDispo
ISerializer<TValue> valueSerializer,
bool manualPoll, bool disableDeliveryReports)
{
var configWithoutKeySerializerProperties = KeySerializer.Configure(config, true);
var configWithoutValueSerializerProperties = ValueSerializer.Configure(config, false);
var configWithoutKeySerializerProperties = keySerializer != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, noticed it this we, it was introduced in #205 but waiting for matt to come back as there are other things to fix on it. This will be done in separate PR I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok if it will be another PR for this. But I need it to be fixed here as I'm going to use this CI build. I'll resolve conflicts if any.

? keySerializer.Configure(config, true)
: Enumerable.Empty<KeyValuePair<string, object>>();
var configWithoutValueSerializerProperties = valueSerializer != null
? valueSerializer.Configure(config, false)
: Enumerable.Empty<KeyValuePair<string, object>>();

var configWithoutSerializerProperties = config.Where(item =>
configWithoutKeySerializerProperties.Any(ci => ci.Key == item.Key) &&
Expand Down
149 changes: 149 additions & 0 deletions src/Confluent.Kafka/TopicPartitionTimestamp.cs
@@ -0,0 +1,149 @@
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
//
// Refer to LICENSE for more information.

namespace Confluent.Kafka
{
/// <summary>
/// Represents a Kafka (topic, partition, timestamp) tuple.
/// </summary>
public class TopicPartitionTimestamp
{
/// <summary>
/// Initializes a new TopicPartitionTimestamp instance.
/// </summary>
/// <param name="tp">
/// Kafka topic name and partition.
/// </param>
/// <param name="timestamp">
/// A Kafka timestamp value.
/// </param>
public TopicPartitionTimestamp(TopicPartition tp, Timestamp timestamp)
: this (tp.Topic, tp.Partition, timestamp)
{
}

/// <summary>
/// Initializes a new TopicPartitionTimestamp instance.
/// </summary>
/// <param name="topic">
/// A Kafka topic name.
/// </param>
/// <param name="partition">
/// A Kafka partition.
/// </param>
/// <param name="timestamp">
/// A Kafka timestamp value.
/// </param>
public TopicPartitionTimestamp(string topic, int partition, Timestamp timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may modify Timestamp back to the way where it accepted Datetime - if we want to retrieve offset at a particular date, it's kind of heavy with current api (new Timestamp(Timestamp.FromDateTime(...)), or perhaps just add a Timestamp.FromDateTime static method, or an implicit/explicit conversion from datetime to timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Timestamp.FromDateTime method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we want to provide easier conversion from DateTime to Timestamp.

Of the options @treziac notes, I'm definitely against implicit conversion and TimestampType needs to be specified so explicit conversion is out too.

Between the static method and constructor, I'm currently favoring using a constructor because 1. it's fewer characters. 2. we only ever want to return a Timestamp struct - we don't need the flexibility provided by a static method. 3. It's slightly incorrectly named - DateTime is not the only parameter. 4. the constructor variant reads easier to me.

I'm definitely open to being convinced back to using a static method if someone wants to present a good argument, but currently think we should change the FromDateTime method to an additional Timestamp constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, some previous conversation related to DateTime/Timestamps is in #120

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agreed that a constructor much better than the static method. Done in a517357

{
Topic = topic;
Partition = partition;
Timestamp = timestamp;
}

/// <summary>
/// Gets the Kafka topic name.
/// </summary>
public string Topic { get; }

/// <summary>
/// Gets the Kafka partition.
/// </summary>
public int Partition { get; }

/// <summary>
/// Gets the Kafka timestamp.
/// </summary>
public Timestamp Timestamp { get; }

/// <summary>
/// Gets the TopicPartition component of this TopicPartitionTimestamp instance.
/// </summary>
public TopicPartition TopicPartition
=> new TopicPartition(Topic, Partition);

/// <summary>
/// Tests whether this TopicPartitionTimestamp instance is equal to the specified object.
/// </summary>
/// <param name="obj">
/// The object to test.
/// </param>
/// <returns>
/// true if obj is a TopicPartitionTimestamp and all properties are equal. false otherwise.
/// </returns>
public override bool Equals(object obj)
{
if (!(obj is TopicPartitionTimestamp))
{
return false;
}

var tp = (TopicPartitionTimestamp)obj;
return tp.Partition == Partition && tp.Topic == Topic && tp.Timestamp == Timestamp;
}

/// <summary>
/// Returns a hash code for this TopicPartitionTimestamp.
/// </summary>
/// <returns>
/// An integer that specifies a hash value for this TopicPartitionTimestamp.
/// </returns>
public override int GetHashCode()
// x by prime number is quick and gives decent distribution.
=> (Partition.GetHashCode() * 251 + Topic.GetHashCode()) * 251 + Timestamp.GetHashCode();

/// <summary>
/// Tests whether TopicPartitionTimestamp instance a is equal to TopicPartitionTimestamp instance b.
/// </summary>
/// <param name="a">
/// The first TopicPartitionTimestamp instance to compare.
/// </param>
/// <param name="b">
/// The second TopicPartitionTimestamp instance to compare.
/// </param>
/// <returns>
/// true if TopicPartitionTimestamp instances a and b are equal. false otherwise.
/// </returns>
public static bool operator ==(TopicPartitionTimestamp a, TopicPartitionTimestamp b)
=> a.Equals(b);

/// <summary>
/// Tests whether TopicPartitionTimestamp instance a is not equal to TopicPartitionTimestamp instance b.
/// </summary>
/// <param name="a">
/// The first TopicPartitionTimestamp instance to compare.
/// </param>
/// <param name="b">
/// The second TopicPartitionTimestamp instance to compare.
/// </param>
/// <returns>
/// true if TopicPartitionTimestamp instances a and b are not equal. false otherwise.
/// </returns>
public static bool operator !=(TopicPartitionTimestamp a, TopicPartitionTimestamp b)
=> !(a == b);

/// <summary>
/// Returns a string representation of the TopicPartitionTimestamp object.
/// </summary>
/// <returns>
/// A string that represents the TopicPartitionTimestamp object.
/// </returns>
public override string ToString()
=> $"{Topic} [{Partition}] @{Timestamp}";
}
}