Skip to content

Commit

Permalink
add astro airflow pause
Browse files Browse the repository at this point in the history
  • Loading branch information
andscoop committed Feb 21, 2018
1 parent f96dc12 commit 69bdca6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 18 deletions.
82 changes: 67 additions & 15 deletions airflow/docker.go
Expand Up @@ -21,8 +21,10 @@ import (
)

const (
componentName = "airflow"
deployTagPrefix = "cli-"
componentName = "airflow"
deployTagPrefix = "cli-"
dockerStateUp = "Up"
dockerStateExited = "Exited"
)

// ComposeConfig is input data to docker compose yaml template
Expand Down Expand Up @@ -85,8 +87,18 @@ func generateConfig(projectName, airflowHome string) string {
return buff.String()
}

// createProjectFromContext creates project with yaml config as context
func createProjectFromContext(projectName, airflowHome string) (project.APIProject, error) {
func checkServiceState(serviceState, expectedState string) bool {
scrubbedState := strings.Split(serviceState, " ")[0]

if scrubbedState == expectedState {
return true
}

return false
}

// createProject creates project with yaml config as context
func createProject(projectName, airflowHome string) (project.APIProject, error) {
// Generate the docker-compose yaml
yaml := generateConfig(projectName, airflowHome)

Expand All @@ -107,30 +119,50 @@ func Start(airflowHome string) error {
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProjectFromContext(projectName, airflowHome)
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}

// Build this project image
imageBuild(airflowHome, imageName(projectName, "latest"))

// Start up our project
err = project.Up(context.Background(), options.Up{})
// Fetch project containers
psInfo, err := project.Ps(context.Background())
if err != nil {
return errors.Wrap(err, "Error building, (re)creating or starting project containers")
return errors.Wrap(err, "Error checking docker-compose status")
}

if len(psInfo) > 0 {
// Ensure project is not already running
for _, info := range psInfo {
if checkServiceState(info["State"], dockerStateUp) {
return errors.New("Project is already running, cannot start")
}
}

err = project.Start(context.Background())
if err != nil {
return errors.Wrap(err, "Error building, (re)creating or starting project containers")
}
} else {
// Build this project image
imageBuild(airflowHome, imageName(projectName, "latest"))

// Start up our project
err = project.Up(context.Background(), options.Up{})
if err != nil {
return errors.Wrap(err, "Error building, (re)creating or starting project containers")
}
}

return nil
}

// Stop stops a local airflow development cluster
func Stop(airflowHome string) error {
// Kill stops a local airflow development cluster
func Kill(airflowHome string) error {
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProjectFromContext(projectName, airflowHome)
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}
Expand All @@ -144,13 +176,33 @@ func Stop(airflowHome string) error {
return nil
}

// Stop a running docker project
func Stop(airflowHome string) error {
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}

// Pause our project
err = project.Stop(context.Background(), 30)
if err != nil {
return errors.Wrap(err, "Error pausing project containers")
}

return nil
}

// PS prints the running airflow containers
func PS(airflowHome string) error {
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProjectFromContext(projectName, airflowHome)
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}
Expand Down
21 changes: 18 additions & 3 deletions cmd/airflow.go
Expand Up @@ -53,6 +53,13 @@ var (
RunE: checkForProject(airflowStart),
}

airflowKillCmd = &cobra.Command{
Use: "kill",
Short: "Kill a development airflow cluster",
Long: "Kill a development airflow cluster",
RunE: checkForProject(airflowKill),
}

airflowStopCmd = &cobra.Command{
Use: "stop",
Short: "Stop a development airflow cluster",
Expand Down Expand Up @@ -88,6 +95,9 @@ func init() {
// Airflow start
airflowRootCmd.AddCommand(airflowStartCmd)

// Airflow kill
airflowRootCmd.AddCommand(airflowKillCmd)

// Airflow stop
airflowRootCmd.AddCommand(airflowStopCmd)

Expand Down Expand Up @@ -149,17 +159,22 @@ func airflowDeploy(cmd *cobra.Command, args []string) error {
return airflow.Deploy(projectRoot, args[0])
}

// Start airflow
// Start an airflow cluster
func airflowStart(cmd *cobra.Command, args []string) error {
return airflow.Start(projectRoot)
}

// Stop airflow
// Kill an airflow cluster
func airflowKill(cmd *cobra.Command, args []string) error {
return airflow.Kill(projectRoot)
}

// Stop an airflow cluster
func airflowStop(cmd *cobra.Command, args []string) error {
return airflow.Stop(projectRoot)
}

// Airflow PS
// List containers of an airflow cluster
func airflowPS(cmd *cobra.Command, args []string) error {
return airflow.PS(projectRoot)
}

0 comments on commit 69bdca6

Please sign in to comment.