Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[postgres] Change config option slot.name to be required #1996

Merged
merged 1 commit into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions docs/content/connectors/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ Connector Options
<td>Integer</td>
<td>Integer port number of the PostgreSQL database server.</td>
</tr>
<tr>
<td>slot.name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in
for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring.
<br/>Slot names must conform to <a href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL replication slot naming rules</a>, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."</td>
</tr>
<tr>
<td>decoding.plugin.name</td>
<td>optional</td>
Expand All @@ -133,15 +142,6 @@ Connector Options
<td>The name of the Postgres logical decoding plug-in installed on the server.
Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.</td>
</tr>
<tr>
<td>slot.name</td>
<td>optional</td>
<td style="word-wrap: break-word;">flink</td>
<td>String</td>
<td>The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in
for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring.
<br/>Slot names must conform to <a href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL replication slot naming rules</a>, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."</td>
</tr>
<tr>
<td>changelog-mode</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void testPostgresCDC() throws Exception {
" 'database-name' = '" + POSTGRES.getDatabaseName() + "',",
" 'schema-name' = 'inventory',",
" 'table-name' = 'products',",
" 'slot.name' = 'flink',",
// dropping the slot allows WAL segments to be discarded by the database
" 'debezium.slot.drop_on_stop' = 'true'",
");",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public Builder<T> password(String password) {
/**
* The name of the PostgreSQL logical decoding slot that was created for streaming changes
* from a particular plug-in for a particular database/schema. The server uses this slot to
* stream events to the connector that you are configuring. Default is "flink".
* stream events to the connector that you are configuring.
*
* <p>Slot names must conform to <a
* href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
private static final ConfigOption<String> SLOT_NAME =
ConfigOptions.key("slot.name")
.stringType()
.defaultValue("flink")
.noDefaultValue()
.withDescription(
"The name of the PostgreSQL logical decoding slot that was created for streaming changes "
+ "from a particular plug-in for a particular database/schema. The server uses this slot "
+ "to stream events to the connector that you are configuring. Default is \"flink\".");
+ "to stream events to the connector that you are configuring.");

private static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =
ConfigOptions.key("changelog-mode")
Expand Down Expand Up @@ -165,6 +165,7 @@ public Set<ConfigOption<?>> requiredOptions() {
options.add(DATABASE_NAME);
options.add(SCHEMA_NAME);
options.add(TABLE_NAME);
options.add(SLOT_NAME);
return options;
}

Expand All @@ -173,7 +174,6 @@ public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(DECODING_PLUGIN_NAME);
options.add(SLOT_NAME);
options.add(CHANGELOG_MODE);
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ private DebeziumSourceFunction<SourceRecord> createPostgreSqlSource(int heartbea
.schemaList("inventory")
.tableList("inventory.products")
.deserializer(new ForwardDeserializeSchema())
.slotName(SLOT_NAME)
.debeziumProperties(properties)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

/** Integration tests for PostgreSQL Table source. */
public class PostgreSQLConnectorITCase extends PostgresTestBase {
private static final String SLOT_NAME = "flinktest";

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -79,15 +80,17 @@ public void testConsumingAllEvents()
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ " 'table-name' = '%s',"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
"inventory",
"products");
"products",
SLOT_NAME);
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
Expand Down Expand Up @@ -184,7 +187,7 @@ public void testExceptionForReplicaIdentity() throws Exception {
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.slot.name' = '%s'"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
Expand Down Expand Up @@ -277,15 +280,17 @@ public void testAllTypes() throws Throwable {
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ " 'table-name' = '%s',"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
"inventory",
"full_types");
"full_types",
SLOT_NAME);
String sinkDDL =
"CREATE TABLE sink ("
+ " id INTEGER NOT NULL,"
Expand Down Expand Up @@ -362,7 +367,7 @@ public void testMetadataColumns() throws Throwable {
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.slot.name' = '%s'"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
Expand Down Expand Up @@ -459,7 +464,7 @@ public void testUpsertMode() throws Exception {
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.slot.name' = '%s',"
+ " 'slot.name' = '%s',"
+ " 'changelog-mode' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class PostgreSQLTableFactoryTest {
private static final String MY_DATABASE = "myDB";
private static final String MY_TABLE = "myTable";
private static final String MY_SCHEMA = "public";
private static final String MY_SLOT_NAME = "flinktest";
private static final Properties PROPERTIES = new Properties();

@Test
Expand All @@ -116,7 +117,7 @@ public void testCommonProperties() {
MY_USERNAME,
MY_PASSWORD,
"decoderbufs",
"flink",
MY_SLOT_NAME,
DebeziumChangelogMode.ALL,
PROPERTIES);
assertEquals(expectedSource, actualSource);
Expand All @@ -128,7 +129,6 @@ public void testOptionalProperties() {
options.put("port", "5444");
options.put("decoding.plugin.name", "wal2json");
options.put("debezium.snapshot.mode", "never");
options.put("slot.name", "flink");
options.put("changelog-mode", "upsert");

DynamicTableSource actualSource = createTableSource(options);
Expand All @@ -145,7 +145,7 @@ public void testOptionalProperties() {
MY_USERNAME,
MY_PASSWORD,
"wal2json",
"flink",
MY_SLOT_NAME,
DebeziumChangelogMode.UPSERT,
dbzProperties);
assertEquals(expectedSource, actualSource);
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testMetadataColumns() {
MY_USERNAME,
MY_PASSWORD,
"decoderbufs",
"flink",
MY_SLOT_NAME,
DebeziumChangelogMode.ALL,
new Properties());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
Expand Down Expand Up @@ -263,6 +263,7 @@ private Map<String, String> getAllOptions() {
options.put("table-name", MY_TABLE);
options.put("username", MY_USERNAME);
options.put("password", MY_PASSWORD);
options.put("slot.name", MY_SLOT_NAME);
return options;
}

Expand Down