Skip to content

Conversation

@PatrickRen
Copy link
Contributor

What is the purpose of the change

This pull request add partial caching functionality in sync and async lookup runners, which is a part of FLIP-221.

Brief change log

  • Support partial caching for sync lookup table
  • Port TestValuesLookupFunction onto new LookupFunction API and add tests for sync lookup
  • Support partial caching for async lookup table
  • Port AsyncTestValuesLookupFunction onto new AsyncLookupFunction and add tests for async lookup

Verifying this change

This change modifies some existing tests, including LookupJoinHarnessTest and LookupJoinITCase.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 3, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, @PatrickRen .

The biggest concern of the implementation is the partial cache invades LookupJoinRunners (4 classes). Actually, the cache implementation can be a simple wrapper on the LookupFunction, such as CachedLookupFunction. This can clearly separate the caching (partial or full), fetcher, and join runner logic, which is very important for long-term maintenance. I can imagine that the LookupRunner would be much more complex when we introduce full caching and many if else branches there.

What do you think about refactoring to a simple PartialCachedLookupFunction extends LookupFunction which wraps user LookupFunction and the LookupCache? In the future, we can support full cache in the same way which just adding a new class makes things work.

* table for which it is serving.
*/
@Internal
public class LookupCacheManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this is only used internally (flink-table-runtime), why put it in a common module?

Comment on lines +59 to +67
public synchronized LookupCache registerCache(String tableIdentifier, LookupCache cache) {
checkNotNull(cache, "Could not register null cache in the manager");
if (cachesByTableIdentifier.containsKey(tableIdentifier)) {
return cachesByTableIdentifier.get(tableIdentifier);
} else {
cachesByTableIdentifier.put(tableIdentifier, cache);
return cache;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have several concerns about this method:

  1. the same dim table can be used multiple times in a SQL query with different table options (e.g. cache TTL). I think we shouldn't reuse them if the configuration differs. That means we may need to introduce a LookupCache#getIdentifier() interface to get an identifier of a specific cache.
  2. tableIdentifier is not enough to identify a cache. Because in session mode, different jobs may use the same table identifier to refer to different external tables. Maybe we should add JobID as part of the cache id.
  3. Therefore, the final registered key should be composite of JobID, table identifier and cache identifier.
  4. From the method signature, the cache parameter is the one registered, however, it maybe not. Maybe registerCacheIfAbsent would be better (similar to Map#putIfAbsent).

if (provider instanceof PartialCachingLookupProvider) {
LookupCache cache = ((PartialCachingLookupProvider) provider).getCache();
if (cache == null) {
return Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we allow a null cache for PartialCachingLookupProvider?

  1. Users should use LookupFunctionProvider if no cache is provided.
  2. PartialCachingLookupProvider#getCache() is not annotated @Nullable.
  3. I think we also not allow null getScanRuntimeProvider and getCacheReloadTrigger for FullCachingLookupProvider.

Comment on lines +283 to +288
LookupJoinCodeGenerator.generateLeftTableKeyProjection(
tableConfig,
classLoader,
leftTableRowType,
keyRowType,
lookupKeys),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use org.apache.flink.table.planner.plan.utils.KeySelectorUtil#getRowDataSelector to create the key select/projection which guarantees always generates BinaryRowData for the consistent hashcode() and equals() behavior.

leftTableRowType,
keyRowType,
lookupKeys),
new RowDataSerializer(keyRowType),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheKeySerializer is not needed when we use the above keyselector.

@Nullable private final RowDataSerializer cacheKeySerializer;
@Nullable private final RowDataSerializer cacheValueSerializer;

private LookupCache cache;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transient?

Comment on lines +105 to +111
if (cacheKeySerializer != null && cacheValueSerializer != null) {
RowData copiesKey = cacheKeySerializer.copy(key);
Collection<RowData> copiedValues =
value.stream()
.map(cacheValueSerializer::copy)
.collect(Collectors.toCollection(ArrayList::new));
getCache().put(copiesKey, copiedValues);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why copy?

Comment on lines +125 to +129
* <p>- Input from left table (id, name): +I(1, Alice)
*
* <p>- Value return by user's fetcher (id, age, gender): +I(1, 18, female)
*
* <p>Then the entry stored in the cache would be: +I(1), +I(1, 18, female), even calculation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change flag can be omitted when explaining the cache strategy because the cache doesn't store any changelogs and all records should be insert-only.

+I(1), +I(1, 18, female) ==> key=(1), value=(1, 18, female)

HEAP_BACKEND,
ENABLE_OBJECT_REUSE,
AsyncOutputMode.ALLOW_UNORDERED,
DISABLE_CACHE),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need double cases for dynamic table sources. I think mixing cache cases into the existing 4 cases (for dynamic table sources) is enough.

}
}
if (cacheHandler != null) {
cacheHandler.getCache().close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should unregister the cache as well. Otherwise, memory leaks. That means LookupCacheManager may need to track the reference count.

@PatrickRen
Copy link
Contributor Author

Thanks for the review @wuchong ! I made a refactor based on your comments so I open a new PR #20480. Will close this one for now.

@PatrickRen PatrickRen closed this Aug 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants