/
connection_mysql.go
138 lines (117 loc) · 4.08 KB
/
connection_mysql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package materialize
import (
"fmt"
"strings"
"github.com/jmoiron/sqlx"
)
type ConnectionMySQLBuilder struct {
Connection
connectionType string
mysqlHost string
mysqlPort int
mysqlUser ValueSecretStruct
mysqlPassword IdentifierSchemaStruct
mysqlSSHTunnel IdentifierSchemaStruct
mysqlSSLMode string
mysqlSSLCa ValueSecretStruct
mysqlSSLCert ValueSecretStruct
mysqlSSLKey IdentifierSchemaStruct
mysqlAWSPrivateLink IdentifierSchemaStruct
validate bool
}
func NewConnectionMySQLBuilder(conn *sqlx.DB, obj MaterializeObject) *ConnectionMySQLBuilder {
b := Builder{conn, BaseConnection}
return &ConnectionMySQLBuilder{
Connection: Connection{b, obj.Name, obj.SchemaName, obj.DatabaseName},
}
}
func (b *ConnectionMySQLBuilder) ConnectionType(connectionType string) *ConnectionMySQLBuilder {
b.connectionType = connectionType
return b
}
func (b *ConnectionMySQLBuilder) MySQLHost(mysqlHost string) *ConnectionMySQLBuilder {
b.mysqlHost = mysqlHost
return b
}
func (b *ConnectionMySQLBuilder) MySQLPort(mysqlPort int) *ConnectionMySQLBuilder {
b.mysqlPort = mysqlPort
return b
}
func (b *ConnectionMySQLBuilder) MySQLUser(mysqlUser ValueSecretStruct) *ConnectionMySQLBuilder {
b.mysqlUser = mysqlUser
return b
}
func (b *ConnectionMySQLBuilder) MySQLPassword(mysqlPassword IdentifierSchemaStruct) *ConnectionMySQLBuilder {
b.mysqlPassword = mysqlPassword
return b
}
func (b *ConnectionMySQLBuilder) MySQLSSHTunnel(mysqlSSHTunnel IdentifierSchemaStruct) *ConnectionMySQLBuilder {
b.mysqlSSHTunnel = mysqlSSHTunnel
return b
}
func (b *ConnectionMySQLBuilder) MySQLSSLMode(mysqlSSLMode string) *ConnectionMySQLBuilder {
b.mysqlSSLMode = mysqlSSLMode
return b
}
func (b *ConnectionMySQLBuilder) MySQLSSLCa(mysqlSSLCa ValueSecretStruct) *ConnectionMySQLBuilder {
b.mysqlSSLCa = mysqlSSLCa
return b
}
func (b *ConnectionMySQLBuilder) MySQLSSLCert(mysqlSSLCert ValueSecretStruct) *ConnectionMySQLBuilder {
b.mysqlSSLCert = mysqlSSLCert
return b
}
func (b *ConnectionMySQLBuilder) MySQLSSLKey(mysqlSSLKey IdentifierSchemaStruct) *ConnectionMySQLBuilder {
b.mysqlSSLKey = mysqlSSLKey
return b
}
func (b *ConnectionMySQLBuilder) MySQLAWSPrivateLink(mysqlAWSPrivateLink IdentifierSchemaStruct) *ConnectionMySQLBuilder {
b.mysqlAWSPrivateLink = mysqlAWSPrivateLink
return b
}
func (b *ConnectionMySQLBuilder) Validate(validate bool) *ConnectionMySQLBuilder {
b.validate = validate
return b
}
func (b *ConnectionMySQLBuilder) Create() error {
q := strings.Builder{}
q.WriteString(fmt.Sprintf(`CREATE CONNECTION %s TO MYSQL (`, b.QualifiedName()))
q.WriteString(fmt.Sprintf(`HOST %s`, QuoteString(b.mysqlHost)))
q.WriteString(fmt.Sprintf(`, PORT %d`, b.mysqlPort))
if b.mysqlUser.Text != "" {
q.WriteString(fmt.Sprintf(`, USER %s`, QuoteString(b.mysqlUser.Text)))
}
if b.mysqlUser.Secret.Name != "" {
q.WriteString(fmt.Sprintf(`, USER SECRET %s`, b.mysqlUser.Secret.QualifiedName()))
}
if b.mysqlPassword.Name != "" {
q.WriteString(fmt.Sprintf(`, PASSWORD SECRET %s`, b.mysqlPassword.QualifiedName()))
}
if b.mysqlSSLMode != "" {
q.WriteString(fmt.Sprintf(`, SSL MODE %s`, QuoteString(b.mysqlSSLMode)))
}
if b.mysqlSSHTunnel.Name != "" {
q.WriteString(fmt.Sprintf(`, SSH TUNNEL %s`, b.mysqlSSHTunnel.QualifiedName()))
}
if b.mysqlSSLCa.Secret.Name != "" {
q.WriteString(fmt.Sprintf(`, SSL CERTIFICATE AUTHORITY SECRET %s`, b.mysqlSSLCa.Secret.QualifiedName()))
}
if b.mysqlSSLCert.Text != "" {
q.WriteString(fmt.Sprintf(`, SSL CERTIFICATE %s`, QuoteString(b.mysqlSSLCert.Text)))
}
if b.mysqlSSLCert.Secret.Name != "" {
q.WriteString(fmt.Sprintf(`, SSL CERTIFICATE SECRET %s`, b.mysqlSSLCert.Secret.QualifiedName()))
}
if b.mysqlSSLKey.Name != "" {
q.WriteString(fmt.Sprintf(`, SSL KEY SECRET %s`, b.mysqlSSLKey.QualifiedName()))
}
if b.mysqlAWSPrivateLink.Name != "" {
q.WriteString(fmt.Sprintf(`, AWS PRIVATELINK %s`, b.mysqlAWSPrivateLink.QualifiedName()))
}
q.WriteString(`)`)
if !b.validate {
q.WriteString(` WITH (VALIDATE = false)`)
}
q.WriteString(`;`)
return b.ddl.exec(q.String())
}