Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamingregory committed Feb 11, 2019
2 parents 42dd17c + 769879a commit 0d32b65
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 6 deletions.
48 changes: 43 additions & 5 deletions airflow/docker.go
Expand Up @@ -3,6 +3,7 @@ package airflow
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"html/template"
"os"
Expand Down Expand Up @@ -56,6 +57,24 @@ type ComposeConfig struct {
AirflowWebserverPort string
}

// projectNameUnique creates a reasonably unique project name based on the hashed
// path of the project. This prevents collisions of projects with identical dir names
// in different paths. ie (~/dev/project1 vs ~/prod/project1)
func projectNameUnique() (string, error) {
projectName := config.CFG.ProjectName.GetString()

pwd, err := fileutil.GetWorkingDir()

if err != nil {
return "", errors.Wrap(err, "error retrieving working directory")
}

b := md5.Sum([]byte(pwd))
s := fmt.Sprintf("%x", b[:])

return projectName + "_" + s[0:6], nil
}

// repositoryName creates an airflow repository name
func repositoryName(name string) string {
return fmt.Sprintf("%s/%s", name, componentName)
Expand Down Expand Up @@ -157,7 +176,12 @@ func createProject(projectName, airflowHome string, envFile string) (project.API
// Start starts a local airflow development cluster
func Start(airflowHome string, envFile string) error {
// Get project name from config
projectName := config.CFG.ProjectName.GetString()
projectName, err := projectNameUnique()
strippedProjectName := strings.Replace(projectName, "_", "", -1)

if err != nil {
return errors.Wrap(err, "error retrieving working directory")
}

// Create a libcompose project
project, err := createProject(projectName, airflowHome, envFile)
Expand Down Expand Up @@ -216,7 +240,8 @@ func Start(airflowHome string, envFile string) error {

if fileState {
for _, info := range psInfo {
if strings.Contains(info["Name"], fmt.Sprintf("%s_scheduler", projectName)) {
if strings.Contains(info["Name"], strippedProjectName) &&
strings.Contains(info["Name"], "webserver") {
settings.ConfigSettings(info["Id"])
}
}
Expand All @@ -233,7 +258,11 @@ func Start(airflowHome string, envFile string) error {
// Kill stops a local airflow development cluster
func Kill(airflowHome string) error {
// Get project name from config
projectName := config.CFG.ProjectName.GetString()
projectName, err := projectNameUnique()

if err != nil {
return errors.Wrap(err, "error retrieving working directory")
}

// Create a libcompose project
project, err := createProject(projectName, airflowHome, "")
Expand Down Expand Up @@ -290,7 +319,12 @@ func Logs(airflowHome string, webserver, scheduler, follow bool) error {
// Stop a running docker project
func Stop(airflowHome string) error {
// Get project name from config
projectName := config.CFG.ProjectName.GetString()
projectName, err := projectNameUnique()

if err != nil {
return errors.Wrap(err, "error retrieving working directory")
}

// Create a libcompose project
project, err := createProject(projectName, airflowHome, "")
if err != nil {
Expand All @@ -309,7 +343,11 @@ func Stop(airflowHome string) error {
// PS prints the running airflow containers
func PS(airflowHome string) error {
// Get project name from config
projectName := config.CFG.ProjectName.GetString()
projectName, err := projectNameUnique()

if err != nil {
return errors.Wrap(err, "error retrieving working directory")
}

// Create a libcompose project
project, err := createProject(projectName, airflowHome, "")
Expand Down
1 change: 0 additions & 1 deletion docker/docker.go
Expand Up @@ -64,6 +64,5 @@ func AirflowCommand(id string, airflowCommand string) string {
}

stringOut := string(out)

return stringOut
}

0 comments on commit 0d32b65

Please sign in to comment.