diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml index 6d47cc65821..a90cc050db2 100644 --- a/pinot-common/pom.xml +++ b/pinot-common/pom.xml @@ -139,6 +139,10 @@ org.xerial.snappy snappy-java + + com.github.luben + zstd-jni + org.apache.logging.log4j log4j-slf4j-impl diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index 435c182d012..01afcd3f8a5 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -228,8 +228,8 @@ public void testSerDe() properties.put("foo", "bar"); properties.put("foobar", "potato"); List fieldConfigList = Arrays.asList( - new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, properties), - new FieldConfig("column2", null, null, null)); + new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, null, properties), + new FieldConfig("column2", null, null, null, null)); TableConfig tableConfig = tableConfigBuilder.setFieldConfigList(fieldConfigList).build(); checkFieldConfig(tableConfig); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java index 995cb9f9705..675c180777b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java @@ -641,7 +641,7 @@ public void testValidateFieldConfig() { try { FieldConfig fieldConfig = - new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null); + new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for with conflicting encoding type of myCol1"); @@ -654,7 +654,7 @@ public void testValidateFieldConfig() { .setNoDictionaryColumns(Arrays.asList("myCol1")).build(); try { FieldConfig fieldConfig = - new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null); + new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since FST index is enabled on RAW encoding type"); @@ -665,7 +665,7 @@ public void testValidateFieldConfig() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); try { FieldConfig fieldConfig = - new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null); + new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since FST index is enabled on multi value column"); @@ -676,7 +676,7 @@ public void testValidateFieldConfig() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); try { FieldConfig fieldConfig = - new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null); + new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since FST index is enabled on non String column"); @@ -688,7 +688,7 @@ public void testValidateFieldConfig() { .setNoDictionaryColumns(Arrays.asList("myCol2", "intCol")).build(); try { FieldConfig fieldConfig = - new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null); + new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since TEXT index is enabled on multi value column"); @@ -700,7 +700,7 @@ public void testValidateFieldConfig() { .setNoDictionaryColumns(Arrays.asList("myCol2", "intCol")).build(); try { FieldConfig fieldConfig = - new FieldConfig("intCol", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null); + new FieldConfig("intCol", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since TEXT index is enabled on non String column"); @@ -712,7 +712,7 @@ public void testValidateFieldConfig() { .setNoDictionaryColumns(Arrays.asList("myCol1")).build(); try { FieldConfig fieldConfig = - new FieldConfig("myCol21", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null); + new FieldConfig("myCol21", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since field name is not present in schema"); @@ -720,6 +720,28 @@ public void testValidateFieldConfig() { Assert.assertEquals(e.getMessage(), "Column Name myCol21 defined in field config list must be a valid column defined in the schema"); } + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + try { + FieldConfig fieldConfig = + new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null, FieldConfig.CompressionCodec.SNAPPY, null); + tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); + TableConfigUtils.validate(tableConfig, schema); + Assert.fail("Should fail since dictionary encoding does not support compression codec snappy"); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type"); + } + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + try { + FieldConfig fieldConfig = + new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null, FieldConfig.CompressionCodec.ZSTANDARD, null); + tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); + TableConfigUtils.validate(tableConfig, schema); + Assert.fail("Should fail since dictionary encoding does not support compression codec zstandard"); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type"); + } } @Test @@ -888,7 +910,7 @@ public void testValidateIndexingConfig() { // expected } - FieldConfig fieldConfig = new FieldConfig("myCol2", null, null, null); + FieldConfig fieldConfig = new FieldConfig("myCol2", null, null, null, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) .setFieldConfigList(Arrays.asList(fieldConfig)).build(); try { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FSTBasedRegexpLikeQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FSTBasedRegexpLikeQueriesTest.java index 012059cba54..7155844f6bb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/FSTBasedRegexpLikeQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/FSTBasedRegexpLikeQueriesTest.java @@ -156,8 +156,8 @@ private void buildSegment() List rows = createTestData(NUM_ROWS); List fieldConfigs = new ArrayList<>(); fieldConfigs - .add(new FieldConfig(DOMAIN_NAMES_COL, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null)); - fieldConfigs.add(new FieldConfig(URL_COL, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null)); + .add(new FieldConfig(DOMAIN_NAMES_COL, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null, null)); + fieldConfigs.add(new FieldConfig(URL_COL, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null, null)); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) .setInvertedIndexColumns(Arrays.asList(DOMAIN_NAMES_COL)).setFieldConfigList(fieldConfigs).build(); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/H3IndexQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/H3IndexQueriesTest.java index 075606cabb7..741c0e36bf1 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/H3IndexQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/H3IndexQueriesTest.java @@ -72,7 +72,7 @@ public class H3IndexQueriesTest extends BaseQueriesTest { private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setFieldConfigList(Collections.singletonList( new FieldConfig(H3_INDEX_COLUMN, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.H3, - H3_INDEX_PROPERTIES))).build(); + null, H3_INDEX_PROPERTIES))).build(); private IndexSegment _indexSegment; diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java new file mode 100644 index 00000000000..0b76bf7191d --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.SelectionOnlyOperator; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Functional tests for compression type feature. + * The tests use three kinds of input data + * (1) string + * (2) integer + * (3) long + */ +public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "CompressionCodecQueriesTest"); + private static final String TABLE_NAME = "MyTable"; + private static final String SEGMENT_NAME = "testSegment"; + + private static final String SNAPPY_STRING = "SNAPPY_STRING"; + private static final String PASS_THROUGH_STRING = "PASS_THROUGH_STRING"; + private static final String ZSTANDARD_STRING = "ZSTANDARD_STRING"; + + private static final String SNAPPY_LONG = "SNAPPY_LONG"; + private static final String PASS_THROUGH_LONG = "PASS_THROUGH_LONG"; + private static final String ZSTANDARD_LONG = "ZSTANDARD_LONG"; + + private static final String SNAPPY_INTEGER = "SNAPPY_INTEGER"; + private static final String PASS_THROUGH_INTEGER = "PASS_THROUGH_INTEGER"; + private static final String ZSTANDARD_INTEGER = "ZSTANDARD_INTEGER"; + + private static final List RAW_SNAPPY_INDEX_COLUMNS = Arrays + .asList(SNAPPY_STRING, SNAPPY_LONG, SNAPPY_INTEGER); + + private static final List RAW_ZSTANDARD_INDEX_COLUMNS = Arrays + .asList(ZSTANDARD_STRING, ZSTANDARD_LONG, ZSTANDARD_INTEGER); + + private static final List RAW_PASS_THROUGH_INDEX_COLUMNS = Arrays + .asList(PASS_THROUGH_STRING, PASS_THROUGH_LONG, PASS_THROUGH_INTEGER); + + private final List _rows = new ArrayList<>(); + + private IndexSegment _indexSegment; + private List _indexSegments; + private List rows; + + @Override + protected String getFilter() { + return ""; + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List getIndexSegments() { + return _indexSegments; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + + buildSegment(); + + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(); + Set indexColumns = new HashSet<>(); + indexColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS); + indexColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS); + indexColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS); + + indexLoadingConfig.getNoDictionaryColumns().addAll(indexColumns); + ImmutableSegment immutableSegment = + ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + } + + @AfterClass + public void tearDown() { + _indexSegment.destroy(); + FileUtils.deleteQuietly(INDEX_DIR); + } + + private void buildSegment() + throws Exception { + rows = createTestData(); + + List fieldConfigs = new ArrayList<>(RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size()); + for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) { + fieldConfigs + .add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.SNAPPY, null)); + } + + for (String indexColumn : RAW_ZSTANDARD_INDEX_COLUMNS) { + fieldConfigs + .add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.ZSTANDARD, null)); + } + + for (String indexColumn : RAW_PASS_THROUGH_INDEX_COLUMNS) { + fieldConfigs + .add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.PASS_THROUGH, null)); + } + + List _noDictionaryColumns = new ArrayList<>(); + _noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS); + _noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS); + _noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setNoDictionaryColumns(_noDictionaryColumns) + .setFieldConfigList(fieldConfigs).build(); + Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension(SNAPPY_STRING, FieldSpec.DataType.STRING) + .addSingleValueDimension(PASS_THROUGH_STRING, FieldSpec.DataType.STRING) + .addSingleValueDimension(ZSTANDARD_STRING, FieldSpec.DataType.STRING) + .addSingleValueDimension(SNAPPY_INTEGER, FieldSpec.DataType.INT) + .addSingleValueDimension(ZSTANDARD_INTEGER, FieldSpec.DataType.INT) + .addSingleValueDimension(PASS_THROUGH_INTEGER, FieldSpec.DataType.INT) + .addSingleValueDimension(SNAPPY_LONG, FieldSpec.DataType.LONG) + .addSingleValueDimension(ZSTANDARD_LONG, FieldSpec.DataType.LONG) + .addSingleValueDimension(PASS_THROUGH_LONG, FieldSpec.DataType.LONG) + .build(); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(INDEX_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(SEGMENT_NAME); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GenericRowRecordReader(rows)) { + driver.init(config, recordReader); + driver.build(); + } + } + + private List createTestData() + throws Exception { + List rows = new ArrayList<>(); + + //Generate random data + int rowLength = 1000; + Random random = new Random(); + String[] tempStringRows = new String[rowLength]; + Integer[] tempIntRows = new Integer[rowLength]; + Long[] tempLongRows = new Long[rowLength]; + + for (int i = 0; i < rowLength; i++) { + //Adding a fixed value to check for filter queries + if(i % 10 == 0) { + tempStringRows[i] = "hello_world_123"; + tempIntRows[i] = 1001; + tempLongRows[i] = 1001L; + } + else { + tempStringRows[i] = RandomStringUtils.random(random.nextInt(100), true, true); + tempIntRows[i] = RandomUtils.nextInt(0, rowLength); + tempLongRows[i] = RandomUtils.nextLong(0, rowLength); + } + + } + + for (int i = 0; i < rowLength; i++) { + GenericRow row = new GenericRow(); + row.putValue(SNAPPY_STRING, tempStringRows[i]); + row.putValue(ZSTANDARD_STRING, tempStringRows[i]); + row.putValue(PASS_THROUGH_STRING, tempStringRows[i]); + row.putValue(SNAPPY_INTEGER, tempIntRows[i]); + row.putValue(ZSTANDARD_INTEGER, tempIntRows[i]); + row.putValue(PASS_THROUGH_INTEGER, tempIntRows[i]); + row.putValue(SNAPPY_LONG, tempLongRows[i]); + row.putValue(ZSTANDARD_LONG, tempLongRows[i]); + row.putValue(PASS_THROUGH_LONG, tempLongRows[i]); + rows.add(row); + } + return rows; + } + + /** + * Tests for basic compression codec queries. + */ + @Test + public void testQueriesWithCompressionCodec() + throws Exception { + + String query = + "SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, " + + "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG FROM MyTable LIMIT 1000"; + ArrayList expected = new ArrayList<>(); + + for(GenericRow row: rows) { + expected.add(new Serializable[]{ + String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(row.getValue(PASS_THROUGH_STRING)), + (Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER), + (Long) row.getValue(SNAPPY_LONG), (Long)row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG), + }); + } + testSelectQueryHelper(query, expected.size(), expected); + } + + /** + * Tests for filter over integer values compression codec queries. + */ + @Test + public void testZstandardIntegerFilterQueriesWithCompressionCodec() + throws Exception { + + String query = + "SELECT ZSTANDARD_INTEGER FROM MyTable " + + "WHERE ZSTANDARD_INTEGER > 1000 LIMIT 1000"; + ArrayList expected = new ArrayList<>(); + + for(GenericRow row: rows) { + int value = (Integer) row.getValue(ZSTANDARD_INTEGER); + if(value > 1000) { + expected.add(new Serializable[]{value}); + } + } + testSelectQueryHelper(query, expected.size(), expected); + } + + /** + * Tests for filter over integer values compression codec queries. + */ + @Test + public void testSnappyIntegerFilterQueriesWithCompressionCodec() + throws Exception { + + String query = + "SELECT SNAPPY_INTEGER FROM MyTable " + + "WHERE SNAPPY_INTEGER > 100 LIMIT 1000"; + ArrayList expected = new ArrayList<>(); + + for(GenericRow row: rows) { + int value = (Integer) row.getValue(SNAPPY_INTEGER); + if(value > 100) { + expected.add(new Serializable[]{value}); + } + } + testSelectQueryHelper(query, expected.size(), expected); + } + + /** + * Tests for filter over integer values compression codec queries. + */ + @Test + public void testPassThroughIntegerFilterQueriesWithCompressionCodec() + throws Exception { + + String query = + "SELECT PASS_THROUGH_INTEGER FROM MyTable " + + "WHERE PASS_THROUGH_INTEGER > 100 LIMIT 1000"; + ArrayList expected = new ArrayList<>(); + + for(GenericRow row: rows) { + int value = (Integer) row.getValue(PASS_THROUGH_INTEGER); + if(value > 100) { + expected.add(new Serializable[]{value}); + } + } + testSelectQueryHelper(query, expected.size(), expected); + } + + /** + * Tests for filter over string values zstandard compression codec queries. + */ + @Test + public void testZstandardStringFilterQueriesWithCompressionCodec() + throws Exception { + String query = + "SELECT ZSTANDARD_STRING FROM MyTable WHERE ZSTANDARD_STRING = 'hello_world_123' LIMIT 1000"; + ArrayList expected = new ArrayList<>(); + + for(GenericRow row: rows) { + String value = String.valueOf(row.getValue(ZSTANDARD_STRING)); + if(value.equals("hello_world_123")) { + expected.add(new Serializable[]{value}); + } + } + testSelectQueryHelper(query, expected.size(), expected); + } + + /** + * Tests for filter over string values snappy compression codec queries. + */ + @Test + public void testSnappyStringFilterQueriesWithCompressionCodec() + throws Exception { + String query = + "SELECT SNAPPY_STRING FROM MyTable WHERE SNAPPY_STRING = 'hello_world_123' LIMIT 1000"; + ArrayList expected = new ArrayList<>(); + + for(GenericRow row: rows) { + String value = String.valueOf(row.getValue(SNAPPY_STRING)); + if(value.equals("hello_world_123")) { + expected.add(new Serializable[]{value}); + } + } + testSelectQueryHelper(query, expected.size(), expected); + } + + /* + * Helper methods for tests + */ + private void testSelectQueryHelper(String query, int expectedResultSize, List expectedResults) + throws Exception { + SelectionOnlyOperator operator = getOperatorForSqlQuery(query); + IntermediateResultsBlock operatorResult = operator.nextBlock(); + List resultset = (List) operatorResult.getSelectionResult(); + Assert.assertNotNull(resultset); + Assert.assertEquals(resultset.size(), expectedResultSize); + if (expectedResults != null) { + // compare with expected result table + for (int i = 0; i < expectedResultSize; i++) { + Object[] actualRow = resultset.get(i); + Object[] expectedRow = expectedResults.get(i); + Assert.assertEquals(actualRow.length, expectedRow.length); + for (int j = 0; j < actualRow.length; j++) { + Object actualColValue = actualRow[j]; + Object expectedColValue = expectedRow[j]; + Assert.assertEquals(actualColValue, expectedColValue); + } + } + } + } + +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java index a08c8bca690..3fe75efb8d7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java @@ -158,11 +158,11 @@ private void buildSegment() List fieldConfigs = new ArrayList<>(RAW_TEXT_INDEX_COLUMNS.size() + DICT_TEXT_INDEX_COLUMNS.size()); for (String textIndexColumn : RAW_TEXT_INDEX_COLUMNS) { fieldConfigs - .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null)); + .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null)); } for (String textIndexColumn : DICT_TEXT_INDEX_COLUMNS) { fieldConfigs - .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TEXT, null)); + .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TEXT, null, null)); } TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) .setNoDictionaryColumns(RAW_TEXT_INDEX_COLUMNS).setInvertedIndexColumns(DICT_TEXT_INDEX_COLUMNS) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java index f90548e8e76..75654dd0a50 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java @@ -102,7 +102,7 @@ protected List getBloomFilterColumns() { @Override protected List getFieldConfigs() { return Collections.singletonList( - new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null)); + new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null)); } @BeforeClass diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java new file mode 100644 index 00000000000..cca7343abc5 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pinot.segment.local.io.compression.SnappyCompressor; +import org.apache.pinot.segment.local.io.compression.SnappyDecompressor; +import org.apache.pinot.segment.local.io.compression.ZstandardCompressor; +import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@State(Scope.Benchmark) +// Test to get memory statistics for snappy and zstandard integer compression techniques +public class BenchmarkNoDictionaryIntegerCompression { + + @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"}) + public static int _rowLength; + + @State(Scope.Thread) + public static class BenchmarkNoDictionaryIntegerCompressionState { + + private static ByteBuffer _uncompressedInt; + private static ByteBuffer _snappyIntegerIntegerInput; + private static ByteBuffer _zstandardCompressedIntegerInput; + private static ByteBuffer _snappyCompressedIntegerOutput; + private static ByteBuffer _zstdCompressedIntegerOutput; + private static ByteBuffer _snappyIntegerDecompressed; + private static ByteBuffer _zstdIntegerDecompressed; + private static SnappyCompressor snappyCompressor; + private static SnappyDecompressor snappyDecompressor; + private static ZstandardCompressor zstandardCompressor; + private static ZstandardDecompressor zstandardDecompressor; + + @Setup(Level.Invocation) + public void setUp() + throws Exception { + + initializeCompressors(); + generateRandomIntegerBuffer(); + allocateBufferMemory(); + + snappyCompressor.compress(_uncompressedInt,_snappyIntegerIntegerInput); + Zstd.compress(_zstandardCompressedIntegerInput, _uncompressedInt); + + _zstdIntegerDecompressed.flip();_zstandardCompressedIntegerInput.flip();_uncompressedInt.flip();_snappyIntegerDecompressed.flip(); + } + + private void generateRandomIntegerBuffer() { + //Generate Random Int + _uncompressedInt = ByteBuffer.allocateDirect(_rowLength * Integer.BYTES); + for (int i = 0; i < _rowLength; i++) { + _uncompressedInt.putInt(RandomUtils.nextInt()); + } + _uncompressedInt.flip(); + + _snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2); + _zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2); + } + + private void initializeCompressors() { + //Initialize compressors and decompressors for snappy + snappyCompressor = new SnappyCompressor(); + snappyDecompressor = new SnappyDecompressor(); + + //Initialize compressors and decompressors for zstandard + zstandardCompressor = new ZstandardCompressor(); + zstandardDecompressor = new ZstandardDecompressor(); + } + + private void allocateBufferMemory() { + _snappyIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2); + _zstdIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2); + _snappyIntegerIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2); + _zstandardCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2); + } + + @TearDown(Level.Invocation) + public void tearDown() + throws Exception { + _snappyCompressedIntegerOutput.clear(); + _snappyIntegerDecompressed.clear(); + _zstdCompressedIntegerOutput.clear(); + _zstdIntegerDecompressed.clear(); + + _uncompressedInt.rewind(); + _zstandardCompressedIntegerInput.rewind(); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkSnappyIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state) + throws IOException { + int size = state.snappyCompressor.compress(state._uncompressedInt, state._snappyCompressedIntegerOutput); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkSnappyIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state) + throws IOException { + int size = state.snappyDecompressor.decompress(state._snappyIntegerIntegerInput, state._snappyIntegerDecompressed); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkZstandardIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state) + throws IOException { + int size = state.zstandardCompressor.compress(state._zstdCompressedIntegerOutput, state._uncompressedInt); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkZstandardIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state) + throws IOException { + int size = state.zstandardDecompressor.decompress(state._zstdIntegerDecompressed, state._zstandardCompressedIntegerInput); + return size; + } + + public static void main(String[] args) + throws Exception { + new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build()).run(); + } +} diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java new file mode 100644 index 00000000000..86abf563c5b --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pinot.segment.local.io.compression.SnappyCompressor; +import org.apache.pinot.segment.local.io.compression.SnappyDecompressor; +import org.apache.pinot.segment.local.io.compression.ZstandardCompressor; +import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@State(Scope.Benchmark) +// Test to get memory statistics for snappy and zstandard long compression techniques +public class BenchmarkNoDictionaryLongCompression { + + @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"}) + public static int _rowLength; + + @State(Scope.Thread) + public static class BenchmarkNoDictionaryLongCompressionState { + + private static ByteBuffer _uncompressedLong; + private static ByteBuffer _snappyCompressedLongInput; + private static ByteBuffer _zstandardCompressedLongInput; + private static ByteBuffer _snappyCompressedLongOutput; + private static ByteBuffer _zstandardCompressedLongOutput; + private static ByteBuffer _snappyLongDecompressedOutput; + private static ByteBuffer _zstandardLongDecompressedOutput; + SnappyCompressor snappyCompressor; + SnappyDecompressor snappyDecompressor; + ZstandardCompressor zstandardCompressor; + ZstandardDecompressor zstandardDecompressor; + + @Setup(Level.Invocation) + public void setUp() + throws Exception { + + initializeCompressors(); + + generateRandomLongBuffer(); + + allocateBufferMemory(); + + snappyCompressor.compress(_uncompressedLong,_snappyCompressedLongInput); + Zstd.compress(_zstandardCompressedLongInput, _uncompressedLong); + + _zstandardCompressedLongInput.flip();_uncompressedLong.flip();_snappyLongDecompressedOutput.flip(); + } + + private void generateRandomLongBuffer() { + //Generate Random Long + _uncompressedLong = ByteBuffer.allocateDirect(_rowLength * Long.BYTES); + for (int i = 0; i < _rowLength; i++) { + _uncompressedLong.putLong(RandomUtils.nextLong()); + } + _uncompressedLong.flip(); + } + + private void initializeCompressors() { + //Initialize compressors and decompressors + snappyCompressor = new SnappyCompressor(); + snappyDecompressor = new SnappyDecompressor(); + + //Initialize compressors and decompressors for zstandard + zstandardCompressor = new ZstandardCompressor(); + zstandardDecompressor = new ZstandardDecompressor(); + } + + private void allocateBufferMemory() { + _snappyCompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2); + _zstandardCompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2); + _snappyLongDecompressedOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2); + _zstandardLongDecompressedOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2); + _snappyCompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2); + _zstandardCompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2); + } + + @TearDown(Level.Invocation) + public void tearDown() + throws Exception { + _snappyCompressedLongOutput.clear(); + _snappyLongDecompressedOutput.clear(); + _zstandardCompressedLongOutput.clear(); + _zstandardLongDecompressedOutput.clear(); + + _uncompressedLong.rewind(); + _zstandardCompressedLongInput.rewind(); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkSnappyLongCompression(BenchmarkNoDictionaryLongCompressionState state) + throws IOException { + int size = state.snappyCompressor.compress(state._uncompressedLong, state._snappyCompressedLongOutput); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkSnappyLongDecompression(BenchmarkNoDictionaryLongCompressionState state) + throws IOException { + int size = state.snappyDecompressor.decompress(state._snappyCompressedLongInput, state._snappyLongDecompressedOutput); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkZstandardLongCompression(BenchmarkNoDictionaryLongCompressionState state) + throws IOException { + int size = Zstd.compress(state._zstandardCompressedLongOutput, state._uncompressedLong); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkZstandardLongDecompression(BenchmarkNoDictionaryLongCompressionState state) + throws IOException { + int size = Zstd.decompress(state._zstandardLongDecompressedOutput, state._zstandardCompressedLongInput); + return size; + } + + public static void main(String[] args) + throws Exception { + new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryLongCompression.class.getSimpleName()).build()).run(); + } +} diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java new file mode 100644 index 00000000000..c0442f53dab --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.segment.local.io.compression.SnappyCompressor; +import org.apache.pinot.segment.local.io.compression.SnappyDecompressor; +import org.apache.pinot.segment.local.io.compression.ZstandardCompressor; +import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@State(Scope.Benchmark) +// Test to get memory statistics for snappy and zstandard string compression techniques +public class BenchmarkNoDictionaryStringCompression { + + @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"}) + public static int _rowLength; + + public static Random _random = new Random(); + + @State(Scope.Thread) + public static class BenchmarkNoDictionaryStringCompressionState { + private static ByteBuffer _uncompressedString; + private static ByteBuffer _snappyCompressedStringInput; + private static ByteBuffer _zstandardCompressedStringInput; + private static ByteBuffer _snappyCompressedStringOutput; + private static ByteBuffer _zstandardCompressedStringOutput; + private static ByteBuffer _snappyStringDecompressed; + private static ByteBuffer _zstandardStringDecompressed; + SnappyCompressor snappyCompressor; + SnappyDecompressor snappyDecompressor; + ZstandardCompressor zstandardCompressor; + ZstandardDecompressor zstandardDecompressor; + + @Setup(Level.Invocation) + public void setUp() + throws Exception { + + initializeCompressors(); + generateRandomStringBuffer(); + allocateMemory(); + + _snappyCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2); + _zstandardCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2); + + snappyCompressor.compress(_uncompressedString,_snappyCompressedStringInput); + Zstd.compress(_zstandardCompressedStringInput, _uncompressedString); + + _zstandardStringDecompressed.flip();_zstandardCompressedStringInput.flip();_uncompressedString.flip();_snappyStringDecompressed.flip(); + } + + private void initializeCompressors() { + //Initialize compressors and decompressors for snappy + snappyCompressor = new SnappyCompressor(); + snappyDecompressor = new SnappyDecompressor(); + + //Initialize compressors and decompressors for zstandard + zstandardCompressor = new ZstandardCompressor(); + zstandardDecompressor = new ZstandardDecompressor(); + } + + private void generateRandomStringBuffer() { + String[] tempRows = new String[_rowLength]; + int maxStringLengthInBytes = 0; + int numChars = 100; + + for (int i = 0; i < _rowLength; i++) { + String value = RandomStringUtils.random(_random.nextInt(numChars), true, true); + maxStringLengthInBytes = Math.max(maxStringLengthInBytes, StringUtil.encodeUtf8(value).length); + tempRows[i] = value; + } + + _uncompressedString = ByteBuffer.allocateDirect(_rowLength * maxStringLengthInBytes); + for (int i = 0; i < _rowLength; i++) { + _uncompressedString.put(StringUtil.encodeUtf8(tempRows[i])); + } + _uncompressedString.flip(); + } + + private void allocateMemory() { + _snappyStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2); + _zstandardStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2); + _snappyCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2); + _zstandardCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2); + } + + @TearDown(Level.Invocation) + public void tearDown() + throws Exception { + _snappyCompressedStringOutput.clear(); + _snappyStringDecompressed.clear(); + _zstandardCompressedStringOutput.clear(); + _zstandardStringDecompressed.clear(); + + _uncompressedString.rewind(); + _zstandardCompressedStringInput.rewind(); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkSnappyStringCompression(BenchmarkNoDictionaryStringCompressionState state) + throws IOException { + int size = state.snappyCompressor.compress(state._uncompressedString, state._snappyCompressedStringOutput); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkSnappyStringDecompression(BenchmarkNoDictionaryStringCompressionState state) + throws IOException { + int size = state.snappyDecompressor.decompress(state._snappyCompressedStringInput, state._snappyStringDecompressed); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkZstandardStringCompression(BenchmarkNoDictionaryStringCompressionState state) + throws IOException { + int size = state.zstandardCompressor.compress(state._zstandardCompressedStringOutput, state._uncompressedString); + return size; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkZstandardStringDecompression(BenchmarkNoDictionaryStringCompressionState state) + throws IOException { + int size = state.zstandardDecompressor.decompress(state._zstandardStringDecompressed, state._zstandardCompressedStringInput); + return size; + } + + public static void main(String[] args) + throws Exception { + new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryStringCompression.class.getSimpleName()).build()).run(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java index 93a5fd19cc4..3714f663c2f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java @@ -48,6 +48,9 @@ public static ChunkCompressor getCompressor(ChunkCompressionType compressionType case SNAPPY: return new SnappyCompressor(); + case ZSTANDARD: + return new ZstandardCompressor(); + default: throw new IllegalArgumentException("Illegal compressor name " + compressionType); } @@ -67,6 +70,9 @@ public static ChunkDecompressor getDecompressor(ChunkCompressionType compression case SNAPPY: return new SnappyDecompressor(); + case ZSTANDARD: + return new ZstandardDecompressor(); + default: throw new IllegalArgumentException("Illegal compressor name " + compressionType); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java new file mode 100644 index 00000000000..033f9a86ef0 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.io.compression; + +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.pinot.segment.spi.compression.ChunkCompressor; + +/** + * Implementation of {@link ChunkCompressor} using Zstandard(Zstd) compression algorithm. + * Zstd.compress(destinationBuffer, sourceBuffer) + */ +public class ZstandardCompressor implements ChunkCompressor { + @Override + public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed) + throws IOException { + int compressedSize = Zstd.compress(outCompressed, inUncompressed); + // When the compress method returns successfully, + // dstBuf's position() will be set to its current position() plus the compressed size of the data. + // and srcBuf's position() will be set to its limit() + // Flip operation Make the destination ByteBuffer(outCompressed) ready for read by setting the position to 0 + outCompressed.flip(); + return compressedSize; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java new file mode 100644 index 00000000000..7099b24b315 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.io.compression; + +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.pinot.segment.spi.compression.ChunkDecompressor; + +/** + * Implementation of {@link ChunkDecompressor} using Zstandard(Zstd) decompression algorithm. + * Zstd.decompress(destinationBuffer, sourceBuffer) + * Compresses the data in buffer 'srcBuf' using default compression level + */ +public class ZstandardDecompressor implements ChunkDecompressor { + @Override + public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput) + throws IOException { + int decompressedSize = Zstd.decompress(decompressedOutput, compressedInput); + // When the decompress method returns successfully, + // dstBuf's position() will be set to its current position() plus the decompressed size of the data. + // and srcBuf's position() will be set to its limit() + // Flip operation Make the destination ByteBuffer(decompressedOutput) ready for read by setting the position to 0 + decompressedOutput.flip(); + return decompressedSize; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 1365cfe10c4..65cf5f93a18 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -573,6 +573,9 @@ private static void validateFieldConfigList(@Nullable List fieldCon Preconditions.checkArgument(!noDictionaryColumns.contains(columnName), "FieldConfig encoding type is different from indexingConfig for column: " + columnName); } + Preconditions.checkArgument(fieldConfig.getCompressionCodec() == null, + "Set compression codec to null for dictionary encoding type"); + break; } switch (fieldConfig.getIndexType()) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java index 18ae930a64a..a40eab5b453 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java @@ -69,6 +69,16 @@ public void testWithoutCompression() testDouble(compressionType); } + @Test + public void testWithZstandardCompression() + throws Exception { + ChunkCompressionType compressionType = ChunkCompressionType.ZSTANDARD; + testInt(compressionType); + testLong(compressionType); + testFloat(compressionType); + testDouble(compressionType); + } + public void testInt(ChunkCompressionType compressionType) throws Exception { int[] expected = new int[NUM_VALUES]; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java index 3e8e1e16edf..98b9bb77715 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java @@ -60,6 +60,13 @@ public void testWithoutCompression() test(ChunkCompressionType.PASS_THROUGH); } + @Test + public void testWithZstandardCompression() + throws Exception { + test(ChunkCompressionType.ZSTANDARD); + } + + /** * This test writes {@link #NUM_ENTRIES} using {@link VarByteChunkSVForwardIndexWriter}. It then reads * the strings & bytes using {@link VarByteChunkSVForwardIndexReader}, and asserts that what was written is the same as @@ -161,24 +168,31 @@ public void testVarCharWithDifferentSizes() throws Exception { testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10, 1000); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10, 1000); + testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10, 1000); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100, 1000); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100, 1000); + testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100, 1000); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000, 1000); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000, 1000); + testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000, 1000); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10000, 100); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10000, 100); + testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10000, 100); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100000, 10); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100000, 10); + testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100000, 10); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000000, 10); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000000, 10); + testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000000, 10); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 2000000, 10); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 2000000, 10); + testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 2000000, 10); } private void testLargeVarcharHelper(ChunkCompressionType compressionType, int numChars, int numDocs) diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java index bf9c62d3466..00298a5ea08 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java @@ -19,7 +19,7 @@ package org.apache.pinot.segment.spi.compression; public enum ChunkCompressionType { - PASS_THROUGH(0), SNAPPY(1); + PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2); private final int _value; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index 1ffe1416df8..5c34634128b 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -193,6 +193,7 @@ public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema) { extractTextIndexColumnsFromTableConfig(tableConfig); extractFSTIndexColumnsFromTableConfig(tableConfig); extractH3IndexConfigsFromTableConfig(tableConfig); + extractCompressionCodecConfigsFromTableConfig(tableConfig); _nullHandlingEnabled = indexingConfig.isNullHandlingEnabled(); } @@ -262,6 +263,19 @@ private void extractH3IndexConfigsFromTableConfig(TableConfig tableConfig) { } } + private void extractCompressionCodecConfigsFromTableConfig(TableConfig tableConfig) { + List fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList != null) { + for (FieldConfig fieldConfig : fieldConfigList) { + if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW && fieldConfig.getCompressionCodec() != null) { + _rawIndexCreationColumns.add(fieldConfig.getName()); + _rawIndexCompressionType.put(fieldConfig.getName(), + ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name())); + } + } + } + } + public Map getCustomProperties() { return _customProperties; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java index 2f67dc285df..3804e50065f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java @@ -30,6 +30,7 @@ public class FieldConfig extends BaseJsonConfig { private final String _name; private final EncodingType _encodingType; private final IndexType _indexType; + private final CompressionCodec _compressionCodec; private final Map _properties; public static String BLOOM_FILTER_COLUMN_KEY = "createBloomFilter"; @@ -51,11 +52,13 @@ public class FieldConfig extends BaseJsonConfig { public FieldConfig(@JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "encodingType") @Nullable EncodingType encodingType, @JsonProperty(value = "indexType") @Nullable IndexType indexType, + @JsonProperty(value = "compressionCodec") @Nullable CompressionCodec compressionCodec, @JsonProperty(value = "properties") @Nullable Map properties) { Preconditions.checkArgument(name != null, "'name' must be configured"); _name = name; _encodingType = encodingType; _indexType = indexType; + _compressionCodec = compressionCodec; _properties = properties; } @@ -69,6 +72,10 @@ public enum IndexType { INVERTED, SORTED, TEXT, FST, H3 } + public enum CompressionCodec { + PASS_THROUGH, SNAPPY, ZSTANDARD + } + public String getName() { return _name; } @@ -83,6 +90,11 @@ public IndexType getIndexType() { return _indexType; } + @Nullable + public CompressionCodec getCompressionCodec() { + return _compressionCodec; + } + @Nullable public Map getProperties() { return _properties; diff --git a/pom.xml b/pom.xml index 37cb9a8aa81..f53dcc4680d 100644 --- a/pom.xml +++ b/pom.xml @@ -144,6 +144,7 @@ 4.1.2 1.1.1.7 + 1.4.9-5 2.11.2 4.1.54.Final 1.16.1 @@ -520,6 +521,11 @@ snappy-java ${snappy-java.version} + + com.github.luben + zstd-jni + ${zstd-jni.version} + org.apache.commons commons-compress