Skip to content

Commit

Permalink
Remove some unused TimeFieldSpec related methods from Schema (#5347)
Browse files Browse the repository at this point in the history
Another step towards Issue #2756. As I was making changes to Schema to treat TIME as DATE_TIME, found a subset of changes that can go prior to the bigger change.
Removing 3 methods from the Schema which are related to TimeFieldSpec
1. getIncomingTimeUnit - was unused
2. getOutgoingTimeUnit - was unused
3. getTimeColumnName - was able to remove the 4 usages
  • Loading branch information
npawar committed May 7, 2020
1 parent e46a92a commit bfd263a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.pinot.core.util.IdMap;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
Expand Down Expand Up @@ -130,7 +129,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final Collection<FieldSpec> _physicalFieldSpecs;
private final Collection<DimensionFieldSpec> _physicalDimensionFieldSpecs;
private final Collection<MetricFieldSpec> _physicalMetricFieldSpecs;
private final Collection<DateTimeFieldSpec> _physicalDateTimeFieldSpecs;
private final Collection<String> _physicalTimeColumnNames;

// default message metadata
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
Expand Down Expand Up @@ -174,7 +173,7 @@ public long getLatestIngestionTimestamp() {
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
List<DimensionFieldSpec> physicalDimensionFieldSpecs = new ArrayList<>(_schema.getDimensionNames().size());
List<MetricFieldSpec> physicalMetricFieldSpecs = new ArrayList<>(_schema.getMetricNames().size());
List<DateTimeFieldSpec> physicalDateTimeFieldSpecs = new ArrayList<>(_schema.getDateTimeFieldSpecs().size());
List<String> physicalTimeColumnNames = new ArrayList<>();

for (FieldSpec fieldSpec : allFieldSpecs) {
if (!fieldSpec.isVirtualColumn()) {
Expand All @@ -185,16 +184,17 @@ public long getLatestIngestionTimestamp() {
physicalDimensionFieldSpecs.add((DimensionFieldSpec) fieldSpec);
} else if (fieldType == FieldSpec.FieldType.METRIC) {
physicalMetricFieldSpecs.add((MetricFieldSpec) fieldSpec);
} else if (fieldType == FieldSpec.FieldType.DATE_TIME) {
physicalDateTimeFieldSpecs.add((DateTimeFieldSpec) fieldSpec);
} else if (fieldType == FieldSpec.FieldType.DATE_TIME || fieldType == FieldSpec.FieldType.TIME) {
physicalTimeColumnNames.add(fieldSpec.getName());
}
}
}
_physicalFieldSpecs = Collections.unmodifiableCollection(physicalFieldSpecs);
_physicalDimensionFieldSpecs = Collections.unmodifiableCollection(physicalDimensionFieldSpecs);
_physicalMetricFieldSpecs = Collections.unmodifiableCollection(physicalMetricFieldSpecs);
_physicalDateTimeFieldSpecs = Collections.unmodifiableCollection(physicalDateTimeFieldSpecs);
_numKeyColumns = _physicalDimensionFieldSpecs.size() + _physicalDateTimeFieldSpecs.size() + 1; // Add 1 for time column
_physicalTimeColumnNames = Collections.unmodifiableCollection(physicalTimeColumnNames);

_numKeyColumns = _physicalDimensionFieldSpecs.size() + _physicalTimeColumnNames.size();

_logger =
LoggerFactory.getLogger(MutableSegmentImpl.class.getName() + "_" + _segmentName + "_" + config.getStreamName());
Expand Down Expand Up @@ -802,13 +802,8 @@ private int getOrCreateDocId(Map<String, Object> dictIdMap) {
for (FieldSpec fieldSpec : _physicalDimensionFieldSpecs) {
dictIds[i++] = (Integer) dictIdMap.get(fieldSpec.getName());
}
for (FieldSpec fieldSpec : _physicalDateTimeFieldSpecs) {
dictIds[i++] = (Integer) dictIdMap.get(fieldSpec.getName());
}

String timeColumnName = _schema.getTimeColumnName();
if (timeColumnName != null) {
dictIds[i] = (Integer) dictIdMap.get(timeColumnName);
for (String timeColumnName : _physicalTimeColumnNames) {
dictIds[i++] = (Integer) dictIdMap.get(timeColumnName);
}
return _recordIdMap.put(new FixedIntArray(dictIds));
}
Expand Down Expand Up @@ -876,25 +871,16 @@ private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentC
}
}

// Date time columns should be dictionary encoded.
for (FieldSpec fieldSpec : _physicalDateTimeFieldSpecs) {
String dateTimeColumn = fieldSpec.getName();
if (noDictionaryColumns.contains(dateTimeColumn)) {
_logger.warn("Metrics aggregation cannot be turned ON in presence of no-dictionary datetime columns, eg: {}",
dateTimeColumn);
// Time columns should be dictionary encoded.
for (String timeColumnName : _physicalTimeColumnNames) {
if (noDictionaryColumns.contains(timeColumnName)) {
_logger.warn("Metrics aggregation cannot be turned ON in presence of no-dictionary datetime/time columns, eg: {}",
timeColumnName);
_aggregateMetrics = false;
break;
}
}

// Time column should be dictionary encoded.
String timeColumn = _schema.getTimeColumnName();
if (noDictionaryColumns.contains(timeColumn)) {
_logger
.warn("Metrics aggregation cannot be turned ON in presence of no-dictionary time column, eg: {}", timeColumn);
_aggregateMetrics = false;
}

if (!_aggregateMetrics) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;


Expand Down Expand Up @@ -111,9 +112,10 @@ private List<File> rollupSegments(Schema schema)
for (DateTimeFieldSpec dateTimeFieldSpec : schema.getDateTimeFieldSpecs()) {
groupByColumns.add(dateTimeFieldSpec.getName());
}
String timeColumn = schema.getTimeColumnName();
if (timeColumn != null) {
groupByColumns.add(timeColumn);
// TODO: once time column starts showing up as dateTimeFieldSpec (https://github.com/apache/incubator-pinot/issues/2756) below lines becomes redundant
String timeColumnName = _tableConfig.getValidationConfig().getTimeColumnName();
if (timeColumnName != null && !groupByColumns.contains(timeColumnName)) {
groupByColumns.add(timeColumnName);
}

// Initialize roll-up record transformer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.core.segment.index.loader.columnminmaxvalue;

import com.clearspring.analytics.util.Preconditions;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
Expand Down Expand Up @@ -56,32 +58,20 @@ public void addColumnMinMaxValue()
Preconditions.checkState(_columnMinMaxValueGeneratorMode != ColumnMinMaxValueGeneratorMode.NONE);

Schema schema = _segmentMetadata.getSchema();
Set<String> columnsToAddMinMaxValue = new HashSet<>(schema.getPhysicalColumnNames());

// Process time column
String timeColumnName = schema.getTimeColumnName();
if (timeColumnName != null) {
addColumnMinMaxValueForColumn(timeColumnName);
// mode ALL - use all columns
// mode NON_METRIC - use all dimensions and time columns
// mode TIME - use only time columns
switch (_columnMinMaxValueGeneratorMode) {
case TIME:
columnsToAddMinMaxValue.removeAll(schema.getDimensionNames());
// Intentionally falling through to next case
case NON_METRIC:
columnsToAddMinMaxValue.removeAll(schema.getMetricNames());
}
for (String dateTimeColumn : schema.getDateTimeNames()) {
addColumnMinMaxValueForColumn(dateTimeColumn);
}
if (_columnMinMaxValueGeneratorMode == ColumnMinMaxValueGeneratorMode.TIME) {
saveMetadata();
return;
}

// Process dimension columns
for (String dimensionColumnName : schema.getDimensionNames()) {
addColumnMinMaxValueForColumn(dimensionColumnName);
}
if (_columnMinMaxValueGeneratorMode == ColumnMinMaxValueGeneratorMode.NON_METRIC) {
saveMetadata();
return;
}

// Process metric columns
for (String metricColumnName : schema.getMetricNames()) {
addColumnMinMaxValueForColumn(metricColumnName);
for (String column : columnsToAddMinMaxValue) {
addColumnMinMaxValueForColumn(column);
}
saveMetadata();
}
Expand Down
22 changes: 5 additions & 17 deletions pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,21 +313,6 @@ public List<String> getDateTimeNames() {
return _dateTimeNames;
}

@JsonIgnore
public String getTimeColumnName() {
return (_timeFieldSpec != null) ? _timeFieldSpec.getName() : null;
}

@JsonIgnore
public TimeUnit getIncomingTimeUnit() {
return (_timeFieldSpec != null) ? _timeFieldSpec.getIncomingGranularitySpec().getTimeType() : null;
}

@JsonIgnore
public TimeUnit getOutgoingTimeUnit() {
return (_timeFieldSpec != null) ? _timeFieldSpec.getOutgoingGranularitySpec().getTimeType() : null;
}

/**
* Returns a json representation of the schema.
*/
Expand Down Expand Up @@ -673,7 +658,7 @@ static DateTimeFieldSpec convertToDateTimeFieldSpec(TimeFieldSpec timeFieldSpec)
.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString()),
"Conversion from incoming to outgoing is not supported for SIMPLE_DATE_FORMAT");
String transformFunction =
getTransformFunction(incomingName, incomingTimeSize, incomingTimeUnit, outgoingTimeSize, outgoingTimeUnit);
constructTransformFunctionString(incomingName, incomingTimeSize, incomingTimeUnit, outgoingTimeSize, outgoingTimeUnit);
dateTimeFieldSpec.setTransformFunction(transformFunction);
}

Expand All @@ -683,7 +668,10 @@ static DateTimeFieldSpec convertToDateTimeFieldSpec(TimeFieldSpec timeFieldSpec)
return dateTimeFieldSpec;
}

private static String getTransformFunction(String incomingName, int incomingTimeSize, TimeUnit incomingTimeUnit,
/**
* Constructs a transformFunction string for the time column, based on incoming and outgoing timeGranularitySpec
*/
private static String constructTransformFunctionString(String incomingName, int incomingTimeSize, TimeUnit incomingTimeUnit,
int outgoingTimeSize, TimeUnit outgoingTimeUnit) {

String innerFunction = incomingName;
Expand Down

0 comments on commit bfd263a

Please sign in to comment.