Skip to content

Commit

Permalink
Allow cluster start and stop from nested directory
Browse files Browse the repository at this point in the history
  • Loading branch information
schnie committed Feb 13, 2018
1 parent 50bbf79 commit dfe0d2b
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 91 deletions.
25 changes: 1 addition & 24 deletions airflow/airflow.go
Expand Up @@ -5,14 +5,9 @@ import (
"os"
"path/filepath"

"github.com/astronomerio/astro-cli/docker"
"github.com/astronomerio/astro-cli/utils"
)

func imageName(name, tag string) string {
return fmt.Sprintf("%s/%s:%s", name, "airflow", tag)
}

func initDirs(root string, dirs []string) bool {
// Any inputs exist
exists := false
Expand Down Expand Up @@ -64,7 +59,7 @@ func initFiles(root string, files map[string]string) bool {
// Init will scaffold out a new airflow project
func Init(path string) {
// List of directories to create
dirs := []string{"dags", "plugins"}
dirs := []string{"dags", "plugins", "include"}

// Map of files to create
files := map[string]string{
Expand All @@ -84,21 +79,3 @@ func Init(path string) {
// Create new airflow deployment
func Create() {
}

// Build builds the airflow project
func Build(name, tag string) {
image := imageName(name, tag)
fmt.Printf("Building %s...\n", image)
docker.Exec("build", "-t", image, ".")
}

// Deploy pushes a new docker image
// TODO: Check for uncommitted git changes
// TODO: Command to bump version or create version automatically
func Deploy(name, tag string) {
image := imageName(name, tag)
fmt.Printf("Pushing %s...\n", image)
remoteImage := fmt.Sprintf("%s/%s", docker.CloudRegistry, image)
docker.Exec("tag", image, remoteImage)
docker.Exec("push", remoteImage)
}
112 changes: 68 additions & 44 deletions airflow/docker.go
Expand Up @@ -6,10 +6,10 @@ import (
"fmt"
"html/template"
"os"
"path/filepath"

"github.com/astronomerio/astro-cli/config"
"github.com/docker/libcompose/docker"
docker "github.com/astronomerio/astro-cli/docker"
dockercompose "github.com/docker/libcompose/docker"
"github.com/docker/libcompose/docker/ctx"
"github.com/docker/libcompose/project"
"github.com/docker/libcompose/project/options"
Expand All @@ -27,19 +27,65 @@ type ComposeConfig struct {
AirflowWebserverPort string
}

// imageName creates an airflow image name
func imageName(name, tag string) string {
return fmt.Sprintf("%s/%s:%s", name, "ap-airflow", tag)
}

// imageBuild builds the airflow project
func imageBuild(path, imageName string) {
// Change to location of Dockerfile
os.Chdir(path)

fmt.Printf("Building %s...\n", imageName)
docker.Exec("build", "-t", imageName, ".")
}

// generateConfig generates the docker-compose config
func generateConfig(projectName, airflowHome string) string {
tmpl, err := template.New("yml").Parse(composeyml)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

config := ComposeConfig{
PostgresUser: config.GetString(config.CFGPostgresUser),
PostgresPassword: config.GetString(config.CFGPostgresPassword),
PostgresHost: config.GetString(config.CFGPostgresHost),
PostgresPort: config.GetString(config.CFGPostgresPort),
AirflowImage: imageName(projectName, "latest"),
AirflowHome: airflowHome,
AirflowUser: "astro",
AirflowWebserverPort: "8080",
}

buff := new(bytes.Buffer)
err = tmpl.Execute(buff, config)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

return buff.String()
}

// Start starts a local airflow development cluster
func Start(path string) error {
// Infer the project name using directory
proj := projectName(path)
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Build this project image
imageBuild(path, imageName(projectName, "latest"))

// Generate the docker-compose yaml
yaml := generateConfig(proj)
yaml := generateConfig(projectName, path)

// Create the project
project, err := docker.NewProject(&ctx.Context{
project, err := dockercompose.NewProject(&ctx.Context{
Context: project.Context{
ComposeBytes: [][]byte{[]byte(yaml)},
ProjectName: proj,
ProjectName: projectName,
},
}, nil)

Expand All @@ -59,17 +105,17 @@ func Start(path string) error {

// Stop stops a local airflow development cluster
func Stop(path string) error {
// Infer the project name using directory
proj := projectName(path)
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Generate the docker-compose yaml
yaml := generateConfig(proj)
yaml := generateConfig(projectName, path)

// Create the project
project, err := docker.NewProject(&ctx.Context{
project, err := dockercompose.NewProject(&ctx.Context{
Context: project.Context{
ComposeBytes: [][]byte{[]byte(yaml)},
ProjectName: proj,
ProjectName: projectName,
},
}, nil)

Expand All @@ -87,36 +133,14 @@ func Stop(path string) error {
return nil
}

// generateConfig generates the docker-compose config
func generateConfig(projectName string) string {
tmpl, err := template.New("yml").Parse(composeyml)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

config := ComposeConfig{
PostgresUser: config.GetString(config.CFGPostgresUser),
PostgresPassword: config.GetString(config.CFGPostgresPassword),
PostgresHost: config.GetString(config.CFGPostgresHost),
PostgresPort: config.GetString(config.CFGPostgresPort),
AirflowImage: fmt.Sprintf("%s/airflow", projectName),
AirflowHome: "/home/schnie/repos/open-example-dags",
AirflowUser: "astro",
AirflowWebserverPort: "8080",
}

buff := new(bytes.Buffer)
err = tmpl.Execute(buff, config)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

return buff.String()
}

// projectName converts a path to project name
func projectName(path string) string {
return filepath.Base(path)
// Deploy pushes a new docker image
// TODO: Check for uncommitted git changes
// TODO: Command to bump version or create version automatically
func Deploy(path, name, tag string) {
imageName := imageName(name, tag)
imageBuild(path, imageName)
fmt.Printf("Pushing %s...\n", imageName)
remoteImage := fmt.Sprintf("%s/%s", docker.CloudRegistry, imageName)
docker.Exec("tag", imageName, remoteImage)
docker.Exec("push", remoteImage)
}
27 changes: 16 additions & 11 deletions airflow/files.go
@@ -1,15 +1,20 @@
package airflow

import "strings"

// dockerfile is the Dockerfile template
const dockerfile = `FROM astronomerinc/ap-airflow:latest-onbuild`
var dockerfile = strings.TrimSpace(`
FROM astronomerinc/ap-airflow:latest-onbuild
`)

// dockerignore is the .dockerignore template
const dockerignore = `.astro
var dockerignore = strings.TrimSpace(`
.astro
.git
`
`)

// composeyml is the docker-compose template
const composeyml = `
var composeyml = strings.TrimSpace(`
version: '2'
volumes:
Expand All @@ -32,8 +37,6 @@ services:
scheduler:
image: {{ .AirflowImage }}
build:
context: .
command: ["airflow", "scheduler"]
restart: unless-stopped
user: {{ .AirflowUser }}
Expand All @@ -47,12 +50,12 @@ services:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://{{ .PostgresUser }}:{{ .PostgresPassword }}@{{ .PostgresHost }}:{{ .PostgresPort }}
volumes:
- {{ .AirflowHome }}:/usr/local/airflow
- {{ .AirflowHome }}/dags:/usr/local/airflow/dags:ro
- {{ .AirflowHome }}/plugins:/usr/local/airflow/plugins:ro
- {{ .AirflowHome }}/include:/usr/local/airflow/include:ro
webserver:
image: {{ .AirflowImage }}
build:
context: .
command: ["airflow", "webserver"]
restart: unless-stopped
user: {{ .AirflowUser }}
Expand All @@ -69,5 +72,7 @@ services:
ports:
- {{ .AirflowWebserverPort }}:{{ .AirflowWebserverPort }}
volumes:
- {{ .AirflowHome }}:/usr/local/airflow
`
- {{ .AirflowHome }}/dags:/usr/local/airflow/dags:ro
- {{ .AirflowHome }}/plugins:/usr/local/airflow/plugins:ro
- {{ .AirflowHome }}/include:/usr/local/airflow/include:ro
`)
27 changes: 17 additions & 10 deletions cmd/airflow.go
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
Expand Down Expand Up @@ -90,6 +91,16 @@ func init() {
airflowRootCmd.AddCommand(airflowStopCmd)
}

// projectRoot returns the project root
func projectRoot() string {
path, err := config.ProjectRoot()
if err != nil {
fmt.Printf("Error searching for project dir: %v\n", err)
os.Exit(1)
}
return path
}

// TODO: allow specify directory and/or project name (store in .astro/config)
// Use project name for image name
func airflowInit(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -134,28 +145,24 @@ func airflowDeploy(cmd *cobra.Command, args []string) {
deploymentName := args[0]
deploymentTag := args[1]

airflow.Build(deploymentName, deploymentTag)
airflow.Deploy(deploymentName, deploymentTag)
airflow.Deploy(projectRoot(), deploymentName, deploymentTag)
}

// Get airflow status
func airflowStatus(cmd *cobra.Command, args []string) {
}

// Start airflow
func airflowStart(cmd *cobra.Command, args []string) {
// Grab working directory
path := utils.GetWorkingDir()

err := airflow.Start(path)
err := airflow.Start(projectRoot())
if err != nil {
fmt.Println(err)
}
}

// Stop airflow
func airflowStop(cmd *cobra.Command, args []string) {
// Grab working directory
path := utils.GetWorkingDir()

err := airflow.Stop(path)
err := airflow.Stop(projectRoot())
if err != nil {
fmt.Println(err)
}
Expand Down
13 changes: 11 additions & 2 deletions config/config.go
Expand Up @@ -89,7 +89,7 @@ func initProject() {

configPath, searchErr := utils.FindDirInPath(ConfigDir)
if searchErr != nil {
fmt.Printf("Error initializing config: %v\n", searchErr)
fmt.Printf("Error searching for project dir: %v\n", searchErr)
return
}

Expand Down Expand Up @@ -170,9 +170,18 @@ func ProjectConfigExists() bool {
return configExists(viperProject)
}

// ProjectRoot returns the path to the nearest project root
func ProjectRoot() (string, error) {
configPath, searchErr := utils.FindDirInPath(ConfigDir)
if searchErr != nil {
return "", searchErr
}
return filepath.Dir(configPath), nil
}

// GetString will return the requested config, check working dir and fallback to home
func GetString(config string) string {
if configExists(viperProject) {
if configExists(viperProject) && viperProject.IsSet(config) {
return viperProject.GetString(config)
}
return viperHome.GetString(config)
Expand Down

0 comments on commit dfe0d2b

Please sign in to comment.