Skip to content

Commit

Permalink
[KFS-1863] add local mode to flink shell (#2713)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannickPferr committed Apr 17, 2024
1 parent 0d20598 commit c243317
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 21 deletions.
51 changes: 39 additions & 12 deletions internal/flink/command_shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ import (
pcmd "github.com/confluentinc/cli/v3/pkg/cmd"
"github.com/confluentinc/cli/v3/pkg/config"
"github.com/confluentinc/cli/v3/pkg/errors"
"github.com/confluentinc/cli/v3/pkg/featureflags"
client "github.com/confluentinc/cli/v3/pkg/flink/app"
"github.com/confluentinc/cli/v3/pkg/flink/test/mock"
"github.com/confluentinc/cli/v3/pkg/flink/types"
"github.com/confluentinc/cli/v3/pkg/log"
ppanic "github.com/confluentinc/cli/v3/pkg/panic-recovery"
)

// If we set this const useFakeGateway to true, we start the client with a simulated gateway client that returns fake data. This is used for debugging.
const useFakeGateway = false

func (c *command) newShellCommand(prerunner pcmd.PreRunner) *cobra.Command {
cmd := &cobra.Command{
Use: "shell",
Expand All @@ -36,6 +33,11 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner) *cobra.Command {
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)

if featureflags.Manager.BoolVariation("cli.flink.internal", c.Context, config.CliLaunchDarklyClient, true, false) {
cmd.Flags().StringSlice("config-key", []string{}, "App option keys for local mode.")
cmd.Flags().StringSlice("config-value", []string{}, "App option values for local mode.")
}

return cmd
}

Expand Down Expand Up @@ -70,14 +72,21 @@ func (c *command) authenticated(authenticated func(*cobra.Command, []string) err
}

func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Command) error {
if useFakeGateway {
return client.StartApp(
mock.NewFakeFlinkGatewayClient(),
func() error { return nil },
types.ApplicationOptions{
Context: c.Context,
UserAgent: c.Version.UserAgent,
}, func() {})
if featureflags.Manager.BoolVariation("cli.flink.internal", c.Context, config.CliLaunchDarklyClient, true, false) {
// get config keys and values from flags
configKeys, err := cmd.Flags().GetStringSlice("config-key")
if err != nil {
return err
}
configValues, err := cmd.Flags().GetStringSlice("config-value")
if err != nil {
return err
}

// if configs were passed, we should enter local mode
if len(configKeys) > 0 && len(configValues) > 0 {
return c.startWithLocalMode(configKeys, configValues)
}
}

environmentId, err := cmd.Flags().GetString("environment")
Expand Down Expand Up @@ -168,6 +177,24 @@ func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Comma
return client.StartApp(flinkGatewayClient, c.authenticated(prerunner.Authenticated(c.AuthenticatedCLICommand), cmd, jwtValidator), opts, reportUsage(cmd, c.Config, unsafeTrace))
}

func (c *command) startWithLocalMode(configKeys, configValues []string) error {
// parse app options from given flags
appOptions, err := types.ParseApplicationOptionsFromSlices(configKeys, configValues)
if err != nil {
return err
}

// validate app options
if err := appOptions.Validate(); err != nil {
return err
}

gatewayClient := ccloudv2.NewFlinkGatewayClient(appOptions.GetGatewayUrl(), c.Version.UserAgent, appOptions.GetUnsafeTrace(), "authToken")

appOptions.Context = c.Context
return client.StartApp(gatewayClient, func() error { return nil }, *appOptions, func() {})
}

func (c *command) getFlinkLanguageServiceUrl(gatewayClient *ccloudv2.FlinkGatewayClient) (string, error) {
if cfg := gatewayClient.GetConfig(); cfg != nil && len(cfg.Servers) > 0 {
gatewayUrl := cfg.Servers[0].URL
Expand Down
4 changes: 2 additions & 2 deletions pkg/flink/app/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func StartApp(gatewayClient ccloudv2.GatewayClientInterface, tokenRefreshFunc fu
dataStore := store.NewStore(gatewayClient, appController.ExitApplication, &appOptions, synchronizedTokenRefreshFunc)
resultFetcher := results.NewResultFetcher(dataStore)

// Instantiate lsp
lspClient := lsp.NewLSPClientWS(getAuthToken, appOptions.GetLSPBaseUrl(), appOptions.GetOrganizationId(), appOptions.GetEnvironmentId())
// Instantiate LSP
lspClient := lsp.NewWebsocketClient(getAuthToken, appOptions.GetLSPBaseUrl(), appOptions.GetOrganizationId(), appOptions.GetEnvironmentId())

stdinBefore := utils.GetStdin()
consoleParser, err := utils.GetConsoleParser()
Expand Down
2 changes: 1 addition & 1 deletion pkg/flink/lsp/lsp_completer_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (w *WebsocketLSPClient) refreshWebsocketConnection() {
}
}

func NewLSPClientWS(getAuthToken func() string, baseUrl, organizationId, environmentId string) LspInterface {
func NewWebsocketClient(getAuthToken func() string, baseUrl, organizationId, environmentId string) LspInterface {
lspClient, conn, err := newLSPConnection(baseUrl, getAuthToken(), organizationId, environmentId)
if err != nil {
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/flink/types/application_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ApplicationOptions struct {
ServiceAccountId string
Verbose bool
LSPBaseUrl string
GatewayURL string
GatewayUrl string
Context *config.Context
}

Expand Down Expand Up @@ -104,7 +104,7 @@ func (a *ApplicationOptions) Validate() error {
if a.GetComputePoolId() == "" {
missingOptions = append(missingOptions, "ComputePoolId")
}
if a.GetGatewayURL() == "" {
if a.GetGatewayUrl() == "" {
missingOptions = append(missingOptions, "GatewayURL")
}
if len(missingOptions) > 0 {
Expand Down Expand Up @@ -190,9 +190,9 @@ func (a *ApplicationOptions) GetLSPBaseUrl() string {
return ""
}

func (a *ApplicationOptions) GetGatewayURL() string {
func (a *ApplicationOptions) GetGatewayUrl() string {
if a != nil {
return a.GatewayURL
return a.GatewayUrl
}
return ""
}
4 changes: 2 additions & 2 deletions pkg/flink/types/application_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestParseApplicationOptionsFromSlices(t *testing.T) {
},
{
name: "TestParseAppOptionsSuccessfully",
configKeys: []string{"UnsafeTrace", "UserAgent", "EnvironmentId", "EnvironmentName", "OrganizationId", "Database", "ComputePoolId", "ServiceAccountId", "Verbose", "LSPBaseUrl", "GatewayURL"},
configKeys: []string{"UnsafeTrace", "UserAgent", "EnvironmentId", "EnvironmentName", "OrganizationId", "Database", "ComputePoolId", "ServiceAccountId", "Verbose", "LSPBaseUrl", "GatewayUrl"},
configValues: []string{"true", "test", "env-123", "test-env", "org-123", "test-database", "lfcp-123", "sa-123", "true", "localhost:8080", "localhost:8000"},
expectedAppOptions: &ApplicationOptions{
UnsafeTrace: true,
Expand All @@ -53,7 +53,7 @@ func TestParseApplicationOptionsFromSlices(t *testing.T) {
ServiceAccountId: "sa-123",
Verbose: true,
LSPBaseUrl: "localhost:8080",
GatewayURL: "localhost:8000",
GatewayUrl: "localhost:8000",
},
expectedError: false,
},
Expand Down

0 comments on commit c243317

Please sign in to comment.