APIE-744 - Flink Materialized Tables#3254
APIE-744 - Flink Materialized Tables#3254Cynthia Qin (cqin-confluent) merged 19 commits intomainfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR introduces Flink Materialized Table support to the CLI by adding a new confluent flink materialized-table command group, wiring it to a new Flink Gateway internal SDK client, and extending the test server + integration fixtures to cover create/describe/list/update/delete/stop/resume flows.
Changes:
- Add
flink materialized-tablecommand group with subcommands for CRUD + stop/resume. - Add an internal Flink Gateway client (
FlinkGatewayClientInternal) backed byccloud-sdk-go-v2-internal/flink-gateway. - Extend integration tests, test server routes, and golden fixtures for the new commands.
Reviewed changes
Copilot reviewed 30 out of 31 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| test/test-server/flink_gateway_router.go | Adds materialized-table endpoints to the test Flink Gateway router. |
| test/flink_test.go | Adds integration tests for materialized-table commands. |
| test/fixtures/output/flink/materialized-table/create/create.golden | Golden output for create. |
| test/fixtures/output/flink/materialized-table/create/create-column.golden | Golden output for create with columns. |
| test/fixtures/output/flink/materialized-table/create/create-filled.golden | Golden output for create with optional fields. |
| test/fixtures/output/flink/materialized-table/describe/describe.golden | Golden output for describe. |
| test/fixtures/output/flink/materialized-table/describe/describe-noKafka.golden | Golden output for missing required flag. |
| test/fixtures/output/flink/materialized-table/list/list.golden | Golden output for list. |
| test/fixtures/output/flink/materialized-table/delete/delete.golden | Golden output for delete. |
| test/fixtures/output/flink/materialized-table/update/update.golden | Golden output for update case 1. |
| test/fixtures/output/flink/materialized-table/update/update-2.golden | Golden output for update case 2. |
| test/fixtures/output/flink/materialized-table/update/no-cp.golden | Golden output for update without compute pool. |
| test/fixtures/output/flink/materialized-table/update/update-3.golden | Golden output for update with extra fields. |
| test/fixtures/output/flink/materialized-table/stop.golden | Golden output for stop. |
| test/fixtures/output/flink/materialized-table/resume.golden | Golden output for resume. |
| test/fixtures/output/flink/help.golden | Adds materialized-table to Flink help output. |
| pkg/resource/resource.go | Adds MaterializedTable resource label. |
| pkg/cmd/authenticated_cli_command.go | Adds GetFlinkGatewayClientInternal and internal client caching. |
| pkg/ccloudv2/flink_gateway.go | Implements internal Flink Gateway client + materialized-table API methods. |
| internal/flink/command.go | Registers the new materialized-table command group. |
| internal/flink/command_materialized_table.go | Defines the command group, output type, and shared flags/arg completion. |
| internal/flink/command_materialized_table_create.go | Implements materialized-table create and parsing helpers. |
| internal/flink/command_materialized_table_describe.go | Implements materialized-table describe. |
| internal/flink/command_materialized_table_list.go | Implements materialized-table list. |
| internal/flink/command_materialized_table_update.go | Implements materialized-table update. |
| internal/flink/command_materialized_table_delete.go | Implements materialized-table delete. |
| internal/flink/command_materialized_table_stop.go | Implements materialized-table stop (via update stopped=true). |
| internal/flink/command_materialized_table_resume.go | Implements materialized-table resume (via update stopped=false). |
| go.mod | Adds internal Flink Gateway SDK dependency. |
| go.sum | Adds checksums for the new dependency. |
| cmd/lint/main.go | Updates flag lint exclusions for new long flag names. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") | ||
| pcmd.AddCloudFlag(cmd) | ||
| pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
| cobra.CheckErr(cmd.MarkFlagRequired("database")) | ||
|
|
There was a problem hiding this comment.
This command requires --database, but the value is never read or used to filter the results. Either remove the required flag, or read it and filter tables (or call a database-scoped list endpoint if available) so the output matches user intent.
| func handleSqlMaterializedTablesTable(t *testing.T) http.HandlerFunc { | ||
| return func(w http.ResponseWriter, r *http.Request) { | ||
| switch r.Method { | ||
| case http.MethodGet: | ||
| connectionName := mux.Vars(r)["materialized-table"] | ||
| if strings.Contains(connectionName, "nonexist") { | ||
| err := writeResourceNotFoundError(w) | ||
| require.NoError(t, err) | ||
| return |
There was a problem hiding this comment.
Route variable mismatch: the path uses {table_name}, but the handler reads mux.Vars(r)["materialized-table"]. This will always return an empty string and break the nonexist simulation. Read the table_name variable (and consider renaming connectionName accordingly).
Steven Gagniere (sgagniere)
left a comment
There was a problem hiding this comment.
Some comments about alignment with the api spec:
Steven Gagniere (sgagniere)
left a comment
There was a problem hiding this comment.
Thanks, just a few more comments:
| cmd.Flags().String("distributed-by-column-names", "", "The names of the columns the table is distributed by.") | ||
| cmd.Flags().Int("distributed-by-buckets", 0, "The number of buckets.") |
There was a problem hiding this comment.
Let's also update the flags to match the updated output field names.
|
|
||
| cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") | ||
| cmd.Flags().String("compute-pool", "", "The ID associated with the compute pool in context.") | ||
| cmd.Flags().String("service-account", "", "The ID of a principal this Materialized Table query runs as.") |
There was a problem hiding this comment.
Here as well.
| cmd.Flags().String("column-physical", "", "Path to the columns data for type physical.") | ||
| cmd.Flags().String("column-metadata", "", "Path to the columns data for type metadata.") | ||
| cmd.Flags().String("column-computed", "", "Path to the columns data for type computed.") | ||
| cmd.Flags().String("watermark-column-name", "", "The name of the watermark columns.") |
There was a problem hiding this comment.
| cmd.Flags().String("watermark-column-name", "", "The name of the watermark columns.") | |
| cmd.Flags().String("watermark-column", "", "The name of the watermark columns.") |
|
|
||
| cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") | ||
| cmd.Flags().String("compute-pool", "", "The ID associated with the compute pool in context.") | ||
| cmd.Flags().String("service-account", "", "The ID of a principal this Materialized Table query runs as.") |
|




Release Notes
New Features
confluent flink materialized-table [ create | list | describe | update ]commands for managing Materialized Tables in Confluent Cloud.Bug Fixes
Examples
Checklist
Test & Reviewsection below.Blast Radiussection below.What
The existing
confluent flink statementcommand is imperative; it's designed to trigger ephemeral actions and leads to an unnecessary "destroy-and-recreate" workflow. This model is fundamentally incompatible with Materialized Tables, which are durable, persistent objects. Using the old imperative resource for them is a design mismatch that breaks CI/CD and negates the server-side CREATE OR ALTER capability.This PR implements a new
confluent flink materialized-tablecommands to solve this.Blast Radius
Low to none blast radius:
confluent flink materialized-tablecommand group is purely additive and should not break any existing workflows.References
APIE-744 API/CLI/TF for Flink Materialized tables
Test & Review
APIE-776 Flink Materialized Tables - CLI Testing