Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tag table with backwards compatibility #68

Merged
merged 27 commits into from
Feb 3, 2023

Conversation

to11mtm
Copy link
Member

@to11mtm to11mtm commented Mar 19, 2022

Fixes #67

(Alternative solution to akkadotnet/akka.net#5296)

Changes

This is an implementation of Tag tables with a form of backwards compatibility.

  • Added TagWriteMode enum, with 3 modes of operation.

    • applied logic to ByteArrayJournalSerializer so that tags may be applied based on setting
    • Added logic to BaseByteArrayJournalDao to switch behavior based on mode of operation
  • Added logic to BaseByteArrayJournalDao to interleave batch-untagged and per-row-tagged writes when writing to tag table

  • Added TagReadMode enum, with 3 modes of operation.

    • Added logic to BaseByteReadArrayJournalDao to respect enumeration
      • Queries running off tags will check tag table and/or main table based on setting
      • Filling of tags for stream pipeline will respect setting

Todo:

  • Some cleanup in the Read Journal and elsewhere
  • Config bits (defaults, reference, table/column names)
  • Create table bits
  • Tests around new functionality
    • Ideally, we should have tests around migration scenarios.
  • Update deletion logic to delete tags in tag table
    • Note: a combination of 'delete-compatibility-mode' and tag-tables may make deletions very painful, since you'd be locking across 3 tables.
  • Update 'update' functionality so that it will update tag table if needed.
    • the update functionality is a non-standard addition present from persistence-jdbc.

Not in scope of this PR:

  • Migration routines/scripts. I think a helper application similar to what has been proposed for indexes would be a good idea, but this is likely a big enough PR as-is.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Latest dev Benchmarks

Include data from the relevant benchmark prior to this change here.

This PR's Benchmarks

Include data from after this change here.

@Aaronontheweb Aaronontheweb self-requested a review June 14, 2022 21:23
@Aaronontheweb
Copy link
Member

Going to review this, as I am very interested in having this functionality for a project.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a partial review for you @to11mtm - have about half the files to look at still.

@@ -3,6 +3,19 @@

namespace Akka.Persistence.Sql.Linq2Db.Config
{
[Flags]
public enum TagWriteMode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it - this is how we can support data written in previous journal implementations.

public TagWriteMode TagWriteMode { get; }
public TagTableMode TagTableMode { get; }
public string? TagTableName { get; }
public bool UseEventManifestColumn { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all of the aforementioned compat-flags?

}
else
{
res = TagWriteMode.CommaSeparatedArray;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, this is probably how most popular platforms have handled this.

@@ -135,7 +135,13 @@ public AkkaPersistenceDataConnectionFactory(IProviderConfig<SnapshotTableConfigu
.HasColumnName(tableConfig.ColumnNames.SequenceNumber)
.Member(r => r.Timestamp)
.HasColumnName(tableConfig.ColumnNames.Created);

//We can skip writing tags the old way by ignoring the column in mapping.
journalRowBuilder.Member(r => r.tagArr).IsNotColumn();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -23,6 +24,37 @@

namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO
{
public class SequentialUUIDGenerator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does akkadotnet/akka.net#5995 help with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably!

var dbid = await dc.InsertWithInt64IdentityAsync(journalRow);
foreach (var s1 in journalRow.tagArr)
{
tagsToInsert.Add(new JournalTagRow(){JournalOrderingId = dbid, TagValue = s1});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb TBH we could really use more tests around things, but I have never used tags so I have a hard time coming up with scenarios.

@ismaelhamed
Copy link
Member

Is this a different approach than #467?

@to11mtm
Copy link
Member Author

to11mtm commented Jun 21, 2022

Is this a different approach than #467?

Yes and not quite:

Yes:

  • If you are running based off Ordering, it is close to the same approach as 467, with the added kindness of having migration modes. (The migration modes add a good bit of complexity here and are where much of the deviation occurs.)

Not Quite:

  • We aren't subclassing the way JDBC does to handle the different behavior modes. Given we are trying to provide migration modes, it would have meant 3 classes and a more complex migration process.
  • We are adding the option of using some form of SequentialUUID for joining to the tag table rather than orderingID.
    • This is a mode intended for scenarios where high write throughput with tags is desired, at the expense of additional storage and possibly (likely) slower reads.
    • This may not be a good idea, or may require additional optimization
  • We may or may not be doing the Inserts the same way SlickDB does. Based on the initial PRs for Tag tables in persistence-jdbc, I'm -guessing- their approach was similar to ours (where you have to insert each row one at a time to get IDs). Assumed this is still true based on them having split logic similar to this OR, where if there are no tags they try to do a simpler write.

Comment on lines 181 to 190
if (config.TableConfig.UseEventManifestColumn)
{
journalRowBuilder.Member(r => r.eventManifest)
.IsColumn().HasLength(64);
}
else
{
journalRowBuilder.Member(r => r.eventManifest)
.IsNotColumn();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, this switch should control whether the 'fixed' behavior for eventManifest is used.

If it's not mapped, the property will come back null and the other logic works as it should, and is ignored on inserts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -23,6 +24,37 @@

namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO
{
public class SequentialUUIDGenerator
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This algo is borrowed from EF Core, and should be attributed such and re-commented for our use.

The expectation is that we should be able to generate Guid-esque structures with some 'predictability' in a range of output.

This function's use may require fine tuning to produce optimal results.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use akkadotnet/akka.net#5995 for this instead? I'd prefer to keep 1 implementation floating around, but if that approach doesn't make sense then no worries.

.Via(deserializeFlow);
}

private Expression<Func<JournalRow, bool>> eventsByTagMigrationPredicate(DataConnection conn, string tagVal)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be expanded into it's own method because of the compiler version used in build.


private Expression<Func<JournalRow, JournalTagRow, bool>> EventsByTagOnlyJoinPredicate
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be expanded as such due to build server compiler not being able to infer expressions being returned from a ternary.

@@ -23,6 +24,37 @@

namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO
{
public class SequentialUUIDGenerator
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably!

var dbid = await dc.InsertWithInt64IdentityAsync(journalRow);
foreach (var s1 in journalRow.tagArr)
{
tagsToInsert.Add(new JournalTagRow(){JournalOrderingId = dbid, TagValue = s1});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb TBH we could really use more tests around things, but I have never used tags so I have a hard time coming up with scenarios.

@Aaronontheweb
Copy link
Member

@to11mtm I'll review this again this week. Had it on my daily to-do list for a while but would really like to see it merged and Akka.Persistence.Linq2Db become the new standard ;)

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - however, for testing this prior to going into production I'd like to propose the following:

  1. Create 3 databases: Akka.Persistence.SqlServer, Postgres, and MySQL with pre-populated data using tags - all created from the same application that uses the same persistent queries. Save those DB files and commit them into a repo somewhere.
  2. Build a docker image that mounts and restores those databases.
  3. Run an application that uses Akka.Persistence.Linq2Db's implementation of the journal and see if we can re-run the existing queries and produce the same results against all 3.
  4. Re-run a version of this using custom object serialization, just to ensure that is also supported in the migration.

That would pass the snuff test for me to a degree where I'd feel comfortable deprecating Akka.Persistence.Sql.Common and its subsequent implementations. Our team at Petabridge can help with this. What do you think @to11mtm @ismaelhamed ?

@Aaronontheweb
Copy link
Member

Build is failing due to NuGet.org errors at the moment.

@Aaronontheweb
Copy link
Member

@to11mtm what do you think of my review comment above? Should we mark this as "ready" and merge it in, with a plan to complete those above steps with help from our team?

@to11mtm
Copy link
Member Author

to11mtm commented Jul 19, 2022

LGTM - however, for testing this prior to going into production I'd like to propose the following:

It's not done. That TODO list at the top for starters.

1. Create 3 databases: Akka.Persistence.SqlServer, Postgres, and MySQL with pre-populated data using tags - all created from the same application that uses the same persistent queries. Save those DB files and commit them into a repo somewhere.

You can't do MySQL until #3 is completed.

2. Build a docker image that mounts and restores those databases.

3. Run an application that uses Akka.Persistence.Linq2Db's implementation of the journal and see if we can re-run the existing queries and produce the same results against all 3.

4. Re-run a version of this using custom object serialization, just to ensure that is also supported in the migration.

That would pass the snuff test for me to a degree where I'd feel comfortable deprecating Akka.Persistence.Sql.Common and its subsequent implementations.

The other filed issues should be considered before deprecating Sql.Common, timestamp generator is one that comes to mind.

Our team at Petabridge can help with this. What do you think @to11mtm @ismaelhamed ?

As I've mentioned in other conversations this is not a task I've had the bandwidth to take over the finish line, both in scope of the TODOs (bandwidth) as well as more full testing (I'm not familiar enough with query to come up with good tests). I can answer questions and advise but as it stands the bus factor on this is too low.

As another side note, originally we hoped to deprecate sql.common earlier. As it stands, we risk a large influx of support issues and will risk more noise between compatibility issues of switching from sql.common vs whatever set of compatibility flags they choose (i.e. eventmanifest but no tag table, compatibility modes on different DBs... delete surprises...) something to consider.

tl;dr- TODOs need to be done before this gets merged, until then the tests you describe give a false sense this is done.

# Conflicts:
#	src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs
#	src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs
#	src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs
#	src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs
#	src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs
#	src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs
#	src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs
#	src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs
#	src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
#	src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs
#	src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs
@Arkatufus Arkatufus mentioned this pull request Nov 23, 2022
6 tasks
# Conflicts:
#	src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/ReadJournalConfigSpec.cs
#	src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalConfig.cs
#	src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
#	src/Akka.Persistence.Sql.Linq2Db/persistence.conf
@Aaronontheweb
Copy link
Member

@Arkatufus you should move your commits onto its own PR and try to leave @to11mtm 's work here intact, for the sake of making it easy to compare the difference between the two separately.

# Conflicts:
#	src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs
#	src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs
@Arkatufus Arkatufus marked this pull request as ready for review January 31, 2023 20:59
# Conflicts:
#	src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerCompatibilitySpecConfig.cs
#	src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs
#	src/Akka.Persistence.Sql.Linq2Db/persistence.conf
Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving and merging this because it's a blocker and I don't want to review another 90+ file PR again, but there are some issues that need to be addressed in the comments. Let's deal with those in separate PRs.

<ItemGroup>
<PackageReference Include="Akka.Streams" Version="1.4.21" />
<PackageReference Include="linq2db" Version="3.4.1" />
<PackageReference Include="Reactive.Streams" Version="1.0.2" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for an explicit Reactive.Streams reference here

@@ -9,6 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.TestKit" Version="$(AkkaVersion)" />
<PackageReference Include="Akka" Version="$(AkkaVersion)" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove explicit Akka reference if we're referencing the TestKit

@@ -9,6 +9,7 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config)
BufferSize = config.GetInt("buffer-size", 5000);
BatchSize = config.GetInt("batch-size", 100);
DbRoundTripBatchSize = config.GetInt("db-round-trip-max-batch-size", 1000);
DbRoundTripTagBatchSize = config.GetInt("db-round-trip-max-tag-batch-size", 1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -0,0 +1,41 @@
// -----------------------------------------------------------------------
// <copyright file="TagTableConfig.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can get rid of Lightbend attribution here.

@@ -0,0 +1,43 @@
// -----------------------------------------------------------------------
// <copyright file="TagTableColumnNames.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

@@ -140,6 +140,13 @@ public AkkaPersistenceDataConnectionFactory(IProviderConfig<SnapshotTableConfigu
.Member(r => r.Timestamp)
.HasColumnName(columnNames.Created);

//We can skip writing tags the old way by ignoring the column in mapping.
journalRowBuilder.Member(r => r.TagArr).IsNotColumn();
if (tableConfig.TagWriteMode == TagWriteMode.TagTable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on @to11mtm 's suggestion, we're going to need to be able to support multiple tag write modes. Can we address this in a subsequent PR?

using LinqToDB.Data;
using static LanguageExt.Prelude;

namespace Akka.Persistence.Sql.Linq2Db.Journal.Dao
{
public class SequentialUuidGenerator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can make this static

@@ -18,13 +19,15 @@ public sealed class ByteArrayJournalSerializer : FlowPersistentReprSerializer<Jo
private readonly string _separator;
private readonly IProviderConfig<JournalTableConfig> _journalConfig;
private readonly string[] _separatorArray;
private readonly TagWriteMode _tagWriteMode;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consideration for multiple write modes here as well


public string TagValue { get; set; }

public Guid WriteUuid { get; set; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to include WriteUuid on the column - OrderingId should be good enough if it's unique.

columns {
ordering-id = ordering_id
tag-value = tag
writer-uuid = writer_uuid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this

@Aaronontheweb Aaronontheweb merged commit a26036e into akkadotnet:dev Feb 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

Feature: Store Tags in separate table rather than inline
4 participants