Skip to content

Commit

Permalink
hardcode kinesis in TableConfigUtils to avoid pulling pinot-kinesis m…
Browse files Browse the repository at this point in the history
…odule as dependency
  • Loading branch information
mqliang committed Jan 31, 2022
1 parent ac8f7e7 commit 4fa66df
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 19 deletions.
Expand Up @@ -39,7 +39,6 @@
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.parsers.QueryCompiler;
import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
Expand Down Expand Up @@ -98,6 +97,10 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String SDF_QUERY_5 =
"SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";

// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";

private ZkClient _zkClient;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;

Expand Down Expand Up @@ -225,16 +228,16 @@ public void testEnablingEmptySegmentPruner() {

// When indexingConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
when(indexingConfig.getStreamConfigs()).thenReturn(
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KinesisConfig.STREAM_TYPE));
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);

// When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList(
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KinesisConfig.STREAM_TYPE)));
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
when(indexingConfig.getStreamConfigs()).thenReturn(
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KinesisConfig.STREAM_TYPE));
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
Expand Down
10 changes: 0 additions & 10 deletions pinot-segment-local/pom.xml
Expand Up @@ -76,10 +76,6 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-segment-spi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-kinesis</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.larray</groupId>
<artifactId>larray-mmap</artifactId>
Expand Down Expand Up @@ -170,11 +166,5 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-kinesis</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -36,7 +36,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
Expand Down Expand Up @@ -86,6 +85,10 @@ private TableConfigUtils() {
// supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}.
private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask";

// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";

/**
* @see TableConfigUtils#validate(TableConfig, Schema, String)
*/
Expand All @@ -110,7 +113,7 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N
}
// Sanitize the table config before validation
sanitize(tableConfig);
// skip all validation if skip type ALL is selected.
// skip all validation if skip type ALL is selected.
if (!skipTypes.contains(ValidationType.ALL)) {
validateValidationConfig(tableConfig, schema);
validateIngestionConfig(tableConfig, schema);
Expand Down Expand Up @@ -862,7 +865,7 @@ public enum ValidationType {
}

/**
* Helper method to check is EmptySegmentPruner for a TableConfig.
* needsEmptySegmentPruner checks if EmptySegmentPruner is needed for a TableConfig.
* @param tableConfig Input table config.
*/
public static boolean needsEmptySegmentPruner(TableConfig tableConfig) {
Expand All @@ -889,7 +892,7 @@ private static boolean isKinesisConfigured(TableConfig tableConfig) {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
if (indexingConfig != null) {
Map<String, String> streamConfig = indexingConfig.getStreamConfigs();
if (streamConfig != null && KinesisConfig.STREAM_TYPE.equals(
if (streamConfig != null && KINESIS_STREAM_TYPE.equals(
streamConfig.get(StreamConfigProperties.STREAM_TYPE))) {
return true;
}
Expand All @@ -903,7 +906,7 @@ private static boolean isKinesisConfigured(TableConfig tableConfig) {
return false;
}
for (Map<String, String> config : streamIngestionConfig.getStreamConfigMaps()) {
if (config != null && KinesisConfig.STREAM_TYPE.equals(config.get(StreamConfigProperties.STREAM_TYPE))) {
if (config != null && KINESIS_STREAM_TYPE.equals(config.get(StreamConfigProperties.STREAM_TYPE))) {
return true;
}
}
Expand Down

0 comments on commit 4fa66df

Please sign in to comment.