-
Notifications
You must be signed in to change notification settings - Fork 1k
/
ClusterShardingMessageSerializer.cs
679 lines (593 loc) · 33.9 KB
/
ClusterShardingMessageSerializer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
//-----------------------------------------------------------------------
// <copyright file="ClusterShardingMessageSerializer.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Sharding.Serialization.Proto.Msg;
using Akka.Remote.Serialization.Proto.Msg;
using Akka.Serialization;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using ActorRefMessage = Akka.Remote.Serialization.Proto.Msg.ActorRefData;
namespace Akka.Cluster.Sharding.Serialization
{
/// <summary>
/// INTERNAL API: Protobuf serializer of Cluster.Sharding messages.
/// </summary>
public class ClusterShardingMessageSerializer : SerializerWithStringManifest
{
private static readonly byte[] Empty = new byte[0];
#region manifests
private const string CoordinatorStateManifest = "AA";
private const string ShardRegionRegisteredManifest = "AB";
private const string ShardRegionProxyRegisteredManifest = "AC";
private const string ShardRegionTerminatedManifest = "AD";
private const string ShardRegionProxyTerminatedManifest = "AE";
private const string ShardHomeAllocatedManifest = "AF";
private const string ShardHomeDeallocatedManifest = "AG";
private const string RegisterManifest = "BA";
private const string RegisterProxyManifest = "BB";
private const string RegisterAckManifest = "BC";
private const string GetShardHomeManifest = "BD";
private const string ShardHomeManifest = "BE";
private const string HostShardManifest = "BF";
private const string ShardStartedManifest = "BG";
private const string BeginHandOffManifest = "BH";
private const string BeginHandOffAckManifest = "BI";
private const string HandOffManifest = "BJ";
private const string ShardStoppedManifest = "BK";
private const string GracefulShutdownReqManifest = "BL";
private const string RegionStoppedManifest = "BM";
private const string EntityStateManifest = "CA";
private const string EntityStartedManifest = "CB";
private const string EntityStoppedManifest = "CD";
private const string EntitiesStartedManifest = "CE";
private const string EntitiesStoppedManifest = "CF";
private const string StartEntityManifest = "EA";
private const string StartEntityAckManifest = "EB";
private const string GetShardStatsManifest = "DA";
private const string ShardStatsManifest = "DB";
private const string GetShardRegionStatsManifest = "DC";
private const string ShardRegionStatsManifest = "DD";
private const string GetClusterShardingStatsManifest = "GS"; // This is "DE" in JVM
private const string ClusterShardingStatsManifest = "CS"; // This is "DF" in JVM
private const string GetCurrentRegionsManifest = "DG";
private const string CurrentRegionsManifest = "DH";
private const string GetCurrentShardStateManifest = "FA";
private const string CurrentShardStateManifest = "FB";
private const string GetShardRegionStateManifest = "FC";
private const string ShardStateManifest = "FD";
private const string CurrentShardRegionStateManifest = "FE";
private const string EventSourcedRememberShardsMigrationMarkerManifest = "SM";
private const string EventSourcedRememberShardsState = "SS";
#endregion
private readonly Dictionary<string, Func<byte[], object>> _fromBinaryMap;
/// <summary>
/// Initializes a new instance of the <see cref="ClusterShardingMessageSerializer"/> class.
/// </summary>
/// <param name="system">The actor system to associate with this serializer.</param>
public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(system)
{
_fromBinaryMap = new Dictionary<string, Func<byte[], object>>
{
{ EntityStateManifest, bytes => EntityStateFromBinary(bytes) },
{ EntityStartedManifest, bytes => EntityStartedFromBinary(bytes) },
{ EntitiesStartedManifest, bytes => EntitiesStartedFromBinary(bytes) },
{ EntityStoppedManifest, bytes => EntityStoppedFromBinary(bytes) },
{ EntitiesStoppedManifest, bytes => EntitiesStoppedFromBinary(bytes) },
{ CoordinatorStateManifest, CoordinatorStateFromBinary},
{ ShardRegionRegisteredManifest, bytes => new ShardCoordinator.ShardRegionRegistered(ActorRefMessageFromBinary(bytes)) },
{ ShardRegionProxyRegisteredManifest, bytes => new ShardCoordinator.ShardRegionProxyRegistered(ActorRefMessageFromBinary(bytes)) },
{ ShardRegionTerminatedManifest, bytes => new ShardCoordinator.ShardRegionTerminated(ActorRefMessageFromBinary(bytes)) },
{ ShardRegionProxyTerminatedManifest, bytes => new ShardCoordinator.ShardRegionProxyTerminated(ActorRefMessageFromBinary(bytes)) },
{ ShardHomeAllocatedManifest, ShardHomeAllocatedFromBinary},
{ ShardHomeDeallocatedManifest, bytes => new ShardCoordinator.ShardHomeDeallocated(ShardIdMessageFromBinary(bytes)) },
{ RegisterManifest, bytes => new ShardCoordinator.Register(ActorRefMessageFromBinary(bytes)) },
{ RegisterProxyManifest, bytes => new ShardCoordinator.RegisterProxy(ActorRefMessageFromBinary(bytes)) },
{ RegisterAckManifest, bytes => new ShardCoordinator.RegisterAck(ActorRefMessageFromBinary(bytes)) },
{ GetShardHomeManifest, bytes => new ShardCoordinator.GetShardHome(ShardIdMessageFromBinary(bytes)) },
{ ShardHomeManifest, bytes => ShardHomeFromBinary(bytes) },
{ HostShardManifest, bytes => new ShardCoordinator.HostShard(ShardIdMessageFromBinary(bytes)) },
{ ShardStartedManifest, bytes => new ShardCoordinator.ShardStarted(ShardIdMessageFromBinary(bytes)) },
{ BeginHandOffManifest, bytes => new ShardCoordinator.BeginHandOff(ShardIdMessageFromBinary(bytes)) },
{ BeginHandOffAckManifest, bytes => new ShardCoordinator.BeginHandOffAck(ShardIdMessageFromBinary(bytes)) },
{ HandOffManifest, bytes => new ShardCoordinator.HandOff(ShardIdMessageFromBinary(bytes)) },
{ ShardStoppedManifest, bytes => new ShardCoordinator.ShardStopped(ShardIdMessageFromBinary(bytes)) },
{ GracefulShutdownReqManifest, bytes => new ShardCoordinator.GracefulShutdownRequest(ActorRefMessageFromBinary(bytes)) },
{ RegionStoppedManifest, bytes => new ShardCoordinator.RegionStopped(ActorRefMessageFromBinary(bytes)) },
{ GetShardStatsManifest, bytes => Shard.GetShardStats.Instance },
{ ShardStatsManifest, bytes => ShardStatsFromBinary(bytes) },
{ GetShardRegionStatsManifest, bytes => GetShardRegionStats.Instance },
{ ShardRegionStatsManifest, bytes => ShardRegionStatsFromBinary(bytes) },
{ GetClusterShardingStatsManifest, bytes => GetClusterShardingStatsFromBinary(bytes) },
{ ClusterShardingStatsManifest, bytes => ClusterShardingStatsFromBinary(bytes) },
{ GetCurrentRegionsManifest, bytes => GetCurrentRegions.Instance },
{ CurrentRegionsManifest, bytes => CurrentRegionsFromBinary(bytes) },
{ StartEntityManifest, bytes => StartEntityFromBinary(bytes) },
{ StartEntityAckManifest, bytes => StartEntityAckFromBinary(bytes) },
{ GetCurrentShardStateManifest, bytes => Shard.GetCurrentShardState.Instance },
{ CurrentShardStateManifest, bytes => CurrentShardStateFromBinary(bytes) },
{ GetShardRegionStateManifest, bytes => GetShardRegionState.Instance },
{ ShardStateManifest, bytes => ShardStateFromBinary(bytes) },
{ CurrentShardRegionStateManifest, bytes => CurrentShardRegionStateFromBinary(bytes) },
{ EventSourcedRememberShardsMigrationMarkerManifest, bytes => EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker.Instance},
{ EventSourcedRememberShardsState, bytes => RememberShardsStateFromBinary(bytes) }
};
}
/// <summary>
/// Serializes the given object into a byte array
/// </summary>
/// <param name="obj">The object to serialize</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when the specified <paramref name="obj"/> is of an unknown type.
/// </exception>
/// <returns>A byte array containing the serialized object</returns>
public override byte[] ToBinary(object obj)
{
switch (obj)
{
case ShardCoordinator.CoordinatorState o: return CoordinatorStateToProto(o).ToByteArray();
case ShardCoordinator.ShardRegionRegistered o: return ActorRefMessageToProto(o.Region).ToByteArray();
case ShardCoordinator.ShardRegionProxyRegistered o: return ActorRefMessageToProto(o.RegionProxy).ToByteArray();
case ShardCoordinator.ShardRegionTerminated o: return ActorRefMessageToProto(o.Region).ToByteArray();
case ShardCoordinator.ShardRegionProxyTerminated o: return ActorRefMessageToProto(o.RegionProxy).ToByteArray();
case ShardCoordinator.ShardHomeAllocated o: return ShardHomeAllocatedToProto(o).ToByteArray();
case ShardCoordinator.ShardHomeDeallocated o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.Register o: return ActorRefMessageToProto(o.ShardRegion).ToByteArray();
case ShardCoordinator.RegisterProxy o: return ActorRefMessageToProto(o.ShardRegionProxy).ToByteArray();
case ShardCoordinator.RegisterAck o: return ActorRefMessageToProto(o.Coordinator).ToByteArray();
case ShardCoordinator.GetShardHome o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.ShardHome o: return ShardHomeToProto(o).ToByteArray();
case ShardCoordinator.HostShard o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.ShardStarted o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.BeginHandOff o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.BeginHandOffAck o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.HandOff o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.ShardStopped o: return ShardIdMessageToProto(o.Shard).ToByteArray();
case ShardCoordinator.GracefulShutdownRequest o: return ActorRefMessageToProto(o.ShardRegion).ToByteArray();
case ShardCoordinator.RegionStopped o: return ActorRefMessageToProto(o.ShardRegion).ToByteArray();
case EventSourcedRememberEntitiesShardStore.State o: return EntityStateToProto(o).ToByteArray();
case EventSourcedRememberEntitiesShardStore.EntitiesStarted o: return EntitiesStartedToProto(o).ToByteArray();
case EventSourcedRememberEntitiesShardStore.EntitiesStopped o: return EntitiesStoppedToProto(o).ToByteArray();
case ShardRegion.StartEntity o: return StartEntityToProto(o).ToByteArray();
case ShardRegion.StartEntityAck o: return StartEntityAckToProto(o).ToByteArray();
case Shard.GetShardStats _: return Empty;
case Shard.ShardStats o: return ShardStatsToProto(o).ToByteArray();
case GetShardRegionStats _: return Empty;
case ShardRegionStats o: return ShardRegionStatsToProto(o).ToByteArray();
case GetClusterShardingStats o: return GetClusterShardingStatsToProto(o).ToByteArray();
case ClusterShardingStats o: return ClusterShardingStatsToProto(o).ToByteArray();
case GetCurrentRegions _: return Empty;
case CurrentRegions o: return CurrentRegionsToProto(o).ToByteArray();
case Shard.GetCurrentShardState _: return Empty;
case Shard.CurrentShardState o: return CurrentShardStateToProto(o).ToByteArray();
case GetShardRegionState _: return Empty;
case ShardState o: return ShardStateToProto(o).ToByteArray();
case CurrentShardRegionState o: return CurrentShardRegionStateToProto(o).ToByteArray();
case EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker _: return Empty;
case EventSourcedRememberEntitiesCoordinatorStore.State o: return RememberShardsStateToProto(o).ToByteArray();
}
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{GetType()}]");
}
/// <summary>
/// Deserializes a byte array into an object using an optional <paramref name="manifest" /> (type hint).
/// </summary>
/// <param name="bytes">The array containing the serialized object</param>
/// <param name="manifest">The type hint used to deserialize the object contained in the array.</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when the specified <paramref name="bytes"/>cannot be deserialized using the specified <paramref name="manifest"/>.
/// </exception>
/// <returns>The object contained in the array</returns>
public override object FromBinary(byte[] bytes, string manifest)
{
if (_fromBinaryMap.TryGetValue(manifest, out var factory))
return factory(bytes);
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
}
/// <summary>
/// Returns the manifest (type hint) that will be provided in the <see cref="FromBinary(byte[], string)" /> method.
/// <note>
/// This method returns <see cref="string.Empty" /> if a manifest is not needed.
/// </note>
/// </summary>
/// <param name="o">The object for which the manifest is needed.</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when the specified <paramref name="o"/> does not have an associated manifest.
/// </exception>
/// <returns>The manifest needed for the deserialization of the specified <paramref name="o" />.</returns>
internal static string GetManifest(object o)
=> o switch
{
EventSourcedRememberEntitiesShardStore.State _ => EntityStateManifest,
EventSourcedRememberEntitiesShardStore.EntitiesStarted _ => EntitiesStartedManifest,
EventSourcedRememberEntitiesShardStore.EntitiesStopped _ => EntitiesStoppedManifest,
ShardCoordinator.CoordinatorState _ => CoordinatorStateManifest,
ShardCoordinator.ShardRegionRegistered _ => ShardRegionRegisteredManifest,
ShardCoordinator.ShardRegionProxyRegistered _ => ShardRegionProxyRegisteredManifest,
ShardCoordinator.ShardRegionTerminated _ => ShardRegionTerminatedManifest,
ShardCoordinator.ShardRegionProxyTerminated _ => ShardRegionProxyTerminatedManifest,
ShardCoordinator.ShardHomeAllocated _ => ShardHomeAllocatedManifest,
ShardCoordinator.ShardHomeDeallocated _ => ShardHomeDeallocatedManifest,
ShardCoordinator.Register _ => RegisterManifest,
ShardCoordinator.RegisterProxy _ => RegisterProxyManifest,
ShardCoordinator.RegisterAck _ => RegisterAckManifest,
ShardCoordinator.GetShardHome _ => GetShardHomeManifest,
ShardCoordinator.ShardHome _ => ShardHomeManifest,
ShardCoordinator.HostShard _ => HostShardManifest,
ShardCoordinator.ShardStarted _ => ShardStartedManifest,
ShardCoordinator.BeginHandOff _ => BeginHandOffManifest,
ShardCoordinator.BeginHandOffAck _ => BeginHandOffAckManifest,
ShardCoordinator.HandOff _ => HandOffManifest,
ShardCoordinator.ShardStopped _ => ShardStoppedManifest,
ShardCoordinator.GracefulShutdownRequest _ => GracefulShutdownReqManifest,
ShardCoordinator.RegionStopped _ => RegionStoppedManifest,
ShardRegion.StartEntity _ => StartEntityManifest,
ShardRegion.StartEntityAck _ => StartEntityAckManifest,
Shard.GetShardStats _ => GetShardStatsManifest,
Shard.ShardStats _ => ShardStatsManifest,
GetShardRegionStats _ => GetShardRegionStatsManifest,
ShardRegionStats _ => ShardRegionStatsManifest,
GetClusterShardingStats _ => GetClusterShardingStatsManifest,
ClusterShardingStats _ => ClusterShardingStatsManifest,
GetCurrentRegions _ => GetCurrentRegionsManifest,
CurrentRegions _ => CurrentRegionsManifest,
Shard.GetCurrentShardState _ => GetCurrentShardStateManifest,
Shard.CurrentShardState _ => CurrentShardStateManifest,
GetShardRegionState _ => GetShardRegionStateManifest,
ShardState _ => ShardStateManifest,
CurrentShardRegionState _ => CurrentShardRegionStateManifest,
EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker _ => EventSourcedRememberShardsMigrationMarkerManifest,
EventSourcedRememberEntitiesCoordinatorStore.State _ => EventSourcedRememberShardsState,
_ => string.Empty
};
/// <summary>
/// Returns the manifest (type hint) that will be provided in the <see cref="FromBinary(byte[], string)" /> method.
/// </summary>
/// <param name="o">The object for which the manifest is needed.</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when the specified <paramref name="o"/> does not have an associated manifest.
/// </exception>
/// <returns>The manifest needed for the deserialization of the specified <paramref name="o" />.</returns>
public override string Manifest(object o)
{
var man = GetManifest(o);
if(ReferenceEquals(man, string.Empty))
throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{GetType()}]");
return man;
}
//
// EventSourcedRememberEntitiesCoordinatorStore.State
//
private static Proto.Msg.RememberedShardState RememberShardsStateToProto(EventSourcedRememberEntitiesCoordinatorStore.State state)
{
var message = new Proto.Msg.RememberedShardState();
message.ShardId.AddRange(state.Shards);
message.Marker = state.WrittenMigrationMarker;
return message;
}
private static EventSourcedRememberEntitiesCoordinatorStore.State RememberShardsStateFromBinary(byte[] bytes)
{
var message = Proto.Msg.RememberedShardState.Parser.ParseFrom(bytes);
return new EventSourcedRememberEntitiesCoordinatorStore.State(message.ShardId.ToImmutableHashSet(), message.Marker);
}
//
// ShardStats
//
private static Proto.Msg.ShardStats ShardStatsToProto(Shard.ShardStats shardStats)
{
var message = new Proto.Msg.ShardStats();
message.Shard = shardStats.ShardId;
message.EntityCount = shardStats.EntityCount;
return message;
}
private static Shard.ShardStats ShardStatsFromBinary(byte[] bytes)
{
var message = Proto.Msg.ShardStats.Parser.ParseFrom(bytes);
return new Shard.ShardStats(message.Shard, message.EntityCount);
}
//
// ShardRegion.StartEntity
//
private static Proto.Msg.StartEntity StartEntityToProto(ShardRegion.StartEntity startEntity)
{
var message = new Proto.Msg.StartEntity();
message.EntityId = startEntity.EntityId;
return message;
}
private static ShardRegion.StartEntity StartEntityFromBinary(byte[] bytes)
{
var message = Proto.Msg.StartEntity.Parser.ParseFrom(bytes);
return new ShardRegion.StartEntity(message.EntityId);
}
//
// ShardRegion.StartEntityAck
//
private static Proto.Msg.StartEntityAck StartEntityAckToProto(ShardRegion.StartEntityAck startEntityAck)
{
var message = new Proto.Msg.StartEntityAck();
message.EntityId = startEntityAck.EntityId;
message.ShardId = startEntityAck.ShardId;
return message;
}
private static ShardRegion.StartEntityAck StartEntityAckFromBinary(byte[] bytes)
{
var message = Proto.Msg.StartEntityAck.Parser.ParseFrom(bytes);
return new ShardRegion.StartEntityAck(message.EntityId, message.ShardId);
}
//
// EntityStarted
//
private static EventSourcedRememberEntitiesShardStore.EntitiesStarted EntityStartedFromBinary(byte[] bytes)
{
var message = Proto.Msg.EntityStarted.Parser.ParseFrom(bytes);
return new EventSourcedRememberEntitiesShardStore.EntitiesStarted(ImmutableHashSet.Create(message.EntityId));
}
//
// EntitiesStarted
//
private static Proto.Msg.EntitiesStarted EntitiesStartedToProto(EventSourcedRememberEntitiesShardStore.EntitiesStarted entitiesStarted)
{
var message = new Proto.Msg.EntitiesStarted();
message.EntityId.AddRange(entitiesStarted.Entities);
return message;
}
private static EventSourcedRememberEntitiesShardStore.EntitiesStarted EntitiesStartedFromBinary(byte[] bytes)
{
var message = Proto.Msg.EntitiesStarted.Parser.ParseFrom(bytes);
return new EventSourcedRememberEntitiesShardStore.EntitiesStarted(message.EntityId.ToImmutableHashSet());
}
//
// EntityStopped
//
private static EventSourcedRememberEntitiesShardStore.EntitiesStopped EntityStoppedFromBinary(byte[] bytes)
{
var message = Proto.Msg.EntityStopped.Parser.ParseFrom(bytes);
return new EventSourcedRememberEntitiesShardStore.EntitiesStopped(ImmutableHashSet.Create(message.EntityId));
}
//
// EntityStopped
//
private static Proto.Msg.EntitiesStopped EntitiesStoppedToProto(EventSourcedRememberEntitiesShardStore.EntitiesStopped entitiesStopped)
{
var message = new Proto.Msg.EntitiesStopped();
message.EntityId.AddRange(entitiesStopped.Entities);
return message;
}
private static EventSourcedRememberEntitiesShardStore.EntitiesStopped EntitiesStoppedFromBinary(byte[] bytes)
{
var message = Proto.Msg.EntitiesStopped.Parser.ParseFrom(bytes);
return new EventSourcedRememberEntitiesShardStore.EntitiesStopped(message.EntityId.ToImmutableHashSet());
}
//
// ShardCoordinator.State
//
private static Proto.Msg.CoordinatorState CoordinatorStateToProto(ShardCoordinator.CoordinatorState state)
{
var message = new Proto.Msg.CoordinatorState();
message.Shards.AddRange(state.Shards.Select(entry =>
{
var coordinatorState = new Proto.Msg.CoordinatorState.Types.ShardEntry();
coordinatorState.ShardId = entry.Key;
coordinatorState.RegionRef = Akka.Serialization.Serialization.SerializedActorPath(entry.Value);
return coordinatorState;
}));
message.Regions.AddRange(state.Regions.Keys.Select(Akka.Serialization.Serialization.SerializedActorPath));
message.RegionProxies.AddRange(state.RegionProxies.Select(Akka.Serialization.Serialization.SerializedActorPath));
message.UnallocatedShards.AddRange(state.UnallocatedShards);
return message;
}
private ShardCoordinator.CoordinatorState CoordinatorStateFromBinary(byte[] bytes)
{
var state = Proto.Msg.CoordinatorState.Parser.ParseFrom(bytes);
var shards = ImmutableDictionary.CreateRange(state.Shards.Select(entry => new KeyValuePair<string, IActorRef>(entry.ShardId, ResolveActorRef(entry.RegionRef))));
var regionsZero = ImmutableDictionary.CreateRange(state.Regions.Select(region => new KeyValuePair<IActorRef, IImmutableList<string>>(ResolveActorRef(region), ImmutableList<string>.Empty)));
var regions = shards.Aggregate(regionsZero, (acc, entry) => acc.SetItem(entry.Value, acc[entry.Value].Add(entry.Key)));
var proxies = state.RegionProxies.Select(ResolveActorRef).ToImmutableHashSet();
var unallocatedShards = state.UnallocatedShards.ToImmutableHashSet();
return new ShardCoordinator.CoordinatorState(
shards: shards,
regions: regions,
regionProxies: proxies,
unallocatedShards: unallocatedShards);
}
//
// ShardCoordinator.ShardHomeAllocated
//
private static Proto.Msg.ShardHomeAllocated ShardHomeAllocatedToProto(ShardCoordinator.ShardHomeAllocated shardHomeAllocated)
{
var message = new Proto.Msg.ShardHomeAllocated();
message.Shard = shardHomeAllocated.Shard;
message.Region = Akka.Serialization.Serialization.SerializedActorPath(shardHomeAllocated.Region);
return message;
}
private ShardCoordinator.ShardHomeAllocated ShardHomeAllocatedFromBinary(byte[] bytes)
{
var msg = Proto.Msg.ShardHomeAllocated.Parser.ParseFrom(bytes);
return new ShardCoordinator.ShardHomeAllocated(msg.Shard, ResolveActorRef(msg.Region));
}
//
// ShardCoordinator.ShardHome
//
private static Proto.Msg.ShardHome ShardHomeToProto(ShardCoordinator.ShardHome shardHome)
{
var message = new Proto.Msg.ShardHome();
message.Shard = shardHome.Shard;
message.Region = Akka.Serialization.Serialization.SerializedActorPath(shardHome.Ref);
return message;
}
private ShardCoordinator.ShardHome ShardHomeFromBinary(byte[] bytes)
{
var msg = Proto.Msg.ShardHome.Parser.ParseFrom(bytes);
return new ShardCoordinator.ShardHome(msg.Shard, ResolveActorRef(msg.Region));
}
//
// ActorRefMessage
//
private ActorRefMessage ActorRefMessageToProto(IActorRef actorRef)
{
var message = new ActorRefMessage();
message.Path = Akka.Serialization.Serialization.SerializedActorPath(actorRef);
return message;
}
private IActorRef ActorRefMessageFromBinary(byte[] binary)
{
return ResolveActorRef(ActorRefMessage.Parser.ParseFrom(binary).Path);
}
//
// EventSourcedRememberEntitiesShardStore.State
//
private static Proto.Msg.EntityState EntityStateToProto(EventSourcedRememberEntitiesShardStore.State entityState)
{
var message = new Proto.Msg.EntityState();
message.Entities.AddRange(entityState.Entities);
return message;
}
private static EventSourcedRememberEntitiesShardStore.State EntityStateFromBinary(byte[] bytes)
{
var msg = Proto.Msg.EntityState.Parser.ParseFrom(bytes);
return new EventSourcedRememberEntitiesShardStore.State(msg.Entities.ToImmutableHashSet());
}
//
// ShardIdMessage
//
private Proto.Msg.ShardIdMessage ShardIdMessageToProto(string shard)
{
var message = new Proto.Msg.ShardIdMessage();
message.Shard = shard;
return message;
}
// ShardRegionStats
private static Proto.Msg.ShardRegionStats ShardRegionStatsToProto(ShardRegionStats s)
{
var message = new Proto.Msg.ShardRegionStats();
message.Stats.Add((IDictionary<string, int>)s.Stats);
message.Failed.Add(s.Failed);
return message;
}
private static ShardRegionStats ShardRegionStatsFromBinary(byte[] b)
{
var p = Proto.Msg.ShardRegionStats.Parser.ParseFrom(b);
return new ShardRegionStats(p.Stats.ToImmutableDictionary(), p.Failed.ToImmutableHashSet());
}
// GetClusterShardingStats
private static Proto.Msg.GetClusterShardingStats GetClusterShardingStatsToProto(GetClusterShardingStats stats)
{
var p = new Proto.Msg.GetClusterShardingStats();
p.Timeout = Duration.FromTimeSpan(stats.Timeout);
return p;
}
private static GetClusterShardingStats GetClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.GetClusterShardingStats.Parser.ParseFrom(b);
return new GetClusterShardingStats(p.Timeout.ToTimeSpan());
}
// ClusterShardingStats
private static Proto.Msg.ClusterShardingStats ClusterShardingStatsToProto(ClusterShardingStats stats)
{
var p = new Proto.Msg.ClusterShardingStats();
foreach (var s in stats.Regions)
{
p.Stats.Add(new ClusterShardingStatsEntry() { Address = AddressToProto(s.Key), Stats = ShardRegionStatsToProto(s.Value) });
}
return p;
}
private static ClusterShardingStats ClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.ClusterShardingStats.Parser.ParseFrom(b);
var dict = new Dictionary<Address, ShardRegionStats>();
foreach (var s in p.Stats)
{
dict[AddressFrom(s.Address)] = new ShardRegionStats(s.Stats.Stats.ToImmutableDictionary(), s.Stats.Failed.ToImmutableHashSet());
}
return new ClusterShardingStats(dict.ToImmutableDictionary());
}
//CurrentRegions
private static Proto.Msg.CurrentRegions CurrentRegionsToProto(CurrentRegions regions)
{
var p = new Proto.Msg.CurrentRegions();
p.Regions.AddRange(regions.Regions.Select(AddressToProto));
return p;
}
private static CurrentRegions CurrentRegionsFromBinary(byte[] b)
{
var p = Proto.Msg.CurrentRegions.Parser.ParseFrom(b);
return new CurrentRegions(p.Regions.Select(AddressFrom).ToImmutableHashSet());
}
//CurrentShardState
private static Proto.Msg.CurrentShardState CurrentShardStateToProto(Shard.CurrentShardState state)
{
var p = new Proto.Msg.CurrentShardState();
p.ShardId = state.ShardId;
p.EntityIds.AddRange(state.EntityIds);
return p;
}
private static Shard.CurrentShardState CurrentShardStateFromBinary(byte[] b)
{
var p = Proto.Msg.CurrentShardState.Parser.ParseFrom(b);
return new Shard.CurrentShardState(p.ShardId, p.EntityIds.ToImmutableHashSet());
}
//ShardState
private static Proto.Msg.ShardState ShardStateToProto(ShardState state)
{
var p = new Proto.Msg.ShardState();
p.ShardId = state.ShardId;
p.EntityIds.AddRange(state.EntityIds);
return p;
}
private static ShardState ShardStateFromProto(Proto.Msg.ShardState state)
{
return new ShardState(state.ShardId, state.EntityIds.ToImmutableHashSet());
}
private static ShardState ShardStateFromBinary(byte[] b)
{
var p = Proto.Msg.ShardState.Parser.ParseFrom(b);
return new ShardState(p.ShardId, p.EntityIds.ToImmutableHashSet());
}
//CurrentShardRegionState
private static Proto.Msg.CurrentShardRegionState CurrentShardRegionStateToProto(CurrentShardRegionState state)
{
var p = new Proto.Msg.CurrentShardRegionState();
p.Shards.AddRange(state.Shards.Select(ShardStateToProto));
p.Failed.AddRange(state.Failed);
return p;
}
private static CurrentShardRegionState CurrentShardRegionStateFromBinary(byte[] b)
{
var p = Proto.Msg.CurrentShardRegionState.Parser.ParseFrom(b);
return new CurrentShardRegionState(p.Shards.Select(ShardStateFromProto).ToImmutableHashSet(), p.Failed.ToImmutableHashSet());
}
private static AddressData AddressToProto(Address address)
{
var message = new AddressData();
message.System = address.System;
message.Hostname = address.Host;
message.Port = (uint)(address.Port ?? 0);
message.Protocol = address.Protocol;
return message;
}
private static Address AddressFrom(AddressData addressProto)
{
return new Address(
addressProto.Protocol,
addressProto.System,
addressProto.Hostname,
addressProto.Port == 0 ? null : (int?)addressProto.Port);
}
private static string ShardIdMessageFromBinary(byte[] bytes)
{
return Proto.Msg.ShardIdMessage.Parser.ParseFrom(bytes).Shard;
}
private IActorRef ResolveActorRef(string path)
{
return system.Provider.ResolveActorRef(path);
}
}
}