Skip to content

Commit

Permalink
Merge branch 'main' into fix/triggerer-logs (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
neel-astro committed Feb 7, 2022
1 parent d25d541 commit b5810c8
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 24 deletions.
2 changes: 1 addition & 1 deletion airflow/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
airflowVersionLabelName = "io.astronomer.docker.airflow.version"
triggererAllowedAirflowVersion = "2.2.0"

webserverHealthCheckInterval = 5 * time.Second
webserverHealthCheckInterval = 10 * time.Second
)

var repoNameSanitizeRegexp = regexp.MustCompile(`^[^a-z0-9]*`) // must not start with anything except lowercase letter or number
Expand Down
2 changes: 1 addition & 1 deletion airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
// Docker is the docker command.
Docker = "docker"

healthCheckBreakPoint = 25 // Maximum number of tries to wait for health check to pass
healthCheckBreakPoint = 100 // Maximum number of events to wait for health check to pass
healthyProjectStatus = "health_status: healthy"
execDieStatus = "exec_die"
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func newDeploymentRootCmd(client *houston.Client, out io.Writer) *cobra.Command
newDeploymentListCmd(client, out),
newDeploymentUpdateCmd(client, out),
newDeploymentDeleteCmd(client, out),
newLogsCmd(),
newLogsCmd(client),
newDeploymentSaRootCmd(client, out),
newDeploymentUserRootCmd(client, out),
newDeploymentAirflowRootCmd(client, out),
Expand Down
78 changes: 60 additions & 18 deletions cmd/logs.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package cmd

import (
"fmt"
"time"

"github.com/astronomer/astro-cli/deployment"
"github.com/astronomer/astro-cli/houston"
"github.com/astronomer/astro-cli/logs"

"github.com/spf13/cobra"
)

const (
logWebserver = "webserver"
logScheduler = "scheduler"
logWorker = "worker"
logTriggerer = "triggerer"
)

var (
search string
follow bool
Expand All @@ -27,7 +37,7 @@ var (
`
)

func newLogsCmd() *cobra.Command {
func newLogsCmd(client *houston.Client) *cobra.Command {
cmd := &cobra.Command{
Use: "logs",
Aliases: []string{"log", "l"},
Expand All @@ -40,10 +50,18 @@ func newLogsCmd() *cobra.Command {
newSchedulerLogsCmd(),
newWorkersLogsCmd(),
)

appConfig, err := deployment.AppConfig(client)
if err != nil {
fmt.Println("Error checking feature flag", err)
} else if appConfig.Flags.TriggererEnabled {
cmd.AddCommand(newTriggererLogsCmd())
}

return cmd
}

func newLogsDeprecatedCmd() *cobra.Command {
func newLogsDeprecatedCmd(client *houston.Client) *cobra.Command {
cmd := &cobra.Command{
Use: "logs",
Aliases: []string{"log", "l"},
Expand All @@ -57,6 +75,14 @@ func newLogsDeprecatedCmd() *cobra.Command {
newSchedulerLogsCmd(),
newWorkersLogsCmd(),
)

appConfig, err := deployment.AppConfig(client)
if err != nil {
fmt.Println("Error checking feature flag", err)
} else if appConfig.Flags.TriggererEnabled {
cmd.AddCommand(newTriggererLogsCmd())
}

return cmd
}

Expand All @@ -71,7 +97,9 @@ func newWebserverLogsCmd() *cobra.Command {
astro deployment logs webserver YOU_DEPLOYMENT_ID -s string-to-find
`,
Args: cobra.ExactArgs(1),
RunE: webserverRemoteLogs,
RunE: func(cmd *cobra.Command, args []string) error {
return fetchRemoteLogs(logWebserver, args)
},
}
cmd.Flags().StringVarP(&search, "search", "s", "", "Search term inside logs")
cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Subscribe to watch more logs")
Expand All @@ -91,7 +119,9 @@ func newSchedulerLogsCmd() *cobra.Command {
astro deployment logs scheduler YOU_DEPLOYMENT_ID -s string-to-find
`,
Args: cobra.ExactArgs(1),
RunE: schedulerRemoteLogs,
RunE: func(cmd *cobra.Command, args []string) error {
return fetchRemoteLogs(logScheduler, args)
},
}
cmd.Flags().StringVarP(&search, "search", "s", "", "Search term inside logs")
cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Subscribe to watch more logs")
Expand All @@ -100,6 +130,7 @@ astro deployment logs scheduler YOU_DEPLOYMENT_ID -s string-to-find
return cmd
}

// nolint:dupl
func newWorkersLogsCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "workers",
Expand All @@ -110,7 +141,9 @@ func newWorkersLogsCmd() *cobra.Command {
astro deployment logs workers YOU_DEPLOYMENT_ID -s string-to-find
`,
Args: cobra.ExactArgs(1),
RunE: workersRemoteLogs,
RunE: func(cmd *cobra.Command, args []string) error {
return fetchRemoteLogs(logWorker, args)
},
}
cmd.Flags().StringVarP(&search, "search", "s", "", "Search term inside logs")
cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Subscribe to watch more logs")
Expand All @@ -120,23 +153,32 @@ astro deployment logs workers YOU_DEPLOYMENT_ID -s string-to-find
return cmd
}

