Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,27 @@ public class TopicConfig {
"Supported: 'raw', 'string', 'by_schema_id', 'by_latest_schema'. " +
"Schema Registry URL required for 'by_schema_id' and 'by_latest_schema'.";

public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_PREFIX = "automq.table.topic.convert.value.by_latest_schema.";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_PREFIX = "automq.table.topic.convert.key.by_latest_schema.";

public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_CONFIG = "automq.table.topic.convert.key.subject";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_DOC = "The Schema Registry subject name for key schemas. " +
"Defaults to '{topic-name}-key' if not specified.";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_CONFIG = "automq.table.topic.convert.key.message.full.name";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_DOC = "The fully qualified message name for Protobuf key schemas. " +
"Used when schema contains multiple message types. Defaults to first message type.";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_SUBJECT_CONFIG =
"automq.table.topic.convert.value.by_latest_schema.subject";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_SUBJECT_DOC =
"Subject name to resolve the latest value schema from Schema Registry when using convert.value.type=by_latest_schema. " +
"If not set, defaults to '<topic>-value'.";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_CONFIG =
"automq.table.topic.convert.value.by_latest_schema.message.full.name";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_DOC =
"Fully-qualified message name for the latest value schema (if using Protobuf) when convert.value.type=by_latest_schema." +
"If not set, uses the first message.";

public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_SUBJECT_CONFIG =
"automq.table.topic.convert.key.by_latest_schema.subject";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_SUBJECT_DOC =
"Subject name to resolve the latest key schema from Schema Registry when using convert.key.type=by_latest_schema. " +
"If not set, defaults to '<topic>-key'.";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_CONFIG =
"automq.table.topic.convert.key.by_latest_schema.message.full.name";
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_DOC =
"Fully-qualified message name for the latest key schema (if using Protobuf) when convert.key.type=by_latest_schema. " +
"If not set, uses the first message.";

public static final String AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_CONFIG = "automq.table.topic.transform.value.type";
public static final String AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_DOC = "Transformation to apply to the record value after conversion. " +
Expand Down
13 changes: 5 additions & 8 deletions core/src/main/java/kafka/automq/table/worker/WorkerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;

import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.server.record.ErrorsTolerance;
import org.apache.kafka.server.record.TableTopicConvertType;
import org.apache.kafka.server.record.TableTopicSchemaType;
Expand Down Expand Up @@ -64,23 +65,19 @@ public TableTopicConvertType keyConvertType() {
}

public String valueSubject() {
Object subject = config.valueSchemaLatestConfig.get("subject");
return subject != null ? subject.toString() : null;
return config.getString(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_SUBJECT_CONFIG);
}

public String valueMessageFullName() {
Object name = config.valueSchemaLatestConfig.get("message.full.name");
return name != null ? name.toString() : null;
return config.getString(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_CONFIG);
}

public String keySubject() {
Object subject = config.keySchemaLatestConfig.get("subject");
return subject != null ? subject.toString() : null;
return config.getString(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_SUBJECT_CONFIG);
}

public String keyMessageFullName() {
Object name = config.keySchemaLatestConfig.get("message.full.name");
return name != null ? name.toString() : null;
return config.getString(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_CONFIG);
}

public TableTopicTransformType transformType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,11 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.NONE.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_CONFIG, STRING, RAW.name, in(TableTopicConvertType.names().toArray(new String[0])), MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_CONFIG, STRING, TableTopicConvertType.STRING.name, in(TableTopicConvertType.names().toArray(new String[0])), MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_CONFIG, STRING, null, null, LOW, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_CONFIG, STRING, null, null, LOW, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_CONFIG, STRING, NONE.name, in(TableTopicTransformType.names().toArray(new String[0])), MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_SUBJECT_CONFIG, STRING, null, null, MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_SUBJECT_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_CONFIG, STRING, null, null, MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_SUBJECT_CONFIG, STRING, null, null, MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_SUBJECT_DOC)
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_CONFIG, STRING, null, null, MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_MESSAGE_FULL_NAME_DOC)
.define(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG, STRING, null, TableTopicConfigValidator.IdColumnsValidator.INSTANCE, MEDIUM, TopicConfig.TABLE_TOPIC_ID_COLUMNS_DOC)
.define(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG, STRING, null, TableTopicConfigValidator.PartitionValidator.INSTANCE, MEDIUM, TopicConfig.TABLE_TOPIC_PARTITION_BY_DOC)
.define(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_DOC)
Expand Down Expand Up @@ -419,8 +421,7 @@ public Optional<String> serverConfigName(String configName) {
public final TableTopicSchemaType tableTopicSchemaType;
public final TableTopicConvertType valueConvertType;
public final TableTopicConvertType keyConvertType;
public final Map<String, Object> valueSchemaLatestConfig;
public final Map<String, Object> keySchemaLatestConfig;

public final TableTopicTransformType transformType;
public final String tableTopicIdColumns;
public final String tableTopicPartitionBy;
Expand Down Expand Up @@ -490,8 +491,6 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
this.valueConvertType = TableTopicConvertType.forName(getString(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_CONFIG));
this.keyConvertType = TableTopicConvertType.forName(getString(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_CONFIG));
this.valueSchemaLatestConfig = originalsWithPrefix(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_PREFIX);
this.keySchemaLatestConfig = originalsWithPrefix(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_PREFIX);
this.transformType = TableTopicTransformType.forName(getString(TopicConfig.AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_CONFIG));
this.tableTopicIdColumns = getString(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG);
this.tableTopicPartitionBy = getString(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG);
Expand Down
Loading