Skip to content

Commit

Permalink
Support for airflow upgrade_check in Astro CLI #2274 (#392)
Browse files Browse the repository at this point in the history
* Support for `airflow upgrade_check` in Astro CLI #2274

* replace upgrade_check to upgrade-check

* add install package when user run astro dev upgrade-check

* enable printing logs during command is running

* add docs

* change wording

* add small test

* add more tests

* Update docker_test.go

* Update cmd/airflow.go

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
  • Loading branch information
2 people authored and Adam Vandover committed Dec 9, 2020
1 parent a775e4a commit db885e9
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 12 deletions.
16 changes: 6 additions & 10 deletions airflow/docker.go
Expand Up @@ -445,7 +445,7 @@ func getWebServerContainerId(airflowHome string) (string, error) {

// Run creates using docker exec
// inspired from https://github.com/docker/cli/tree/master/cli/command/container
func Run(airflowHome string, args []string) error {
func Run(airflowHome string, args []string, user string) error {
cli, err := client.NewClientWithOpts(client.FromEnv)

if err != nil {
Expand All @@ -454,9 +454,12 @@ func Run(airflowHome string, args []string) error {

execConfig := &types.ExecConfig{
AttachStdout: true,
Tty: true,
Cmd: args,
}
if user != "" {
execConfig.User = user
}

fmt.Printf("Running: %s\n", strings.Join(args, " "))
containerID, err := getWebServerContainerId(airflowHome)
if err != nil {
Expand All @@ -476,18 +479,11 @@ func Run(airflowHome string, args []string) error {

execStartCheck := types.ExecStartCheck{
Detach: execConfig.Detach,
Tty: execConfig.Tty,
}

resp, _ := cli.ContainerExecAttach(context.Background(), execID, execStartCheck)

// Read stdout response from container
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Reader)
s := buf.String()
fmt.Println(s)

return err
return docker.ExecPipe(resp, os.Stdin, os.Stdout, os.Stderr)
}

// Deploy pushes a new docker image
Expand Down
31 changes: 30 additions & 1 deletion cmd/airflow.go
Expand Up @@ -76,6 +76,7 @@ func newDevRootCmd(client *houston.Client, out io.Writer) *cobra.Command {
newAirflowStopCmd(client, out),
newAirflowPSCmd(client, out),
newAirflowRunCmd(client, out),
newAirflowUpgradeCheckCmd(client, out),
)
return cmd
}
Expand Down Expand Up @@ -216,6 +217,23 @@ func newAirflowRunCmd(client *houston.Client, out io.Writer) *cobra.Command {
return cmd
}

func newAirflowUpgradeCheckCmd(client *houston.Client, out io.Writer) *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade-check",
Short: "List DAG and config-level changes required to upgrade to Airflow 2.0",
Long: "List DAG and config-level changes required to upgrade to Airflow 2.0",
// ignore PersistentPreRunE of root command
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return nil
},
PreRunE: ensureProjectDir,
RunE: airflowUpgradeCheck,
Example: RunExample,
DisableFlagParsing: true,
}
return cmd
}

func ensureProjectDir(cmd *cobra.Command, args []string) error {
isProjectDir, err := config.IsProjectDir(config.WorkingPath)
if err != nil {
Expand Down Expand Up @@ -370,7 +388,18 @@ func airflowRun(cmd *cobra.Command, args []string) error {

// Add airflow command, to simplify astro cli usage
args = append([]string{"airflow"}, args...)
return airflow.Run(config.WorkingPath, args)
// ignore last user parameter
return airflow.Run(config.WorkingPath, args, "")
}

// airflowUpgradeCheck
func airflowUpgradeCheck(cmd *cobra.Command, args []string) error {
// Silence Usage as we have now validated command input
cmd.SilenceUsage = true

// Add airflow command, to simplify astro cli usage
args = append([]string{"bash", "-c", "pip install --no-deps 'apache-airflow-upgrade-check'; airflow upgrade_check"})
return airflow.Run(config.WorkingPath, args, "root")
}

func acceptableVersion(a string, list []string) bool {
Expand Down
6 changes: 6 additions & 0 deletions cmd/airflow_test.go
Expand Up @@ -76,3 +76,9 @@ func TestNewAirflowRunCmd(t *testing.T) {
cmd := newAirflowRunCmd(client, os.Stdout)
assert.Nil(t, cmd.PersistentPreRunE(new(cobra.Command), []string{}))
}

func TestNewAirflowUpgradeCheckCmd(t *testing.T) {
client := houston.NewHoustonClient(httputil.NewHTTPClient())
cmd := newAirflowUpgradeCheckCmd(client, os.Stdout)
assert.Nil(t, cmd.PersistentPreRunE(new(cobra.Command), []string{}))
}
42 changes: 42 additions & 0 deletions docker/docker.go
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"encoding/base64"
"encoding/json"
"io"
"os"
"os/exec"

"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/registry"

"github.com/docker/docker/pkg/jsonmessage"
Expand Down Expand Up @@ -139,3 +141,43 @@ func AirflowCommand(id string, airflowCommand string) string {
stringOut := string(out)
return stringOut
}

// ExecPipe does pipe stream into stdout/stdin and stderr
// so now we can pipe out during exec'ing any commands inside container
func ExecPipe(resp types.HijackedResponse, inStream io.Reader, outStream, errorStream io.Writer) error {
var err error
receiveStdout := make(chan error, 1)
if outStream != nil || errorStream != nil {
go func() {
// always do this because we are never tty
_, err = stdcopy.StdCopy(outStream, errorStream, resp.Reader)
receiveStdout <- err
}()
}

stdinDone := make(chan struct{})
go func() {
if inStream != nil {
io.Copy(resp.Conn, inStream)
}

if err := resp.CloseWrite(); err != nil {
}
close(stdinDone)
}()

select {
case err := <-receiveStdout:
if err != nil {
return err
}
case <-stdinDone:
if outStream != nil || errorStream != nil {
if err := <-receiveStdout; err != nil {
return err
}
}
}

return nil
}
29 changes: 28 additions & 1 deletion docker/docker_test.go
@@ -1,10 +1,37 @@
package docker

import "testing"
import (
"bufio"
"bytes"
"fmt"
"github.com/docker/docker/api/types"
"strings"
"testing"
)

func TestExecVersion(t *testing.T) {
err := Exec("version")
if err != nil {
t.Error(err)
}
}

func TestExecPipe(t *testing.T) {
var buf bytes.Buffer
data := ""
resp := &types.HijackedResponse{Reader: bufio.NewReader(strings.NewReader(data))}
err := ExecPipe(*resp, &buf, &buf, &buf)
fmt.Println(buf.String())
if err != nil {
t.Error(err)
}
}

func TestExecPipeNils(t *testing.T) {
data := ""
resp := &types.HijackedResponse{Reader: bufio.NewReader(strings.NewReader(data))}
err := ExecPipe(*resp, nil, nil, nil)
if err != nil {
t.Error(err)
}
}

0 comments on commit db885e9

Please sign in to comment.