Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for serializing using Akka serialization #180

Merged
merged 4 commits into from Sep 1, 2018

Conversation

@jroper
Copy link
Contributor

jroper commented Apr 10, 2018

Currently, this plugin serializes events by using Java serialization of akka.persistence.PersistentImpl. Given that akka.persistence.PersistentImpl is a private class, not meant to be exposed to the outside world, this is really bad. Akka could change the structure of this class whenever they want in a future release, and this will render every serialized event impossible to deserialize.

It also means that if someone has registered a custom serializer using Akka serialization for their events, that that custom serializer won't be used, which is in contrast to most if not all other Akka persistence plugins.

This modifies akka-persistence-jdbc to use Akka serialization properly, looking up a serializer for the user supplied event/snapshot, and persisting the id and manifest of that serializer along with the serialized bytes. This means users can register custom serializers for their events using the standard Akka serialization mechanism, and using no custom akka-persistence-jdbc code.

The change is both backwards and forwards compatible. It achieves this by introducing a new column for the serialized data, rather than reusing the existing one. Forwards compatibility is achieved by writing both to the old and the new column, but this has a significant overhead, so it's disabled by default. Backwards compatibility is achieved by reading from the new column if it's not null, otherwise, reading from the old column.

The schemas have all been updated such that the old column is now nullable, and all the new columns are not null. This is suitable for a new project. For an existing project, schema migrations have been provided, that set both the old column and the new columns to nullable, for backwards compatibility.

Finally, an option has been provided that will allow the old column to be deleted altogether, but it is disabled by default.

@akka akka deleted a comment Apr 10, 2018
@akka akka deleted a comment Apr 10, 2018
@akka akka deleted a comment Apr 10, 2018
Also fixed mysql schema typo.
@jroper jroper force-pushed the jroper:akka-serialization branch from 6c26e0d to c954dd6 Apr 10, 2018
@akka akka deleted a comment Apr 10, 2018
@akka akka deleted a comment Apr 10, 2018
@akka akka deleted a comment Apr 10, 2018
@jroper

This comment has been minimized.

Copy link
Contributor Author

jroper commented Apr 10, 2018

PR is passing. Please review carefully, especially the schema migrations.

Also, is defaulting forwards compatibility to false a good idea? Also, the schemas still include the message column even though it will never be used in a newly created application. Is that ok? It could be removed, if we default hasMessageColumn to false in reference.conf, but this would mean when upgrading, even if not doing rolling upgrades, you would need to set it to true to support reading of old events.

@WellingR

This comment has been minimized.

Copy link
Member

WellingR commented Apr 10, 2018

Thanks! I had noticed before that the PersistentRepr was serialized as a whole, but I never considered actually changing this because of the extra work needed to keep everything backwards compatible.

I took a look and the implementation looks good, but I need some time to dive into all the details.

About the default settings in the reference.conf:
The general rule is that we want the project to work as good as possible out of the box, this should hold for both existing and for new applications. However we should assume that existing users read a migration guide when they update a major version. (or at least, we should assume that they start reading the guide as soon as they run into trouble).

Following my reasoning above, this means that we can set hasMessageColumn to false and the same for the compatibility flag

