Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13761] consistent namings for expansion address in Debezium IO #16766

Merged
merged 2 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 10 additions & 10 deletions sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type readFromDebeziumSchema struct {
}

type debeziumConfig struct {
expansionService string
readSchema *readFromDebeziumSchema
expansionAddr string
readSchema *readFromDebeziumSchema
}

// readOption facilitates additional parameters to debeziumio.Read() Ptransform.
Expand All @@ -99,7 +99,7 @@ type readOption func(*debeziumConfig)
// connectorClass := debeziumIO.POSTGRESQL
// maxrecords := 1
// debeziumio.Read(s.Scope("Read from debezium"), expansionAddr, username, password, host, port, connectorClass,
// reflectx.String, debeziumio.MaxRecord(maxrecords), debeziumio.ExpansionService("localhost:9000"))
// reflectx.String, debeziumio.MaxRecord(maxrecords), debeziumio.ExpansionAddr("localhost:9000"))
func Read(s beam.Scope, username, password, host, port string, connectorClass DriverClassName, t reflect.Type, opts ...readOption) beam.PCollection {
rfds := readFromDebeziumSchema{
ConnectorClass: string(connectorClass),
Expand All @@ -113,14 +113,14 @@ func Read(s beam.Scope, username, password, host, port string, connectorClass Dr
opt(&dc)
}

expansionService := dc.expansionService
if dc.expansionService == "" {
expansionService = autoStartupAddress
expansionAddr := dc.expansionAddr
if dc.expansionAddr == "" {
expansionAddr = autoStartupAddress
}

pl := beam.CrossLanguagePayload(rfds)
outT := beam.UnnamedOutput(typex.New(t))
out := beam.CrossLanguage(s, readURN, pl, expansionService, nil, outT)
out := beam.CrossLanguage(s, readURN, pl, expansionAddr, nil, outT)
return out[beam.UnnamedOutputTag()]
}

Expand All @@ -139,9 +139,9 @@ func ConnectionProperties(cp []string) readOption {
}
}

// ExpansionService sets the expansion service address to use for DebeziumIO cross-langauage transform.
func ExpansionService(expansionService string) readOption {
// ExpansionAddr sets the expansion service address to use for DebeziumIO cross-langauage transform.
func ExpansionAddr(expansionAddr string) readOption {
return func(cfg *debeziumConfig) {
cfg.expansionService = expansionService
cfg.expansionAddr = expansionAddr
}
}
11 changes: 1 addition & 10 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ var portableFilters = []string{
"TestPanes",
// TODO(BEAM-12797): Python portable runner times out on Kafka reads.
"TestKafkaIO.*",
// TODO(BEAM-13778) needs a schemaio expansion service address flag
"TestJDBCIO_BasicReadWrite",
}

var flinkFilters = []string{
Expand All @@ -100,8 +98,6 @@ var flinkFilters = []string{
"TestTestStream.*Sequence.*",
// Triggers are not yet supported
"TestTrigger.*",
// TODO(BEAM-13778) needs a schemaio expansion service address flag
"TestJDBCIO_BasicReadWrite",
}

var samzaFilters = []string{
Expand All @@ -115,8 +111,6 @@ var samzaFilters = []string{
"TestPanes",
// TODO(BEAM-13006): Samza doesn't yet support post job metrics, used by WordCount
"TestWordCount.*",
// TODO(BEAM-13778) needs a schemaio expansion service address flag
"TestJDBCIO_BasicReadWrite",
}

var sparkFilters = []string{
Expand All @@ -129,13 +123,12 @@ var sparkFilters = []string{
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// TODO(BEAM-13778) needs a schemaio expansion service address flag
"TestJDBCIO_BasicReadWrite",
}

var dataflowFilters = []string{
// The Dataflow runner doesn't work with tests using testcontainers locally.
"TestJDBCIO_BasicReadWrite",
"TestDebeziumIO_BasicRead",
// TODO(BEAM-11576): TestFlattenDup failing on this runner.
"TestFlattenDup",
// The Dataflow runner does not support the TestStream primitive
Expand All @@ -145,8 +138,6 @@ var dataflowFilters = []string{
"TestPanes",
// There is no infrastructure for running KafkaIO tests with Dataflow.
"TestKafkaIO.*",
// TestContainers won't work against dataflow.
"TestDebeziumIO_BasicRead",
// Dataflow doesn't support any test that requires loopback.
// Eg. For FileIO examples.
".*Loopback.*",
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/test/integration/io/xlang/debezium/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ReadPipeline(addr, username, password, dbname, host, port string, connector
p, s := beam.NewPipelineWithRoot()
result := debeziumio.Read(s.Scope("Read from debezium"), username, password, host, port,
connectorClass, reflectx.String, debeziumio.MaxRecord(maxrecords),
debeziumio.ConnectionProperties(connectionProperties), debeziumio.ExpansionService(addr))
debeziumio.ConnectionProperties(connectionProperties), debeziumio.ExpansionAddr(addr))
expectedJson := `{"metadata":{"connector":"postgresql","version":"1.3.1.Final","name":"dbserver1","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"sally.thomas@acme.com"}}}`
expected := beam.Create(s, expectedJson)
passert.Equals(s, result, expected)
Expand Down