Skip to content

Commit

Permalink
Schema evolution: Default values for newly added columns
Browse files Browse the repository at this point in the history
Added functionality of adding new columns with default value in DefaultColumnHandler.
Let DefaultColumnHandler take a schema argument to control the newily added columns.
The schema is passed down from the Helix property store.
Inside SegmentMetadataImpl, add methods to modify and store the segment metadata.
Support ADD, UPDATE, REMOVE operation on newly added columns.
Support segment format v1 and v3.
The DefaultColumnHandler is constructed inside SegmentPreProcessor.
Added unit test and integration test for all types of newly added columns.
  • Loading branch information
Jackie-Jiang committed Aug 15, 2016
1 parent 5ef8277 commit a3514c3
Show file tree
Hide file tree
Showing 38 changed files with 2,501 additions and 687 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface DataManager {

void start();

void addSegment(SegmentMetadata segmentMetadata, AbstractTableConfig tableConfig) throws Exception;
void addSegment(SegmentMetadata segmentMetadata, AbstractTableConfig tableConfig, Schema schema) throws Exception;

void removeSegment(String segmentName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,26 @@
*/
package com.linkedin.pinot.core.data.manager.offline;

import com.linkedin.pinot.common.Utils;
import com.linkedin.pinot.common.config.AbstractTableConfig;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadata;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.segment.SegmentMetadataLoader;
import com.linkedin.pinot.core.data.manager.config.FileBasedInstanceDataManagerConfig;
import com.linkedin.pinot.core.data.manager.config.TableDataManagerConfig;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linkedin.pinot.common.Utils;
import com.linkedin.pinot.common.config.AbstractTableConfig;
import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadata;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.segment.SegmentMetadataLoader;
import com.linkedin.pinot.core.data.manager.config.FileBasedInstanceDataManagerConfig;
import com.linkedin.pinot.core.data.manager.config.TableDataManagerConfig;


/**
* InstanceDataManager is the top level DataManger, Singleton.
Expand Down Expand Up @@ -125,7 +124,7 @@ private void bootstrapSegmentsFromSegmentDir() throws Exception {
File bootstrapSegmentDir = new File(_instanceDataManagerConfig.getInstanceBootstrapSegmentDir());
if (bootstrapSegmentDir.exists()) {
for (File segment : bootstrapSegmentDir.listFiles()) {
addSegment(_segmentMetadataLoader.load(segment), null);
addSegment(_segmentMetadataLoader.load(segment), null, null);
LOGGER.info("Bootstrapped segment from directory : " + segment.getAbsolutePath());
}
} else {
Expand Down Expand Up @@ -171,11 +170,12 @@ public synchronized void shutDown() {
}

@Override
public synchronized void addSegment(SegmentMetadata segmentMetadata, AbstractTableConfig tableConfig) throws Exception {
public synchronized void addSegment(SegmentMetadata segmentMetadata, AbstractTableConfig tableConfig, Schema schema)
throws Exception {
String tableName = segmentMetadata.getTableName();
LOGGER.info("Trying to add segment : " + segmentMetadata.getName());
if (_tableDataManagerMap.containsKey(tableName)) {
_tableDataManagerMap.get(tableName).addSegment(segmentMetadata);
_tableDataManagerMap.get(tableName).addSegment(segmentMetadata, schema);
LOGGER.info("Added a segment : " + segmentMetadata.getName() + " to table : " + tableName);
} else {
LOGGER.error("InstanceDataManager doesn't contain the assigned table for segment : "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
*/
package com.linkedin.pinot.core.data.manager.offline;

import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.slf4j.LoggerFactory;
import com.linkedin.pinot.common.config.AbstractTableConfig;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadata;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.core.indexsegment.IndexSegment;
import com.linkedin.pinot.core.indexsegment.columnar.ColumnarSegmentLoader;
import java.io.File;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -44,15 +46,17 @@ protected void doInit() {
}

@Override
public void addSegment(SegmentMetadata segmentMetadata) throws Exception {
IndexSegment indexSegment =
ColumnarSegmentLoader.loadSegment(segmentMetadata, _readMode, _indexLoadingConfigMetadata);
public void addSegment(SegmentMetadata segmentMetadata, Schema schema)
throws Exception {
IndexSegment indexSegment = ColumnarSegmentLoader.loadSegment(new File(segmentMetadata.getIndexDir()), _readMode,
_indexLoadingConfigMetadata, schema);
addSegment(indexSegment);
}

@Override
public void addSegment(ZkHelixPropertyStore<ZNRecord> propertyStore, AbstractTableConfig tableConfig,
InstanceZKMetadata instanceZKMetadata, SegmentZKMetadata segmentZKMetadata) throws Exception {
InstanceZKMetadata instanceZKMetadata, SegmentZKMetadata segmentZKMetadata)
throws Exception {
throw new UnsupportedOperationException("Not supported for Offline segments");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import com.linkedin.pinot.common.config.AbstractTableConfig;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadata;
import com.linkedin.pinot.common.metrics.ServerMetrics;
Expand Down Expand Up @@ -62,9 +63,10 @@ public interface TableDataManager {
* Adding a Segment into the TableDataManager by given SegmentMetadata.
*
* @param segmentMetaToAdd
* @param schema
* @throws Exception
*/
void addSegment(SegmentMetadata segmentMetaToAdd) throws Exception;
void addSegment(SegmentMetadata segmentMetaToAdd, Schema schema) throws Exception;

/**
* Adding a Segment into the TableDataManager by given DataTableZKMetadata, InstanceZKMetadata, SegmentZKMetadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@
*/
package com.linkedin.pinot.core.data.manager.realtime;

import com.linkedin.pinot.common.utils.SegmentName;
import java.io.File;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.slf4j.LoggerFactory;
import com.linkedin.pinot.common.config.AbstractTableConfig;
import com.linkedin.pinot.common.config.IndexingConfig;
import com.linkedin.pinot.common.data.FieldSpec;
Expand All @@ -34,12 +26,20 @@
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import com.linkedin.pinot.common.utils.NamedThreadFactory;
import com.linkedin.pinot.common.utils.SegmentName;
import com.linkedin.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
import com.linkedin.pinot.core.data.manager.offline.AbstractTableDataManager;
import com.linkedin.pinot.core.data.manager.offline.SegmentDataManager;
import com.linkedin.pinot.core.indexsegment.IndexSegment;
import com.linkedin.pinot.core.indexsegment.columnar.ColumnarSegmentLoader;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaConsumerManager;
import java.io.File;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.slf4j.LoggerFactory;


// TODO Use the refcnt object inside SegmentDataManager
Expand Down Expand Up @@ -159,7 +159,7 @@ public void addSegment(ZkHelixPropertyStore<ZNRecord> propertyStore, AbstractTab
* 2. Validate the schema itself
*
* We allow the user to specify multiple sorted columns, but only consider the first one for now.
* (secondary sort is not yet implemnented).
* (secondary sort is not yet implemented).
*
* If we add more validations, it may make sense to split this method into multiple validation methods.
* But then, we are trying to figure out all the invalid cases before we return from this method...
Expand Down Expand Up @@ -192,9 +192,11 @@ private boolean isValid(Schema schema, IndexingConfig indexingConfig) {
}

@Override
public void addSegment(SegmentMetadata segmentMetaToAdd) throws Exception {
throw new UnsupportedOperationException("Not supported addSegment(SegmentMetadata) in RealtimeTableDataManager"
+ segmentMetaToAdd.getName() + "," + segmentMetaToAdd.getTableName());
public void addSegment(SegmentMetadata segmentMetaToAdd, Schema schema)
throws Exception {
throw new UnsupportedOperationException(
"Unsupported addSegment(SegmentMetadata, Schema) in RealtimeTableDataManager for table: "
+ segmentMetaToAdd.getTableName() + " segment: " + segmentMetaToAdd.getName());
}

private void markSegmentAsLoaded(String segmentId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.linkedin.pinot.common.data.TimeGranularitySpec;
import com.linkedin.pinot.common.segment.ReadMode;
import com.linkedin.pinot.core.data.GenericRow;
import com.linkedin.pinot.core.data.readers.BaseRecordReader;
import com.linkedin.pinot.core.io.reader.SingleColumnMultiValueReader;
import com.linkedin.pinot.core.io.reader.SingleColumnSingleValueReader;
import com.linkedin.pinot.core.io.reader.impl.FixedByteSingleValueMultiColReader;
Expand Down Expand Up @@ -218,9 +217,9 @@ public Schema getSchema() {
fieldSpec = new MetricFieldSpec(columnName, dataType);
break;
case TIME:
TimeUnit timeType = columnMetadata.getTimeunit();
TimeGranularitySpec incominGranularitySpec = new TimeGranularitySpec(dataType, timeType, columnName);
fieldSpec = new TimeFieldSpec(incominGranularitySpec);
TimeUnit timeType = columnMetadata.getTimeUnit();
TimeGranularitySpec incomingGranularitySpec = new TimeGranularitySpec(dataType, timeType, columnName);
fieldSpec = new TimeFieldSpec(incomingGranularitySpec);
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,38 @@
*/
package com.linkedin.pinot.core.indexsegment.columnar;

import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.metadata.segment.IndexLoadingConfigMetadata;
import com.linkedin.pinot.common.segment.ReadMode;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.core.indexsegment.IndexSegment;
import com.linkedin.pinot.core.segment.index.loader.Loaders;
import java.io.File;


public class ColumnarSegmentLoader {
public static IndexSegment load(File indexDir, ReadMode mode) throws Exception {
return load(indexDir, mode, null);
private ColumnarSegmentLoader() {
}

public static IndexSegment load(File indexDir, ReadMode mode, IndexLoadingConfigMetadata indexLoadingConfigMetadata) throws Exception {
switch (mode) {
case heap:
return loadSegment(indexDir, ReadMode.heap, indexLoadingConfigMetadata);
case mmap:
return loadSegment(indexDir, ReadMode.mmap, indexLoadingConfigMetadata);
}
return null;
public static IndexSegment load(File indexDir, ReadMode readMode)
throws Exception {
return Loaders.IndexSegment.load(indexDir, readMode, null, null);
}

public static IndexSegment loadSegment(SegmentMetadata segmentMetadata, ReadMode readMode, IndexLoadingConfigMetadata indexLoadingConfigMetadata) throws Exception {
return loadSegment(new File(segmentMetadata.getIndexDir()), readMode, indexLoadingConfigMetadata);
public static IndexSegment load(File indexDir, ReadMode readMode,
IndexLoadingConfigMetadata indexLoadingConfigMetadata)
throws Exception {
return Loaders.IndexSegment.load(indexDir, readMode, indexLoadingConfigMetadata, null);
}

public static IndexSegment loadSegment(File indexDir, ReadMode readMode, IndexLoadingConfigMetadata indexLoadingConfigMetadata) throws Exception {
return Loaders.IndexSegment.load(indexDir, readMode, indexLoadingConfigMetadata);
public static IndexSegment loadSegment(File indexDir, ReadMode readMode,
IndexLoadingConfigMetadata indexLoadingConfigMetadata)
throws Exception {
return Loaders.IndexSegment.load(indexDir, readMode, indexLoadingConfigMetadata, null);
}

public static IndexSegment loadSegment(File indexDir, ReadMode readMode,
IndexLoadingConfigMetadata indexLoadingConfigMetadata, Schema schema)
throws Exception {
return Loaders.IndexSegment.load(indexDir, readMode, indexLoadingConfigMetadata, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public boolean isSorted() {

@Override
public boolean hasInvertedIndex() {
return columnMetadata.isHasInvertedIndex();
return columnMetadata.hasInvertedIndex();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.linkedin.pinot.core.common.BlockValSet;
import com.linkedin.pinot.core.common.Predicate;
import com.linkedin.pinot.core.io.reader.SingleColumnMultiValueReader;
import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
import com.linkedin.pinot.core.operator.docvalsets.MultiValueSet;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
import com.linkedin.pinot.core.segment.index.readers.ImmutableDictionaryReader;
Expand Down Expand Up @@ -53,7 +52,7 @@ public boolean hasDictionary() {
}

public boolean hasInvertedIndex() {
return columnMetadata.isHasInvertedIndex();
return columnMetadata.hasInvertedIndex();
}

public boolean isSingleValued() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
import org.apache.commons.lang3.ArrayUtils;


/**
*/

public class ColumnIndexCreationInfo {
private final boolean createDictionary;
private final Object min;
Expand All @@ -31,41 +28,25 @@ public class ColumnIndexCreationInfo {
private boolean isSorted;
private final boolean hasNulls;
private final int totalNumberOfEntries;
private final int maxNumberOfMutiValueElements;

private final int maxNumberOfMultiValueElements;
private final boolean isAutoGenerated;
private final Object defaultNullValue;

public ColumnIndexCreationInfo(boolean createDictionary, Object min, Object max, Object sortedArray, ForwardIndexType forwardIndexType,
InvertedIndexType invertedIndexType, boolean isSortedColumn, boolean hasNulls) {
public ColumnIndexCreationInfo(boolean createDictionary, Object min, Object max, Object sortedUniqueElementsArray,
ForwardIndexType forwardIndexType, InvertedIndexType invertedIndexType, boolean isSorted, boolean hasNulls,
int totalNumberOfEntries, int maxNumberOfMultiValueElements, boolean isAutoGenerated, Object defaultNullValue) {
this.createDictionary = createDictionary;
this.min = min;
this.max = max;
sortedUniqueElementsArray = sortedArray;
this.sortedUniqueElementsArray = sortedUniqueElementsArray;
this.forwardIndexType = forwardIndexType;
this.invertedIndexType = invertedIndexType;
isSorted = isSortedColumn;
this.hasNulls = hasNulls;
totalNumberOfEntries = 0;
maxNumberOfMutiValueElements = 0;
}

public ColumnIndexCreationInfo(boolean createDictionary, Object min, Object max, Object sortedArray, ForwardIndexType forwardIndexType,
InvertedIndexType invertedIndexType, boolean isSortedColumn, boolean hasNulls, int totalNumberOfEntries,
int maxNumberOfMultiValueElements) {
this.createDictionary = createDictionary;
this.min = min;
this.max = max;
sortedUniqueElementsArray = sortedArray;
this.forwardIndexType = forwardIndexType;
this.invertedIndexType = invertedIndexType;
isSorted = isSortedColumn;
this.isSorted = isSorted;
this.hasNulls = hasNulls;
this.totalNumberOfEntries = totalNumberOfEntries;
maxNumberOfMutiValueElements = maxNumberOfMultiValueElements;

}

public int getMaxNumberOfMutiValueElements() {
return maxNumberOfMutiValueElements;
this.maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
this.isAutoGenerated = isAutoGenerated;
this.defaultNullValue = defaultNullValue;
}

public boolean isCreateDictionary() {
Expand All @@ -84,6 +65,10 @@ public Object getSortedUniqueElementsArray() {
return sortedUniqueElementsArray;
}

public int getDistinctValueCount() {
return ArrayUtils.getLength(sortedUniqueElementsArray);
}

public ForwardIndexType getForwardIndexType() {
return forwardIndexType;
}
Expand All @@ -108,7 +93,15 @@ public int getTotalNumberOfEntries() {
return totalNumberOfEntries;
}

public int getDistinctValueCount() {
return ArrayUtils.getLength(sortedUniqueElementsArray);
public int getMaxNumberOfMultiValueElements() {
return maxNumberOfMultiValueElements;
}

public boolean isAutoGenerated() {
return isAutoGenerated;
}

public Object getDefaultNullValue() {
return defaultNullValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
*/

public enum InvertedIndexType {
P4_DELTA, ROARING_BITMAPS
P4_DELTA, ROARING_BITMAPS, SORTED_INDEX
}

0 comments on commit a3514c3

Please sign in to comment.