Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
Expand Down Expand Up @@ -93,8 +94,6 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {

private final String _sortedColumn;
private final List<String> _invertedIndexColumns;
private final List<String> _noDictionaryColumns;
private final List<String> _varLengthDictionaryColumns;
private final Logger _segmentLogger;
private final SegmentVersion _segmentVersion;

Expand Down Expand Up @@ -150,18 +149,12 @@ public HLRealtimeSegmentDataManager(final SegmentZKMetadata segmentZKMetadata, f
invertedIndexColumns.add(_sortedColumn);
}
_invertedIndexColumns = new ArrayList<>(invertedIndexColumns);

// No DictionaryColumns
_noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());

_varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());

_streamConfig = new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(tableConfig));

_segmentLogger = LoggerFactory.getLogger(
HLRealtimeSegmentDataManager.class.getName() + "_" + _segmentName + "_" + _streamConfig.getTopicName());
_segmentLogger.info("Created segment data manager with Sorted column:{}, invertedIndexColumns:{}", _sortedColumn,
_invertedIndexColumns);
invertedIndexColumns);

_segmentEndTimeThreshold = _start + _streamConfig.getFlushThresholdTimeMillis();
_resourceTmpDir = new File(resourceDataDir, "_tmp");
Expand Down Expand Up @@ -283,12 +276,15 @@ public void run() {
updateCurrentDocumentCountMetrics();
_segmentLogger.info("Indexed {} raw events", _realtimeSegment.getNumDocsIndexed());
File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + System.currentTimeMillis());
ColumnIndicesForRealtimeTable columnIndicesForRealtimeTable =
new ColumnIndicesForRealtimeTable(_sortedColumn, _invertedIndexColumns, Collections.emptyList(),
Collections.emptyList(), new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()),
new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns()));
// lets convert the segment now
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, null, tempSegmentFolder.getAbsolutePath(),
schema, _tableNameWithType, tableConfig, segmentZKMetadata.getSegmentName(), _sortedColumn,
_invertedIndexColumns, Collections.emptyList(), Collections.emptyList(), _noDictionaryColumns,
_varLengthDictionaryColumns, indexingConfig.isNullHandlingEnabled());
schema, _tableNameWithType, tableConfig, segmentZKMetadata.getSegmentName(),
columnIndicesForRealtimeTable, indexingConfig.isNullHandlingEnabled());

