Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pinot upsert features to pinot common #5175

Merged
merged 17 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.validation.constraints.Null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue with merging, fixed. Thanks for pointing it out


import org.apache.helix.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitionsType;
import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.common.utils.CommonConstants.UpdateSemantic;


@SuppressWarnings({"Duplicates", "unused"})
public class TableConfig extends BaseJsonConfig {
public static final String TABLE_NAME_KEY = "tableName";
public static final String TABLE_TYPE_KEY = "tableType";
private static final String UPDATE_SEMANTIC_CONFIG_KEY = "updateSemantic";
public static final String VALIDATION_CONFIG_KEY = "segmentsConfig";
public static final String TENANT_CONFIG_KEY = "tenants";
public static final String INDEXING_CONFIG_KEY = "tableIndexConfig";
Expand Down Expand Up @@ -75,6 +79,9 @@ public class TableConfig extends BaseJsonConfig {
private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
private List<FieldConfig> _fieldConfigList;

@JsonPropertyDescription(value = "The update semantic of the table, either append or upsert, default as append")
private UpdateSemantic _updateSemantic;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to IngestMode or just have a boolean enableUpsert?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for real-time? We already have push type of either APPEND or REFRESH for offline; aggregateMetrics for real-time. What does UPSERT stand for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kishoreg I am thinking if using enum here will be more flexible in the case of future features changes or whatnot. Boolean seems to be only limiting two options and it might limit what other changes we might want to do with pinot. Please let me what you think. ingestionMode definitely sounds more explicit than what I am using here and I will definite consider about this.

@Jackie-Jiang yes. So basically it is previous old design about we want to support update in pinot realtime ingestion by adding override by primary key (eg, a second message in Kafka with the same primary key represents the update of existing key instead of two records with the same primary key). Upsert stands for this new ingestion semantic (insert if no duplicated primary key, update if duplicate primary key). This feature will be applying to realtime table only


/**
* NOTE: DO NOT use this constructor, use builder instead. This constructor is for deserializer only.
*/
Expand All @@ -90,6 +97,16 @@ private TableConfig(String tableName, TableType tableType, SegmentsValidationAnd
@Nullable QueryConfig queryConfig,
@Nullable Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap,
@Nullable List<FieldConfig> fieldConfigList) {
this(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, quotaConfig, taskConfig,
routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, UpdateSemantic.APPEND);
}

private TableConfig(String tableName, TableType tableType, SegmentsValidationAndRetentionConfig validationConfig,
TenantConfig tenantConfig, IndexingConfig indexingConfig, TableCustomConfig customConfig,
@Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig,
@Null QueryConfig queryConfig,
@Nullable Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap,
@Nullable List<FieldConfig> fieldConfigList, @Nullable UpdateSemantic updateSemantic) {
_tableName = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
_tableType = tableType;
_validationConfig = validationConfig;
Expand All @@ -102,6 +119,7 @@ private TableConfig(String tableName, TableType tableType, SegmentsValidationAnd
_queryConfig = queryConfig;
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
_fieldConfigList = fieldConfigList;
_updateSemantic = updateSemantic;
}

public static TableConfig fromJsonString(String jsonString)
Expand Down Expand Up @@ -156,8 +174,15 @@ public static TableConfig fromJsonConfig(JsonNode jsonConfig)
extractChildConfig(jsonConfig, FIELD_CONFIG_LIST_KEY, new TypeReference<List<FieldConfig>>() {
});

// generate update schematic
UpdateSemantic updateSemantic = UpdateSemantic.DEFAULT_SEMANTIC;
if (jsonConfig.has(UPDATE_SEMANTIC_CONFIG_KEY)) {
updateSemantic = UpdateSemantic.getUpdateSemantic(jsonConfig.get(UPDATE_SEMANTIC_CONFIG_KEY).asText());
}

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList);
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList,
updateSemantic);
}

