diff --git a/airflow/docker.go b/airflow/docker.go index f1596bdaa..1154ecb26 100644 --- a/airflow/docker.go +++ b/airflow/docker.go @@ -445,7 +445,7 @@ func getWebServerContainerId(airflowHome string) (string, error) { // Run creates using docker exec // inspired from https://github.com/docker/cli/tree/master/cli/command/container -func Run(airflowHome string, args []string) error { +func Run(airflowHome string, args []string, user string) error { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { @@ -454,9 +454,12 @@ func Run(airflowHome string, args []string) error { execConfig := &types.ExecConfig{ AttachStdout: true, - Tty: true, Cmd: args, } + if user != "" { + execConfig.User = user + } + fmt.Printf("Running: %s\n", strings.Join(args, " ")) containerID, err := getWebServerContainerId(airflowHome) if err != nil { @@ -476,18 +479,11 @@ func Run(airflowHome string, args []string) error { execStartCheck := types.ExecStartCheck{ Detach: execConfig.Detach, - Tty: execConfig.Tty, } resp, _ := cli.ContainerExecAttach(context.Background(), execID, execStartCheck) - // Read stdout response from container - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(resp.Reader) - s := buf.String() - fmt.Println(s) - - return err + return docker.ExecPipe(resp, os.Stdin, os.Stdout, os.Stderr) } // Deploy pushes a new docker image diff --git a/cmd/airflow.go b/cmd/airflow.go index d59f837be..f986d16b1 100644 --- a/cmd/airflow.go +++ b/cmd/airflow.go @@ -76,6 +76,7 @@ func newDevRootCmd(client *houston.Client, out io.Writer) *cobra.Command { newAirflowStopCmd(client, out), newAirflowPSCmd(client, out), newAirflowRunCmd(client, out), + newAirflowUpgradeCheckCmd(client, out), ) return cmd } @@ -216,6 +217,23 @@ func newAirflowRunCmd(client *houston.Client, out io.Writer) *cobra.Command { return cmd } +func newAirflowUpgradeCheckCmd(client *houston.Client, out io.Writer) *cobra.Command { + cmd := &cobra.Command{ + Use: "upgrade-check", + Short: "List DAG and config-level changes required to upgrade to Airflow 2.0", + Long: "List DAG and config-level changes required to upgrade to Airflow 2.0", + // ignore PersistentPreRunE of root command + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + return nil + }, + PreRunE: ensureProjectDir, + RunE: airflowUpgradeCheck, + Example: RunExample, + DisableFlagParsing: true, + } + return cmd +} + func ensureProjectDir(cmd *cobra.Command, args []string) error { isProjectDir, err := config.IsProjectDir(config.WorkingPath) if err != nil { @@ -370,7 +388,18 @@ func airflowRun(cmd *cobra.Command, args []string) error { // Add airflow command, to simplify astro cli usage args = append([]string{"airflow"}, args...) - return airflow.Run(config.WorkingPath, args) + // ignore last user parameter + return airflow.Run(config.WorkingPath, args, "") +} + +// airflowUpgradeCheck +func airflowUpgradeCheck(cmd *cobra.Command, args []string) error { + // Silence Usage as we have now validated command input + cmd.SilenceUsage = true + + // Add airflow command, to simplify astro cli usage + args = append([]string{"bash", "-c", "pip install --no-deps 'apache-airflow-upgrade-check'; airflow upgrade_check"}) + return airflow.Run(config.WorkingPath, args, "root") } func acceptableVersion(a string, list []string) bool { diff --git a/cmd/airflow_test.go b/cmd/airflow_test.go index 152300fb4..1c02da001 100644 --- a/cmd/airflow_test.go +++ b/cmd/airflow_test.go @@ -76,3 +76,9 @@ func TestNewAirflowRunCmd(t *testing.T) { cmd := newAirflowRunCmd(client, os.Stdout) assert.Nil(t, cmd.PersistentPreRunE(new(cobra.Command), []string{})) } + +func TestNewAirflowUpgradeCheckCmd(t *testing.T) { + client := houston.NewHoustonClient(httputil.NewHTTPClient()) + cmd := newAirflowUpgradeCheckCmd(client, os.Stdout) + assert.Nil(t, cmd.PersistentPreRunE(new(cobra.Command), []string{})) +} diff --git a/docker/docker.go b/docker/docker.go index 54a79b26c..820ed8f14 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -4,9 +4,11 @@ import ( "context" "encoding/base64" "encoding/json" + "io" "os" "os/exec" + "github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/registry" "github.com/docker/docker/pkg/jsonmessage" @@ -139,3 +141,43 @@ func AirflowCommand(id string, airflowCommand string) string { stringOut := string(out) return stringOut } + +// ExecPipe does pipe stream into stdout/stdin and stderr +// so now we can pipe out during exec'ing any commands inside container +func ExecPipe(resp types.HijackedResponse, inStream io.Reader, outStream, errorStream io.Writer) error { + var err error + receiveStdout := make(chan error, 1) + if outStream != nil || errorStream != nil { + go func() { + // always do this because we are never tty + _, err = stdcopy.StdCopy(outStream, errorStream, resp.Reader) + receiveStdout <- err + }() + } + + stdinDone := make(chan struct{}) + go func() { + if inStream != nil { + io.Copy(resp.Conn, inStream) + } + + if err := resp.CloseWrite(); err != nil { + } + close(stdinDone) + }() + + select { + case err := <-receiveStdout: + if err != nil { + return err + } + case <-stdinDone: + if outStream != nil || errorStream != nil { + if err := <-receiveStdout; err != nil { + return err + } + } + } + + return nil +} diff --git a/docker/docker_test.go b/docker/docker_test.go index fe089f55c..241e9915a 100644 --- a/docker/docker_test.go +++ b/docker/docker_test.go @@ -1,6 +1,13 @@ package docker -import "testing" +import ( + "bufio" + "bytes" + "fmt" + "github.com/docker/docker/api/types" + "strings" + "testing" +) func TestExecVersion(t *testing.T) { err := Exec("version") @@ -8,3 +15,23 @@ func TestExecVersion(t *testing.T) { t.Error(err) } } + +func TestExecPipe(t *testing.T) { + var buf bytes.Buffer + data := "" + resp := &types.HijackedResponse{Reader: bufio.NewReader(strings.NewReader(data))} + err := ExecPipe(*resp, &buf, &buf, &buf) + fmt.Println(buf.String()) + if err != nil { + t.Error(err) + } +} + +func TestExecPipeNils(t *testing.T) { + data := "" + resp := &types.HijackedResponse{Reader: bufio.NewReader(strings.NewReader(data))} + err := ExecPipe(*resp, nil, nil, nil) + if err != nil { + t.Error(err) + } +}