Skip to content

Commit

Permalink
Backport "airflow_settings.yaml" and ".env" feature to 0.7 (#205)
Browse files Browse the repository at this point in the history
* Feature/Add connections

* Handle case with dash in project name

* Backport .env feature to 0.7
  • Loading branch information
Ben authored and schnie committed Apr 18, 2019
1 parent e025df2 commit 2571362
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 10 deletions.
2 changes: 2 additions & 0 deletions airflow/airflow.go
Expand Up @@ -75,6 +75,8 @@ func Init(path string) error {
version.GetTagFromVersion()),
"packages.txt": "",
"requirements.txt": "",
".env": "",
"airflow_settings.yaml": include.Settingsyml,
"dags/example-dag.py": include.Exampledag,
"plugins/example-plugin.py": include.ExamplePlugin,
}
Expand Down
58 changes: 49 additions & 9 deletions airflow/docker.go
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/astronomer/astro-cli/docker"
"github.com/astronomer/astro-cli/houston"
"github.com/astronomer/astro-cli/messages"
"github.com/astronomer/astro-cli/pkg/fileutil"
"github.com/astronomer/astro-cli/pkg/input"
"github.com/astronomer/astro-cli/pkg/printutil"
"github.com/astronomer/astro-cli/settings"
)

const (
Expand All @@ -46,6 +48,7 @@ type ComposeConfig struct {
PostgresPassword string
PostgresHost string
PostgresPort string
AirflowEnvFile string
AirflowImage string
AirflowHome string
AirflowUser string
Expand Down Expand Up @@ -77,12 +80,28 @@ func imageBuild(path, imageName string) error {
}

// generateConfig generates the docker-compose config
func generateConfig(projectName, airflowHome string) (string, error) {
func generateConfig(projectName, airflowHome string, envFile string) (string, error) {
tmpl, err := template.New("yml").Parse(include.Composeyml)
if err != nil {
return "", errors.Wrap(err, "failed to generate config")
}

envExists, err := fileutil.Exists(envFile)

if err != nil {
return "", errors.Wrap(err, fmt.Sprintf(messages.ENV_PATH, envFile))
}

if envFile != "" {
if !envExists {
fmt.Printf(messages.ENV_NOT_FOUND, envFile)
envFile = ""
} else {
fmt.Printf(messages.ENV_FOUND, envFile)
envFile = fmt.Sprintf("env_file: %s", envFile)
}
}

config := ComposeConfig{
PostgresUser: config.CFG.PostgresUser.GetString(),
PostgresPassword: config.CFG.PostgresPassword.GetString(),
Expand All @@ -92,6 +111,7 @@ func generateConfig(projectName, airflowHome string) (string, error) {
AirflowHome: airflowHome,
AirflowUser: "astro",
AirflowWebserverPort: config.CFG.WebserverPort.GetString(),
AirflowEnvFile: envFile,
}

buff := new(bytes.Buffer)
Expand All @@ -114,9 +134,9 @@ func checkServiceState(serviceState, expectedState string) bool {
}

// createProject creates project with yaml config as context
func createProject(projectName, airflowHome string) (project.APIProject, error) {
func createProject(projectName, airflowHome string, envFile string) (project.APIProject, error) {
// Generate the docker-compose yaml
yaml, err := generateConfig(projectName, airflowHome)
yaml, err := generateConfig(projectName, airflowHome, envFile)
if err != nil {
return nil, errors.Wrap(err, "failed to create project")
}
Expand All @@ -134,12 +154,14 @@ func createProject(projectName, airflowHome string) (project.APIProject, error)
}

// Start starts a local airflow development cluster
func Start(airflowHome string) error {
func Start(airflowHome string, envFile string) error {
// Get project name from config
projectName := config.CFG.ProjectName.GetString()
replacer := strings.NewReplacer("_", "", "-", "")
strippedProjectName := replacer.Replace(projectName)

// Create a libcompose project
project, err := createProject(projectName, airflowHome)
project, err := createProject(projectName, airflowHome, envFile)
if err != nil {
return errors.Wrap(err, messages.COMPOSE_CREATE_ERROR)
}
Expand Down Expand Up @@ -180,6 +202,24 @@ func Start(airflowHome string) error {
return errors.Wrap(err, messages.COMPOSE_RECREATE_ERROR)
}
}

psInfo, err = project.Ps(context.Background())

fileState, err := fileutil.Exists("airflow_settings.yaml")

if err != nil {
return errors.Wrap(err, messages.SETTINGS_PATH)
}

if fileState {
for _, info := range psInfo {
if strings.Contains(info["Name"], strippedProjectName) &&
strings.Contains(info["Name"], "webserver") {
settings.ConfigSettings(info["Id"])
}
}
}

fmt.Printf(messages.COMPOSE_LINK_WEBSERVER+"\n", config.CFG.WebserverPort.GetString())
fmt.Printf(messages.COMPOSE_LINK_POSTGRES+"\n", config.CFG.PostgresPort.GetString())
return nil
Expand All @@ -191,7 +231,7 @@ func Kill(airflowHome string) error {
projectName := config.CFG.ProjectName.GetString()

// Create a libcompose project
project, err := createProject(projectName, airflowHome)
project, err := createProject(projectName, airflowHome, "")
if err != nil {
return errors.Wrap(err, messages.COMPOSE_CREATE_ERROR)
}
Expand All @@ -213,7 +253,7 @@ func Logs(airflowHome string, webserver, scheduler, follow bool) error {
projectName := config.CFG.ProjectName.GetString()

// Create libcompose project
project, err := createProject(projectName, airflowHome)
project, err := createProject(projectName, airflowHome, "")
if err != nil {
return errors.Wrap(err, messages.COMPOSE_CREATE_ERROR)
}
Expand Down Expand Up @@ -248,7 +288,7 @@ func Stop(airflowHome string) error {
projectName := config.CFG.ProjectName.GetString()

// Create a libcompose project
project, err := createProject(projectName, airflowHome)
project, err := createProject(projectName, airflowHome, "")
if err != nil {
return errors.Wrap(err, messages.COMPOSE_CREATE_ERROR)
}
Expand All @@ -268,7 +308,7 @@ func PS(airflowHome string) error {
projectName := config.CFG.ProjectName.GetString()

// Create a libcompose project
project, err := createProject(projectName, airflowHome)
project, err := createProject(projectName, airflowHome, "")
if err != nil {
return errors.Wrap(err, messages.COMPOSE_CREATE_ERROR)
}
Expand Down
2 changes: 2 additions & 0 deletions airflow/include/composeyml.go
Expand Up @@ -56,6 +56,7 @@ services:
- {{ .AirflowHome }}/plugins:/usr/local/airflow/plugins:ro
- {{ .AirflowHome }}/include:/usr/local/airflow/include:ro
- airflow_logs:/usr/local/airflow/logs
{{ .AirflowEnvFile }}
webserver:
image: {{ .AirflowImage }}
Expand Down Expand Up @@ -83,4 +84,5 @@ services:
- {{ .AirflowHome }}/plugins:/usr/local/airflow/plugins:ro
- {{ .AirflowHome }}/include:/usr/local/airflow/include:ro
- airflow_logs:/usr/local/airflow/logs
{{ .AirflowEnvFile }}
`)
26 changes: 26 additions & 0 deletions airflow/include/settingsyml.go
@@ -0,0 +1,26 @@
package include

import "strings"

// Settingsyml is the settings template
var Settingsyml = strings.TrimSpace(`
# This feature is in Beta.
# Please report any bugs to https://github.com/astronomer/astro-cli/issues
# NOTE: If putting a dict for any field, please wrap in single quotes.
# More details you can find https://github.com/astronomer/docs/blob/master/docs/cli-airflow-configuration.md
airflow:
connections:
- conn_id:
conn_type:
conn_host:
conn_login:
conn_password:
conn_port:
conn_extra:
pools:
- pool_name:
pool_slot:
pool_description:
variables:
- variable_name:
variable_value:`)
10 changes: 9 additions & 1 deletion cmd/airflow.go
Expand Up @@ -23,6 +23,7 @@ import (

var (
projectName string
envFile string
followLogs bool
forceDeploy bool
forcePrompt bool
Expand Down Expand Up @@ -56,6 +57,7 @@ var (
Use: "start",
Short: "Start a development airflow cluster",
Long: "Start a development airflow cluster",
Args: cobra.MaximumNArgs(1),
PreRunE: ensureProjectDir,
RunE: airflowStart,
}
Expand Down Expand Up @@ -110,6 +112,7 @@ func init() {

// Airflow start
airflowRootCmd.AddCommand(airflowStartCmd)
airflowStartCmd.Flags().StringVarP(&envFile, "env", "e", ".env", "Location of file containing environment variables")

// Airflow kill
airflowRootCmd.AddCommand(airflowKillCmd)
Expand Down Expand Up @@ -233,7 +236,12 @@ func airflowStart(cmd *cobra.Command, args []string) error {
// Silence Usage as we have now validated command input
cmd.SilenceUsage = true

return airflow.Start(config.WorkingPath)
// Get release name from args, if passed
if len(args) > 0 {
envFile = args[0]
}

return airflow.Start(config.WorkingPath, envFile)
}

// Kill an airflow cluster
Expand Down
17 changes: 17 additions & 0 deletions docker/docker.go
Expand Up @@ -49,3 +49,20 @@ func ExecLogin(registry, username, password string) error {

return cmd.Run()
}

// AirflowCommand is the main method of interaction with Airflow
func AirflowCommand(id string, airflowCommand string) string {

cmd := exec.Command("docker", "exec", "-it", id, "bash", "-c", airflowCommand)
cmd.Stdin = os.Stdin
cmd.Stderr = os.Stderr

out, err := cmd.Output()

if err != nil {
errors.Wrapf(err, "error encountered")
}

stringOut := string(out)
return stringOut
}
6 changes: 6 additions & 0 deletions messages/messages.go
Expand Up @@ -41,6 +41,10 @@ var (
COMPOSE_LINK_WEBSERVER = "Airflow Webserver: http://localhost:%s/admin/"
COMPOSE_LINK_POSTGRES = "Postgres Database: localhost:%s/postgres"

ENV_PATH = "Error looking for \"%s\""
ENV_FOUND = "Env file \"%s\" found. Loading...\n"
ENV_NOT_FOUND = "Env file \"%s\" not found. Skipping...\n"

HOUSTON_BASIC_AUTH_DISABLED = "Basic authentication is disabled, conact administrator or defer back to oAuth"
HOUSTON_DEPLOYMENT_HEADER = "Authenticated to %s \n\n"
HOUSTON_DEPLOYING_PROMPT = "Deploying: %s\n"
Expand All @@ -57,5 +61,7 @@ var (
REGISTRY_AUTH_FAIL = "Failed to authenticate to the registry, this can occur when registry is offline. Until authenticated you will not be able to push new images to your Airflow clusters\n"
REGISTRY_UNCOMMITTED_CHANGES = "Project directory has uncommmited changes, use `astro airflow deploy [releaseName] -f` to force deploy."

SETTINGS_PATH = "Error looking for airflow_settings.yaml"

NA = "N/A"
)

0 comments on commit 2571362

Please sign in to comment.