-
Notifications
You must be signed in to change notification settings - Fork 2k
/
EventHubDataAdapter.cs
139 lines (124 loc) · 6.42 KB
/
EventHubDataAdapter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
using System;
using System.Collections.Generic;
using Azure.Messaging.EventHubs;
using Orleans.Providers.Streams.Common;
using Orleans.Serialization;
using Orleans.Streams;
namespace Orleans.ServiceBus.Providers
{
/// <summary>
/// Default event hub data adapter. Users may subclass to override event data to stream mapping.
/// </summary>
public class EventHubDataAdapter : IEventHubDataAdapter
{
private readonly SerializationManager serializationManager;
/// <summary>
/// Cache data adapter that adapts EventHub's EventData to CachedEventHubMessage used in cache
/// </summary>
/// <param name="serializationManager"></param>
public EventHubDataAdapter(SerializationManager serializationManager)
{
this.serializationManager = serializationManager;
}
/// <summary>
/// Converts a cached message to a batch container for delivery
/// </summary>
/// <param name="cachedMessage"></param>
/// <returns></returns>
public virtual IBatchContainer GetBatchContainer(ref CachedMessage cachedMessage)
{
var evenHubMessage = new EventHubMessage(cachedMessage, this.serializationManager);
return GetBatchContainer(evenHubMessage);
}
/// <summary>
/// Convert an EventHubMessage to a batch container
/// </summary>
/// <param name="eventHubMessage"></param>
/// <returns></returns>
protected virtual IBatchContainer GetBatchContainer(EventHubMessage eventHubMessage)
{
return new EventHubBatchContainer(eventHubMessage, this.serializationManager);
}
/// <summary>
/// Gets the stream sequence token from a cached message.
/// </summary>
/// <param name="cachedMessage"></param>
/// <returns></returns>
public virtual StreamSequenceToken GetSequenceToken(ref CachedMessage cachedMessage)
{
return new EventHubSequenceTokenV2("", cachedMessage.SequenceNumber, 0);
}
public virtual EventData ToQueueMessage<T>(Guid streamGuid, string streamNamespace, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
if (token != null) throw new ArgumentException("EventHub streams currently does not support non-null StreamSequenceToken.", nameof(token));
return EventHubBatchContainer.ToEventData(this.serializationManager, streamGuid, streamNamespace, events, requestContext);
}
public virtual CachedMessage FromQueueMessage(StreamPosition streamPosition, EventData queueMessage, DateTime dequeueTime, Func<int, ArraySegment<byte>> getSegment)
{
return new CachedMessage()
{
StreamGuid = streamPosition.StreamIdentity.Guid,
StreamNamespace = streamPosition.StreamIdentity.Namespace != null ? string.Intern(streamPosition.StreamIdentity.Namespace) : null,
SequenceNumber = queueMessage.SequenceNumber,
EventIndex = streamPosition.SequenceToken.EventIndex,
EnqueueTimeUtc = queueMessage.EnqueuedTime.UtcDateTime,
DequeueTimeUtc = dequeueTime,
Segment = EncodeMessageIntoSegment(queueMessage, getSegment)
};
}
public virtual StreamPosition GetStreamPosition(string partition, EventData queueMessage)
{
IStreamIdentity streamIdentity = this.GetStreamIdentity(queueMessage);
StreamSequenceToken token =
new EventHubSequenceTokenV2(queueMessage.Offset.ToString(), queueMessage.SequenceNumber, 0);
return new StreamPosition(streamIdentity, token);
}
/// <summary>
/// Get offset from cached message. Left to derived class, as only it knows how to get this from the cached message.
/// </summary>
public virtual string GetOffset(CachedMessage lastItemPurged)
{
// TODO figure out how to get this from the adapter
int readOffset = 0;
return SegmentBuilder.ReadNextString(lastItemPurged.Segment, ref readOffset); // read offset
}
/// <summary>
/// Get the Event Hub partition key to use for a stream.
/// </summary>
/// <param name="streamGuid">The stream Guid.</param>
/// <param name="streamNamespace">The stream Namespace.</param>
/// <returns>The partition key to use for the stream.</returns>
public virtual string GetPartitionKey(Guid streamGuid, string streamNamespace) => streamGuid.ToString();
/// <summary>
/// Get the <see cref="IStreamIdentity"/> for an event message.
/// </summary>
/// <param name="queueMessage">The event message.</param>
/// <returns>The stream identity.</returns>
public virtual IStreamIdentity GetStreamIdentity(EventData queueMessage)
{
Guid streamGuid = Guid.Parse(queueMessage.PartitionKey);
string streamNamespace = queueMessage.GetStreamNamespaceProperty();
return new StreamIdentity(streamGuid, streamNamespace);
}
// Placed object message payload into a segment.
protected virtual ArraySegment<byte> EncodeMessageIntoSegment(EventData queueMessage, Func<int, ArraySegment<byte>> getSegment)
{
byte[] propertiesBytes = queueMessage.SerializeProperties(this.serializationManager);
var payload = queueMessage.Body;
// get size of namespace, offset, partitionkey, properties, and payload
int size = SegmentBuilder.CalculateAppendSize(queueMessage.Offset.ToString()) +
SegmentBuilder.CalculateAppendSize(queueMessage.PartitionKey) +
SegmentBuilder.CalculateAppendSize(propertiesBytes) +
SegmentBuilder.CalculateAppendSize(payload);
// get segment
ArraySegment<byte> segment = getSegment(size);
// encode namespace, offset, partitionkey, properties and payload into segment
int writeOffset = 0;
SegmentBuilder.Append(segment, ref writeOffset, queueMessage.Offset.ToString());
SegmentBuilder.Append(segment, ref writeOffset, queueMessage.PartitionKey);
SegmentBuilder.Append(segment, ref writeOffset, propertiesBytes);
SegmentBuilder.Append(segment, ref writeOffset, payload);
return segment;
}
}
}