/**
Expand Down Expand Up @@ -232,6 +257,11 @@ public ObjectNode toJsonConfig() {
jsonConfig.put(FIELD_CONFIG_LIST_KEY, JsonUtils.objectToJsonNode(_fieldConfigList));
}

if (_updateSemantic != null) {
jsonConfig.put(UPDATE_SEMANTIC_CONFIG_KEY, _updateSemantic.toString());
} else {
jsonConfig.put(UPDATE_SEMANTIC_CONFIG_KEY, UpdateSemantic.APPEND.toString());
}
return jsonConfig;
}

Expand Down Expand Up @@ -315,8 +345,11 @@ public static TableConfig fromZnRecord(ZNRecord znRecord)
});
}

UpdateSemantic updateSemantic = UpdateSemantic.getUpdateSemantic(simpleFields.get(UPDATE_SEMANTIC_CONFIG_KEY));

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList);
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList,
updateSemantic);
}

public ZNRecord toZNRecord()
Expand Down Expand Up @@ -354,6 +387,7 @@ public ZNRecord toZNRecord()
}

ZNRecord znRecord = new ZNRecord(_tableName);
simpleFields.put(UPDATE_SEMANTIC_CONFIG_KEY, _updateSemantic.toString());
jamesyfshao marked this conversation as resolved.
Show resolved Hide resolved
znRecord.setSimpleFields(simpleFields);
return znRecord;
}
Expand Down Expand Up @@ -470,6 +504,18 @@ public List<FieldConfig> getFieldConfigList() {
return _fieldConfigList;
}

public UpdateSemantic getUpdateSemantic() {
return _updateSemantic;
}

public void setUpdateSemantic(UpdateSemantic updateSemantic) {
_updateSemantic = updateSemantic;
}

public boolean isTableForUpsert() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's avoid adding additional utility methods to config classes if possible

return _updateSemantic == UpdateSemantic.UPSERT;
}

public static class Builder {
private static final String DEFAULT_SEGMENT_PUSH_TYPE = "APPEND";
private static final String REFRESH_SEGMENT_PUSH_TYPE = "REFRESH";
Expand Down Expand Up @@ -517,6 +563,8 @@ public static class Builder {
private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
private List<FieldConfig> _fieldConfigList;

private UpdateSemantic _updateSemantic;

public Builder(TableType tableType) {
_tableType = tableType;
}
Expand Down Expand Up @@ -683,6 +731,11 @@ public Builder setFieldConfigList(List<FieldConfig> fieldConfigList) {
return this;
}

public Builder setUpdateSemantic(UpdateSemantic updateSemantic) {
_updateSemantic = updateSemantic;
return this;
}

public TableConfig build() {
// Validation config
SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
Expand Down Expand Up @@ -720,14 +773,20 @@ public TableConfig build() {
_customConfig = new TableCustomConfig(null);
}

if (_updateSemantic == null) {
_updateSemantic = UpdateSemantic.DEFAULT_SEMANTIC;
}

// eventually this validation will be generic but since we are initially
// using FieldConfig only for text columns (and migrate to expand its usage
// soon after), just validate the field config list from text index creation
// perspective.
TextIndexConfigValidator.validate(_fieldConfigList, _noDictionaryColumns);

return new TableConfig(_tableName, _tableType, validationConfig, tenantConfig, indexingConfig, _customConfig,
_quotaConfig, _taskConfig, _routingConfig, _queryConfig, _instanceAssignmentConfigMap, _fieldConfigList);
_quotaConfig, _taskConfig, _routingConfig, _queryConfig, _instanceAssignmentConfigMap, _fieldConfigList,
_updateSemantic);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,19 @@ public static boolean isOfflineTableResource(String resourceName) {
public static boolean isRealtimeTableResource(String resourceName) {
return REALTIME.tableHasTypeSuffix(resourceName);
}

/**
* ensure that table name ends with type info, if no, create one with the given type
* @param tableName the name of the table
* @param type the type of the table for it to fill in if the type info is missing
* @return the table type name with the type info
*/
public static String ensureTableNameWithType(String tableName, TableType type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use TableNameBuilder.forType(type).tableNameWithType(tableName)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this method, @kishoreg this method is used in other components that I will submit PR later

for (TableType tableType: TableType.values()) {
if (tableName.endsWith(TYPE_SUFFIX_SEPARATOR + tableType.toString())) {
return tableName;
}
}
return tableName + TYPE_SUFFIX_SEPARATOR + type.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@
import com.google.common.annotations.VisibleForTesting;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.pinot.common.Utils;
jamesyfshao marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.pinot.common.request.BrokerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.request.BrokerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
*
*/
public enum BrokerGauge implements AbstractMetrics.Gauge {
QUERY_QUOTA_CAPACITY_UTILIZATION_RATE("tables", false), NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true);
QUERY_QUOTA_CAPACITY_UTILIZATION_RATE("tables", false), NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true),
TABLE_MIN_LOW_WATER_MARK("tables", false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this represent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It represents how many table pinot broker is collecting the metrics for. The exact usage of the metrics will show up in later diffs


private final String brokerGaugeName;
private final String unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),

PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true);
PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true),
LOW_WATER_MARK_QUERY_FAILURES("failures", true);

private final String brokerMeterName;
private final String unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
// Netty connection metrics
NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
NETTY_CONNECTION_BYTES_SENT("nettyConnection", true);
NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),

// upsert related metrics
MESSAGE_PRODUCE_FAILED_COUNT("failedMessage", true);

