Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken object type serializer in QueryExecutor #6223

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public abstract class BatchingSqlJournalSetup
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

public string DefaultSerializer { get; }

/// <summary>
Expand Down Expand Up @@ -1325,7 +1326,7 @@ private async Task<WriteMessagesResult> HandleWriteMessages(WriteMessages req, T
protected virtual void WriteEvent(TCommand command, IPersistentRepresentation persistent, string tags = "")
{
var payloadType = persistent.Payload.GetType();
var serializer = _serialization.FindSerializerForType(payloadType, Setup.DefaultSerializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - this is really the key fix here.

var serializer = _serialization.FindSerializerForType(payloadType);

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
Akka.Serialization.Serialization.WithTransport(_serialization.System, () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; }

/// <summary>
Expand Down Expand Up @@ -780,7 +781,7 @@ protected DbCommand GetCommand(DbConnection connection, string sql)
protected virtual void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{

var serializer = Serialization.FindSerializerForType(e.Payload.GetType(), Configuration.DefaultSerializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

var serializer = Serialization.FindSerializerForType(e.Payload.GetType());

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var (binary,manifest) = Akka.Serialization.Serialization.WithTransport(Serialization.System,(e.Payload,serializer) ,(state) =>
Expand Down Expand Up @@ -846,7 +847,10 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
{
// Support old writes that did not set the serializer id
var type = Type.GetType(manifest, true);
#pragma warning disable CS0618
// Backward compatibility code, we still need to use the old default serializer on read to support legacy data
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we'd need to keep this configuration setting here for the rare cases where the data is really old and never had a serialzier ID in-place - we should revert this change specifically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR; this is probably not the hotpath for reads on 99.999999% of events in Akka.Persistence.Sql

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Do we have a backward compat spec around this specific condition? My concern is that if someone had a different default serializer defined, will we be using the correct one here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly my question

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we'll need to find which versions of Akka.Persistence didn't include a serializerId.

It looks like we first started adding serializerIds to Akka.Persistence.Sql.Common back in 1.3.1: https://github.com/akkadotnet/akka.net/releases/tag/v1.3.1

So, we should create a Sqlite database that runs on version 1.3.0 or earlier, writes some data, commit the binary to this repo, and then replay it using the latest development version of Akka.Persistence.Sql.Common - if we can still recover that data with this change, we're in good shape.

#pragma warning restore CS0618
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Akka.Serialization.Serialization.WithTransport(
Serialization.System, (deserializer, (byte[])payload, type),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class SnapshotStoreSettings
/// <summary>
/// The default serializer being used if no type match override is specified
/// </summary>
[Obsolete(message: "This property should never be used, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; private set; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public readonly string DefaultSerializer;

/// <summary>
Expand Down Expand Up @@ -336,7 +337,7 @@ protected AbstractQueryExecutor(QueryConfiguration configuration, Akka.Serializa
protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(snapshotType);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(snapshot));
AddParameter(command, "@Payload", DbType.Binary, binary);
Expand All @@ -350,7 +351,7 @@ protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
protected virtual void SetManifestParameters(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(snapshotType);

string manifest = "";
if (serializer is SerializerWithStringManifest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,22 +224,5 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
}
}

private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

{
var snapshotType = snapshot.GetType();
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType, _settings.DefaultSerializer);

var binary = Akka.Serialization.Serialization.WithTransport(_actorSystem,
() => serializer.ToBinary(snapshot));


return new SnapshotEntry(
persistenceId: metadata.PersistenceId,
sequenceNr: metadata.SequenceNr,
timestamp: metadata.Timestamp,
manifest: snapshotType.TypeQualifiedName(),
payload: binary);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
</ItemGroup>

<ItemGroup>
<None Update="data\Sqlite.CustomObject.db">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// -----------------------------------------------------------------------
// <copyright file="CustomObjectSerializerSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Serialization;
using FluentAssertions;
using Microsoft.Data.Sqlite;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sqlite.Tests
{
public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime
{
private static readonly string ConnectionString;
private static readonly Config Config;
static CustomObjectSerializerSpec()
{
var filename = $"AkkaSqlite-{Guid.NewGuid()}.db";
File.Copy("./data/Sqlite.CustomObject.db", $"{filename}.db");

ConnectionString = $"DataSource={filename}.db";
Config = ConfigurationFactory.ParseString($@"
akka.actor {{
serializers {{
mySerializer = ""{typeof(MySerializer).AssemblyQualifiedName}""
}}
serialization-bindings {{
""System.Object"" = mySerializer
}}
}}

akka.persistence {{
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite {{
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sqlite""
sqlite {{
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
}

public CustomObjectSerializerSpec(ITestOutputHelper helper)
: base(Config, nameof(CustomObjectSerializerSpec), helper)
{
}

[Fact(DisplayName = "Persistence.Sql should use custom serializer for object type")]
public async Task CustomSerializerTest()
{
var probe = CreateTestProbe();

// Sanity check to see that the system should serialize object type using MySerializer
var serializer = Sys.Serialization.FindSerializerForType(typeof(Persisted));
serializer.Should().BeOfType<MySerializer>();

var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a", probe)));
probe.ExpectMsg("recovered");
actor.Tell(new Persisted("a"), probe);
probe.ExpectMsg(new Persisted("a"));

// Read the database directly, make sure that we're using the correct object type serializer
var conn = new SqliteConnection(ConnectionString);
conn.Open();
const string sql = "SELECT ej.serializer_id FROM event_journal ej WHERE ej.persistence_id = 'a'";
await using var cmd = new SqliteCommand(sql, conn);
var record = await cmd.ExecuteReaderAsync();
await record.ReadAsync();

// In the bug this fails, the serializer id is JSON id instead of MySerializer id
record[0].Should().Be(9999);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails because the payload is being serialized using the JSON serializer (1) and not MySerializer (9999)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment explaining what this test actually tests - like, what use case is it here to assert? Just need a snippet at the top explaining this.

}

[Fact(DisplayName = "Persistence.Sql should be able to read legacy data")]
public void LegacyDataTest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to make this cover the issue referenced here: https://github.com/akkadotnet/akka.net/pull/6223/files#r1012961321 - we just need even older data, correct?

{
var probe = CreateTestProbe();
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("old", probe)));
probe.ExpectMsg(new Persisted("old"));
probe.ExpectMsg("recovered");
}

public Task InitializeAsync()
{
if(File.Exists("AkkaSqlite.db"))
File.Delete("AkkaSqlite.db");
return Task.CompletedTask;
}

public Task DisposeAsync()
{
return Task.CompletedTask;
}
}

internal sealed class Persisted: IEquatable<Persisted>
{
public Persisted(string payload)
{
Payload = payload;
}

public string Payload { get; }

public bool Equals(Persisted other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Payload == other.Payload;
}

public override bool Equals(object obj)
{
return ReferenceEquals(this, obj) || obj is Persisted other && Equals(other);
}

public override int GetHashCode()
{
return (Payload != null ? Payload.GetHashCode() : 0);
}
}

internal class MySerializer : Serializer
{
public MySerializer(ExtendedActorSystem system) : base(system)
{
}

public override bool IncludeManifest { get { return true; } }
public override int Identifier { get { return 9999; } }

public override byte[] ToBinary(object obj)
{
return Encoding.UTF8.GetBytes(obj.ToString());
}

public override object FromBinary(byte[] bytes, Type type)
{
return Encoding.UTF8.GetString(bytes);
}
}

internal sealed class PersistedActor : UntypedPersistentActor
{
private readonly IActorRef _probe;

public PersistedActor(string persistenceId, IActorRef probe)
{
PersistenceId = persistenceId;
_probe = probe;
}

public override string PersistenceId { get; }

protected override void OnCommand(object message)
{
var sender = Sender;
Persist(message, _ =>
{
sender.Tell(message);
});
}

protected override void OnRecover(object message)
{
switch (message)
{
case Persisted msg:
_probe.Tell(msg);
break;
case RecoveryCompleted _:
_probe.Tell("recovered");
break;
}
}
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ namespace Akka.Persistence.Sql.Common.Journal
public Akka.Persistence.Sql.Common.Journal.CircuitBreakerSettings CircuitBreakerSettings { get; }
public string ConnectionString { get; }
public System.TimeSpan ConnectionTimeout { get; }
[System.ObsoleteAttribute("This property should never be used for writes, use the default `System.Object` se" +
"rializer instead")]
public string DefaultSerializer { get; }
public System.Data.IsolationLevel IsolationLevel { get; }
public int MaxBatchSize { get; }
Expand Down Expand Up @@ -221,6 +223,8 @@ namespace Akka.Persistence.Sql.Common.Journal
public readonly System.TimeSpan Timeout;
public readonly string TimestampColumnName;
public QueryConfiguration(string schemaName, string journalEventsTableName, string metaTableName, string persistenceIdColumnName, string sequenceNrColumnName, string payloadColumnName, string manifestColumnName, string timestampColumnName, string isDeletedColumnName, string tagsColumnName, string orderingColumnName, string serializerIdColumnName, System.TimeSpan timeout, string defaultSerializer, bool useSequentialAccess) { }
[System.ObsoleteAttribute("This property should never be used for writes, use the default `System.Object` se" +
"rializer instead")]
public string DefaultSerializer { get; }
public string FullJournalTableName { get; }
public string FullMetaTableName { get; }
Expand Down Expand Up @@ -355,6 +359,8 @@ namespace Akka.Persistence.Sql.Common
public string ConnectionString { get; }
public string ConnectionStringName { get; }
public System.TimeSpan ConnectionTimeout { get; }
[System.ObsoleteAttribute("This property should never be used, use the default `System.Object` serializer in" +
"stead")]
public string DefaultSerializer { get; }
public string FullTableName { get; }
public string SchemaName { get; }
Expand Down Expand Up @@ -401,6 +407,8 @@ namespace Akka.Persistence.Sql.Common.Snapshot
}
public class QueryConfiguration
{
[System.ObsoleteAttribute("This property should never be used for writes, use the default `System.Object` se" +
"rializer instead")]
public readonly string DefaultSerializer;
public readonly string ManifestColumnName;
public readonly string PayloadColumnName;
Expand Down
3 changes: 1 addition & 2 deletions src/core/Akka.Persistence/persistence.conf
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ akka.persistence {

# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
serializer = "json"

# Removed: used to be the Maximum size of a persistent message batch written to the journal.
# Now this setting is without function, PersistentActor will write as many messages
# as it has accumulated since the last write.
Expand Down Expand Up @@ -176,7 +176,6 @@ akka.persistence {

# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
serializer = "json"

circuit-breaker {
max-failures = 5
call-timeout = 20s
Expand Down
9 changes: 2 additions & 7 deletions src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public class SqliteJournal: AsyncWriteJournal, IWithUnboundedStash
private readonly string _connectionString;
private readonly TimeSpan _timeout;
private readonly Akka.Serialization.Serialization _serialization;
private readonly string _defaultSerializer;
private readonly ILoggingAdapter _log;
private readonly CancellationTokenSource _pendingRequestsCancellation;

Expand All @@ -119,7 +118,6 @@ public SqliteJournal()

_connectionString = _settings.ConnectionString;
_timeout = _settings.ConnectionTimeout;
_defaultSerializer = _settings.DefaultSerializer;

_serialization = Context.System.Serialization;
_log = Context.GetLogger();
Expand Down Expand Up @@ -328,11 +326,8 @@ private async Task<object> Initialize()
var persistentMessages = payload.ToArray();
foreach (var @event in persistentMessages)
{
// Get the serializer associated with the payload type,
// else use a default serializer
var serializer = _serialization.FindSerializerForType(
@event.Payload.GetType(),
_defaultSerializer);
// Get the serializer associated with the payload type
var serializer = _serialization.FindSerializerForType(@event.Payload.GetType());

// This WithTransport method call is important, it allows for proper
// local IActorRef serialization by switching the serialization information
Expand Down