-
Notifications
You must be signed in to change notification settings - Fork 81
/
ProtobufEventFormatter.cs
403 lines (367 loc) · 20.9 KB
/
ProtobufEventFormatter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// Copyright 2021 Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using CloudNative.CloudEvents.Core;
using CloudNative.CloudEvents.V1;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Mime;
using System.Text;
using System.Threading.Tasks;
using static CloudNative.CloudEvents.V1.CloudEvent;
using static CloudNative.CloudEvents.V1.CloudEvent.Types;
using static CloudNative.CloudEvents.V1.CloudEvent.Types.CloudEventAttributeValue;
namespace CloudNative.CloudEvents.Protobuf
{
// TODO: Derived type which expects to only receive protobuf message data with a particular message type,
// so is able to unpack it.
/// <summary>
/// Formatter that implements the Protobuf Event Format, using the Google.Protobuf library for serialization.
/// </summary>
/// <remarks>
/// <para>
/// When encoding CloudEvents in structured mode, three kinds of data are supported, as indicated in the
/// event format. Text is stored in the <see cref="V1.CloudEvent.TextData"/> field; binary data is stored
/// in the <see cref="V1.CloudEvent.BinaryData"/> field; protobuf messages are stored in the
/// <see cref="V1.CloudEvent.ProtoData"/> field. In the last case, the message is packed in an
/// <see cref="Any"/> message, to preserve information about which message is encoded, unless the message
/// is already an <see cref="Any"/> in which case it is stored directly. (This prevents "double-encoding"
/// when a CloudEvent is decoded and then re-encoded.) Attempts to serialize CloudEvents with any other data type
/// will fail. Derived classes can specialize all of this behavior by overriding
/// <see cref="EncodeStructuredModeData(CloudEvent, V1.CloudEvent)"/>.
/// </para>
/// <para>
/// When decoding CloudEvents in structured mode, text and binary data payloads are represented as strings and byte
/// arrays respectively. Protobuf message payloads are represented using the <see cref="Any"/> wrapper, without
/// attempting to "unpack" the message. This avoids any requirement for the underlying message type to be
/// known by the application consuming the CloudEvent. (The data may be stored for later processing by another
/// application with more awareness, for example.) Derived classes can specialize all of this behavior by
/// overriding <see cref="DecodeStructuredModeData(V1.CloudEvent, CloudEvent)"/>.
/// </para>
/// <para>
/// When encoding CloudEvent data in binary mode, this implementation only supports plain binary and text data.
/// (Even text data is only supported when the <see cref="CloudEvent.DataContentType"/> begins with "text/".)
/// While it might be expected that protobuf messages would be serialized into the binary mode data, there is
/// no clear standard as to whether they should be directly serialized, or packed into an <see cref="Any"/>
/// message first, and no standardized content type to use to distinguish these options. Users are encouraged
/// to either use structured mode where possible, or explicitly encode the data as a byte array first. Derived
/// classes can specialize this behavior by overriding <see cref="EncodeBinaryModeEventData(CloudEvent)"/>.
/// </para>
/// <para>
/// When decoding CloudEvent data in binary mode, if the data content type begins with "text/" it is decoded as
/// a string, otherwise it is left as a byte array. Derived classes can specialize this behavior by overriding
/// <see cref="DecodeBinaryModeEventData(ReadOnlyMemory{byte}, CloudEvent)"/>.
/// </para>
/// <para>
/// This event formatter does not infer any data content type.
/// </para>
/// </remarks>
public class ProtobufEventFormatter : CloudEventFormatter
{
/// <summary>
/// The default value for <see cref="TypeUrlPrefix"/>. This is the value used by Protobuf libraries
/// when no prefix is specifically provided.
/// </summary>
public const string DefaultTypeUrlPrefix = "type.googleapis.com";
private const string MediaTypeSuffix = "+protobuf";
private static readonly string StructuredMediaType = MimeUtilities.MediaType + MediaTypeSuffix;
private static readonly string BatchMediaType = MimeUtilities.BatchMediaType + MediaTypeSuffix;
/// <summary>
/// The type URL prefix this event formatter uses when packing messages into <see cref="Any"/>.
/// The value is never null. Note: the type URL prefix is not used when the data within a CloudEvent
/// is already an Any message, as the message is propagated directly.
/// </summary>
public string TypeUrlPrefix { get; }
private static readonly Dictionary<AttrOneofCase, CloudEventAttributeType> protoToCloudEventAttributeType =
new Dictionary<AttrOneofCase, CloudEventAttributeType>
{
{ AttrOneofCase.CeBoolean, CloudEventAttributeType.Boolean },
{ AttrOneofCase.CeBytes, CloudEventAttributeType.Binary },
{ AttrOneofCase.CeInteger, CloudEventAttributeType.Integer },
{ AttrOneofCase.CeString, CloudEventAttributeType.String },
{ AttrOneofCase.CeTimestamp, CloudEventAttributeType.Timestamp },
{ AttrOneofCase.CeUri, CloudEventAttributeType.Uri },
{ AttrOneofCase.CeUriRef, CloudEventAttributeType.UriReference }
};
/// <summary>
/// Constructs an instance of the formatter, using a type URL prefix of
/// "type.googleapis.com" (the default for <see cref="Any.Pack(IMessage)"/>).
/// </summary>
public ProtobufEventFormatter() : this(DefaultTypeUrlPrefix)
{
}
/// <summary>
/// Constructs an instance of the formatter, using the specified type URL prefix
/// when packing messages.
/// </summary>
/// <param name="typeUrlPrefix">The type URL prefix to use when packing messages
/// into <see cref="Any"/>. Must not be null.</param>
public ProtobufEventFormatter(string typeUrlPrefix)
{
TypeUrlPrefix = Validation.CheckNotNull(typeUrlPrefix, nameof(typeUrlPrefix));
}
/// <inheritdoc />
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(ReadOnlyMemory<byte> body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes) =>
DecodeBatchModeMessage(BinaryDataUtilities.AsStream(body), contentType, extensionAttributes);
/// <inheritdoc />
public override void DecodeBinaryModeEventData(ReadOnlyMemory<byte> body, CloudEvent cloudEvent)
{
Validation.CheckNotNull(cloudEvent, nameof(cloudEvent));
if (cloudEvent.DataContentType is string dataContentType && dataContentType.StartsWith("text/"))
{
Encoding encoding = MimeUtilities.GetEncoding(new ContentType(dataContentType));
cloudEvent.Data = BinaryDataUtilities.GetString(body, encoding);
}
else
{
cloudEvent.Data = body.ToArray();
}
}
/// <inheritdoc />
public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory<byte> body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes) =>
DecodeStructuredModeMessage(BinaryDataUtilities.AsStream(body), contentType, extensionAttributes);
/// <inheritdoc />
public override ReadOnlyMemory<byte> EncodeBatchModeMessage(IEnumerable<CloudEvent> cloudEvents, out ContentType contentType)
{
Validation.CheckNotNull(cloudEvents, nameof(cloudEvents));
contentType = new ContentType(BatchMediaType)
{
CharSet = Encoding.UTF8.WebName
};
var batch = new CloudEventBatch
{
Events = { cloudEvents.Select(cloudEvent => ConvertToProto(cloudEvent, nameof(cloudEvents))) }
};
return batch.ToByteArray();
}
// TODO: Put the boiler-plate code here into CloudEventFormatter
/// <inheritdoc />
public override ReadOnlyMemory<byte> EncodeBinaryModeEventData(CloudEvent cloudEvent)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
if (cloudEvent.Data is null)
{
return Array.Empty<byte>();
}
if (cloudEvent.DataContentType is string dataContentType && dataContentType.StartsWith("text/") && cloudEvent.Data is string text)
{
ContentType contentType = new ContentType(dataContentType);
return MimeUtilities.GetEncoding(contentType).GetBytes(text);
}
if (cloudEvent.Data is byte[] bytes)
{
return bytes;
}
throw new ArgumentException($"{nameof(ProtobufEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type '{cloudEvent.DataContentType}'");
}
/// <inheritdoc />
public override ReadOnlyMemory<byte> EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
{
var proto = ConvertToProto(cloudEvent, nameof(cloudEvent));
contentType = new ContentType(StructuredMediaType)
{
CharSet = Encoding.UTF8.WebName
};
return proto.ToByteArray();
}
/// <inheritdoc />
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
Validation.CheckNotNull(body, nameof(body));
var batchProto = CloudEventBatch.Parser.ParseFrom(body);
return batchProto.Events.Select(proto => ConvertFromProto(proto, extensionAttributes, nameof(body))).ToList();
}
/// <inheritdoc />
public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
Validation.CheckNotNull(messageBody, nameof(messageBody));
return ConvertFromProto(V1.CloudEvent.Parser.ParseFrom(messageBody), extensionAttributes, nameof(messageBody));
}
/// <summary>
/// Converts the given protobuf representation of a CloudEvent into an SDK representation.
/// </summary>
/// <param name="proto">The protobuf representation of a CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when populating the CloudEvent. May be null.</param>
/// <returns>The SDK representation of the CloudEvent.</returns>
public CloudEvent ConvertFromProto(V1.CloudEvent proto, IEnumerable<CloudEventAttribute>? extensionAttributes) =>
ConvertFromProto(Validation.CheckNotNull(proto, nameof(proto)), extensionAttributes, nameof(proto));
private CloudEvent ConvertFromProto(V1.CloudEvent proto, IEnumerable<CloudEventAttribute>? extensionAttributes, string paramName)
{
var specVersion = CloudEventsSpecVersion.FromVersionId(proto.SpecVersion)
?? throw new ArgumentException($"Unsupported CloudEvents spec version '{proto.SpecVersion}'", paramName);
var cloudEvent = new CloudEvent(specVersion, extensionAttributes)
{
Id = proto.Id,
Source = (Uri) specVersion.SourceAttribute.Parse(proto.Source),
Type = proto.Type
};
foreach (var pair in proto.Attributes)
{
if (!protoToCloudEventAttributeType.TryGetValue(pair.Value.AttrCase, out var attrTypeFromProto))
{
// Note: impossible to cover in tests
throw new ArgumentException($"Unhandled protobuf attribute case: {pair.Value.AttrCase}", paramName);
}
// If we've already got an extension attribute specified for this name,
// we validate against it and require the value in the proto to have the right
// type. Otherwise, we create a new extension attribute of the correct type.
var attr = cloudEvent.GetAttribute(pair.Key);
if (attr is null)
{
attr = CloudEventAttribute.CreateExtension(pair.Key, attrTypeFromProto);
}
// Note: if CloudEvents spec version 2.0 contains different required attributes, we may want to
// change exactly how this is specified. For the moment, this is the simplest way of implementing the requirement.
else if (attr.IsRequired)
{
// The required attributes are all specified as proto fields.
// They can't appear in the Attributes repeated field as well.
throw new ArgumentException(
$"Attribute '{attr.Name}' is a required attribute, and must only be specified via the top-level proto field.");
}
else if (attr.Type != attrTypeFromProto)
{
// This prevents any type changes, even those which might validate correctly
// otherwise (e.g. between Uri and UriRef).
throw new ArgumentException(
$"Attribute '{attr.Name}' was specified with type '{attr.Type}', but has type '{attrTypeFromProto}' in the protobuf representation.");
}
// Note: the indexer performs validation.
cloudEvent[attr] = pair.Value.AttrCase switch
{
AttrOneofCase.CeBoolean => pair.Value.CeBoolean,
AttrOneofCase.CeBytes => pair.Value.CeBytes.ToByteArray(),
AttrOneofCase.CeInteger => pair.Value.CeInteger,
AttrOneofCase.CeString => pair.Value.CeString,
AttrOneofCase.CeTimestamp => pair.Value.CeTimestamp.ToDateTimeOffset(),
AttrOneofCase.CeUri => CloudEventAttributeType.Uri.Parse(pair.Value.CeUri),
AttrOneofCase.CeUriRef => CloudEventAttributeType.UriReference.Parse(pair.Value.CeUriRef),
_ => throw new ArgumentException($"Unhandled protobuf attribute case: {pair.Value.AttrCase}")
};
}
DecodeStructuredModeData(proto, cloudEvent);
return Validation.CheckCloudEventArgument(cloudEvent, paramName);
}
/// <summary>
/// Decodes the "data" property provided within a structured-mode message,
/// populating the <see cref="CloudEvents.CloudEvent.Data"/> property accordingly.
/// </summary>
/// <remarks>
/// <para>
/// This implementation simply converts binary data to a byte array, leaves proto data
/// as an <see cref="Google.Protobuf.WellKnownTypes.Any"/>, and converts text data to a string.
/// </para>
/// <para>
/// Override this method to provide more specialized conversions, such as to use <see cref="ByteString"/>
/// instead of a byte array, or to "unwrap" the proto data to generated code.
/// </para>
/// </remarks>
/// <param name="proto">The protobuf representation of the CloudEvent. Will not be null.</param>
/// <param name="cloudEvent">The event being decoded. This should not be modified except to
/// populate the <see cref="CloudEvents.CloudEvent.Data"/> property, but may be used to provide extra
/// information such as the data content type. Will not be null.</param>
/// <returns>The data to populate in the <see cref="CloudEvents.CloudEvent.Data"/> property.</returns>
protected virtual void DecodeStructuredModeData(V1.CloudEvent proto, CloudEvent cloudEvent) =>
cloudEvent.Data = proto.DataCase switch
{
DataOneofCase.BinaryData => proto.BinaryData.ToByteArray(),
DataOneofCase.ProtoData => proto.ProtoData,
DataOneofCase.TextData => proto.TextData,
DataOneofCase.None => null,
// Note: impossible to cover in tests
_ => throw new ArgumentException($"Unhandled protobuf data case: {proto.DataCase}")
};
/// <summary>
/// Encodes structured (or batch) mode data within a CloudEvent, storing it in the specified <see cref="CloudEvents.CloudEvent"/>.
/// </summary>
/// <param name="cloudEvent">The CloudEvent being encoded, which will have a non-null value for
/// its <see cref="CloudEvents.CloudEvent.Data"/> property.</param>
/// <param name="proto">The protobuf representation of the CloudEvent, which will be non-null.</param>
protected virtual void EncodeStructuredModeData(CloudEvent cloudEvent, V1.CloudEvent proto)
{
switch (cloudEvent.Data)
{
case IMessage message:
proto.ProtoData = message is Any any ? any : Any.Pack(message, TypeUrlPrefix);
break;
case string text:
proto.TextData = text;
break;
case byte[] binary:
proto.BinaryData = ByteString.CopyFrom(binary);
break;
default:
throw new ArgumentException($"{nameof(ProtobufEventFormatter)} cannot serialize data of type {cloudEvent.Data!.GetType()}");
}
}
/// <summary>
/// Converts the given SDK representation of a CloudEvent to a protobuf representation.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <returns>The protobuf representation of the CloudEvent.</returns>
public V1.CloudEvent ConvertToProto(CloudEvent cloudEvent) => ConvertToProto(cloudEvent, nameof(cloudEvent));
private V1.CloudEvent ConvertToProto(CloudEvent cloudEvent, string paramName)
{
Validation.CheckCloudEventArgument(cloudEvent, paramName);
var specVersion = cloudEvent.SpecVersion;
var proto = new V1.CloudEvent
{
Id = cloudEvent.Id,
// Source is a required attribute, and we've validated the CloudEvent,
// so it really should be non-null.
Source = specVersion.SourceAttribute.Format(cloudEvent.Source!),
Type = cloudEvent.Type,
SpecVersion = cloudEvent.SpecVersion.VersionId
};
foreach (var pair in cloudEvent.GetPopulatedAttributes())
{
var attr = pair.Key;
// Skip attributes already handled above.
if (attr == specVersion.IdAttribute ||
attr == specVersion.SourceAttribute ||
attr == specVersion.TypeAttribute)
{
continue;
}
var value = new CloudEventAttributeValue();
switch (CloudEventAttributeTypes.GetOrdinal(attr.Type))
{
case CloudEventAttributeTypeOrdinal.Binary:
value.CeBytes = ByteString.CopyFrom((byte[]) pair.Value);
break;
case CloudEventAttributeTypeOrdinal.Boolean:
value.CeBoolean = (bool) pair.Value;
break;
case CloudEventAttributeTypeOrdinal.Integer:
value.CeInteger = (int) pair.Value;
break;
case CloudEventAttributeTypeOrdinal.String:
value.CeString = (string) pair.Value;
break;
case CloudEventAttributeTypeOrdinal.Timestamp:
value.CeTimestamp = Timestamp.FromDateTimeOffset((DateTimeOffset) pair.Value);
break;
case CloudEventAttributeTypeOrdinal.Uri:
value.CeUri = attr.Format(pair.Value);
break;
case CloudEventAttributeTypeOrdinal.UriReference:
value.CeUriRef = attr.Format(pair.Value);
break;
default:
// Note: impossible to cover in tests
throw new ArgumentException($"Unhandled attribute type: {attr.Type}");
}
proto.Attributes.Add(attr.Name, value);
}
if (cloudEvent.Data is object)
{
EncodeStructuredModeData(cloudEvent, proto);
}
return proto;
}
}
}