Skip to content

Commit

Permalink
Feature/connections (#168)
Browse files Browse the repository at this point in the history
* Add settings.yml default

* Add settings.yaml to init

* Add SettingsConfig and Settings creation

* Add validation checks

* Fix syntax

* Adjust connection validation

* Factor out settings config to separate package

* Fix comment

* Factor out settings

* Add stdout parsing and delete/add connection when found

* Removed unnecessary returns

* Add beta comment

* Change double quotes in conn_extra to single quotes

* Add additional note for conn_extra formatting
  • Loading branch information
Ben authored and schnie committed Jan 11, 2019
1 parent 30665bb commit ddaee93
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/airflow.go
Expand Up @@ -76,6 +76,7 @@ func Init(path string, airflowVersion string) error {
"packages.txt": "",
"requirements.txt": "",
".env": "",
"settings.yaml": include.Settingsyml,
"dags/example-dag.py": include.Exampledag,
"plugins/example-plugin.py": include.ExamplePlugin,
}
Expand Down
14 changes: 14 additions & 0 deletions airflow/docker.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/astronomer/astro-cli/messages"
"github.com/astronomer/astro-cli/pkg/input"
"github.com/astronomer/astro-cli/pkg/printutil"
"github.com/astronomer/astro-cli/settings"
)

