From dfe0d2b87cc71c544becba7dcf8a50348ecd3938 Mon Sep 17 00:00:00 2001 From: schnie Date: Tue, 13 Feb 2018 11:18:32 -0500 Subject: [PATCH] Allow cluster start and stop from nested directory --- airflow/airflow.go | 25 +--------- airflow/docker.go | 112 +++++++++++++++++++++++++++------------------ airflow/files.go | 27 ++++++----- cmd/airflow.go | 27 +++++++---- config/config.go | 13 +++++- 5 files changed, 113 insertions(+), 91 deletions(-) diff --git a/airflow/airflow.go b/airflow/airflow.go index 2fb068cc4..94034a638 100644 --- a/airflow/airflow.go +++ b/airflow/airflow.go @@ -5,14 +5,9 @@ import ( "os" "path/filepath" - "github.com/astronomerio/astro-cli/docker" "github.com/astronomerio/astro-cli/utils" ) -func imageName(name, tag string) string { - return fmt.Sprintf("%s/%s:%s", name, "airflow", tag) -} - func initDirs(root string, dirs []string) bool { // Any inputs exist exists := false @@ -64,7 +59,7 @@ func initFiles(root string, files map[string]string) bool { // Init will scaffold out a new airflow project func Init(path string) { // List of directories to create - dirs := []string{"dags", "plugins"} + dirs := []string{"dags", "plugins", "include"} // Map of files to create files := map[string]string{ @@ -84,21 +79,3 @@ func Init(path string) { // Create new airflow deployment func Create() { } - -// Build builds the airflow project -func Build(name, tag string) { - image := imageName(name, tag) - fmt.Printf("Building %s...\n", image) - docker.Exec("build", "-t", image, ".") -} - -// Deploy pushes a new docker image -// TODO: Check for uncommitted git changes -// TODO: Command to bump version or create version automatically -func Deploy(name, tag string) { - image := imageName(name, tag) - fmt.Printf("Pushing %s...\n", image) - remoteImage := fmt.Sprintf("%s/%s", docker.CloudRegistry, image) - docker.Exec("tag", image, remoteImage) - docker.Exec("push", remoteImage) -} diff --git a/airflow/docker.go b/airflow/docker.go index 64f174a44..c5ec738b4 100644 --- a/airflow/docker.go +++ b/airflow/docker.go @@ -6,10 +6,10 @@ import ( "fmt" "html/template" "os" - "path/filepath" "github.com/astronomerio/astro-cli/config" - "github.com/docker/libcompose/docker" + docker "github.com/astronomerio/astro-cli/docker" + dockercompose "github.com/docker/libcompose/docker" "github.com/docker/libcompose/docker/ctx" "github.com/docker/libcompose/project" "github.com/docker/libcompose/project/options" @@ -27,19 +27,65 @@ type ComposeConfig struct { AirflowWebserverPort string } +// imageName creates an airflow image name +func imageName(name, tag string) string { + return fmt.Sprintf("%s/%s:%s", name, "ap-airflow", tag) +} + +// imageBuild builds the airflow project +func imageBuild(path, imageName string) { + // Change to location of Dockerfile + os.Chdir(path) + + fmt.Printf("Building %s...\n", imageName) + docker.Exec("build", "-t", imageName, ".") +} + +// generateConfig generates the docker-compose config +func generateConfig(projectName, airflowHome string) string { + tmpl, err := template.New("yml").Parse(composeyml) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + config := ComposeConfig{ + PostgresUser: config.GetString(config.CFGPostgresUser), + PostgresPassword: config.GetString(config.CFGPostgresPassword), + PostgresHost: config.GetString(config.CFGPostgresHost), + PostgresPort: config.GetString(config.CFGPostgresPort), + AirflowImage: imageName(projectName, "latest"), + AirflowHome: airflowHome, + AirflowUser: "astro", + AirflowWebserverPort: "8080", + } + + buff := new(bytes.Buffer) + err = tmpl.Execute(buff, config) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + return buff.String() +} + // Start starts a local airflow development cluster func Start(path string) error { - // Infer the project name using directory - proj := projectName(path) + // Get project name from config + projectName := config.GetString(config.CFGProjectName) + + // Build this project image + imageBuild(path, imageName(projectName, "latest")) // Generate the docker-compose yaml - yaml := generateConfig(proj) + yaml := generateConfig(projectName, path) // Create the project - project, err := docker.NewProject(&ctx.Context{ + project, err := dockercompose.NewProject(&ctx.Context{ Context: project.Context{ ComposeBytes: [][]byte{[]byte(yaml)}, - ProjectName: proj, + ProjectName: projectName, }, }, nil) @@ -59,17 +105,17 @@ func Start(path string) error { // Stop stops a local airflow development cluster func Stop(path string) error { - // Infer the project name using directory - proj := projectName(path) + // Get project name from config + projectName := config.GetString(config.CFGProjectName) // Generate the docker-compose yaml - yaml := generateConfig(proj) + yaml := generateConfig(projectName, path) // Create the project - project, err := docker.NewProject(&ctx.Context{ + project, err := dockercompose.NewProject(&ctx.Context{ Context: project.Context{ ComposeBytes: [][]byte{[]byte(yaml)}, - ProjectName: proj, + ProjectName: projectName, }, }, nil) @@ -87,36 +133,14 @@ func Stop(path string) error { return nil } -// generateConfig generates the docker-compose config -func generateConfig(projectName string) string { - tmpl, err := template.New("yml").Parse(composeyml) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - config := ComposeConfig{ - PostgresUser: config.GetString(config.CFGPostgresUser), - PostgresPassword: config.GetString(config.CFGPostgresPassword), - PostgresHost: config.GetString(config.CFGPostgresHost), - PostgresPort: config.GetString(config.CFGPostgresPort), - AirflowImage: fmt.Sprintf("%s/airflow", projectName), - AirflowHome: "/home/schnie/repos/open-example-dags", - AirflowUser: "astro", - AirflowWebserverPort: "8080", - } - - buff := new(bytes.Buffer) - err = tmpl.Execute(buff, config) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - return buff.String() -} - -// projectName converts a path to project name -func projectName(path string) string { - return filepath.Base(path) +// Deploy pushes a new docker image +// TODO: Check for uncommitted git changes +// TODO: Command to bump version or create version automatically +func Deploy(path, name, tag string) { + imageName := imageName(name, tag) + imageBuild(path, imageName) + fmt.Printf("Pushing %s...\n", imageName) + remoteImage := fmt.Sprintf("%s/%s", docker.CloudRegistry, imageName) + docker.Exec("tag", imageName, remoteImage) + docker.Exec("push", remoteImage) } diff --git a/airflow/files.go b/airflow/files.go index 06e1b8a5f..c0eeac167 100644 --- a/airflow/files.go +++ b/airflow/files.go @@ -1,15 +1,20 @@ package airflow +import "strings" + // dockerfile is the Dockerfile template -const dockerfile = `FROM astronomerinc/ap-airflow:latest-onbuild` +var dockerfile = strings.TrimSpace(` +FROM astronomerinc/ap-airflow:latest-onbuild +`) // dockerignore is the .dockerignore template -const dockerignore = `.astro +var dockerignore = strings.TrimSpace(` +.astro .git -` +`) // composeyml is the docker-compose template -const composeyml = ` +var composeyml = strings.TrimSpace(` version: '2' volumes: @@ -32,8 +37,6 @@ services: scheduler: image: {{ .AirflowImage }} - build: - context: . command: ["airflow", "scheduler"] restart: unless-stopped user: {{ .AirflowUser }} @@ -47,12 +50,12 @@ services: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://{{ .PostgresUser }}:{{ .PostgresPassword }}@{{ .PostgresHost }}:{{ .PostgresPort }} volumes: - - {{ .AirflowHome }}:/usr/local/airflow + - {{ .AirflowHome }}/dags:/usr/local/airflow/dags:ro + - {{ .AirflowHome }}/plugins:/usr/local/airflow/plugins:ro + - {{ .AirflowHome }}/include:/usr/local/airflow/include:ro webserver: image: {{ .AirflowImage }} - build: - context: . command: ["airflow", "webserver"] restart: unless-stopped user: {{ .AirflowUser }} @@ -69,5 +72,7 @@ services: ports: - {{ .AirflowWebserverPort }}:{{ .AirflowWebserverPort }} volumes: - - {{ .AirflowHome }}:/usr/local/airflow -` + - {{ .AirflowHome }}/dags:/usr/local/airflow/dags:ro + - {{ .AirflowHome }}/plugins:/usr/local/airflow/plugins:ro + - {{ .AirflowHome }}/include:/usr/local/airflow/include:ro +`) diff --git a/cmd/airflow.go b/cmd/airflow.go index 8ae60fe7d..f3025a47e 100644 --- a/cmd/airflow.go +++ b/cmd/airflow.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "os" "path/filepath" "regexp" "strings" @@ -90,6 +91,16 @@ func init() { airflowRootCmd.AddCommand(airflowStopCmd) } +// projectRoot returns the project root +func projectRoot() string { + path, err := config.ProjectRoot() + if err != nil { + fmt.Printf("Error searching for project dir: %v\n", err) + os.Exit(1) + } + return path +} + // TODO: allow specify directory and/or project name (store in .astro/config) // Use project name for image name func airflowInit(cmd *cobra.Command, args []string) { @@ -134,28 +145,24 @@ func airflowDeploy(cmd *cobra.Command, args []string) { deploymentName := args[0] deploymentTag := args[1] - airflow.Build(deploymentName, deploymentTag) - airflow.Deploy(deploymentName, deploymentTag) + airflow.Deploy(projectRoot(), deploymentName, deploymentTag) } +// Get airflow status func airflowStatus(cmd *cobra.Command, args []string) { } +// Start airflow func airflowStart(cmd *cobra.Command, args []string) { - // Grab working directory - path := utils.GetWorkingDir() - - err := airflow.Start(path) + err := airflow.Start(projectRoot()) if err != nil { fmt.Println(err) } } +// Stop airflow func airflowStop(cmd *cobra.Command, args []string) { - // Grab working directory - path := utils.GetWorkingDir() - - err := airflow.Stop(path) + err := airflow.Stop(projectRoot()) if err != nil { fmt.Println(err) } diff --git a/config/config.go b/config/config.go index 35243186e..47fc5bfc7 100644 --- a/config/config.go +++ b/config/config.go @@ -89,7 +89,7 @@ func initProject() { configPath, searchErr := utils.FindDirInPath(ConfigDir) if searchErr != nil { - fmt.Printf("Error initializing config: %v\n", searchErr) + fmt.Printf("Error searching for project dir: %v\n", searchErr) return } @@ -170,9 +170,18 @@ func ProjectConfigExists() bool { return configExists(viperProject) } +// ProjectRoot returns the path to the nearest project root +func ProjectRoot() (string, error) { + configPath, searchErr := utils.FindDirInPath(ConfigDir) + if searchErr != nil { + return "", searchErr + } + return filepath.Dir(configPath), nil +} + // GetString will return the requested config, check working dir and fallback to home func GetString(config string) string { - if configExists(viperProject) { + if configExists(viperProject) && viperProject.IsSet(config) { return viperProject.GetString(config) } return viperHome.GetString(config)