func webserverRemoteLogs(cmd *cobra.Command, args []string) error {
if follow {
return logs.SubscribeDeploymentLog(args[0], "webserver", search, since)
}
return logs.DeploymentLog(args[0], "webserver", search, since)
}
// nolint:dupl
func newTriggererLogsCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "triggerer",
Aliases: []string{"triggerers", "triggerer", "trg"},
Short: "Stream logs from Airflow triggerer",
Long: `Stream logs from Airflow triggerer. For example:
func schedulerRemoteLogs(cmd *cobra.Command, args []string) error {
if follow {
return logs.SubscribeDeploymentLog(args[0], "scheduler", search, since)
astro deployment logs triggerer YOU_DEPLOYMENT_ID -s string-to-find
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return fetchRemoteLogs(logTriggerer, args)
},
}
return logs.DeploymentLog(args[0], "scheduler", search, since)
cmd.Flags().StringVarP(&search, "search", "s", "", "Search term inside logs")
cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Subscribe to watch more logs")
cmd.Flags().DurationVarP(&since, "since", "t", 0, "Only return logs newer than a relative duration like 5m, 1h, or 24h")
cmd.Flags().BoolP("help", "h", false, "Help for "+cmd.Name())
// get airflow workers logs
return cmd
}

func workersRemoteLogs(cmd *cobra.Command, args []string) error {
func fetchRemoteLogs(component string, args []string) error {
if follow {
return logs.SubscribeDeploymentLog(args[0], "worker", search, since)
return logs.SubscribeDeploymentLog(args[0], component, search, since)
}
return logs.DeploymentLog(args[0], "worker", search, since)
return logs.DeploymentLog(args[0], component, search, since)
}
43 changes: 41 additions & 2 deletions cmd/logs_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,55 @@
package cmd

import (
"bytes"
"io/ioutil"
"net/http"
"testing"

"github.com/astronomer/astro-cli/houston"
testUtil "github.com/astronomer/astro-cli/pkg/testing"

"github.com/stretchr/testify/assert"
)

func TestDeploymentLogsRootCommand(t *testing.T) {
func TestDeploymentLogsRootCommandTriggererEnabled(t *testing.T) {
testUtil.InitTestConfig()
output, err := executeCommand("deployment", "logs")
okResponse := `{
"data": {
"appConfig": {"triggererEnabled": true, "featureFlags": { "triggererEnabled": true}}
}
}`
client := testUtil.NewTestClient(func(req *http.Request) *http.Response {
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBufferString(okResponse)),
Header: make(http.Header),
}
})
api := houston.NewHoustonClient(client)
output, err := executeCommandC(api, "deployment", "logs")
assert.NoError(t, err)
assert.Contains(t, output, "astro deployment logs")
assert.Contains(t, output, "triggerer")
}

func TestDeploymentLogsRootCommandTriggererDisabled(t *testing.T) {
testUtil.InitTestConfig()
okResponse := `{
"data": {
"appConfig": {"triggererEnabled": false, "featureFlags": { "triggererEnabled": false}}
}
}`
client := testUtil.NewTestClient(func(req *http.Request) *http.Response {
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBufferString(okResponse)),
Header: make(http.Header),
}
})
api := houston.NewHoustonClient(client)
output, err := executeCommandC(api, "deployment", "logs")
assert.NoError(t, err)
assert.Contains(t, output, "astro deployment logs")
assert.NotContains(t, output, "triggerer")
}
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewRootCmd(client *houston.Client, out io.Writer) *cobra.Command {
newSaRootCmd(client, out),
// TODO: remove newAirflowRootCmd, after 1.0 we have only devRootCmd
newAirflowRootCmd(client, out),
newLogsDeprecatedCmd(),
newLogsDeprecatedCmd(client),
)
return rootCmd
}
Expand Down

0 comments on commit b5810c8

Please sign in to comment.