Permalink
Browse files

Stabilized switch to the ProtoBuf serialization for event persistence

  • Loading branch information...
1 parent 13bd3cb commit 7b725ff081e78ad9fdb4bd885c3f8735ad205238 @abdullin committed Jul 20, 2011
@@ -10,7 +10,6 @@ public class Identity : IComparable
{
[DataMember(Order = 1)]
public Guid Id { get; protected set; }
-
[DataMember(Order = 2)]
public int Tag { get; protected set; }
@@ -81,7 +80,6 @@ public override string ToString()
}
}
-
[DataContract]
public sealed class NoteId : Identity
{
@@ -94,7 +92,6 @@ public NoteId(Guid id)
NoteId() {}
}
-
[DataContract]
public sealed class StoryId : Identity
{
@@ -107,7 +104,6 @@ public StoryId(Guid id)
StoryId() {}
}
-
[DataContract]
public sealed class ActivityId : Identity
{
@@ -118,9 +114,9 @@ public ActivityId(Guid id)
Id = id;
}
+
ActivityId() {}
}
-
[DataContract]
public sealed class TaskId : Identity
{
@@ -1,48 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Text;
-using Lokad.Cqrs;
-using ServiceStack.Text;
-
-namespace FarleyFile
-{
- sealed class DevSerializer : IDataSerializer
- {
- readonly IDictionary<string, Type> _stringToType;
- readonly IDictionary<Type, string> _typeToString;
-
- public DevSerializer(Type[] types)
- {
- _stringToType = types.ToDictionary(t => t.Name, t => t);
- _typeToString = types.ToDictionary(t => t, t => t.Name);
- }
-
- public void Serialize(object instance, Stream destinationStream)
- {
- using (var writer = new StreamWriter(destinationStream, Encoding.UTF8))
- {
- JsonSerializer.SerializeToWriter(instance, instance.GetType(),writer);
- }
- }
-
- public object Deserialize(Stream sourceStream, Type type)
- {
- using (var reader = new StreamReader(sourceStream, Encoding.UTF8))
- {
- return JsonSerializer.DeserializeFromReader(reader, type);
- }
- }
-
- public bool TryGetContractNameByType(Type messageType, out string contractName)
- {
- return _typeToString.TryGetValue(messageType, out contractName);
- }
-
- public bool TryGetContractTypeByName(string contractName, out Type contractType)
- {
- return _stringToType.TryGetValue(contractName, out contractType);
- }
- }
-}
@@ -30,6 +30,9 @@
<Reference Include="Lokad.Cqrs.Portable">
<HintPath>..\Library\Lokad.Cqrs.Portable.dll</HintPath>
</Reference>
+ <Reference Include="protobuf-net">
+ <HintPath>..\Library\protobuf-net.dll</HintPath>
+ </Reference>
<Reference Include="ServiceStack.Text">
<HintPath>..\Library\ServiceStack.Text.dll</HintPath>
</Reference>
@@ -42,6 +45,7 @@
<HintPath>..\Library\System.Reactive.dll</HintPath>
</Reference>
<Reference Include="System.Windows.Forms" />
+ <Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
<Compile Include="Interactions\AbstractInteraction.cs" />
@@ -55,9 +59,6 @@
<Compile Include="Interactions\InteractionResultStatus.cs" />
<Compile Include="Interactions\Specific\DoAddActivity.cs" />
<Compile Include="Solarized.cs" />
- <Compile Include="Sys.cs" />
- <Compile Include="AggregateDispatcher.cs" />
- <Compile Include="DevSerializer.cs" />
<Compile Include="Form1.cs">
<SubType>Form</SubType>
</Compile>
@@ -66,6 +67,10 @@
</Compile>
<Compile Include="Program.cs" />
<Compile Include="LifelineViewport.cs" />
+ <Compile Include="Wires\AggregateDispatcher.cs" />
+ <Compile Include="Wires\DataSerializerWithProtoBuf.cs" />
+ <Compile Include="Wires\EnvelopeSerializerWithProtoBuf.cs" />
+ <Compile Include="Wires\Sys.cs" />
<EmbeddedResource Include="Form1.resx">
<DependentUpon>Form1.cs</DependentUpon>
</EmbeddedResource>
@@ -88,4 +93,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
@@ -52,7 +52,7 @@ string AddReference(Identity i, params object[] names)
// collision
if (!reference.Equals(i))
{
- LookupRef[name] = Identity.Empty;
+ LookupRef[name] = null;
Log("Collision: '{0}' -> '{1}' by '{2}'", reference, i, name);
continue;
}
@@ -0,0 +1,109 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Runtime.Serialization;
+using Lokad.Cqrs;
+using Lokad.Cqrs.Core.Serialization;
+using ProtoBuf.Meta;
+using System.Linq;
+
+namespace FarleyFile
+{
+ /// <summary>
+ /// Copied from Lokad.CQRS
+ /// </summary>
+ public class DataSerializerWithProtoBuf : IDataSerializer
+ {
+ readonly IDictionary<string, Type> _contract2Type = new Dictionary<string, Type>();
+ readonly IDictionary<Type, string> _type2Contract = new Dictionary<Type, string>();
+ readonly IDictionary<Type, IFormatter> _type2Formatter = new Dictionary<Type, IFormatter>();
+
+ public DataSerializerWithProtoBuf(ICollection<Type> knownTypes)
+ {
+ if (knownTypes.Count == 0)
+ throw new InvalidOperationException(
+ "ProtoBuf requires some known types to serialize. Have you forgot to supply them?");
+
+
+ InitIdentityTree();
+
+ foreach (var type in knownTypes)
+ {
+ var reference = ContractEvil.GetContractReference(type);
+ var formatter = RuntimeTypeModel.Default.CreateFormatter(type);
+
+ try
+ {
+ _contract2Type.Add(reference, type);
+ }
+ catch (ArgumentException e)
+ {
+ var msg = string.Format("Duplicate contract '{0}' being added to ProtoBuf dictionary", reference);
+ throw new InvalidOperationException(msg, e);
+ }
+ try
+ {
+ _type2Contract.Add(type, reference);
+ _type2Formatter.Add(type, formatter);
+ }
+ catch (ArgumentException e)
+ {
+ var msg = string.Format("Duplicate type '{0}' being added to ProtoBuf dictionary", type);
+ throw new InvalidOperationException(msg, e);
+ }
+ }
+ }
+
+ public void Serialize(object instance, Stream destination)
+ {
+ IFormatter formatter;
+ if (!_type2Formatter.TryGetValue(instance.GetType(), out formatter))
+ {
+ var s =
+ string.Format(
+ "Can't find serializer for unknown object type '{0}'. Have you passed all known types to the constructor?",
+ instance.GetType());
+ throw new InvalidOperationException(s);
+ }
+ formatter.Serialize(destination, instance);
+ }
+
+ public object Deserialize(Stream source, Type type)
+ {
+ IFormatter value;
+ if (!_type2Formatter.TryGetValue(type, out value))
+ {
+ var s =
+ string.Format(
+ "Can't find serializer for unknown object type '{0}'. Have you passed all known types to the constructor?",
+ type);
+ throw new InvalidOperationException(s);
+ }
+ return value.Deserialize(source);
+ }
+ public bool TryGetContractNameByType(Type messageType, out string contractName)
+ {
+ return _type2Contract.TryGetValue(messageType, out contractName);
+ }
+
+ public bool TryGetContractTypeByName(string contractName, out Type contractType)
+ {
+ return _contract2Type.TryGetValue(contractName, out contractType);
+ }
+
+ static void InitIdentityTree()
+ {
+ RuntimeTypeModel.Default[typeof(DateTimeOffset)].Add("m_dateTime", "m_offsetMinutes");
+
+ var id = typeof(Identity);
+ var derived = id.Assembly.GetExportedTypes()
+ .Where(id.IsAssignableFrom)
+ .Where(t => t != id);
+ int i = 4;
+ foreach (var d in derived)
+ {
+ RuntimeTypeModel.Default[id].AddSubType(i++, d);
+ }
+ }
+ }
+}
@@ -0,0 +1,20 @@
+using System.IO;
+using Lokad.Cqrs;
+using Lokad.Cqrs.Core.Envelope;
+using ProtoBuf;
+
+namespace FarleyFile
+{
+ public sealed class EnvelopeSerializerWithProtoBuf : IEnvelopeSerializer
+ {
+ public void SerializeEnvelope(Stream stream, EnvelopeContract contract)
+ {
+ Serializer.Serialize(stream, contract);
+ }
+
+ public EnvelopeContract DeserializeEnvelope(Stream stream)
+ {
+ return Serializer.Deserialize<EnvelopeContract>(stream);
+ }
+ }
+}
@@ -17,17 +17,6 @@ namespace FarleyFile
{
class Sys
{
- static string SimpleDispatchRule(ImmutableEnvelope e)
- {
- if (e.GetAttribute("to-entity","")!="")
- return "files:aggregates";
- if (e.Items.All(i => typeof(IEvent).IsAssignableFrom(i.MappedType)))
- return "files:events";
- if (e.Items.All(i => typeof(ICommand).IsAssignableFrom(i.MappedType)))
- return "files:commands";
- throw new InvalidOperationException("Unsuported envelope");
- }
-
public static CqrsEngineBuilder Configure(FileStorageConfig cache)
{
@@ -54,8 +43,8 @@ public static CqrsEngineBuilder Configure(FileStorageConfig cache)
m.InAssemblyOf<AddNote>();
m.InAssemblyOf<StoryList>();
});
- builder.Advanced.CustomDataSerializer(t => new DevSerializer(t));
- //builder.Advanced.CustomEnvelopeSerializer(new DevEnvelopeSerializer());
+ builder.Advanced.CustomDataSerializer(t => new DataSerializerWithProtoBuf(t));
+ builder.Advanced.CustomEnvelopeSerializer(new EnvelopeSerializerWithProtoBuf());
builder.Storage(c =>
{
RegisterAtomicStorage(cache, c);
@@ -65,9 +54,8 @@ public static CqrsEngineBuilder Configure(FileStorageConfig cache)
builder.File(m =>
{
m.AddFileSender(cache, "router", cm => cm.IdGeneratorForTests());
- m.AddFileRouter(cache, "router", SimpleDispatchRule);
+ m.AddFileProcess(cache, "router", x => x.DispatcherIsLambda(SaveAndRoute));
m.AddFileProcess(cache, "events", p => p.DispatchAsEvents(md => md.WhereMessagesAre<IEvent>()));
- m.AddFileProcess(cache, "commands", p => p.DispatchAsCommandBatch(md => md.WhereMessagesAre<ICommand>()));
m.AddFileProcess(cache, "aggregates", p => p.DispatcherIs(context =>
{
var readers = context.Resolve<ITapeStorageFactory>();
@@ -79,6 +67,39 @@ public static CqrsEngineBuilder Configure(FileStorageConfig cache)
return builder;
}
+ static Action<ImmutableEnvelope> SaveAndRoute(IComponentContext ctx)
+ {
+ var registry = ctx.Resolve<QueueWriterRegistry>();
+ IQueueWriterFactory factory;
+ if (!registry.TryGet("files", out factory))
+ throw new InvalidOperationException("file queue not configured, yet.");
+
+ var agg = factory.GetWriteQueue("aggregates");
+ var events = factory.GetWriteQueue("events");
+ var tapeFactory = ctx.Resolve<ITapeStorageFactory>();
+ var log = tapeFactory.GetOrCreateStream("full-log");
+ var streamer = ctx.Resolve<IEnvelopeStreamer>();
+
+ return e =>
+ {
+ if (!log.TryAppend(streamer.SaveEnvelopeData(e)))
+ {
+ throw new InvalidOperationException("Failed to save envelope");
+ }
+ if (e.GetAttribute("to-entity", "") != "")
+ {
+ agg.PutMessage(e);
+ return;
+ }
+ if (e.Items.All(i => typeof (IEvent).IsAssignableFrom(i.MappedType)))
+ {
+ events.PutMessage(e);
+ return;
+ }
+ throw new InvalidOperationException("Unsuported envelope");
+ };
+ }
+
static void RegisterAtomicStorage(FileStorageConfig cache, StorageModule c)
{
c.AtomicIsInFiles(cache.Folder.FullName, cb =>
@@ -88,8 +109,8 @@ static void RegisterAtomicStorage(FileStorageConfig cache, StorageModule c)
JsonSerializer.DeserializeFromStream);
cb.WhereEntityIs<IEntityBase>();
- cb.FolderForEntity(t => "view-" + t.Name.ToLowerInvariant());
- cb.FolderForSingleton("view-single");
+ cb.FolderForEntity(t => "views/" + t.Name.ToLowerInvariant());
+ cb.FolderForSingleton("views");
cb.NameForSingleton(type => type.Name + ".txt");
cb.NameForEntity((type, o) =>
{
@@ -33,11 +33,22 @@ public void Json_should_work_with_identities()
}
[Test]
+ public void Protobuf_should_handle_DTO()
+ {
+ var expected = new DateTimeOffset(2011, 12, 2, 4, 5, 1, TimeSpan.FromHours(3));
+ RuntimeTypeModel.Default[typeof (DateTimeOffset)].Add("m_dateTime", "m_offsetMinutes");
+ var actual = Serializer.DeepClone(expected);
+ Assert.AreEqual(expected, actual);
+ }
+
+ [Test]
public void ProtoBuf_should_work_with_identities()
{
- var id = new StoryId(Guid.NewGuid());
- RuntimeTypeModel.Default[typeof(Identity)]
- .AddSubType(3, typeof(StoryId));
+ var id = new StoryId(Guid.Empty);
+ var metaType = RuntimeTypeModel.Default[typeof(StoryId)];
+ RuntimeTypeModel.Default.Add(typeof (Identity), true);
+ metaType.UseConstructor = false;
+ metaType.Add("Tag", "Id");
var deserialized = Serializer.DeepClone(id);
Assert.AreEqual(id, deserialized);
}
Binary file not shown.
Binary file not shown.
Oops, something went wrong.

0 comments on commit 7b725ff

Please sign in to comment.