[SYSTEMDS-3185] Federated Response Serialization Caching#1611
Closed
ywcb00 wants to merge 13 commits into
Closed
Conversation
… coordinators all addressing the same federated workers (sameWorkers)
create alsCG algorithm tests (SameWorkers and SharedWorkers) add runAll scripts chore(FederatedLookupTable.java): add log trace message to log every call to getECM()
…ripts docs(FederatedLookupTable.java): add class and function headers
…testing feat(FederatedMultiTenantTest): add junit java class to test the multi tenancy within the test system add the respective dml script chore(AutomatedTestBase.java): change the access specifier of CONFIG_DIR from private to protected feat(SystemDS-MultiTenant-config.xml): add a systemds config with the federated initialization timeout set to 30 seconds
…ONFIG to the specified DMLConfig value when setting the global flags chore(.gitignore): remove the artifacts of the multi tenant test shell scripts from gitignore again (rebase fail) fix(Statistics.java): reset the FederatedStatistics with the Statistics fix(SparkStatistics.java): increment the Spark collect count with every Spark collect time accumulation chore(LocalInstruction.java): set the outputBufering flag to true in order to obtain the ByteArrayOutputStream from the runTest() call chore(FederatedMultiTenantTest.java): do not redirect the error stream of the coordinator to its stdout since we are obtaining them separate now
…ests and verify the results and the numbers of cache hits
…Test to FederatedReuseReadTest start the federated workers with lineage enabled feat(MultiTenantTestBase.java): overload the method for starting federated workers with a parameter for additional arguments feat(AutomatedTestBase.java): overload the method for starting a federated worker with a parameter for additional arguments refactor(FederatedStatistics.java): rename the ReadCache stats to ReuseRead
… an output matrix/frame with its CacheableData object after processing the instruction feat(Lineage.java): add methods to serialize and deserialize the lineage trace of a single lineage item feat(LineageItem.java): change the _visited boolean flag to a Map with a boolean flag for each different thread id (allows for multithreaded lineage trace checks) feat(FederatedRequest.java): add the lineage trace string as a member variable add the respective getter and setter methods create a constructor with a lineage item as parameter which gets and sets the serialized lineage trace string feat(FederatedWorkerHandler.java): deserialize the lineage trace for put matrices and frames and add it to the lineage of the respective matrix/frame and execution context add a literal lineage for put scalar objects as its lineage keep the fallback solution of the checksum for the case of lineage activated on the federated worker but not activated on the coordinator feat(FederationMap.java): create the PUT_VAR fed requests including the lineage trace whenever broadcasting a matrix or frame feat(LocalVariableMap.java): change localMap from HashMap to ConcurrentHashMap since we else have a problem with concurrent clear instructions on the federated worker feat(CPOperand.java): create a static method to create a literal lineage string for a scalar object feat(FederatedStatistics.java): add the LineageCacheStatsCollection to obtain and display the lineage cache statistics from the federated worker chore(LineageCacheStatistics.java): add some getters
…tProcessInstruction again to not set the lineage item into the cacheable data fix(FederatedWorkerHandler.java): move the closing bracket of the lineage cache reuse branch in readData back to its original place so that the lineage trace is set for a reused object as well (rebase issue) chore(FederatedLineageTraceReuseTest): remove the print of the coordinator output
…ide the encode method to check for lineage reuse before serializing the response add the CachedObjectEncoder to the ChannelPipeline instead of the basic ObjectEncoder of netty chore(FederatedWorkerHandler.java): create the federated response of a get request for a cacheblock with the respective lineage object feat(LineageCache.java): add reuse and put method for the serializations of federated responses feat(LineageCacheEntry.java): adopt the lineage cache entry to have serialized bytes as value and add the respective methods feat(LineageItemUtils.java): create static method to derive a serialized lineage item from a given lineage item feat(FederatedResponse.java): add member _linItem to pass the lineage item of a response to a get request to the encode method where serialization is taking place chore(Statistics.java): display the federated put lineage statistics with the federated statistics
…reuse of serialized federated responses fix(FederatedLineageTraceReuseTest): consider the writes to cache made by the serialization reuse
…g the lineage item of a serialized response rename the identifier opcode for the lineage item of serialized bytes to SERIALIZATION_OPCODE
d1d3323 to
e835e9a
Compare
Contributor
|
LGTM - thanks @ywcb00 for the patch. During the merge, I fixed a few unnecessary imports, formatting, and the stringbuilder handling in federated statistics. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Hi,
This PR introduces the reuse of serialized federated responses. In order to avoid redundant serialization, we put the serialized byte array of federated responses containing a CacheBlock into the lineage cache and mark it with a separate lineage item for serialization derived from the lineage item of the underlying data. The next time the same data is requested, the serialization process for the federated response is skipped and the cached bytes are loaded directly into the output buffer.
The implementation overrides the encode method of netty's ObjectEncoder to check whether we can reuse the serialization prior to the actual encoding, and to obtain and store the serialized bytes after the encoding.
Thanks for review :)