_segmentLogger.info("Trying to build segment");
final long buildStartTime = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
Expand All @@ -48,6 +49,7 @@
import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
Expand Down Expand Up @@ -260,15 +262,10 @@ public void deleteSegmentFile() {
private final LLCSegmentName _llcSegmentName;
private final TransformPipeline _transformPipeline;
private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _streamMetadataProvider = null;
private StreamMetadataProvider _partitionMetadataProvider = null;
private final File _resourceTmpDir;
private final String _tableNameWithType;
private final List<String> _invertedIndexColumns;
private final List<String> _textIndexColumns;
private final List<String> _fstIndexColumns;
private final List<String> _noDictionaryColumns;
private final List<String> _varLengthDictionaryColumns;
private final String _sortedColumn;
private final ColumnIndicesForRealtimeTable _columnIndicesForRealtimeTable;
private final Logger _segmentLogger;
private final String _tableStreamName;
private final PinotDataBufferMemoryManager _memoryManager;
Expand All @@ -288,7 +285,7 @@ public void deleteSegmentFile() {
private final SegmentCommitterFactory _segmentCommitterFactory;
private final ConsumptionRateLimiter _rateLimiter;

private volatile StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime = null;
private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;

// TODO each time this method is called, we print reason for stop. Good to print only once.
private boolean endCriteriaReached() {
Expand Down Expand Up @@ -870,9 +867,8 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
// lets convert the segment now
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, segmentZKPropsConfig, tempSegmentFolder.getAbsolutePath(),
_schema, _tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(), _sortedColumn,
_invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _noDictionaryColumns,
_varLengthDictionaryColumns, _nullHandlingEnabled);
_schema, _tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(),
_columnIndicesForRealtimeTable, _nullHandlingEnabled);
_segmentLogger.info("Trying to build segment");
try {
converter.build(_segmentVersion, _serverMetrics);
Expand Down Expand Up @@ -1019,7 +1015,7 @@ protected boolean buildSegmentAndReplace() {

private void closeStreamConsumers() {
closePartitionGroupConsumer();
closeStreamMetadataProvider();
closePartitionMetadataProvider();
if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
_partitionGroupConsumerSemaphore.release();
}
Expand All @@ -1033,11 +1029,13 @@ private void closePartitionGroupConsumer() {
}
}

private void closeStreamMetadataProvider() {
try {
_streamMetadataProvider.close();
} catch (Exception e) {
_segmentLogger.warn("Could not close stream metadata provider", e);
private void closePartitionMetadataProvider() {
if (_partitionMetadataProvider != null) {
try {
_partitionMetadataProvider.close();
} catch (Exception e) {
_segmentLogger.warn("Could not close stream metadata provider", e);
}
}
}

Expand Down Expand Up @@ -1271,8 +1269,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
_partitionLevelStreamConfig =
new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig));
_streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
_streamPartitionMsgOffsetFactory =
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
_streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory();
String streamTopic = _partitionLevelStreamConfig.getTopicName();
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_llcSegmentName = llcSegmentName;
Expand All @@ -1296,38 +1293,32 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
.createRateLimiter(_partitionLevelStreamConfig, _tableNameWithType, _serverMetrics, _metricKeyName);

List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
String sortedColumn;
if (sortedColumns.isEmpty()) {
_segmentLogger.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}",
_llcSegmentName);
_sortedColumn = null;
sortedColumn = null;
} else {
String firstSortedColumn = sortedColumns.get(0);
if (_schema.hasColumn(firstSortedColumn)) {
_segmentLogger.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}",
firstSortedColumn, _llcSegmentName);
_sortedColumn = firstSortedColumn;
sortedColumn = firstSortedColumn;
} else {
_segmentLogger
.warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.",
firstSortedColumn, _llcSegmentName);
_sortedColumn = null;
sortedColumn = null;
}
}

// Inverted index columns
Set<String> invertedIndexColumns = indexLoadingConfig.getInvertedIndexColumns();
// We need to add sorted column into inverted index columns because when we convert realtime in memory segment into
// offline segment, we use sorted column's inverted index to maintain the order of the records so that the records
// are sorted on the sorted column.
if (_sortedColumn != null) {
invertedIndexColumns.add(_sortedColumn);
if (sortedColumn != null) {
invertedIndexColumns.add(sortedColumn);
}
_invertedIndexColumns = new ArrayList<>(invertedIndexColumns);

// No dictionary Columns
_noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());

_varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());

// Read the max number of rows
int segmentMaxRowCount = _partitionLevelStreamConfig.getFlushThresholdRows();
Expand All @@ -1341,11 +1332,12 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo

_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();

Set<String> textIndexColumns = indexLoadingConfig.getTextIndexColumns();
_textIndexColumns = new ArrayList<>(textIndexColumns);

Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns();
_fstIndexColumns = new ArrayList<>(fstIndexColumns);
_columnIndicesForRealtimeTable = new ColumnIndicesForRealtimeTable(sortedColumn,
new ArrayList<>(invertedIndexColumns),
new ArrayList<>(indexLoadingConfig.getTextIndexColumns()),
new ArrayList<>(indexLoadingConfig.getFSTIndexColumns()),
new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()),
new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns()));

