Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken object type serializer in QueryExecutor #6223

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
public string DefaultSerializer { get; }
[Obsolete(message: "This property will always return null")]
public string DefaultSerializer => null;

/// <summary>
/// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.
Expand Down Expand Up @@ -220,7 +221,7 @@ public class QueryConfiguration
string orderingColumnName,
string serializerIdColumnName,
TimeSpan timeout,
string defaultSerializer,
string defaultSerializer, // This is being ignored now
bool useSequentialAccess)
{
SchemaName = schemaName;
Expand All @@ -235,7 +236,6 @@ public class QueryConfiguration
Timeout = timeout;
TagsColumnName = tagsColumnName;
OrderingColumnName = orderingColumnName;
DefaultSerializer = defaultSerializer;
SerializerIdColumnName = serializerIdColumnName;
UseSequentialAccess = useSequentialAccess;
}
Expand Down Expand Up @@ -780,7 +780,7 @@ protected DbCommand GetCommand(DbConnection connection, string sql)
protected virtual void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{

var serializer = Serialization.FindSerializerForType(e.Payload.GetType(), Configuration.DefaultSerializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

var serializer = Serialization.FindSerializerForType(e.Payload.GetType());

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var (binary,manifest) = Akka.Serialization.Serialization.WithTransport(Serialization.System,(e.Payload,serializer) ,(state) =>
Expand Down Expand Up @@ -846,7 +846,7 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
{
// Support old writes that did not set the serializer id
var type = Type.GetType(manifest, true);
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we'd need to keep this configuration setting here for the rare cases where the data is really old and never had a serialzier ID in-place - we should revert this change specifically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR; this is probably not the hotpath for reads on 99.999999% of events in Akka.Persistence.Sql

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Do we have a backward compat spec around this specific condition? My concern is that if someone had a different default serializer defined, will we be using the correct one here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly my question

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we'll need to find which versions of Akka.Persistence didn't include a serializerId.

It looks like we first started adding serializerIds to Akka.Persistence.Sql.Common back in 1.3.1: https://github.com/akkadotnet/akka.net/releases/tag/v1.3.1

So, we should create a Sqlite database that runs on version 1.3.0 or earlier, writes some data, commit the binary to this repo, and then replay it using the latest development version of Akka.Persistence.Sql.Common - if we can still recover that data with this change, we're in good shape.

var deserializer = Serialization.FindSerializerForType(type);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Akka.Serialization.Serialization.WithTransport(
Serialization.System, (deserializer, (byte[])payload, type),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// -----------------------------------------------------------------------
// <copyright file="CustomObjectSerializerSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Serialization;
using FluentAssertions;
using Microsoft.Data.Sqlite;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sqlite.Tests
{
public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime
{
private static readonly Config Config = ConfigurationFactory.ParseString($@"
akka.actor {{
serializers {{
mySerializer = ""{typeof(MySerializer).AssemblyQualifiedName}""
}}
serialization-bindings {{
""System.Object"" = mySerializer
}}
}}

akka.persistence {{
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite {{
connection-string = ""DataSource=AkkaJournal.db""
auto-initialize = on
}}
}}
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sqlite""
sqlite {{
connection-string = ""DataSource=AkkaSnapshot.db""
auto-initialize = on
}}
}}
}}").WithFallback(SqlitePersistence.DefaultConfiguration());

public CustomObjectSerializerSpec(ITestOutputHelper helper)
: base(Config, nameof(CustomObjectSerializerSpec), helper)
{
}

[Fact(DisplayName = "Persistence.Sql should use custom serializer for object type")]
public async Task CustomSerializerTest()
{
var probe = CreateTestProbe();

// Sanity check to see that the system should serialize object type using MySerializer
var serializer = Sys.Serialization.FindSerializerForType(typeof(Persisted));
serializer.Should().BeOfType<MySerializer>();

var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a")));
actor.Tell(new Persisted("a"), probe);
probe.ExpectMsg(new Persisted("a"));

// Read the database directly, make sure that we're using the correct object type serializer
var conn = new SqliteConnection("DataSource=AkkaJournal.db");
conn.Open();
const string sql = "SELECT ej.serializer_id FROM event_journal ej WHERE ej.persistence_id = 'a'";
await using var cmd = new SqliteCommand(sql, conn);
var record = await cmd.ExecuteReaderAsync();
await record.ReadAsync();

// In the bug this fails, the serializer id is JSON id instead of MySerializer id
record[0].Should().Be(9999);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails because the payload is being serialized using the JSON serializer (1) and not MySerializer (9999)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment explaining what this test actually tests - like, what use case is it here to assert? Just need a snippet at the top explaining this.

}

public Task InitializeAsync()
{
if(File.Exists("AkkaJournal.db"))
File.Delete("AkkaJournal.db");
return Task.CompletedTask;
}

public Task DisposeAsync()
{
return Task.CompletedTask;
}
}

internal sealed class Persisted: IEquatable<Persisted>
{
public Persisted(string payload)
{
Payload = payload;
}

public string Payload { get; }

public bool Equals(Persisted other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Payload == other.Payload;
}

public override bool Equals(object obj)
{
return ReferenceEquals(this, obj) || obj is Persisted other && Equals(other);
}

public override int GetHashCode()
{
return (Payload != null ? Payload.GetHashCode() : 0);
}
}

internal class MySerializer : Serializer
{
public MySerializer(ExtendedActorSystem system) : base(system)
{
}

public override bool IncludeManifest { get { return true; } }
public override int Identifier { get { return 9999; } }

public override byte[] ToBinary(object obj)
{
return Encoding.UTF8.GetBytes(obj.ToString());
}

public override object FromBinary(byte[] bytes, Type type)
{
return Encoding.UTF8.GetString(bytes);
}
}

internal sealed class PersistedActor : UntypedPersistentActor
{
public PersistedActor(string persistenceId)
{
PersistenceId = persistenceId;
}

public override string PersistenceId { get; }

protected override void OnCommand(object message)
{
var sender = Sender;
Persist(message, _ =>
{
sender.Tell(message);
});
}

protected override void OnRecover(object message)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Persistence.Sql.Common.Journal
public readonly System.TimeSpan Timeout;
public readonly string TimestampColumnName;
public QueryConfiguration(string schemaName, string journalEventsTableName, string metaTableName, string persistenceIdColumnName, string sequenceNrColumnName, string payloadColumnName, string manifestColumnName, string timestampColumnName, string isDeletedColumnName, string tagsColumnName, string orderingColumnName, string serializerIdColumnName, System.TimeSpan timeout, string defaultSerializer, bool useSequentialAccess) { }
[System.ObsoleteAttribute("This property will always return null")]
public string DefaultSerializer { get; }
public string FullJournalTableName { get; }
public string FullMetaTableName { get; }
Expand Down
6 changes: 0 additions & 6 deletions src/core/Akka.Persistence/persistence.conf
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ akka.persistence {
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"

# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
serializer = "json"

# Removed: used to be the Maximum size of a persistent message batch written to the journal.
# Now this setting is without function, PersistentActor will write as many messages
# as it has accumulated since the last write.
Expand Down Expand Up @@ -174,9 +171,6 @@ akka.persistence {
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
serializer = "json"

circuit-breaker {
max-failures = 5
call-timeout = 20s
Expand Down