Skip to content

Commit

Permalink
add astro airflow pause
Browse files Browse the repository at this point in the history
  • Loading branch information
andscoop committed Feb 21, 2018
1 parent f96dc12 commit ee95d9b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 16 deletions.
66 changes: 53 additions & 13 deletions airflow/docker.go
Expand Up @@ -85,8 +85,8 @@ func generateConfig(projectName, airflowHome string) string {
return buff.String()
}

// createProjectFromContext creates project with yaml config as context
func createProjectFromContext(projectName, airflowHome string) (project.APIProject, error) {
// createProject creates project with yaml config as context
func createProject(projectName, airflowHome string) (project.APIProject, error) {
// Generate the docker-compose yaml
yaml := generateConfig(projectName, airflowHome)

Expand All @@ -107,30 +107,50 @@ func Start(airflowHome string) error {
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProjectFromContext(projectName, airflowHome)
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}

// Build this project image
imageBuild(airflowHome, imageName(projectName, "latest"))

// Start up our project
err = project.Up(context.Background(), options.Up{})
// Fetch project containers
psInfo, err := project.Ps(context.Background())
if err != nil {
return errors.Wrap(err, "Error building, (re)creating or starting project containers")
return errors.Wrap(err, "Error checking docker-compose status")
}

if len(psInfo) > 0 {
// Ensure project is not alreasdy running
for _, info := range psInfo {
if info["State"][0:2] == "Up" {
return errors.New("Project is already running, cannot start")
}
}

err = project.Start(context.Background())
if err != nil {
return errors.Wrap(err, "Error building, (re)creating or starting project containers")
}
} else {
// Build this project image
imageBuild(airflowHome, imageName(projectName, "latest"))

// Start up our project
err = project.Up(context.Background(), options.Up{})
if err != nil {
return errors.Wrap(err, "Error building, (re)creating or starting project containers")
}
}

return nil
}

// Stop stops a local airflow development cluster
func Stop(airflowHome string) error {
// Kill stops a local airflow development cluster
func Kill(airflowHome string) error {
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProjectFromContext(projectName, airflowHome)
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}
Expand All @@ -144,13 +164,33 @@ func Stop(airflowHome string) error {
return nil
}

// Stop a running docker project
func Stop(airflowHome string) error {
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}

// Pause our project
err = project.Stop(context.Background(), 30)
if err != nil {
return errors.Wrap(err, "Error pausing project containers")
}

return nil
}

// PS prints the running airflow containers
func PS(airflowHome string) error {
// Get project name from config
projectName := config.GetString(config.CFGProjectName)

// Create a libcompose project
project, err := createProjectFromContext(projectName, airflowHome)
project, err := createProject(projectName, airflowHome)
if err != nil {
return errors.Wrap(err, "Error creating docker-compose project")
}
Expand Down
21 changes: 18 additions & 3 deletions cmd/airflow.go
Expand Up @@ -53,6 +53,13 @@ var (
RunE: checkForProject(airflowStart),
}

airflowKillCmd = &cobra.Command{
Use: "kill",
Short: "Kill a development airflow cluster",
Long: "Kill a development airflow cluster",
RunE: checkForProject(airflowKill),
}

airflowStopCmd = &cobra.Command{
Use: "stop",
Short: "Stop a development airflow cluster",
Expand Down Expand Up @@ -88,6 +95,9 @@ func init() {
// Airflow start
airflowRootCmd.AddCommand(airflowStartCmd)

// Airflow kill
airflowRootCmd.AddCommand(airflowKillCmd)

// Airflow stop
airflowRootCmd.AddCommand(airflowStopCmd)

Expand Down Expand Up @@ -149,17 +159,22 @@ func airflowDeploy(cmd *cobra.Command, args []string) error {
return airflow.Deploy(projectRoot, args[0])
}

// Start airflow
// Start an airflow cluster
func airflowStart(cmd *cobra.Command, args []string) error {
return airflow.Start(projectRoot)
}

// Stop airflow
// Kill an airflow cluster
func airflowKill(cmd *cobra.Command, args []string) error {
return airflow.Kill(projectRoot)
}

// Stop an airflow cluster
func airflowStop(cmd *cobra.Command, args []string) error {
return airflow.Stop(projectRoot)
}

// Airflow PS
// List containers of an airflow cluster
func airflowPS(cmd *cobra.Command, args []string) error {
return airflow.PS(projectRoot)
}

0 comments on commit ee95d9b

Please sign in to comment.