Skip to content

Commit

Permalink
postgres: Automatically create search_path schema when needed (#169)
Browse files Browse the repository at this point in the history
In #167 we added support for specifying a postgres `search_path`, which is used to store the `schema_migrations` table. However, if the schema does not already exist it will cause an error.

In this PR we automatically create the first schema in the `search_path` if it does not exist.
  • Loading branch information
amacneil committed Nov 1, 2020
1 parent 55a8065 commit ac718a2
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 67 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -140,7 +140,7 @@ DATABASE_URL="postgres://username:password@/database_name?socket=/var/run/postgr
```

A `search_path` parameter can be used to specify the [current schema](https://www.postgresql.org/docs/13/ddl-schemas.html#DDL-SCHEMAS-PATH) while applying migrations, as well as for dbmate's `schema_migrations` table.
If multiple comma-separated schemas are passed, the first will be used for the `schema_migrations` table.
If the schema does not exist, it will be created automatically. If multiple comma-separated schemas are passed, the first will be used for the `schema_migrations` table.

```sh
DATABASE_URL="postgres://username:password@127.0.0.1:5432/database_name?search_path=myschema"
Expand Down
2 changes: 1 addition & 1 deletion pkg/dbmate/clickhouse.go
Expand Up @@ -206,7 +206,7 @@ func (drv ClickHouseDriver) DatabaseExists(u *url.URL) (bool, error) {
}

// CreateMigrationsTable creates the schema_migrations table
func (drv ClickHouseDriver) CreateMigrationsTable(db *sql.DB) error {
func (drv ClickHouseDriver) CreateMigrationsTable(u *url.URL, db *sql.DB) error {
_, err := db.Exec(`
create table if not exists schema_migrations (
version String,
Expand Down
29 changes: 16 additions & 13 deletions pkg/dbmate/clickhouse_test.go
Expand Up @@ -15,9 +15,8 @@ func clickhouseTestURL(t *testing.T) *url.URL {
return u
}

func prepTestClickHouseDB(t *testing.T) *sql.DB {
func prepTestClickHouseDB(t *testing.T, u *url.URL) *sql.DB {
drv := ClickHouseDriver{}
u := clickhouseTestURL(t)

// drop any existing database
err := drv.DropDatabase(u)
Expand Down Expand Up @@ -92,9 +91,9 @@ func TestClickHouseDumpSchema(t *testing.T) {
u := clickhouseTestURL(t)

// prepare database
db := prepTestClickHouseDB(t)
db := prepTestClickHouseDB(t, u)
defer mustClose(db)
err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

// insert migration
Expand Down Expand Up @@ -171,7 +170,8 @@ func TestClickHouseDatabaseExists_Error(t *testing.T) {

func TestClickHouseCreateMigrationsTable(t *testing.T) {
drv := ClickHouseDriver{}
db := prepTestClickHouseDB(t)
u := clickhouseTestURL(t)
db := prepTestClickHouseDB(t, u)
defer mustClose(db)

// migrations table should not exist
Expand All @@ -180,24 +180,25 @@ func TestClickHouseCreateMigrationsTable(t *testing.T) {
require.EqualError(t, err, "code: 60, message: Table dbmate.schema_migrations doesn't exist.")

// create table
err = drv.CreateMigrationsTable(db)
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

// migrations table should exist
err = db.QueryRow("select count(*) from schema_migrations").Scan(&count)
require.NoError(t, err)

// create table should be idempotent
err = drv.CreateMigrationsTable(db)
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)
}

func TestClickHouseSelectMigrations(t *testing.T) {
drv := ClickHouseDriver{}
db := prepTestClickHouseDB(t)
u := clickhouseTestURL(t)
db := prepTestClickHouseDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

tx, err := db.Begin()
Expand Down Expand Up @@ -229,10 +230,11 @@ func TestClickHouseSelectMigrations(t *testing.T) {

func TestClickHouseInsertMigration(t *testing.T) {
drv := ClickHouseDriver{}
db := prepTestClickHouseDB(t)
u := clickhouseTestURL(t)
db := prepTestClickHouseDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

count := 0
Expand All @@ -255,10 +257,11 @@ func TestClickHouseInsertMigration(t *testing.T) {

func TestClickHouseDeleteMigration(t *testing.T) {
drv := ClickHouseDriver{}
db := prepTestClickHouseDB(t)
u := clickhouseTestURL(t)
db := prepTestClickHouseDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

tx, err := db.Begin()
Expand Down
2 changes: 1 addition & 1 deletion pkg/dbmate/db.go
Expand Up @@ -252,7 +252,7 @@ func (db *DB) openDatabaseForMigration() (Driver, *sql.DB, error) {
return nil, nil, err
}

if err := drv.CreateMigrationsTable(sqlDB); err != nil {
if err := drv.CreateMigrationsTable(db.DatabaseURL, sqlDB); err != nil {
mustClose(sqlDB)
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dbmate/driver.go
Expand Up @@ -13,7 +13,7 @@ type Driver interface {
CreateDatabase(*url.URL) error
DropDatabase(*url.URL) error
DumpSchema(*url.URL, *sql.DB) ([]byte, error)
CreateMigrationsTable(*sql.DB) error
CreateMigrationsTable(*url.URL, *sql.DB) error
SelectMigrations(*sql.DB, int) (map[string]bool, error)
InsertMigration(Transaction, string) error
DeleteMigration(Transaction, string) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/dbmate/mysql.go
Expand Up @@ -192,7 +192,7 @@ func (drv MySQLDriver) DatabaseExists(u *url.URL) (bool, error) {
}

// CreateMigrationsTable creates the schema_migrations table
func (drv MySQLDriver) CreateMigrationsTable(db *sql.DB) error {
func (drv MySQLDriver) CreateMigrationsTable(u *url.URL, db *sql.DB) error {
_, err := db.Exec("create table if not exists schema_migrations " +
"(version varchar(255) primary key)")

Expand Down
29 changes: 16 additions & 13 deletions pkg/dbmate/mysql_test.go
Expand Up @@ -15,9 +15,8 @@ func mySQLTestURL(t *testing.T) *url.URL {
return u
}

func prepTestMySQLDB(t *testing.T) *sql.DB {
func prepTestMySQLDB(t *testing.T, u *url.URL) *sql.DB {
drv := MySQLDriver{}
u := mySQLTestURL(t)

// drop any existing database
err := drv.DropDatabase(u)
Expand Down Expand Up @@ -121,9 +120,9 @@ func TestMySQLDumpSchema(t *testing.T) {
u := mySQLTestURL(t)

// prepare database
db := prepTestMySQLDB(t)
db := prepTestMySQLDB(t, u)
defer mustClose(db)
err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

// insert migration
Expand Down Expand Up @@ -191,7 +190,8 @@ func TestMySQLDatabaseExists_Error(t *testing.T) {

func TestMySQLCreateMigrationsTable(t *testing.T) {
drv := MySQLDriver{}
db := prepTestMySQLDB(t)
u := mySQLTestURL(t)
db := prepTestMySQLDB(t, u)
defer mustClose(db)

// migrations table should not exist
Expand All @@ -200,24 +200,25 @@ func TestMySQLCreateMigrationsTable(t *testing.T) {
require.Regexp(t, "Table 'dbmate.schema_migrations' doesn't exist", err.Error())

// create table
err = drv.CreateMigrationsTable(db)
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

// migrations table should exist
err = db.QueryRow("select count(*) from schema_migrations").Scan(&count)
require.NoError(t, err)

// create table should be idempotent
err = drv.CreateMigrationsTable(db)
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)
}

func TestMySQLSelectMigrations(t *testing.T) {
drv := MySQLDriver{}
db := prepTestMySQLDB(t)
u := mySQLTestURL(t)
db := prepTestMySQLDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

_, err = db.Exec(`insert into schema_migrations (version)
Expand All @@ -240,10 +241,11 @@ func TestMySQLSelectMigrations(t *testing.T) {

func TestMySQLInsertMigration(t *testing.T) {
drv := MySQLDriver{}
db := prepTestMySQLDB(t)
u := mySQLTestURL(t)
db := prepTestMySQLDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

count := 0
Expand All @@ -263,10 +265,11 @@ func TestMySQLInsertMigration(t *testing.T) {

func TestMySQLDeleteMigration(t *testing.T) {
drv := MySQLDriver{}
db := prepTestMySQLDB(t)
u := mySQLTestURL(t)
db := prepTestMySQLDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

_, err = db.Exec(`insert into schema_migrations (version)
Expand Down
28 changes: 27 additions & 1 deletion pkg/dbmate/postgres.go
Expand Up @@ -191,7 +191,33 @@ func (drv PostgresDriver) DatabaseExists(u *url.URL) (bool, error) {
}

// CreateMigrationsTable creates the schema_migrations table
func (drv PostgresDriver) CreateMigrationsTable(db *sql.DB) error {
func (drv PostgresDriver) CreateMigrationsTable(u *url.URL, db *sql.DB) error {
// get schema from URL search_path param
searchPath := strings.Split(u.Query().Get("search_path"), ",")
urlSchema := strings.TrimSpace(searchPath[0])
if urlSchema == "" {
urlSchema = "public"
}

// get *unquoted* current schema from database
dbSchema, err := queryRow(db, "select current_schema()")
if err != nil {
return err
}

// if urlSchema and dbSchema are not equal, the most likely explanation is that the schema
// has not yet been created
if urlSchema != dbSchema {
// in theory we could just execute this statement every time, but we do the comparison
// above in case the user doesn't have permissions to create schemas and the schema
// already exists
fmt.Printf("Creating schema: %s\n", urlSchema)
_, err = db.Exec("create schema if not exists " + pq.QuoteIdentifier(urlSchema))
if err != nil {
return err
}
}

migrationsTable, err := drv.migrationsTableName(db)
if err != nil {
return err
Expand Down
86 changes: 65 additions & 21 deletions pkg/dbmate/postgres_test.go
Expand Up @@ -119,7 +119,7 @@ func TestPostgresCreateDropDatabase(t *testing.T) {
defer mustClose(db)

err = db.Ping()
require.NotNil(t, err)
require.Error(t, err)
require.Equal(t, "pq: database \"dbmate\" does not exist", err.Error())
}()
}
Expand All @@ -131,7 +131,7 @@ func TestPostgresDumpSchema(t *testing.T) {
// prepare database
db := prepTestPostgresDB(t, u)
defer mustClose(db)
err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

// insert migration
Expand Down Expand Up @@ -191,32 +191,76 @@ func TestPostgresDatabaseExists_Error(t *testing.T) {
u.User = url.User("invalid")

exists, err := drv.DatabaseExists(u)
require.Error(t, err)
require.Equal(t, "pq: password authentication failed for user \"invalid\"", err.Error())
require.Equal(t, false, exists)
}

func TestPostgresCreateMigrationsTable(t *testing.T) {
drv := PostgresDriver{}
u := postgresTestURL(t)
db := prepTestPostgresDB(t, u)
defer mustClose(db)

// migrations table should not exist
count := 0
err := db.QueryRow("select count(*) from public.schema_migrations").Scan(&count)
require.Equal(t, "pq: relation \"public.schema_migrations\" does not exist", err.Error())
t.Run("default schema", func(t *testing.T) {
u := postgresTestURL(t)
db := prepTestPostgresDB(t, u)
defer mustClose(db)

// create table
err = drv.CreateMigrationsTable(db)
require.NoError(t, err)
// migrations table should not exist
count := 0
err := db.QueryRow("select count(*) from public.schema_migrations").Scan(&count)
require.Error(t, err)
require.Equal(t, "pq: relation \"public.schema_migrations\" does not exist", err.Error())

// migrations table should exist
err = db.QueryRow("select count(*) from public.schema_migrations").Scan(&count)
require.NoError(t, err)
// create table
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

// create table should be idempotent
err = drv.CreateMigrationsTable(db)
require.NoError(t, err)
// migrations table should exist
err = db.QueryRow("select count(*) from public.schema_migrations").Scan(&count)
require.NoError(t, err)

// create table should be idempotent
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)
})

t.Run("custom schema", func(t *testing.T) {
u, err := url.Parse(postgresTestURL(t).String() + "&search_path=foo")
require.NoError(t, err)
db := prepTestPostgresDB(t, u)
defer mustClose(db)

// delete schema
_, err = db.Exec("drop schema if exists foo")
require.NoError(t, err)

// drop any schema_migrations table in public schema
_, err = db.Exec("drop table if exists public.schema_migrations")
require.NoError(t, err)

// migrations table should not exist in either schema
count := 0
err = db.QueryRow("select count(*) from foo.schema_migrations").Scan(&count)
require.Error(t, err)
require.Equal(t, "pq: relation \"foo.schema_migrations\" does not exist", err.Error())
err = db.QueryRow("select count(*) from public.schema_migrations").Scan(&count)
require.Error(t, err)
require.Equal(t, "pq: relation \"public.schema_migrations\" does not exist", err.Error())

// create table
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

// foo schema should be created, and migrations table should exist only in foo schema
err = db.QueryRow("select count(*) from foo.schema_migrations").Scan(&count)
require.NoError(t, err)
err = db.QueryRow("select count(*) from public.schema_migrations").Scan(&count)
require.Error(t, err)
require.Equal(t, "pq: relation \"public.schema_migrations\" does not exist", err.Error())

// create table should be idempotent
err = drv.CreateMigrationsTable(u, db)
require.NoError(t, err)
})
}

func TestPostgresSelectMigrations(t *testing.T) {
Expand All @@ -225,7 +269,7 @@ func TestPostgresSelectMigrations(t *testing.T) {
db := prepTestPostgresDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

_, err = db.Exec(`insert into public.schema_migrations (version)
Expand All @@ -252,7 +296,7 @@ func TestPostgresInsertMigration(t *testing.T) {
db := prepTestPostgresDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

count := 0
Expand All @@ -276,7 +320,7 @@ func TestPostgresDeleteMigration(t *testing.T) {
db := prepTestPostgresDB(t, u)
defer mustClose(db)

err := drv.CreateMigrationsTable(db)
err := drv.CreateMigrationsTable(u, db)
require.NoError(t, err)

_, err = db.Exec(`insert into public.schema_migrations (version)
Expand Down

0 comments on commit ac718a2

Please sign in to comment.