forked from goharbor/harbor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pgsql.go
137 lines (120 loc) · 3.89 KB
/
pgsql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"fmt"
"os"
"github.com/astaxie/beego/orm"
"github.com/golang-migrate/migrate"
_ "github.com/golang-migrate/migrate/database/postgres" // import pgsql driver for migrator
_ "github.com/golang-migrate/migrate/source/file" // import local file driver for migrator
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
_ "github.com/lib/pq" // register pgsql driver
)
const defaultMigrationPath = "migrations/postgresql/"
type pgsql struct {
host string
port string
usr string
pwd string
database string
sslmode string
}
// Name returns the name of PostgreSQL
func (p *pgsql) Name() string {
return "PostgreSQL"
}
// String ...
func (p *pgsql) String() string {
return fmt.Sprintf("type-%s host-%s port-%s databse-%s sslmode-%q",
p.Name(), p.host, p.port, p.database, p.sslmode)
}
// NewPGSQL returns an instance of postgres
func NewPGSQL(host string, port string, usr string, pwd string, database string, sslmode string) Database {
if len(sslmode) == 0 {
sslmode = "disable"
}
return &pgsql{
host: host,
port: port,
usr: usr,
pwd: pwd,
database: database,
sslmode: sslmode,
}
}
// Register registers pgSQL to orm with the info wrapped by the instance.
func (p *pgsql) Register(alias ...string) error {
if err := utils.TestTCPConn(fmt.Sprintf("%s:%s", p.host, p.port), 60, 2); err != nil {
return err
}
if err := orm.RegisterDriver("postgres", orm.DRPostgres); err != nil {
return err
}
an := "default"
if len(alias) != 0 {
an = alias[0]
}
info := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
p.host, p.port, p.usr, p.pwd, p.database, p.sslmode)
return orm.RegisterDataBase(an, "postgres", info)
}
// UpgradeSchema calls migrate tool to upgrade schema to the latest based on the SQL scripts.
func (p *pgsql) UpgradeSchema() error {
dbURL := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=%s", p.usr, p.pwd, p.host, p.port, p.database, p.sslmode)
// For UT
path := os.Getenv("POSTGRES_MIGRATION_SCRIPTS_PATH")
if len(path) == 0 {
path = defaultMigrationPath
}
srcURL := fmt.Sprintf("file://%s", path)
m, err := migrate.New(srcURL, dbURL)
if err != nil {
return err
}
defer func() {
srcErr, dbErr := m.Close()
if srcErr != nil || dbErr != nil {
log.Warningf("Failed to close migrator, source error: %v, db error: %v", srcErr, dbErr)
}
}()
log.Infof("Upgrading schema for pgsql ...")
err = m.Up()
if err == migrate.ErrNoChange {
log.Infof("No change in schema, skip.")
} else if err != nil { // migrate.ErrLockTimeout will be thrown when another process is doing migration and timeout.
log.Errorf("Failed to upgrade schema, error: %q", err)
return err
}
return upgradeSchema()
}
// This function is used to deleting the replication policies and
// jobs which are marked as "deleted" to fix #6698.
//
// The code only exists on branch 1.7 and the corresponding changes
// for master branch is put in the upgrade SQL file
func upgradeSchema() error {
sql := `DELETE FROM replication_job AS j
USING replication_policy AS p
WHERE j.policy_id = p.id AND p.deleted = TRUE`
_, err := GetOrmer().Raw(sql).Exec()
if err != nil {
return err
}
sql = `DELETE FROM replication_policy AS p
WHERE p.deleted = TRUE`
_, err = GetOrmer().Raw(sql).Exec()
return err
}