Permalink
Browse files

Wrapped up kafka server errors as a KafkaException for the Consumer.

  • Loading branch information...
spmallette committed Mar 26, 2011
1 parent cb4b08e commit a8883b063fe1a6b3dc49cb48b88e7c215b1d628a
@@ -80,7 +80,13 @@ public List<Message> Consume(FetchRequest request)
{
byte[] data = connection.Read(dataLength);
- // TODO: need to check in on kafka error codes...assume all's good for now
+ int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray<byte>()), 0);
+ if (errorCode != KafkaException.NoError)
+ {
+ throw new KafkaException(errorCode);
+ }
+
+ // skip the error code and process the rest
byte[] unbufferedData = data.Skip(2).ToArray();
int processed = 0;
@@ -121,7 +127,13 @@ public List<List<Message>> Consume(MultiFetchRequest request)
int position = 0;
- // gonna skip the first two as error code???
+ int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray<byte>()), 0);
+ if (errorCode != KafkaException.NoError)
+ {
+ throw new KafkaException(errorCode);
+ }
+
+ // skip the error code and process the rest
position = position + 2;
for (int ix = 0; ix < fetchRequests; ix++)
@@ -131,7 +143,13 @@ public List<List<Message>> Consume(MultiFetchRequest request)
int messageSetSize = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Skip(position).Take(4).ToArray<byte>()), 0);
position = position + 4;
- // TODO: need to check in on kafka error codes...assume all's good for now
+ errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Skip(position).Take(2).ToArray<byte>()), 0);
+ if (errorCode != KafkaException.NoError)
+ {
+ throw new KafkaException(errorCode);
+ }
+
+ // skip the error code and process the rest
position = position + 2;
byte[] messageSetBytes = data.Skip(position).ToArray<byte>().Take(messageSetSize).ToArray<byte>();
@@ -187,7 +205,13 @@ public IList<long> GetOffsetsBefore(OffsetRequest request)
{
byte[] data = connection.Read(dataLength);
- // TODO: need to check in on kafka error codes...assume all's good for now
+ int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray<byte>()), 0);
+ if (errorCode != KafkaException.NoError)
+ {
+ throw new KafkaException(errorCode);
+ }
+
+ // skip the error code and process the rest
byte[] unbufferedData = data.Skip(2).ToArray();
// first four bytes are the number of offsets
@@ -42,6 +42,7 @@
<ItemGroup>
<Compile Include="AbstractRequest.cs" />
<Compile Include="Consumer.cs" />
+ <Compile Include="KafkaException.cs" />
<Compile Include="RequestContext.cs" />
<Compile Include="Request\FetchRequest.cs" />
<Compile Include="Request\MultiFetchRequest.cs" />
@@ -0,0 +1,81 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Kafka.Client
+{
+ /// <summary>
+ /// A wrapping of an error code returned from Kafka.
+ /// </summary>
+ public class KafkaException : Exception
+ {
+ /// <summary>
+ /// No error occurred.
+ /// </summary>
+ public const int NoError = 0;
+
+ /// <summary>
+ /// The offset requested was out of range.
+ /// </summary>
+ public const int OffsetOutOfRangeCode = 1;
+
+ /// <summary>
+ /// The message was invalid.
+ /// </summary>
+ public const int InvalidMessageCode = 2;
+
+ /// <summary>
+ /// The wrong partition.
+ /// </summary>
+ public const int WrongPartitionCode = 3;
+
+ /// <summary>
+ /// Invalid message size.
+ /// </summary>
+ public const int InvalidRetchSizeCode = 4;
+
+ /// <summary>
+ /// Initializes a new instance of the KafkaException class.
+ /// </summary>
+ /// <param name="errorCode">The error code generated by a request to Kafka.</param>
+ public KafkaException(int errorCode) : base(GetMessage(errorCode))
+ {
+ ErrorCode = errorCode;
+ }
+
+ /// <summary>
+ /// Gets the error code that was sent from Kafka.
+ /// </summary>
+ public int ErrorCode { get; private set; }
+
+ /// <summary>
+ /// Gets the message for the exception based on the Kafka error code.
+ /// </summary>
+ /// <param name="errorCode">The error code from Kafka.</param>
+ /// <returns>A string message representation </returns>
+ private static string GetMessage(int errorCode)
+ {
+ if (errorCode == OffsetOutOfRangeCode)
+ {
+ return "Offset out of range";
+ }
+ else if (errorCode == InvalidMessageCode)
+ {
+ return "Invalid message";
+ }
+ else if (errorCode == WrongPartitionCode)
+ {
+ return "Wrong partition";
+ }
+ else if (errorCode == InvalidRetchSizeCode)
+ {
+ return "Invalid message size";
+ }
+ else
+ {
+ return "Unknown error";
+ }
+ }
+ }
+}
@@ -11,7 +11,7 @@ namespace Kafka.Client.Tests
/// Contains tests that go all the way to Kafka and back.
/// </summary>
[TestFixture]
- [Ignore("Requires a Kafka server running to execute")]
+ //[Ignore("Requires a Kafka server running to execute")]
public class KafkaIntegrationTest
{
/// <summary>

0 comments on commit a8883b0

Please sign in to comment.