const (
Expand Down Expand Up @@ -185,7 +186,20 @@ func Start(airflowHome string, envFile string) error {
if err != nil {
return errors.Wrap(err, messages.COMPOSE_RECREATE_ERROR)
}

}

psInfo, err = project.Ps(context.Background())
if err != nil {
return errors.Wrap(err, messages.COMPOSE_STATUS_CHECK_ERROR)
}

for _, info := range psInfo {
if strings.Contains(info["Name"], "scheduler") {
settings.ConfigSettings(info["Id"])
}
}

fmt.Printf(messages.COMPOSE_LINK_WEBSERVER+"\n", config.CFG.WebserverPort.GetString())
fmt.Printf(messages.COMPOSE_LINK_POSTGRES+"\n", config.CFG.PostgresPort.GetString())
return nil
Expand Down
28 changes: 28 additions & 0 deletions airflow/include/settingsyml.go
@@ -0,0 +1,28 @@
package include

import "strings"

// Settingsyml is the settings template
var Settingsyml = strings.TrimSpace(`
# This feature is in Beta.
# Please report any bugs to https://github.com/astronomer/astro-cli/issues
# NOTE: If putting a dict in conn_extra, please wrap in single quotes.
airflow:
connections:
- conn_id:
conn_type:
conn_host:
conn_login:
conn_password:
conn_port:
conn_extra:
pools:
- pool_name:
pool_slot:
pool_description:
variables:
- variable_name:
variable_value:`)
137 changes: 137 additions & 0 deletions settings/settings.go
@@ -0,0 +1,137 @@
package settings

import (
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strings"

"github.com/astronomer/astro-cli/messages"
"github.com/astronomer/astro-cli/pkg/fileutil"
"github.com/pkg/errors"
"github.com/spf13/viper"
)

var (
// ConfigFileName is the name of the config files (home / project)
ConfigFileName = "settings"
// ConfigFileType is the config file extension
ConfigFileType = "yaml"
// ConfigFileNameWithExt is the config filename with extension
ConfigFileNameWithExt = fmt.Sprintf("%s.%s", ConfigFileName, ConfigFileType)
// HomePath is the path to a users home directory
HomePath, _ = fileutil.GetHomeDir()
// HomeConfigFile is the global config file
HomeConfigFile = filepath.Join(HomePath, ConfigFileNameWithExt)
// WorkingPath is the path to the working directory
WorkingPath, _ = fileutil.GetWorkingDir()

// viperSettings is the viper object in a project directory
viperSettings *viper.Viper

settings Config
)

// ConfigSettings is the main builder of the settings package
func ConfigSettings(id string) {
InitSettings()
AddConnections(id)
AddPools(id)
AddVariables(id)
}

// InitSettings initializes settings file
func InitSettings() {
// Set up viper object for project config
viperSettings = viper.New()
viperSettings.SetConfigName(ConfigFileName)
viperSettings.SetConfigType(ConfigFileType)
workingConfigFile := filepath.Join(WorkingPath, ConfigFileNameWithExt)
// Add the path we discovered
viperSettings.SetConfigFile(workingConfigFile)

// Read in project config
readErr := viperSettings.ReadInConfig()
// fmt.Println(viperSettings.Get("airflow"))
if readErr != nil {
fmt.Printf(messages.CONFIG_READ_ERROR, readErr)
}

err := viperSettings.Unmarshal(&settings)

if err != nil {
errors.Wrap(err, "unable to decode into struct")
}
}

// AddVariables is a function to add Variables from settings.yaml
func AddVariables(id string) {
variables := settings.Airflow.Variables

for _, variable := range variables {
if len(variable.VariableName) == 0 && len(variable.VariableValue) > 0 {
fmt.Print("Skipping Variable Creation: No Variable Name Specified.")

} else {
airflowCommand := fmt.Sprintf("airflow variables -s \"%s\" \"%s\"", variable.VariableName, variable.VariableValue)
AirflowCommand(id, airflowCommand)
fmt.Printf("Added Variable: %s\n", variable.VariableName)
}
}
}

// AddConnections is a function to add Connections from settings.yaml
func AddConnections(id string) {
connections := settings.Airflow.Connections
airflowCommand := fmt.Sprintf("airflow connections -l")
out := AirflowCommand(id, airflowCommand)

for _, conn := range connections {
if len(conn.ConnID) > 0 && len(conn.ConnType) == 0 && len(conn.ConnURI) == 0 {
fmt.Printf("Skipping %s: ConnType or ConnUri must be specified.", conn.ConnID)
} else {
quotedConnID := "'" + conn.ConnID + "'"
if strings.Contains(out, quotedConnID) {
fmt.Printf("Found Connection: \"%s\"...replacing...\n", conn.ConnID)
airflowCommand = fmt.Sprintf("airflow connections -d --conn_id \"%s\"", conn.ConnID)
AirflowCommand(id, airflowCommand)
}
airflowCommand = fmt.Sprintf("airflow connections -a --conn_id \"%s\" --conn_type \"%s\" --conn_uri \"%s\" --conn_extra '%s' --conn_host \"%s\" --conn_login \"%s\" --conn_password \"%s\" --conn_schema \"%s\" --conn_port \"%v\"", conn.ConnID, conn.ConnType, conn.ConnURI, conn.ConnExtra, conn.ConnHost, conn.ConnLogin, conn.ConnPassword, conn.ConnSchema, conn.ConnPort)
fmt.Println(airflowCommand)
AirflowCommand(id, airflowCommand)
fmt.Printf("Added Connection: %s\n", conn.ConnID)
}
}
}

// AddPools is a function to add Pools from settings.yaml
func AddPools(id string) {
pools := settings.Airflow.Pools
for _, pool := range pools {
if len(pool.PoolName) == 0 && pool.PoolSlot > 0 {
fmt.Print("Skipping Pool Creation: No Pool Name Specified.")
} else {
airflowCommand := fmt.Sprintf("airflow pool -s \"%s\" \"%v\" \"%s\"", pool.PoolName, pool.PoolSlot, pool.PoolDescription)
AirflowCommand(id, airflowCommand)
fmt.Printf("Added Pool: %s\n", pool.PoolName)
}
}
}

// AirflowCommand is the main method of interaction with Airflow
func AirflowCommand(id string, airflowCommand string) string {
cmd := exec.Command("docker", "exec", "-it", id, "bash", "-c", airflowCommand)
cmd.Stdin = os.Stdin
cmd.Stderr = os.Stderr

out, err := cmd.Output()
if err != nil {
log.Fatal(err)
}

stringOut := string(out)

return stringOut
}
39 changes: 39 additions & 0 deletions settings/types.go
@@ -0,0 +1,39 @@
package settings

// Connections contains structure of airflow connections
type Connections []struct {
ConnID string `mapstructure:"conn_id"`
ConnType string `mapstructure:"conn_type"`
ConnHost string `mapstructure:"conn_host"`
ConnSchema string `mapstructure:"conn_schema"`
ConnLogin string `mapstructure:"conn_login"`
ConnPassword string `mapstructure:"conn_password"`
ConnPort int `mapstructure:"conn_port"`
ConnURI string `mapstructure:"conn_uri"`
ConnExtra string `mapstructure:"conn_extra"`
}

// Pools contains structure of airflow pools
type Pools []struct {
PoolName string `mapstructure:"pool_name"`
PoolSlot int `mapstructure:"pool_slot"`
PoolDescription string `mapstructure:"pool_description"`
}

// Variables contains structure of airflow variables
type Variables []struct {
VariableName string `mapstructure:"variable_name"`
VariableValue string `mapstructure:"variable_value"`
}

// Airflow contains structure of airflow settings
type Airflow struct {
Connections `mapstructure:"connections"`
Pools `mapstructure:"pools"`
Variables `mapstructure:"variables"`
}

// Config is input data to generate connections, pools, and variables
type Config struct {
Airflow `mapstructure:"airflow"`
}

0 comments on commit ddaee93

Please sign in to comment.