Skip to content

Commit

Permalink
Introduce DimensionTableDataManager (#6346)
Browse files Browse the repository at this point in the history
* Add DimensionTableData manager

* Address review comments.

* CLose reader after using.

* Revisit javadocs.

* Release segment after use.

* Touch up instance instantiation.

* Cleanup segment in test.

* Release segments in "finally" block.

* Update logs.

* Add TableConfig validations for Dim Tables.

* Seperate IngestionConfigTests for dim tables.

* Remove defensive null checks.

* Fix github action profile name.

* Fix ingestionTest dependencies.

* Undo the gihub-actions mvn profile name fix.
  • Loading branch information
cbalci committed Dec 22, 2020
1 parent 7580ae4 commit 33de6dc
Show file tree
Hide file tree
Showing 11 changed files with 649 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class SegmentDataManager {
private int _referenceCount = 1;

@VisibleForTesting
synchronized int getReferenceCount() {
public synchronized int getReferenceCount() {
return _referenceCount;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.data.manager.offline;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;


/**
* Dimension Table is a special type of OFFLINE table which is assigned to all servers
* in a tenant and is used to execute a LOOKUP Transform Function. DimensionTableDataManager
* loads the contents into a HashMap for faster access thus the size should be small
* enough to easily fit in memory.
*
* DimensionTableDataManager uses Registry of Singletons pattern to store one instance per table
* which can be accessed via {@link #getInstanceByTableName} static method.
*/
@ThreadSafe
public class DimensionTableDataManager extends OfflineTableDataManager {
// Storing singletons per table in a HashMap
private static final Map<String, DimensionTableDataManager> _instances = new ConcurrentHashMap<>();

private DimensionTableDataManager() {
}

/**
* `createInstanceByTableName` should only be used by the {@link TableDataManagerProvider} and the returned
* instance should be properly initialized via {@link #init} method before using.
*/
public static DimensionTableDataManager createInstanceByTableName(String tableNameWithType) {
return _instances.computeIfAbsent(tableNameWithType, k -> new DimensionTableDataManager());
}

public static DimensionTableDataManager getInstanceByTableName(String tableNameWithType) {
return _instances.get(tableNameWithType);
}

/**
* Instance properties/methods
*/
private final ReadWriteLock _rwl = new ReentrantReadWriteLock();
private final Lock _lookupTableReadLock = _rwl.readLock();
private final Lock _lookupTableWriteLock = _rwl.writeLock();

// _lookupTable is a HashMap used for storing/serving records for a table keyed by table PK
@GuardedBy("_rwl")
private final Map<PrimaryKey, GenericRow> _lookupTable = new HashMap<>();
private Schema _tableSchema;
private List<String> _primaryKeyColumns;

@Override
protected void doInit() {
super.doInit();

// dimension tables should always have schemas with primary keys
_tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
_primaryKeyColumns = _tableSchema.getPrimaryKeyColumns();
}

@Override
public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
throws Exception {
super.addSegment(indexDir, indexLoadingConfig);
try {
loadLookupTable();
_logger.info("Successfully added segment {} and loaded lookup table: {}", indexDir.getName(), getTableName());
} catch (Exception e) {
throw new RuntimeException(String.format("Error loading lookup table: %s", getTableName()), e);
}
}

@Override
public void removeSegment(String segmentName) {
super.removeSegment(segmentName);
try {
loadLookupTable();
_logger.info("Successfully removed segment {} and reloaded lookup table: {}", segmentName, getTableName());
} catch (Exception e) {
throw new RuntimeException(String
.format("Error reloading lookup table after segment remove ({}) for table: {}", segmentName, getTableName()),
e);
}
}

/**
* `loadLookupTable()` reads contents of the DimensionTable into _lookupTable HashMap for fast lookup.
*/
private void loadLookupTable()
throws Exception {
_lookupTableWriteLock.lock();
try {
_lookupTable.clear();
List<SegmentDataManager> segmentManagers = acquireAllSegments();
if (segmentManagers.size() == 0) {
return;
}

try {
for (SegmentDataManager segmentManager : segmentManagers) {
IndexSegment indexSegment = segmentManager.getSegment();
try (PinotSegmentRecordReader reader = new PinotSegmentRecordReader(
indexSegment.getSegmentMetadata().getIndexDir())) {
while (reader.hasNext()) {
GenericRow row = reader.next();
_lookupTable.put(row.getPrimaryKey(_primaryKeyColumns), row);
}
}
}
} finally {
for (SegmentDataManager segmentManager : segmentManagers) {
releaseSegment(segmentManager);
}
}
} finally {
_lookupTableWriteLock.unlock();
}
}

public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {

This comment has been minimized.

Copy link
@kishoreg

kishoreg Dec 23, 2020

Member

why is this method part of OfflineTableDataManager. This should be modeled as part of segment interface right?

This comment has been minimized.

Copy link
@cbalci

cbalci Dec 29, 2020

Author Contributor

Hi Kishore! This method is exposed to be used by the LookupTransformFunction as proposed in #6383. Please take a look at the usage here.

In this implementation, DataManager loads all of the segments into heap and makes it available for 'lookup' through this method. So in this sense, this is not a lookup operation for a single segment but the whole table. Can you elaborate on why you think the segment interface makes more sense?

_lookupTableReadLock.lock();
try {
return _lookupTable.get(pk);
} finally {
_lookupTableReadLock.unlock();
}
}

public FieldSpec getColumnFieldSpec(String columnName) {
return _tableSchema.getFieldSpecFor(columnName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ public static TableDataManager getTableDataManager(@Nonnull TableDataManagerConf
TableDataManager tableDataManager;
switch (TableType.valueOf(tableDataManagerConfig.getTableDataManagerType())) {
case OFFLINE:
tableDataManager = new OfflineTableDataManager();
if (tableDataManagerConfig.isDimTable()) {
tableDataManager =
DimensionTableDataManager.createInstanceByTableName(tableDataManagerConfig.getTableName());
} else {
tableDataManager = new OfflineTableDataManager();
}
break;
case REALTIME:
tableDataManager = new RealtimeTableDataManager(_segmentBuildSemaphore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
Expand Down Expand Up @@ -145,6 +146,7 @@ public static void validateRetentionConfig(TableConfig tableConfig) {
*
* 2. For OFFLINE table
* - checks for valid field spec for timeColumnName in schema, if timeColumnName and schema are non-null
* - for Dimension tables checks the primary key requirement
*
* 3. Checks peerDownloadSchema
*/
Expand All @@ -161,6 +163,12 @@ private static void validateValidationConfig(TableConfig tableConfig, @Nullable
"Cannot find valid fieldSpec for timeColumn: %s from the table config: %s, in the schema: %s", timeColumnName,
tableConfig.getTableName(), schema.getSchemaName());
}
if (tableConfig.isDimTable()) {
Preconditions.checkState(tableConfig.getTableType() == TableType.OFFLINE,
"Dimension table must be of OFFLINE table type.");
Preconditions.checkState(schema != null, "Dimension table must have an associated schema");
Preconditions.checkState(schema.getPrimaryKeyColumns().size() > 0, "Dimension table must have primary key[s]");
}

String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
if (peerSegmentDownloadScheme != null) {
Expand All @@ -181,6 +189,7 @@ private static void validateValidationConfig(TableConfig tableConfig, @Nullable
* 3. checks for null column name or transform function in transform config
* 4. validity of transform function string
* 5. checks for source fields used in destination columns
* 6. ingestion type for dimension tables
*/
public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Schema schema) {
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
Expand All @@ -190,7 +199,8 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc

// Batch
if (ingestionConfig.getBatchIngestionConfig() != null) {
List<Map<String, String>> batchConfigMaps = ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps();
BatchIngestionConfig cfg = ingestionConfig.getBatchIngestionConfig();
List<Map<String, String>> batchConfigMaps = cfg.getBatchConfigMaps();
try {
if (CollectionUtils.isNotEmpty(batchConfigMaps)) {
// Validate that BatchConfig can be created
Expand All @@ -199,6 +209,14 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
} catch (Exception e) {
throw new IllegalStateException("Could not create BatchConfig using the batchConfig map", e);
}
if (tableConfig.isDimTable()) {
Preconditions.checkState(cfg.getSegmentIngestionType().equalsIgnoreCase("REFRESH"),
"Dimension tables must have segment ingestion type REFRESH");
}
}
if (tableConfig.isDimTable()){
Preconditions.checkState(ingestionConfig.getBatchIngestionConfig() != null,
"Dimension tables must have batch ingestion configuration");
}

// Stream
Expand Down
Loading

0 comments on commit 33de6dc

Please sign in to comment.