private final String meterName;
private final String unit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;

import java.util.Collections;
import java.util.Map;

@JsonIgnoreProperties(ignoreUnknown = true)
public class TableLowWaterMarksInfo {
// A mapping from table name to the table's partiton -> low_water_mark mappings.
private Map<String, Map<Integer, Long>> tableLowWaterMarks;

public TableLowWaterMarksInfo(@JsonProperty("lowWaterMarks") Map<String, Map<Integer, Long>> tableLowWaterMarks) {
this.tableLowWaterMarks = tableLowWaterMarks;
}

public void setTableLowWaterMarks(Map<String, Map<Integer, Long>> tableLowWaterMarks) {
this.tableLowWaterMarks = tableLowWaterMarks;
}

public Map<String, Map<Integer, Long>> getTableLowWaterMarks() {
return Collections.unmodifiableMap(tableLowWaterMarks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.File;

import org.apache.commons.lang.StringUtils;


public class CommonConstants {

Expand All @@ -35,9 +37,13 @@ public static class Helix {
public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
public static final String PREFIX_OF_MINION_INSTANCE = "Minion_";
public static final String PREFIX_OF_KEY_COORDINATOR_INSTANCE = "KeyCoordinator_";

public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
public static final String LEAD_CONTROLLER_RESOURCE_NAME = "leadControllerResource";
public static final String KEY_COORDINATOR_MESSAGE_RESOURCE_NAME = "keyCoordinatorMessageResource";

public static final int KEY_COORDINATOR_MESSAGE_RESOURCE_REPLICA_COUNT = 1;

public static final String LEAD_CONTROLLER_RESOURCE_ENABLED_KEY = "RESOURCE_ENABLED";
public static final String ENABLE_CASE_INSENSITIVE_PQL_KEY = "enable.case.insensitive.pql";
Expand Down Expand Up @@ -159,12 +165,17 @@ public static class Broker {
public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 100.0;
public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = "pinot.broker.enable.query.limit.override";

public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS = "pinot.broker.query.polling.server.lwms.interval.ms";
public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT = "pinot.broker.query.polling.server.lwms.port";
public static final String CONFIG_OF_BROKER_LWM_REWRITE_ENABLE = "pinot.broker.query.lwm.rewrite";
public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = true;
public static class Request {
public static final String PQL = "pql";
public static final String SQL = "sql";
public static final String TRACE = "trace";
public static final String DEBUG_OPTIONS = "debugOptions";
public static final String QUERY_OPTIONS = "queryOptions";
public static final String DISABLE_REWRITE = "disableRewrite";

public static class QueryOptionKey {
public static final String PRESERVE_TYPE = "preserveType";
Expand Down Expand Up @@ -290,6 +301,7 @@ public static class Controller {
}

public static class Minion {

public static final String CONFIG_OF_METRICS_PREFIX = "pinot.minion.";
public static final String METADATA_EVENT_OBSERVER_PREFIX = "metadata.event.notifier";

Expand All @@ -308,6 +320,23 @@ public static class Minion {
public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "crypter";
}

public static class Grigio {
jamesyfshao marked this conversation as resolved.
Show resolved Hide resolved
// config for distributed grigio components
public static final String CONFIG_OF_METRICS_PREFIX_KEY = "metrics.prefix";
public static final String DEFAULT_METRICS_PREFIX = "pinot.grigio.";

// config for server related components
public static final String PINOT_UPSERT_SERVER_COMPONENT_PREFIX = "upsert.";
}

public static class Metric {
public static class Server {
public static final String CURRENT_NUMBER_OF_SEGMENTS = "currentNumberOfSegments";
public static final String CURRENT_NUMBER_OF_DOCUMENTS = "currentNumberOfDocuments";
public static final String NUMBER_OF_DELETED_SEGMENTS = "numberOfDeletedSegments";
}
}

public static class Segment {
public static class Realtime {
public enum Status {
Expand Down Expand Up @@ -378,4 +407,23 @@ public static class BuiltInVirtualColumn {
@Deprecated
public static final String TABLE_NAME = "segment.table.name";
}

public enum UpdateSemantic {
APPEND,
jamesyfshao marked this conversation as resolved.
Show resolved Hide resolved
UPSERT;

public static UpdateSemantic DEFAULT_SEMANTIC = APPEND;

public static UpdateSemantic getUpdateSemantic(String updateSemanticStr) {
if (StringUtils.isEmpty(updateSemanticStr)) {
return DEFAULT_SEMANTIC;
}
try {
return valueOf(updateSemanticStr.toUpperCase());
} catch (Exception ex) {
return DEFAULT_SEMANTIC;
}
}
}

}