README.md Outdated
To migrate from v3.x to v4.x, the database schema needs to be updated. To migrate, see the `<database-name>-migration-v4.sql` files in your corresponding databases [schema directory](https://github.com/dnvriend/akka-persistence-jdbc/tree/master/src/test/resources/schema/). The schema update is both backwards and forwards compatible. If you wish to perform rolling upgrades, canary upgrades, or want to be able to rollback, you will need to enable forwards compatibility, which can be done by setting the following configuration:

```
akka-persistence-jdbc.migration.v4.forwards-compatibility = true

This comment has been minimized.

Copy link
@WellingR

WellingR Apr 10, 2018

Member

I am always confused about this. Shouldn't this be backwards compatibility, since this means the new data is still backwards compatible with akka-persistence-jdbc v3?

This comment has been minimized.

Copy link
@jroper

jroper Apr 11, 2018

Author Contributor

It depends on both the context and the direction that you're starting from when you're describing the compatibility. In this case, the context is the compatibility between the code and the data, and I'm starting from the direction of the code (since this is a flag that controls the code, it's not a migration being applied to the data). If the new code is compatible with the old data, then that's backwards compatible. If the old code is compatible with the new data, then that's forwards compatible. So this flag ensures that the new code is writing new data that the old code will be forwards compatible with.

@akka akka deleted a comment Apr 11, 2018
@akka akka deleted a comment Apr 11, 2018
@akka akka deleted a comment Apr 11, 2018
@jroper

This comment has been minimized.

Copy link
Contributor Author

jroper commented Apr 11, 2018

I've made the following changes:

  • Introduced a backwards-compatibility option (in addition to forwards-compatibility), which defaults to false, and is used as a default for the hasMessageColumn/hasSnapshotColumn config.
  • Removed the message/snapshot columns from the schemas, so new applications that create the schema from scratch won't have or try to use these columns.
  • Introduced a migration task that can be run in the background (shouldn't require an outage) to migrate old events to store them in the new format - thus eliminating the danger of Akka persistence changing the PersistentImpl class.
  • Added tests that load an H2 database that was created using v3 with the old schema, and then tests that things work in backwards compatibility mode, as well as tests that after running the new migration task, things work without backwards compatibility mode on.
  • Updated the README to document the backwards compatibility config, as well as the migration task.
@akka akka deleted a comment from jroper Apr 12, 2018
jroper added a commit to jroper/lagom that referenced this pull request Apr 13, 2018
This shows how a complementary event log feature could be offered. It
writes events directly to the akka-persistence-jdbc event log, but in a
user supplied transaction, allowing them to be consumed by Lagom's
existing read side processors and topic publishers.

So far, only the minimum for supporting this in JPA has been
implemented.

Furthermore, these changes are dependent on
akka/akka-persistence-jdbc#180, the primary
reason being that without that change, Akka persistence JDBC reads the
sequence number, which this tries to generate in a single database call
by doing a subselect, from a serialized form of an Akka class, rather
than from the table row.
private def migrateJournalBatch(db: Database): Future[Int] = {
val batchUpdate = JournalTable
.filter(_.event.isEmpty)
.take(RowsPerTransaction)

This comment has been minimized.

Copy link
@WellingR

WellingR Apr 15, 2018

Member

For more efficient querying it is probably good to order based on the ordering column.
In this case you can keep track of how far you are in the migration process. (by keeping track of the highest ordering value). The query can include an extra filter _.ordering > maxHandledOrdering.
This mechanism is similar to what is used in the eventsByTag queries.

The same can be done for the snapshot table

This comment has been minimized.

Copy link
@jroper

jroper Apr 15, 2018

Author Contributor

Cool, I'll update.

@deprecated("This was used to store the the PersistentRepr serialized as is, but no longer.", "4.0.0")
private val message: Rep[Option[Array[Byte]]] = column[Array[Byte]](journalTableCfg.columnNames.message)

val event: Rep[Option[Array[Byte]]] = column[Option[Array[Byte]]](journalTableCfg.columnNames.event)

This comment has been minimized.

Copy link
@WellingR

WellingR Apr 15, 2018

Member

As far as I know. Some people use slick to generate the ddl for their tables.
Therefore I think we might need two variants of the journal table definition:

  • One Journal table class, with corresponding JournalRow, in this case message is not included, and all columns are required. This table class should represent the new v4 schemas.
  • One LegacyJournal table class with corresponding 'LegacyJournalRow', this is the backwards/forwards compatible variant which has all columns, however there are all optional.

The Legacy variant needs to be used whenever backwards/forwards compatibility is still used. This this is the only way to have access to both the old and new formats.

We probably need to introduce a parent trait for the two types of journal rows.
I understand that this will have further implications. Since the queries we have defined probably need to be separated too in regular/legacy variants.
Also the serializer needs to be able to work with the two types of journal rows.

Let me know what you think about this.

This comment has been minimized.

Copy link
@jroper

jroper Apr 15, 2018

Author Contributor

As I understand it, the message column isn't included, because it's the composition of * that is used to know the schema, and that is controlled by whether hasMessageColumn is true or not. But the other columns are optional.

Note that in the case of Oracle, most of them need to be optional, since Oracle does not distinguish between null and empty.

This comment has been minimized.

Copy link
@WellingR

WellingR Aug 9, 2018

Member

As far as I know, some people use these table definitions to generate the ddl. (as opposed to the pre-defined ddl's that we use in the tests).

It is a fair point that oracle does not distinguish between null and empty, however to keep the behaviour the same for all database we could explicitly make the columns eventManifest and serManifest optional, and insert None/null in case the manifests are empty.

This allows the non-legacy journal (and the row case class) to omit the old message and the event, serId and writerUuid can be made mandatory.

It might require a bit of fiddling to get this right (since this gives us two table definitions), but I think it is worth it


before {
tmpDirectory = Files.createTempDirectory("akka-persistence-jdbc-migration-").toFile
IOUtils.copyAndClose(fromClasspathAsStream("v3-h2-data.mv.db"), new FileOutputStream(new File(tmpDirectory, "database.mv.db")))

This comment has been minimized.

Copy link
@WellingR

WellingR Apr 15, 2018

Member

It is probably a good idea to include this test for each supported database.

This comment has been minimized.

This comment has been minimized.

Copy link
@jroper

jroper Apr 15, 2018

Author Contributor

This would require having a backup/export of each supported database that was created using the old code - it was quite an effort to create this just for H2. I'm not sure whether that effort is justified to do it for every database.

This comment has been minimized.

Copy link
@WellingR

WellingR Aug 9, 2018

Member

Wouldn't it be possible to insert the rows in legacy format using slick? That would make it database independent

println(s"Migrating journal events, each . indicates $RowsPerTransaction rows migrated.")
val migration = migrateNextBatch(db, 0)
val migrated = Await.result(migration, Duration.Inf)
println(s"Journal migration complete! $migrated events were migrated.")

This comment has been minimized.

Copy link
@WellingR

WellingR Apr 15, 2018

Member

Instead of using println it is probably better to use a logger.

@WellingR WellingR requested a review from renatocaval Apr 15, 2018
@WellingR

This comment has been minimized.

Copy link
Member

WellingR commented Apr 15, 2018

One question I still have

Currently, this plugin serializes events by using Java serialization of akka.persistence.PersistentImpl.

Are you sure? In one of my project I have configured akka.actor.allow-java-serialization = off to make sure that misconfigured serializers are detected. This does not cause any errors. Can you explain why this words?

It also means that if someone has registered a custom serializer using Akka serialization for their events, that that custom serializer won't be used, which is in contrast to most if not all other Akka persistence plugins.

I always throught this actually worked. However I too was annoyed that the message was wrapped in the PersistentRepr, because I could not actually extract and inspect the serialized data in the database.

@jroper

This comment has been minimized.

Copy link
Contributor Author

jroper commented Apr 15, 2018

You're right - I just checked, Akka persistence actually has a serializer defined for PersistentRepr, and does use Akka serialization to serialize the payload using the defined serializer for that payload. That makes this whole PR far less important than I had thought.

I think it's still of value to have it done this way - for example, if you want to manually read/manipulate the event log in any way, right now you need to know the protobuf format of PersistentRepr to do it. Also, not sure if Akka persistence intends for people to use this serializer for storage, they might just intend it for use in remoting, in which case, the only backwards compatibility that they'll be maintaining would be short term to allow for rolling upgrades. So from that perspective it's probably still pretty important, but quite as bad as I had thought.

@WellingR

This comment has been minimized.

Copy link
Member

WellingR commented Apr 15, 2018

I think it's still of value to have it done this way

I completely agree. I have created a 4.0.0 milestone, since a proper fix for #168 might also require an additional change to the schema.

We can take the time to make sure that both of these issues are resolved in the next major version

@patriknw

This comment has been minimized.

Copy link
Member

patriknw commented Apr 16, 2018

I have only read the description and some comments. I agree that it’s an important change even though there is a serializer in Akka for PersistentRepr. It’s better to store the user’s payload and the other meta data in seperate columns, e.g. for easier inspection of the events (e.g. Json format).

@skisel

This comment has been minimized.

Copy link

skisel commented Aug 9, 2018

Hi there,

I'm wondering is there any plan to finish this and merge finally? May I help somehow?

@@ -18,7 +22,9 @@ CREATE TABLE IF NOT EXISTS public.snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
created BIGINT NOT NULL,
snapshot BYTEA NOT NULL,
snapshot_DATA BYTEA NOT NULL,

This comment has been minimized.

Copy link
@WellingR

WellingR Aug 9, 2018

Member

Is there any reason why DATA has been uppercased?

@WellingR

This comment has been minimized.

Copy link
Member

WellingR commented Aug 9, 2018

I see that the biggest open issue is that the migration task still needs some improvement.
I would also prefer separate table definitions for the new/legacy journals.

@jroper are you planning to continue working on this soon?
If not, then I propose that I merge this PR to the 4.x.x branch and create some new issues from the work that still needs to be done to make this completely ready.

@skisel

This comment has been minimized.

Copy link

skisel commented Aug 21, 2018

As there is no response from @jroper, I would merge it to the branch and then somebody else could address these issues (I could try to contribute).

@WellingR WellingR changed the base branch from 4.x.x to akka-serialization Sep 1, 2018
@WellingR

This comment has been minimized.

Copy link
Member

WellingR commented Sep 1, 2018

Merging this to the akka-serialization branch in this repository for now. This allows easier resolving of conflicts, and others can fix the remaining issues

@WellingR WellingR merged commit b003ce4 into akka:akka-serialization Sep 1, 2018
1 of 2 checks passed
1 of 2 checks passed
Codacy/PR Quality Review Not up to standards. This pull request quality could be better.
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
WellingR added a commit that referenced this pull request Sep 1, 2018
Added support for serializing using Akka serialization including migration
WellingR added a commit that referenced this pull request Sep 2, 2018
Added support for serializing using Akka serialization including migration
@WellingR

This comment has been minimized.

Copy link
Member

WellingR commented Sep 3, 2018

@skisel If you want to address some of the open issues you can have a look at the new version of this PR #199 . Let me know if you want to have a go at one of the open issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.