// Start new realtime segment
String consumerDir = realtimeTableDataManager.getConsumerDir();
Expand All @@ -1355,8 +1347,9 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
.setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
.setFSTIndexColumns(fstIndexColumns).setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
.setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(indexLoadingConfig.getTextIndexColumns())
.setFSTIndexColumns(indexLoadingConfig.getFSTIndexColumns())
.setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
.setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
Expand Down Expand Up @@ -1389,7 +1382,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
_startOffset = _partitionGroupConsumptionStatus.getStartOffset();
_currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
makeStreamConsumer("Starting");
makeStreamMetadataProvider("Starting");
createPartitionMetadataProvider("Starting");
setPartitionParameters(realtimeSegmentConfigBuilder, indexingConfig.getSegmentPartitionConfig());
_realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
_resourceTmpDir = new File(resourceDataDir, "_tmp");
Expand Down Expand Up @@ -1435,10 +1428,13 @@ private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long now) {
}

public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
try (StreamMetadataProvider metadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_partitionGroupId)) {
return metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs);
} catch (Exception e) {
if (_partitionMetadataProvider == null) {
createPartitionMetadataProvider("Fetch latest stream offset");
}
try {
return _partitionMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. No need to instantiate another partitionLevelMetadataProvider here.

maxWaitTimeMs);
} catch (TimeoutException e) {
_segmentLogger.warn(
"Cannot fetch latest stream offset for clientId {} and partitionGroupId {} with maxWaitTime {}", _clientId,
_partitionGroupId, maxWaitTimeMs);
Expand Down Expand Up @@ -1474,7 +1470,7 @@ private void setPartitionParameters(RealtimeSegmentConfig.Builder realtimeSegmen
// However this is not an issue for Kafka, since partitionGroups never expire and every partitionGroup has
// a single partition
// Fix this before opening support for partitioning in Kinesis
int numPartitionGroups = _streamMetadataProvider
int numPartitionGroups = _partitionMetadataProvider
.computePartitionGroupMetadata(_clientId, _partitionLevelStreamConfig,
Collections.emptyList(), /*maxWaitTimeMs=*/5000).size();

Expand All @@ -1488,7 +1484,7 @@ private void setPartitionParameters(RealtimeSegmentConfig.Builder realtimeSegmen
} catch (Exception e) {
_segmentLogger.warn("Failed to get number of stream partitions in 5s, "
+ "using number of partitions in the partition config: {}", numPartitions, e);
makeStreamMetadataProvider("Timeout getting number of stream partitions");
createPartitionMetadataProvider("Timeout getting number of stream partitions");
}

realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
Expand Down Expand Up @@ -1530,12 +1526,10 @@ private void recreateStreamConsumer(String reason) {
/**
* Creates a new stream metadata provider
*/
private void makeStreamMetadataProvider(String reason) {
if (_streamMetadataProvider != null) {
closeStreamMetadataProvider();
}
_segmentLogger.info("Creating new stream metadata provider, reason: {}", reason);
_streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(_clientId);
private void createPartitionMetadataProvider(String reason) {
closePartitionMetadataProvider();
_segmentLogger.info("Creating new partition metadata provider, reason: {}", reason);
_partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId);
}

// This should be done during commit? We may not always commit when we build a segment....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
Expand Down Expand Up @@ -122,11 +123,12 @@ public void testNoRecordsIndexed()
SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
segmentZKPropsConfig.setStartOffset("1");
segmentZKPropsConfig.setEndOffset("100");
ColumnIndicesForRealtimeTable cdc = new ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
indexingConfig.getInvertedIndexColumns(), null, null,
indexingConfig.getNoDictionaryColumns(), indexingConfig.getVarLengthDictionaryColumns());
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, outputDir.getAbsolutePath(), schema,
tableNameWithType, tableConfig, segmentName, indexingConfig.getSortedColumn().get(0),
indexingConfig.getInvertedIndexColumns(), null, null, indexingConfig.getNoDictionaryColumns(),
indexingConfig.getVarLengthDictionaryColumns(), false);
tableNameWithType, tableConfig, segmentName, cdc, false);

converter.build(SegmentVersion.v3, null);
SegmentMetadataImpl metadata = new SegmentMetadataImpl(new File(outputDir, segmentName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tab

@Override
public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
throw new UnsupportedOperationException();
return new KinesisStreamMetadataProvider(clientId, _streamConfig);
}

@Override
Expand Down
Loading