-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data loader (sampler component) #7531
Conversation
@dclim this PR would be super useful. Thanks! Please check the CI failure. It looks legit. |
@jihoonson yes, I've been fixing the tests to handle this (happens when |
fix checkstyle issues add sampler fix to process CSV files from cache properly change to composition and rename some classes add tests and report num rows read and indexed remove excludedByFilter flag and don't send filtered out data fix tests to handle both settings for druid.generic.useDefaultValueForNull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dclim nice work! Left some comments. Also tagged Design Review
because this PR changes extension points.
.gitignore
Outdated
@@ -13,5 +13,7 @@ target | |||
*.log | |||
*.DS_Store | |||
_site | |||
indexing-service/foo/* | |||
indexing-service/testDataSource/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm it looks that these directories are used in some tests, but I think any test directory should be under $PACKAGE_ROOT/target/
. How about fixing those tests rather than adding these directories to .gitignore
? I would like to not add them because they would remind me this issue whenever I see them. I don't think finding those tests and fixing them have to be done in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, will remove!
private final InputRow inputRow; | ||
|
||
@Nullable | ||
private final byte[] raw; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a comment about when raw
can be null? I guess inputRow
would be null if parsing fails, but not sure when raw
would be null.
@JsonProperty("timeoutMs") Integer timeoutMs | ||
) | ||
{ | ||
this.numRows = numRows != null ? numRows : DEFAULT_NUM_ROWS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to limit the max numRows
to avoid to use too much memory for sampling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that sounds good
private final int numRows; | ||
private final String cacheKey; | ||
private final boolean skipCache; | ||
private final int timeoutMs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a comment what this is for? I thought it's a timeout for cache, but it looks a timeout for TimedShutoffFirehose
.
|
||
public class SamplerException extends RuntimeException | ||
{ | ||
public SamplerException(String formatText, Object... arguments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove unused method.
private static final int DEFAULT_TIMEOUT_MS = 10000; | ||
|
||
private final int numRows; | ||
private final String cacheKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a comment how cacheKey
works? I was confused at first.
private static final boolean DEFAULT_SKIP_CACHE = false; | ||
private static final int DEFAULT_TIMEOUT_MS = 10000; | ||
|
||
private final int numRows; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a comment about that the actual number of sampled rows would be min(number of cached rows, numRows)
if cacheKey
is valid and skipCache
is false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dclim thanks for the update! +1 after CI
The new interface methods for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Sampler area LGTM as well.
Implementation of the sampler component of #7502.
Runs on the overlord and exposes an endpoint on
POST /druid/indexer/v1/sampler
that returns sampled data for use by the data loader GUI. This is currently intended as an internal-only endpoint and is intentionally not documented.Changes are as minimally-invasive as possible, and most code is confined to the
org.apache.druid.indexing.overlord.sampler
package. Additional methods were added toFirehose
(returning the raw rows) andFirehoseFactory
(adding aconnectForSampler
method that signals to the implementation that we only care about a few rows and to skip things like prefetching and caching) to improve the sampling experience; default implementations do 'the right thing' if not implemented.There are a few 'hacks' added to make the API a bit nicer - i.e. allowing the sampler to work if no
dataSchema
is provided, in which case it just returns the raw rows if possible and marks everything as unparseable (since no parser was provided).