Skip to content

Commit

Permalink
dialect/sql/schema: file based type store (#2644)
Browse files Browse the repository at this point in the history
* dialect/sql/schema: file based type store

This PR adds support for a file based type storage when using versioned migrations. The file called `.ent_types` is written to the migration directory alongside the migration files and will be kept in sync for every migration file generation run.

In order to not break existing code, where the type storage might differ for different deployment, global unique ID mut be enabled by using a new option. This will also be raised as an error to the user when attempting to use versioned migrations and global unique ID.

Documentation will be added to this PR once feedback on the code is gathered.

* apply CR

* fix tests

* change format of types file to exclude it from atlas.sum file

* docs and drift test

* apply CR
  • Loading branch information
masseelch committed Jun 15, 2022
1 parent 195be2d commit 7017cbc
Show file tree
Hide file tree
Showing 12 changed files with 411 additions and 37 deletions.
152 changes: 145 additions & 7 deletions dialect/sql/schema/atlas.go
Expand Up @@ -9,6 +9,8 @@ import (
"database/sql"
"errors"
"fmt"
"io/fs"
"io/ioutil"
"sort"
"strings"

Expand Down Expand Up @@ -271,16 +273,32 @@ func WithSumFile() MigrateOption {
}
}

// WithUniversalID instructs atlas to use a file based type store when
// global unique ids are enabled. For more information see the setupAtlas method on Migrate.
//
// ATTENTION:
// The file based PK range store is not backward compatible, since the allocated ranges were computed
// dynamically when computing the diff between a deployed database and the current schema. In cases where there
// exist multiple deployments, the allocated ranges for the same type might be different from each other,
// depending on when the deployment took part.
func WithUniversalID() MigrateOption {
return func(m *Migrate) {
m.universalID = true
m.atlas.typeStoreConsent = true
}
}

type (
// atlasOptions describes the options for atlas.
atlasOptions struct {
enabled bool
diff []DiffHook
apply []ApplyHook
skip ChangeKind
dir migrate.Dir
fmt migrate.Formatter
genSum bool
enabled bool
diff []DiffHook
apply []ApplyHook
skip ChangeKind
dir migrate.Dir
fmt migrate.Formatter
genSum bool
typeStoreConsent bool
}

// atBuilder must be implemented by the different drivers in
Expand All @@ -293,9 +311,12 @@ type (
atIncrementC(*schema.Table, *schema.Column)
atIncrementT(*schema.Table, int64)
atIndex(*Index, *schema.Table, *schema.Index) error
atTypeRangeSQL(t ...string) string
}
)

var errConsent = errors.New("sql/schema: use WithUniversalID() instead of WithGlobalUniqueID(true) when using WithDir(): https://entgo.io/docs/migrate#universal-ids")

func (m *Migrate) setupAtlas() error {
// Using one of the Atlas options, opt-in to Atlas migration.
if !m.atlas.enabled && (m.atlas.skip != NoChange || len(m.atlas.diff) > 0 || len(m.atlas.apply) > 0) || m.atlas.dir != nil {
Expand Down Expand Up @@ -326,6 +347,16 @@ func (m *Migrate) setupAtlas() error {
if m.atlas.dir != nil && m.atlas.fmt == nil {
m.atlas.fmt = sqltool.GolangMigrateFormatter
}
if m.universalID && m.atlas.dir != nil {
// If global unique ids and a migration directory is given, enable the file based type store for pk ranges.
m.typeStore = &dirTypeStore{dir: m.atlas.dir}
// To guard the user against a possible bug due to backward incompatibility, the file based type store must
// be enabled by an option. For more information see the comment of WithUniversalID function.
if !m.atlas.typeStoreConsent {
return errConsent
}
m.atlas.diff = append(m.atlas.diff, m.ensureTypeTable)
}
return nil
}

Expand Down Expand Up @@ -537,6 +568,54 @@ func (m *Migrate) aIndexes(b atBuilder, t1 *Table, t2 *schema.Table) error {
return nil
}

func (m *Migrate) ensureTypeTable(next Differ) Differ {
return DiffFunc(func(current, desired *schema.Schema) ([]schema.Change, error) {
// If there is a types table but no types file yet, the user most likely
// switched from online migration to migration files.
if len(m.dbTypeRanges) == 0 {
var (
at = schema.NewTable(TypeTable)
et = NewTable(TypeTable).
AddPrimary(&Column{Name: "id", Type: field.TypeUint, Increment: true}).
AddColumn(&Column{Name: "type", Type: field.TypeString, Unique: true})
)
m.atTable(et, at)
if err := m.aColumns(m, et, at); err != nil {
return nil, err
}
if err := m.aIndexes(m, et, at); err != nil {
return nil, err
}
desired.Tables = append(desired.Tables, at)
}
// If there is a drift between the types stored in the database and the ones stored in the file,
// stop diffing, as this is potentially destructive. This will most likely happen on the first diffing
// after moving from online-migration to versioned migrations if the "old" ent types are not in sync with
// the deterministic ones computed by the new engine.
if len(m.dbTypeRanges) > 0 && len(m.fileTypeRanges) > 0 && !equal(m.fileTypeRanges, m.dbTypeRanges) {
return nil, fmt.Errorf(
"type allocation range drift detected: %v <> %v: see %s for more information",
m.dbTypeRanges, m.fileTypeRanges,
"https://entgo.io/docs/versioned-migrations#moving-from-auto-migration-to-versioned-migrations",
)
}
changes, err := next.Diff(current, desired)
if err != nil {
return nil, err
}
if len(m.dbTypeRanges) > 0 && len(m.fileTypeRanges) == 0 {
// Override the types file created in the diff process with the "old" allocated types ranges.
if err := m.typeStore.(*dirTypeStore).save(m.dbTypeRanges); err != nil {
return nil, err
}
// Change the type range allocations since they will be added to the migration files when
// writing the migration plan to migration files.
m.typeRanges = m.dbTypeRanges
}
return changes, nil
})
}

func setAtChecks(t1 *Table, t2 *schema.Table) {
if check := t1.Annotation.Check; check != "" {
t2.AddChecks(&schema.Check{
Expand Down Expand Up @@ -574,3 +653,62 @@ func descIndexes(idx *Index) map[string]bool {
}
return descs
}

const entTypes = ".ent_types"

// dirTypeStore stores and read pk information from a text file stored alongside generated versioned migrations.
// This behaviour is enabled automatically when using versioned migrations.
type dirTypeStore struct {
dir migrate.Dir
}

const atlasDirective = "atlas:sum ignore\n"

// load the types from the types file.
func (s *dirTypeStore) load(context.Context, dialect.ExecQuerier) ([]string, error) {
f, err := s.dir.Open(entTypes)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("reading types file: %w", err)
}
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
}
defer f.Close()
c, err := ioutil.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("reading types file: %w", err)
}
return strings.Split(strings.TrimPrefix(string(c), atlasDirective), ","), nil
}

// add a new type entry to the types file.
func (s *dirTypeStore) add(ctx context.Context, conn dialect.ExecQuerier, t string) error {
ts, err := s.load(ctx, conn)
if err != nil {
return fmt.Errorf("adding type %q: %w", t, err)
}
return s.save(append(ts, t))
}

// save takes the given allocation range and writes them to the types file.
// The types file will be overridden.
func (s *dirTypeStore) save(ts []string) error {
if err := s.dir.WriteFile(entTypes, []byte(atlasDirective+strings.Join(ts, ","))); err != nil {
return fmt.Errorf("writing types file: %w", err)
}
return nil
}

var _ typeStore = (*dirTypeStore)(nil)

func equal(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}
for i := range s1 {
if s1[i] != s2[i] {
return false
}
}
return true
}
33 changes: 33 additions & 0 deletions dialect/sql/schema/atlas_test.go
@@ -0,0 +1,33 @@
// Copyright 2019-present Facebook Inc. All rights reserved.
// This source code is licensed under the Apache 2.0 license found
// in the LICENSE file in the root directory of this source tree.

package schema

import (
"context"
"os"
"path/filepath"
"testing"

"ariga.io/atlas/sql/migrate"
"github.com/stretchr/testify/require"
)

func TestDirTypeStore(t *testing.T) {
ex := []string{"a", "b", "c"}
p := t.TempDir()
d, err := migrate.NewLocalDir(p)
require.NoError(t, err)

s := &dirTypeStore{d}
require.NoError(t, s.save(ex))
require.FileExists(t, filepath.Join(p, entTypes))
c, err := os.ReadFile(filepath.Join(p, entTypes))
require.NoError(t, err)
require.Contains(t, string(c), atlasDirective)

ac, err := s.load(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, ex, ac)
}
59 changes: 43 additions & 16 deletions dialect/sql/schema/migrate.go
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"strings"

"ariga.io/atlas/sql/migrate"
"entgo.io/ent/dialect"
Expand Down Expand Up @@ -114,7 +115,9 @@ type Migrate struct {
atlas *atlasOptions // migrate with atlas.
typeRanges []string // types order by their range.
hooks []Hook // hooks to apply before creation
typeStore typeStore
typeStore typeStore // the typeStore to read and save type ranges
fileTypeRanges []string // used internally by ensureTypeTable hook
dbTypeRanges []string // used internally by ensureTypeTable hook
}

// NewMigrate create a migration structure for the given SQL driver.
Expand Down Expand Up @@ -163,45 +166,69 @@ func (m *Migrate) Create(ctx context.Context, tables ...*Table) error {
return creator.Create(ctx, tables...)
}

// Diff compares the state read from the StateReader with the state defined by Ent.
// Changes will be written to migration files by the configures Planner.
// Diff compares the state read from the connected database with the state defined by Ent.
// Changes will be written to migration files by the configured Planner.
func (m *Migrate) Diff(ctx context.Context, tables ...*Table) error {
return m.NamedDiff(ctx, "changes", tables...)
}

// NamedDiff compares the state read from the StateReader with the state defined by Ent.
// Changes will be written to migration files by the configures Planner.
// NamedDiff compares the state read from the connected database with the state defined by Ent.
// Changes will be written to migration files by the configured Planner.
func (m *Migrate) NamedDiff(ctx context.Context, name string, tables ...*Table) error {
if m.atlas.dir == nil {
return errors.New("no migration directory given")
}
opts := []migrate.PlannerOption{
migrate.WithFormatter(m.atlas.fmt),
}
if m.atlas.genSum {
// Validate the migration directory before proceeding.
if err := migrate.Validate(m.atlas.dir); err != nil {
return fmt.Errorf("validating migration directory: %w", err)
}
} else {
opts = append(opts, migrate.DisableChecksum())
}
if err := m.init(ctx, m); err != nil {
return err
}
if m.universalID {
if err := m.types(ctx, m); err != nil {
return err
}
m.fileTypeRanges = m.typeRanges
ex, err := m.tableExist(ctx, m, TypeTable)
if err != nil {
return err
}
if ex {
m.dbTypeRanges, err = (&dbTypeStore{m}).load(ctx, m)
if err != nil {
return err
}
}
defer func() {
m.fileTypeRanges = nil
m.dbTypeRanges = nil
}()
}
plan, err := m.atDiff(ctx, m, name, tables...)
if err != nil {
return err
}
if m.universalID {
newTypes := m.typeRanges[len(m.dbTypeRanges):]
if len(newTypes) > 0 {
plan.Changes = append(plan.Changes, &migrate.Change{
Cmd: m.atTypeRangeSQL(newTypes...),
Comment: fmt.Sprintf("add pk ranges for %s tables", strings.Join(newTypes, ",")),
})
}
}
// Skip if the plan has no changes.
if len(plan.Changes) == 0 {
return nil
}
opts := []migrate.PlannerOption{
migrate.WithFormatter(m.atlas.fmt),
}
if m.atlas.genSum {
// Validate the migration directory before proceeding.
if err := migrate.Validate(m.atlas.dir); err != nil {
return fmt.Errorf("validating migration directory: %w", err)
}
} else {
opts = append(opts, migrate.DisableChecksum())
}
return migrate.NewPlanner(nil, m.atlas.dir, opts...).WritePlan(plan)
}

Expand Down

0 comments on commit 7017cbc

Please sign in to comment.