-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QTL] Support multiple lookup maps within one namespace #2524
Conversation
@sirpkt awesome! |
@sirpkt i am not sure how this can work with the actual LookupDimensionSpec i am wondering how you will call it a query time. |
As I replied in the issue page,
I added unit test code for explicit Json usage of NamepacedExtraction. |
@drcrallen @b-slim @cheddar can u guys review this and coordinate over development? |
Map<K, V> map = maps.get(entry.getKey()); | ||
if (map == null) | ||
{ | ||
map = new ConcurrentHashMap<K, V>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This violates offheap caching
@sirpkt I think this one needs more discussion among the community to make sure it fits overall expectations. As such I'm proposing punting it out of 0.9.1. 0.9.1 is slated for a major overhaul of Lookups to essentially be the first (hopefully) production-ready version for lookups. This is a (important) feature add for lookups, but is outside the scope of "required for MVP" |
@drcrallen @b-slim what's going on with this PR? |
Same opinion as @drcrallen the feature needs more discussion, plus major changes to be compatible with new lookups impls. In addition we have a pretty busy roadmap and i guess this feature is not a top priority IMHO. The author can always start working on make it working with new lookup module. |
I'll try to make this working with new lookup module. |
@sirpkt @b-slim @drcrallen can we submit an issue or a proposal for the list of changes described in this PR and discuss changes there? |
7258abb
to
254fe3d
Compare
I added |
254fe3d
to
79ef586
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some comments, but I'm generally on board with the goal and approach taken by this PR.
*/ | ||
public abstract ConcurrentMap<String, String> getCacheMap(String namespaceOrCacheKey); | ||
public Map<String, String> getCacheMap(String id, String mapName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rename this to something like getCacheInnerMap() to differentiate the two functions, and note in the javadocs that this retrieves an inner map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* NamespaceKey is used to distinguish the same mapNames of different namespace IDs | ||
*/ | ||
public class NamespaceKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For thought, would it be easier/better to use a MultiKey for the composite namespace:ID key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced by MultiKey
|
||
One of either `uri` xor `uriPrefix` must be specified. | ||
|
||
The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data. | ||
|
||
The `namespaceParseSpec` can be one of a number of values. Each of the examples below would rename foo to bar, baz to bat, and buck to truck. All parseSpec types assumes each input is delimited by a new line. See below for the types of parseSpec supported. | ||
|
||
The `keyValueMaps` specifies lookup maps under one lookup name. One lookup can have multiple maps that share the source of lookup. It is a list of maps called `keyValueMap` that has three entries, `mapName`, `keyName`, and `valueName`. `mapName` is the name of map inside the lookup and `keyName` is key column or field name and `valueName` is value column or field name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding a bit more documentation detail along the lines of:
- Key/Value column refer to columns within the lookup source; "columns" field refers to Druid columns whose values will be used as filtering criteria for retrieving the mapping row from the lookup source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|`keyColumn`|The name of the column containing the key|no|The first column| | ||
|`valueColumn`|The name of the column containing the value|no|The second column| | ||
|
||
`columns` should contain all the columns specified in `kayValueMaps` of uri lookup spec. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "kayValueMaps" -> "keyValueMaps"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed as "maps"
|`delimiter`|The delimiter in the file|no|tab (`\t`)| | ||
|`listDelimiter`|The list delimiter in the file|no| (`\u0001`)| | ||
|
||
`columns` should contain all the columns specified in `kayValueMaps` of uri lookup spec. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "kayValueMaps"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed as maps
@@ -180,7 +181,7 @@ public boolean start() | |||
kafkaProperties.setProperty("group.id", factoryId); | |||
final String topic = getKafkaTopic(); | |||
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId); | |||
final Map<String, String> map = cacheManager.getCacheMap(factoryId); | |||
final Map<String, String> map = cacheManager.getCacheMap(factoryId, KeyValueMap.DEFAULT_MAPNAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a note in the docs about how KafkaLookupExtractor only uses the default mapname
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return getCachePopulator(id, extractionNamespace, lastVersion, cache); | ||
} | ||
|
||
public Callable<String> getCachePopulator(String id, T extractionNamespace, String lastVersion, Map<String, String> swap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some javadocs explaining how this function differs from getMapCachePopulator()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -412,23 +431,79 @@ public void run() | |||
} | |||
|
|||
/** | |||
* This method is expected to swap the cacheKey into the active namespace, and leave future requests for new cacheKey available. getCacheMap(cacheKey) should return empty data after this call. | |||
* This method swap the cacheKey into the active namespace, and leave future requests for new cacheKey available. getCacheMap(cacheKey) should return empty data after this call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spelling: "swap" -> "swaps", "leave" -> "leaves"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
protected abstract boolean deleteInsideMap(final ConcurrentMap<String, Map<String, String>> map); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add javadocs for these two methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
finally { | ||
lock.unlock(); | ||
} | ||
// do nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a note on why the delete can be a no-op here (GC?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a useful feature to reduce memory use and simplify management of lookups - although I have some concerns about the API. Specifically, we need to try to retain backwards compatibility.
] | ||
}, | ||
"keyValueMaps":[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree just maps
is clearer.
"keyValueMaps":[ | ||
{ | ||
"mapName":"default", | ||
"keyName":"key", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest:
keyName
->keyColumn
valueName
->valueColumn
(like the old configs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -55,10 +55,8 @@ public JDBCExtractionNamespace( | |||
final MetadataStorageConnectorConfig connectorConfig, | |||
@NotNull @JsonProperty(value = "table", required = true) | |||
final String table, | |||
@NotNull @JsonProperty(value = "keyColumn", required = true) | |||
final String keyColumn, | |||
@NotNull @JsonProperty(value = "valueColumn", required = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to retain backwards compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should have a "default map" name that the keyColumn/valueColumn go into if you don't specify a maps
list. And then that one also gets used at query time if you don't specify a mapName.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see we already have this in DEFAULT_MAPNAME
. Let's use that for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@NotNull @JsonProperty(value = "valueColumn", required = true) | ||
final String valueColumn, | ||
@JsonProperty(value = "keyValueMaps", required = true) | ||
KeyValueMap[] keyValueMaps, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer List<KeyValueMap>
here, it's generally easier to work with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@Override | ||
public String toString() | ||
{ | ||
return String.format("{map[%s] : key field = %s, value field = %s}", mapName, keyName, valueName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to use the auto generated IntelliJ style?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to use auto generated one
private String makeQueryString(List<String> requiredFields) | ||
{ | ||
String query = "SELECT "; | ||
query += StringUtils.join(requiredFields, ','); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be escaped; field names can have funny characters in them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final String keyName; | ||
private final String valueName; | ||
|
||
public static String DEFAULT_MAPNAME = "default"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
__default
is more consistent with defaults in other Druid areas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
79ef586
to
739290a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't totally review the cache manager code or jdbc namespace yet. But I looked at the main apis, http stuff, parser stuff, and uri namespace code so far. Will look at the rest soon but I just wanted to get at least this part of the review out.
The biggest question for me at this point is, does it make sense to move "maps" into the parse spec? I think it does but would appreciate a second opinion.
@@ -159,8 +202,20 @@ The remapping values for each globally cached lookup can be specified by a json | |||
"uri": "s3://bucket/some/key/prefix/renames-0003.gz", | |||
"namespaceParseSpec":{ | |||
"format":"csv", | |||
"columns":["key","value"] | |||
"columns":["key","val1","val2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing "
|
||
One of either `uri` xor `uriPrefix` must be specified. | ||
|
||
The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data. | ||
|
||
The `namespaceParseSpec` can be one of a number of values. Each of the examples below would rename foo to bar, baz to bat, and buck to truck. All parseSpec types assumes each input is delimited by a new line. See below for the types of parseSpec supported. | ||
|
||
The `maps` specifies lookup maps under one lookup name. One lookup can have multiple maps that share the source of lookup. It is a list of map specs called `keyValueMap` that has three entries, `mapName`, `keyColumn`, and `valueColumn`. `mapName` is the name of map inside the lookup and `keyColumn` is key column or field name within the lookup source and `valueColumn` is value column or field name within the lookup source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyValueMap
is maps
now
] | ||
}, | ||
"maps":[ | ||
{ | ||
"mapName":"default", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using __default
in these examples as it is the actual default map name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also document this as the default map name.
"tsColumn":"timestamp_column", | ||
"pollPeriod":600000 | ||
} | ||
``` | ||
|
||
# Introspection | ||
|
||
Globally cached lookups have introspection points at `/keys` and `/values` which return a complete set of the keys and values (respectively) in the lookup. Introspection to `/` returns the entire map. Introspection to `/version` returns the version indicator for the lookup. | ||
Globally cached lookups have introspection points at `/keys` and `/values` which return a complete set of the map names and key/value maps(respectively) in the lookup. Introspection to `/` returns the entire map. Introspection to `/version` returns the version indicator for the lookup. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I wonder about backwards compatibility here. Will take a closer look at the actual http code.
|
||
/** | ||
* Some LookupExtractorFactories have multiple maps in one namespace. | ||
* For those factories, mapNmae should be additionally given to get LookupExtractor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mapName (spelling)
public static String DEFAULT_KEYNAME = "key"; | ||
public static String DEFAULT_VALUENAME = "value"; | ||
|
||
public static List<KeyValueMap> DEFAULT_MAPS = ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? I understand having a default mapName, but it seems strange to have a default key/value name (especially undocumented).
{ | ||
Parser<String, String> getParser(); | ||
Parser<String, Object> getParser(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing String to Object means this is no longer a "flat" data parser! Maybe that's okay, but if it is okay, the name should definitely change.
{ | ||
"mapName":"default", | ||
"keyColumn":"key", | ||
"valueColumn":"val1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reading through the URIExtractionNamespace changes I now wonder if having the "maps"
with their keyColumn / valueColumn out here is causing the dizziness and weirdness with simpleJson… because it has no keyColumn!
I wonder if it makes more sense to move "maps"
into the namespaceParseSpec. That way the parser is in charge of what the map names and k/vs it returns are, and that should remove some of the weirdness in URIExtractionNamespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that seems like a reasonable change, it would express more directly that simpleJson parser doesn't use the "maps" field unlike the other parser types
I suppose the logic for map building from a set of KeyValueMaps in the URIExtractionNamespace's delegate parser could be moved to something shared by the CSV/TSV/customJson "FlatDataParsers" in URIExtractionNamespace
return ImmutableMap.<String, String>of(k, val); | ||
ImmutableMap.Builder<MultiKey, Map<String, String>> builder = new ImmutableMap.Builder<>(); | ||
|
||
// somewhat dizzy to support simple json case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above… I wonder if it'd be less dizzy to move "maps"
into the namespaceParseSpec.
private String makeQueryString(List<String> requiredFields) | ||
{ | ||
String query = "SELECT \""; | ||
query += StringUtils.join(requiredFields, "\", \""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is enough escaping. There could be backslashes and quotes and stuff in the field names. Maybe. Does JDBC/JDBI have a utility function to help with escaping?
EIther that, or let's check the requiredFields against a whitelist of characters.
@drcrallen @b-slim any thoughts on the general idea & API here? Broadly: looks like the changes are all centered around having more than one lookup map per thing-we-load. So we may load a single json file that has many logical lookups in it. IMO the nice thing about doing it this way is we only have to poll and parse the file one time. It's also easier to configure loading multiple lookups from one file. I'm on board with the general idea and attempting to work out whether the API needs adjustments or not. |
@@ -70,6 +75,7 @@ public LookupDimensionSpec( | |||
@JsonProperty("retainMissingValue") boolean retainMissingValue, | |||
@JsonProperty("replaceMissingValueWith") String replaceMissingValueWith, | |||
@JsonProperty("name") String name, | |||
@JsonProperty("mapName") String mapName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this changing the API ? not all the lookups will have a map name ?
I like the idea on minimizing the amount of fetch that a lookup had to make but the current API change make it backward incompatible plus it is unclear what |
739290a
to
dceb5f8
Compare
Sorry for late response. @b-slim I don't understand your point about backward compatibility because @gianm For escaping column and table names at SQL query creation, I use escape and quote methods of SQLTemplate in Querydsl. As I'm not familiar with SQL querying in Java, I'm not sure that this make sense. Other updates:
|
Moving to 0.10.1 as review is not complete. @sirpkt please let us know if you're still interested and we will endeavor to take another look. |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
* Refactoring Appendertor Driver (apache#4292) * Rename FiniteAppenderatorDriver to AppenderatorDriver (apache#4356) * Add totalRowCount to appenderator * add localhost as advertised hostname (apache#4689) * kafkaIndexTask unannounce service in final block (apache#4736) * warn if topic not found (apache#4834) * Kafka: Fixes needlessly low interpretation of maxRowsInMemory. (apache#5034)
This PR is related with #2523
implements ExtractionNamespaceFunctionFactory
toextends ExtractionNamespaceFunctionFactory
.mapName
, which indicates lookup map name in the given namespace, however, it works as before without that parameter for backward-compatibility.