-
Notifications
You must be signed in to change notification settings - Fork 70
/
settings.go
175 lines (152 loc) · 5.22 KB
/
settings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package settings
import (
"fmt"
"path/filepath"
"strings"
"github.com/astronomer/astro-cli/docker"
"github.com/astronomer/astro-cli/messages"
"github.com/astronomer/astro-cli/pkg/fileutil"
"github.com/pkg/errors"
"github.com/spf13/viper"
)
var (
// ConfigFileName is the name of the config files (home / project)
ConfigFileName = "airflow_settings"
// ConfigFileType is the config file extension
ConfigFileType = "yaml"
// ConfigFileNameWithExt is the config filename with extension
ConfigFileNameWithExt = fmt.Sprintf("%s.%s", ConfigFileName, ConfigFileType)
// HomePath is the path to a users home directory
HomePath, _ = fileutil.GetHomeDir()
// HomeConfigFile is the global config file
HomeConfigFile = filepath.Join(HomePath, ConfigFileNameWithExt)
// WorkingPath is the path to the working directory
WorkingPath, _ = fileutil.GetWorkingDir()
// viperSettings is the viper object in a project directory
viperSettings *viper.Viper
settings Config
)
// ConfigSettings is the main builder of the settings package
func ConfigSettings(id string) {
InitSettings()
AddConnections(id)
AddPools(id)
AddVariables(id)
}
// InitSettings initializes settings file
func InitSettings() {
// Set up viper object for project config
viperSettings = viper.New()
viperSettings.SetConfigName(ConfigFileName)
viperSettings.SetConfigType(ConfigFileType)
workingConfigFile := filepath.Join(WorkingPath, ConfigFileNameWithExt)
// Add the path we discovered
viperSettings.SetConfigFile(workingConfigFile)
// Read in project config
readErr := viperSettings.ReadInConfig()
if readErr != nil {
fmt.Printf(messages.CONFIG_READ_ERROR, readErr)
}
err := viperSettings.Unmarshal(&settings)
if err != nil {
errors.Wrap(err, "unable to decode into struct")
}
}
// AddVariables is a function to add Variables from settings.yaml
func AddVariables(id string) {
variables := settings.Airflow.Variables
for _, variable := range variables {
if !objectValidator(0, variable.VariableName) {
if objectValidator(0, variable.VariableValue) {
fmt.Print("Skipping Variable Creation: No Variable Name Specified.\n")
}
} else {
if objectValidator(0, variable.VariableValue) {
airflowCommand := fmt.Sprintf("airflow variables -s %s ", variable.VariableName)
airflowCommand += fmt.Sprintf("'%s'", variable.VariableValue)
docker.AirflowCommand(id, airflowCommand)
fmt.Printf("Added Variable: %s\n", variable.VariableName)
}
}
}
}
// AddConnections is a function to add Connections from settings.yaml
func AddConnections(id string) {
connections := settings.Airflow.Connections
airflowCommand := fmt.Sprintf("airflow connections -l")
out := docker.AirflowCommand(id, airflowCommand)
for _, conn := range connections {
if objectValidator(0, conn.ConnID) {
quotedConnID := "'" + conn.ConnID + "'"
if strings.Contains(out, quotedConnID) {
fmt.Printf("Found Connection: \"%s\"...replacing...\n", conn.ConnID)
airflowCommand = fmt.Sprintf("airflow connections -d --conn_id \"%s\"", conn.ConnID)
docker.AirflowCommand(id, airflowCommand)
}
if !objectValidator(1, conn.ConnType, conn.ConnURI) {
fmt.Printf("Skipping %s: conn_type or conn_uri must be specified.\n", conn.ConnID)
} else {
airflowCommand = fmt.Sprintf("airflow connections -a --conn_id \"%s\" ", conn.ConnID)
if objectValidator(0, conn.ConnType) {
airflowCommand += fmt.Sprintf("--conn_type \"%s\" ", conn.ConnType)
}
if objectValidator(0, conn.ConnURI) {
airflowCommand += fmt.Sprintf("--conn_uri '%s' ", conn.ConnURI)
}
if objectValidator(0, conn.ConnExtra) {
airflowCommand += fmt.Sprintf("--conn_extra '%s' ", conn.ConnExtra)
}
if objectValidator(0, conn.ConnHost) {
airflowCommand += fmt.Sprintf("--conn_host '%s' ", conn.ConnHost)
}
if objectValidator(0, conn.ConnLogin) {
airflowCommand += fmt.Sprintf("--conn_login '%s' ", conn.ConnLogin)
}
if objectValidator(0, conn.ConnPassword) {
airflowCommand += fmt.Sprintf("--conn_password '%s' ", conn.ConnPassword)
}
if objectValidator(0, conn.ConnSchema) {
airflowCommand += fmt.Sprintf("--conn_schema '%s' ", conn.ConnSchema)
}
if conn.ConnPort != 0 {
airflowCommand += fmt.Sprintf("--conn_port %v", conn.ConnPort)
}
docker.AirflowCommand(id, airflowCommand)
fmt.Printf("Added Connection: %s\n", conn.ConnID)
}
}
}
}
// AddPools is a function to add Pools from settings.yaml
func AddPools(id string) {
pools := settings.Airflow.Pools
for _, pool := range pools {
if objectValidator(0, pool.PoolName) {
airflowCommand := fmt.Sprintf("airflow pool -s %s ", pool.PoolName)
if pool.PoolSlot != 0 {
airflowCommand += fmt.Sprintf("%v ", pool.PoolSlot)
if objectValidator(0, pool.PoolDescription) {
airflowCommand += fmt.Sprintf("'%s' ", pool.PoolDescription)
} else {
airflowCommand += fmt.Sprint("\"\"")
}
docker.AirflowCommand(id, airflowCommand)
fmt.Printf("Added Pool: %s\n", pool.PoolName)
} else {
fmt.Printf("Skipping %s: Pool Slot must be set.\n", pool.PoolName)
}
}
}
}
func objectValidator(bound int, args ...string) bool {
count := 0
for _, arg := range args {
if len(arg) == 0 {
count++
}
}
if count > bound {
return false
}
return true
}