-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add pinot upsert features to pinot common #5175
Changes from 4 commits
723f08a
5c31201
a0f4fba
553598e
5306aa5
17801a2
e7c8047
d788680
c969577
729f377
0f7ce4b
f4f4f46
a5fd4b9
67c4530
97370a4
f56905e
6c0bd60
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,17 +30,20 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import javax.annotation.Nullable; | ||
|
||
import org.apache.helix.ZNRecord; | ||
import org.apache.pinot.common.assignment.InstancePartitionsType; | ||
import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; | ||
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; | ||
import org.apache.pinot.spi.utils.JsonUtils; | ||
import org.apache.pinot.common.utils.CommonConstants.UpdateSemantic; | ||
|
||
|
||
@SuppressWarnings({"Duplicates", "unused"}) | ||
public class TableConfig extends BaseJsonConfig { | ||
public static final String TABLE_NAME_KEY = "tableName"; | ||
public static final String TABLE_TYPE_KEY = "tableType"; | ||
private static final String UPDATE_SEMANTIC_CONFIG_KEY = "updateSemantic"; | ||
public static final String VALIDATION_CONFIG_KEY = "segmentsConfig"; | ||
public static final String TENANT_CONFIG_KEY = "tenants"; | ||
public static final String INDEXING_CONFIG_KEY = "tableIndexConfig"; | ||
|
@@ -75,6 +78,9 @@ public class TableConfig extends BaseJsonConfig { | |
private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap; | ||
private List<FieldConfig> _fieldConfigList; | ||
|
||
@JsonPropertyDescription(value = "The update semantic of the table, either append or upsert, default as append") | ||
private UpdateSemantic _updateSemantic; | ||
|
||
/** | ||
* NOTE: DO NOT use this constructor, use builder instead. This constructor is for deserializer only. | ||
*/ | ||
|
@@ -90,6 +96,16 @@ private TableConfig(String tableName, TableType tableType, SegmentsValidationAnd | |
@Nullable QueryConfig queryConfig, | ||
@Nullable Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap, | ||
@Nullable List<FieldConfig> fieldConfigList) { | ||
this(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, quotaConfig, taskConfig, | ||
routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, UpdateSemantic.APPEND); | ||
} | ||
|
||
private TableConfig(String tableName, TableType tableType, SegmentsValidationAndRetentionConfig validationConfig, | ||
TenantConfig tenantConfig, IndexingConfig indexingConfig, TableCustomConfig customConfig, | ||
@Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig, | ||
@Nullable QueryConfig queryConfig, | ||
@Nullable Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap, | ||
@Nullable List<FieldConfig> fieldConfigList, @Nullable UpdateSemantic updateSemantic) { | ||
_tableName = TableNameBuilder.forType(tableType).tableNameWithType(tableName); | ||
_tableType = tableType; | ||
_validationConfig = validationConfig; | ||
|
@@ -102,6 +118,7 @@ private TableConfig(String tableName, TableType tableType, SegmentsValidationAnd | |
_queryConfig = queryConfig; | ||
_instanceAssignmentConfigMap = instanceAssignmentConfigMap; | ||
_fieldConfigList = fieldConfigList; | ||
_updateSemantic = updateSemantic; | ||
} | ||
|
||
public static TableConfig fromJsonString(String jsonString) | ||
|
@@ -156,8 +173,15 @@ public static TableConfig fromJsonConfig(JsonNode jsonConfig) | |
extractChildConfig(jsonConfig, FIELD_CONFIG_LIST_KEY, new TypeReference<List<FieldConfig>>() { | ||
}); | ||
|
||
// generate update schematic | ||
UpdateSemantic updateSemantic = UpdateSemantic.DEFAULT_SEMANTIC; | ||
if (jsonConfig.has(UPDATE_SEMANTIC_CONFIG_KEY)) { | ||
updateSemantic = UpdateSemantic.getUpdateSemantic(jsonConfig.get(UPDATE_SEMANTIC_CONFIG_KEY).asText()); | ||
} | ||
|
||
return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, | ||
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList); | ||
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, | ||
updateSemantic); | ||
} | ||
|
||
/** | ||
|
@@ -232,6 +256,11 @@ public ObjectNode toJsonConfig() { | |
jsonConfig.put(FIELD_CONFIG_LIST_KEY, JsonUtils.objectToJsonNode(_fieldConfigList)); | ||
} | ||
|
||
if (_updateSemantic != null) { | ||
jsonConfig.put(UPDATE_SEMANTIC_CONFIG_KEY, _updateSemantic.toString()); | ||
} else { | ||
jsonConfig.put(UPDATE_SEMANTIC_CONFIG_KEY, UpdateSemantic.APPEND.toString()); | ||
} | ||
return jsonConfig; | ||
} | ||
|
||
|
@@ -315,8 +344,11 @@ public static TableConfig fromZnRecord(ZNRecord znRecord) | |
}); | ||
} | ||
|
||
UpdateSemantic updateSemantic = UpdateSemantic.getUpdateSemantic(simpleFields.get(UPDATE_SEMANTIC_CONFIG_KEY)); | ||
|
||
return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, | ||
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList); | ||
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, | ||
updateSemantic); | ||
} | ||
|
||
public ZNRecord toZNRecord() | ||
|
@@ -352,6 +384,9 @@ public ZNRecord toZNRecord() | |
if (_fieldConfigList != null) { | ||
simpleFields.put(FIELD_CONFIG_LIST_KEY, JsonUtils.objectToString(_fieldConfigList)); | ||
} | ||
if (_updateSemantic != null) { | ||
simpleFields.put(UPDATE_SEMANTIC_CONFIG_KEY, _updateSemantic.toString()); | ||
} | ||
|
||
ZNRecord znRecord = new ZNRecord(_tableName); | ||
znRecord.setSimpleFields(simpleFields); | ||
|
@@ -470,6 +505,18 @@ public List<FieldConfig> getFieldConfigList() { | |
return _fieldConfigList; | ||
} | ||
|
||
public UpdateSemantic getUpdateSemantic() { | ||
return _updateSemantic; | ||
} | ||
|
||
public void setUpdateSemantic(UpdateSemantic updateSemantic) { | ||
_updateSemantic = updateSemantic; | ||
} | ||
|
||
public boolean isTableForUpsert() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's avoid adding additional utility methods to config classes if possible |
||
return _updateSemantic == UpdateSemantic.UPSERT; | ||
} | ||
|
||
public static class Builder { | ||
private static final String DEFAULT_SEGMENT_PUSH_TYPE = "APPEND"; | ||
private static final String REFRESH_SEGMENT_PUSH_TYPE = "REFRESH"; | ||
|
@@ -517,6 +564,8 @@ public static class Builder { | |
private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap; | ||
private List<FieldConfig> _fieldConfigList; | ||
|
||
private UpdateSemantic _updateSemantic; | ||
|
||
public Builder(TableType tableType) { | ||
_tableType = tableType; | ||
} | ||
|
@@ -683,6 +732,11 @@ public Builder setFieldConfigList(List<FieldConfig> fieldConfigList) { | |
return this; | ||
} | ||
|
||
public Builder setUpdateSemantic(UpdateSemantic updateSemantic) { | ||
_updateSemantic = updateSemantic; | ||
return this; | ||
} | ||
|
||
public TableConfig build() { | ||
// Validation config | ||
SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig(); | ||
|
@@ -720,14 +774,20 @@ public TableConfig build() { | |
_customConfig = new TableCustomConfig(null); | ||
} | ||
|
||
if (_updateSemantic == null) { | ||
_updateSemantic = UpdateSemantic.DEFAULT_SEMANTIC; | ||
} | ||
|
||
// eventually this validation will be generic but since we are initially | ||
// using FieldConfig only for text columns (and migrate to expand its usage | ||
// soon after), just validate the field config list from text index creation | ||
// perspective. | ||
TextIndexConfigValidator.validate(_fieldConfigList, _noDictionaryColumns); | ||
|
||
return new TableConfig(_tableName, _tableType, validationConfig, tenantConfig, indexingConfig, _customConfig, | ||
_quotaConfig, _taskConfig, _routingConfig, _queryConfig, _instanceAssignmentConfigMap, _fieldConfigList); | ||
_quotaConfig, _taskConfig, _routingConfig, _queryConfig, _instanceAssignmentConfigMap, _fieldConfigList, | ||
_updateSemantic); | ||
|
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,8 @@ | |
* | ||
*/ | ||
public enum BrokerGauge implements AbstractMetrics.Gauge { | ||
QUERY_QUOTA_CAPACITY_UTILIZATION_RATE("tables", false), NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true); | ||
QUERY_QUOTA_CAPACITY_UTILIZATION_RATE("tables", false), NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true), | ||
TABLE_MIN_LOW_WATER_MARK("tables", false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this represent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It represents how many table pinot broker is collecting the metrics for. The exact usage of the metrics will show up in later diffs |
||
|
||
private final String brokerGaugeName; | ||
private final String unit; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pinot.common.restlet.resources; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import org.codehaus.jackson.annotate.JsonIgnoreProperties; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
public class TableLowWaterMarksInfo { | ||
// A mapping from table name to the table's partiton -> low_water_mark mappings. | ||
private Map<String, Map<Integer, Long>> tableLowWaterMarks; | ||
|
||
public TableLowWaterMarksInfo(@JsonProperty("lowWaterMarks") Map<String, Map<Integer, Long>> tableLowWaterMarks) { | ||
this.tableLowWaterMarks = tableLowWaterMarks; | ||
} | ||
|
||
public void setTableLowWaterMarks(Map<String, Map<Integer, Long>> tableLowWaterMarks) { | ||
this.tableLowWaterMarks = tableLowWaterMarks; | ||
} | ||
|
||
public Map<String, Map<Integer, Long>> getTableLowWaterMarks() { | ||
return Collections.unmodifiableMap(tableLowWaterMarks); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to IngestMode or just have a boolean enableUpsert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for real-time? We already have push type of either APPEND or REFRESH for offline; aggregateMetrics for real-time. What does UPSERT stand for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kishoreg I am thinking if using enum here will be more flexible in the case of future features changes or whatnot. Boolean seems to be only limiting two options and it might limit what other changes we might want to do with pinot. Please let me what you think. ingestionMode definitely sounds more explicit than what I am using here and I will definite consider about this.
@Jackie-Jiang yes. So basically it is previous old design about we want to support update in pinot realtime ingestion by adding override by primary key (eg, a second message in Kafka with the same primary key represents the update of existing key instead of two records with the same primary key). Upsert stands for this new ingestion semantic (insert if no duplicated primary key, update if duplicate primary key). This feature will be applying to realtime table only