New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support ZSTD compression codec for raw index #6876
Conversation
*/ | ||
private void testSelectQueryHelper(String query, int expectedResultSize, List<Serializable[]> expectedResults) | ||
throws Exception { | ||
SelectionOnlyOperator operator = getOperatorForPqlQuery(query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use getOperatorForSqlQuery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
@@ -573,6 +573,9 @@ private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldCon | |||
Preconditions.checkArgument(!noDictionaryColumns.contains(columnName), | |||
"FieldConfig encoding type is different from indexingConfig for column: " + columnName); | |||
} | |||
Preconditions.checkArgument(fieldConfig.getNoDictionaryColumnCompressorCodec() == null, | |||
"FieldConfig column compression codec is only supported for single value raw encoding type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Also add "Set compression codec to null for dictionary encoding type"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
@@ -69,6 +72,10 @@ public FieldConfig(@JsonProperty(value = "name", required = true) String name, | |||
INVERTED, SORTED, TEXT, FST, H3 | |||
} | |||
|
|||
public enum NoDictionaryColumnCompressorCodec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Suggest naming NoDictionaryColumnCompressionCodec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
int decompressedSize = Zstd.decompress(decompressedOutput, compressedInput); | ||
|
||
// Make the output ByteBuffer ready for read. | ||
decompressedOutput.flip(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to add one/two more lines on why flip() is necessary. I remember during debugging, it was probably not obvious unless you know how Zstd is doing internally. So adding some more context will be helpful for anyone else as well who will read/change this code in future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
Nit: You may want to note that if someone upgrades and then enables ZSTD for new segments, and for some reason has to roll back their deployment, the segments will not be readable. |
import org.apache.pinot.segment.spi.compression.ChunkDecompressor; | ||
|
||
/** | ||
* Implementation of {@link ChunkDecompressor} using Zstandard(Zstd). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) using Zstandard compression algorithm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
/** | ||
* Implementation of {@link ChunkCompressor} using Zstandard(Zstd). | ||
*/ | ||
public class ZstandardCompressor implements ChunkCompressor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) using Zstandard compression algorithm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
@Measurement(iterations = 5) | ||
@State(Scope.Benchmark) | ||
// Test to get memory statistics for snappy and zstandard integer compression techniques | ||
public class BenchmarkIntegerCompressionSpeed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) suggest naming the class as BenchmarkNoDictionaryIntegerCompression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for other benchmark classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
"SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, " | ||
+ "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG FROM MyTable LIMIT 1000"; | ||
ArrayList<Serializable[]> expected = new ArrayList<>(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please add another test query with filter on one or more of these raw columns ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added few test scenarios!
* (2) integer | ||
* (3) long | ||
*/ | ||
public class CompressionCodecQueriesTest extends BaseQueriesTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Suggest renaming to NoDictionaryCompressionQueriesTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); | ||
TableConfigUtils.validate(tableConfig, schema); | ||
Assert.fail("Should fail since dictionary encoding does not support compression codec zstandard "); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same failure is expected for SNAPPY and PASS_THROUGH right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, added both test cases
} | ||
} | ||
} | ||
setRawIndexColumnCompressionType(tableConfig.getIndexingConfig(), rawIndexColumns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right. At line 197 it will check if the map is null or not. If null, then we won't be able to set ZSTD/SNAPPY coming via FieldConfig (from this function). Right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, if the noDictinaryColumnMap is empty/null this indicates that no column name to compressionType is set right? So you are right, the field config map will not be set here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not use the old method here. Let it be there. Just remove the call to it at line 280
Instead, we can probably do the following. Wdyt ?
_rawIndexCreationColumns.add(fieldConfig.getName())
at line 276
_rawIndexCompressionType.put(fieldConfig.getName(), ChunkCompressionType.valueOf(fieldConfig.getNoDictionaryColumnCompressorCodec())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case, please make sure to run the end-to-end query execution test in debug mode and ensure that segment generation code is correctly picking up the compression codec config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
Table level config is there. I mean it was already there. Just reused the same in FieldConfig (the new model). @GSharayu let's also mention that just like other table level changes (column renaming, type changing, column dropping, index dropping) which are currently not allowed, changing the compression codec on an existing noDictionary column from snappy to zstd or vice-versa will not happen since we currently don't have a mechanism for doing this in-place in the segment file. Newly pushed segments will pick up the new codec and since the codec type is written into the index buffer header, we will be able to read both old and new segments |
b1cc143
to
9f23736
Compare
Codecov Report
@@ Coverage Diff @@
## master #6876 +/- ##
============================================
+ Coverage 65.48% 65.51% +0.03%
Complexity 12 12
============================================
Files 1421 1423 +2
Lines 69980 70005 +25
Branches 10112 10116 +4
============================================
+ Hits 45825 45865 +40
+ Misses 20874 20858 -16
- Partials 3281 3282 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
} | ||
} | ||
} | ||
setRawIndexColumnCompressionType(rawIndexColumns, rawIndexColumnsToCompressionTypeMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this still fixes the problem. At line 200, the following method will be called
public void setRawIndexCompressionType(Map<String, ChunkCompressionType> rawIndexCompressionType) {
_rawIndexCompressionType.clear();
_rawIndexCompressionType.putAll(rawIndexCompressionType);
}
When it is called again at line 281, clear() will be called and old compression config coming from noDictionaryConfig will be wiped out from it's first invocation.
I think implementing the suggestion in #6876 (comment) is a simple fix. No need to call setRawIndexColumnCompressionType
() from extractNoDictionaryColumnCompressionCodecConfigsFromTableConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code won't be able to handle the compression config set through old way
Lets say column1 and column2 in an existing table T.
For column1, someone specified compression config through old way (noDictionaryConfig map)
For column2, someone specified compression config (SNAPPY, ZSTD) through new way (FieldConfig)
The current code won't be able to preserve the config of column1 which we need to handle until everything is migrated from existing way to FieldConfig
I haven't seen the old way of config being used so far at Li in production. But we don't know if someone in open source is using it or not. If they are, it will break things for them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
Looks like this library is using JNI.Pinot is going to be dependent on the OS architecture with this feature PR and it's a backward-incompatible change. We should definitely discuss this and consider alternative implementations |
@kishoreg A lot of the compression algorithms are not natively available in Java since they are written in C/C++. Pure Java only implementations which are well tested are unlikely to be available especially for algorithms not yet as popular as Snappy. Apache commons library has implementation for ZSTD but the API is byte array based not direct byte buffer based. It also relies on JNI bindings underneath. Pretty much all low level stuff is available in Java via JNI bridge. I don't think there is any platform specific issue. This library is also used in other Java based projects (e.g Arrow) There is nothing backward incompatible about this change. I think you mean forward compatibility. If someone upgrades to newer version of Pinot and enables ZSTD and then downgrades, then old release can't read the new segments. We have labeled with release notes and kept the default behavior intact. Backward compatibility is still there since we can still read old segments with SNAPPY and SNAPPY continues to be default |
@kishoreg , Even the current Snappy library that is being used in Pinot uses JNI underneath for actual compress/decompress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for addressing comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good. Can you please open read access of the perf doc to everyone, or put the perf result into the PR description?
@@ -51,11 +52,13 @@ | |||
public FieldConfig(@JsonProperty(value = "name", required = true) String name, | |||
@JsonProperty(value = "encodingType") @Nullable EncodingType encodingType, | |||
@JsonProperty(value = "indexType") @Nullable IndexType indexType, | |||
@JsonProperty(value = "properties") @Nullable Map<String, String> properties) { | |||
@JsonProperty(value = "properties") @Nullable Map<String, String> properties, | |||
@JsonProperty(value = "noDictionaryColumnCompressionCodec") @Nullable NoDictionaryColumnCompressionCodec noDictionaryColumnCompressionCodec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we simplify the field name, e.g. compressionCodec
? This long name is slightly hard to config, and we don't need to separate the codec of raw vs dictionary encoded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compressionCodec
works, will update!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
@@ -51,11 +52,13 @@ | |||
public FieldConfig(@JsonProperty(value = "name", required = true) String name, | |||
@JsonProperty(value = "encodingType") @Nullable EncodingType encodingType, | |||
@JsonProperty(value = "indexType") @Nullable IndexType indexType, | |||
@JsonProperty(value = "properties") @Nullable Map<String, String> properties) { | |||
@JsonProperty(value = "properties") @Nullable Map<String, String> properties, | |||
@JsonProperty(value = "noDictionaryColumnCompressionCodec") @Nullable NoDictionaryColumnCompressionCodec noDictionaryColumnCompressionCodec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it in front of properties
to match the declaration order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
@@ -102,7 +102,7 @@ protected String getSortedColumn() { | |||
@Override | |||
protected List<FieldConfig> getFieldConfigs() { | |||
return Collections.singletonList( | |||
new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null)); | |||
new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null,null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reformat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null, null, FieldConfig.NoDictionaryColumnCompressionCodec.SNAPPY); | ||
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); | ||
TableConfigUtils.validate(tableConfig, schema); | ||
Assert.fail("Should fail since dictionary encoding does not support compression codec snappy "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) extra space in the end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @kishoreg Do you still have concern on this PR? This library seems commonly used in the maven repository (115 usages)
@@ -262,6 +263,19 @@ private void extractH3IndexConfigsFromTableConfig(TableConfig tableConfig) { | |||
} | |||
} | |||
|
|||
private void extractNoDictionaryColumnCompressionCodecConfigsFromTableConfig(TableConfig tableConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Rename to extractCompressionCodecConfigsFromTableConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
Yes the native library comes embedded in the java library and it automatically picks up the right library. Same thing for snappy-java we have been using for quite some time and also for this one that uses zstd-jni |
When the forward index is not dictionary encoded, we have 2 choices:
store the data as is (RAW)
store the data snappy compressed - using snappy compression codec library
This PR adds supports for ZSTD compression using library - https://github.com/luben/zstd-jni
We get good compression ratio. So based on the user requirements, user can configure via table config on a per column basis. The default behavior continues to remain the same. It is snappy for dimension columns and no compression for metric columns. The benchmark tests as kept as part of PR. We will also be adding a recommendation rule to the config recommendation rule engine to account for Snappy v/s ZSTD. We will do that in a follow-up PR.
Other table level changes (column renaming, type changing, column dropping, index dropping) which are currently not allowed, changing the compression codec on an existing noDictionary column from snappy to zstd or vice-versa will not happen since we currently don't have a mechanism for doing this in-place in the segment file. Newly pushed segments will pick up the new codec and since the codec type is written into the index buffer header, we will be able to read both old and new segments
Corresponding performance doc with randomly generated data:
https://docs.google.com/document/d/1JKLhDm0-gnrRhyBUDge5u4MeGjotRSgjiexJxI_abfk/edit
Issue (#6804)