-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Account for data format and compression in MSQ auto taskAssignment #14307
Conversation
in LocalInputSource CloudObjectInputSource
@@ -198,7 +240,7 @@ | |||
throw Throwables.propagate(e); | |||
} | |||
} else { | |||
final File tmpFile = File.createTempFile("compressionUtilZipCache", ZIP_SUFFIX); | |||
final File tmpFile = File.createTempFile("compressionUtilZipCache", Format.ZIP.getSuffix()); |
Check warning
Code scanning / CodeQL
Local information disclosure in a temporary directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than the line comments, some comments about how this is handled in MSQ:
- Let's lower
Limits#DEFAULT_MAX_INPUT_BYTES_PER_WORKER
as well. The default value is a bit high even for uncompressed JSON. - Documentation for
taskAssignment
in MSQ'sreference.md
will need an update.
* | ||
* @return The weighted size of the input object. | ||
*/ | ||
@JsonIgnore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be no need for @JsonIgnore
here. Typically our ObjectMapper is configured to ignore all methods that aren't explicitly annotated with @JsonProperty
. Are you seeing something different?
* @return The weighted size of the input object. | ||
*/ | ||
@JsonIgnore | ||
default long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, decompression is under the control of the InputFormat
itself. So the caller shouldn't be passing in the CompressionUtils.Format
, as it doesn't really know. It should pass in the filename and let the InputFormat
decide what it wants to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) | ||
{ | ||
if (CompressionUtils.Format.GZ == compressionFormat) { | ||
return size * 4L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to have this be a constant in CompressionUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -156,6 +158,16 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity | |||
} | |||
} | |||
|
|||
@JsonIgnore | |||
@Override | |||
public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this implemented for json, csv, and parquet. Please implement it for Avro, ORC, regex, and delimited as well. Delimited and regex can be the same as CSV. Avro and ORC can be the same as Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, added for those as well.
item -> { | ||
if (null != inputFormat) { | ||
InputFileAttribute inputFileAttribute = inputAttributeExtractor.apply(item); | ||
return inputFormat.getWeightedSize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice for callers to not need the InputFormat
here. How about having InputFileAttribute
be getSize
and getWeightedSize
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, fixed
{ | ||
CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); | ||
if (CompressionUtils.Format.GZ == compressionFormat) { | ||
return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is COMPRESSED_TEXT_WEIGHT_FACTOR specific to GZIP?
If so, maybe rename to constant to be specific to GZ, also, do we want to consider other compression formats here and similar places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its not, gzip is just the most common, and one that we have data for. We can add others as need arises.
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API #14512 Allow empty tiered replicants map for load rules #14432 Adding Interactive API's for MSQ engine #14416 Add replication factor column to sys table #14403 Account for data format and compression in MSQ auto taskAssignment #14307 Errors take 3 #14004
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API apache#14512 Allow empty tiered replicants map for load rules apache#14432 Adding Interactive API's for MSQ engine apache#14416 Add replication factor column to sys table apache#14403 Account for data format and compression in MSQ auto taskAssignment apache#14307 Errors take 3 apache#14004
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API #14512 Allow empty tiered replicants map for load rules #14432 Adding Interactive API's for MSQ engine #14416 Add replication factor column to sys table #14403 Account for data format and compression in MSQ auto taskAssignment #14307 Errors take 3 #14004 Co-authored-by: Vadim Ogievetsky <vadim@ogievetsky.com>
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API apache#14512 Allow empty tiered replicants map for load rules apache#14432 Adding Interactive API's for MSQ engine apache#14416 Add replication factor column to sys table apache#14403 Account for data format and compression in MSQ auto taskAssignment apache#14307 Errors take 3 apache#14004
Description
This change allows for consideration of the input format and compression when computing how to split the input files among available tasks, in MSQ ingestion, when considering the value of the
maxInputBytesPerWorker
query context parameter. This query parameter allows users to control the maximum number of bytes, with granularity of input file / object, that ingestion tasks will be assigned to ingest. With this change, this context parameter now denotes the estimated weighted size in bytes of the input to split on, with consideration for input format and compression format, rather than the actual file size, reported by the file system. We assume uncompressed newline delimited json as a baseline, with scaling factor of1
. This means that when computing the byte weight that a file has towards the input splitting, we take the file size as is, if uncompressed json, 1:1. It was found during testing that gzip compressed json, and parquet, has scale factors of4
and8
respectively, meaning that each byte of data is weighted 4x and 8x respectively, when computing input splits. This weighted byte scaling is only considered for MSQ ingestion that uses either LocalInputSource or CloudObjectInputSource at the moment. The default value of themaxInputBytesPerWorker
query context parameter has been updated from 10 GiB, to 512 MiBThis PR has: