Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-3185] Federated Lineage Transfer for PUT Data #1544

Conversation

ywcb00
Copy link
Contributor

@ywcb00 ywcb00 commented Feb 14, 2022

Hi,
This PR introduces the mechanics for tranferring the lineage trace of a data object to the federated worker.
For now, we are including only the lineage trace of matrices which come from the data gen operation (e.g. rand()), as they have the respective lineage item set in their CacheableData objects.
In order to include the lineage trace for all matrices, we need to pass it through the broadcast and broadcastSliced methods to the creation of the actual PUT_VAR FederatedRequest. I decided to introduce the mechanics at first, and change the broadcast methods with the next PR. This will allow for a better discussion about where to include the lineage trace, since then we can decide per instruction whether to put the lineage trace to the federated site or not.
However, I first tried to set the LineageItem into the CacheableData object at every instruction, but this leads to an increase in the size of the CacheableData object. Therefore, I just removed it again and changed the tests such that they have only one LineageItem (from DataGen) in the lineage trace of the PUT object for now.

Thanks for review :)

… coordinators all addressing the same federated workers (sameWorkers)
	create alsCG algorithm tests (SameWorkers and SharedWorkers)
	add runAll scripts

chore(FederatedLookupTable.java): add log trace message to log every call to getECM()
…ripts

docs(FederatedLookupTable.java): add class and function headers
…testing

feat(FederatedMultiTenantTest): add junit java class to test the multi tenancy within the test system
	add the respective dml script

chore(AutomatedTestBase.java): change the access specifier of CONFIG_DIR from private to protected

feat(SystemDS-MultiTenant-config.xml): add a systemds config with the federated initialization timeout set to 30 seconds
…ONFIG to the specified DMLConfig value when setting the global flags

chore(.gitignore): remove the artifacts of the multi tenant test shell scripts from gitignore again (rebase fail)

fix(Statistics.java): reset the FederatedStatistics with the Statistics

fix(SparkStatistics.java): increment the Spark collect count with every Spark collect time accumulation

chore(LocalInstruction.java): set the outputBufering flag to true in order to obtain the ByteArrayOutputStream from the runTest() call

chore(FederatedMultiTenantTest.java): do not redirect the error stream of the coordinator to its stdout since we are obtaining them separate now
… will be handled by the lineage from now on

feat(FederatedWorker.java): set the lineage config to the specified flags when creating a worker

feat(FederatedWorkerHandler.java): check if the file which is requested to read is available in cache before performing an actual read
	put the matrix object to cache after the read of a file

feat(LineageCache.java): create methods reuseRead() and putReadObject() to manage the lineage cache for read matrix object

chore(FederatedRequest.java): initialize the checksums member of the federated request inside the getChecksum call if it is null

chore(FederatedStatistics.java): add the overloaded method incrementFedReadCacheBytesCount with the parameter of a cacheblock
…ted lookup table entries and get requests

	add different counts for the different data types in put requests
	add a bytes count for put requests of matrices and frames

refactor(Statistics.java): divide related stats and their methods into own static classes

feat(utils/stats/**.java): create static classes for related statistics

refactor(**.java): change all calls to statistics according to the outsourced stats into new static classes
…ests and verify the results and the numbers of cache hits
…Test to FederatedReuseReadTest

	start the federated workers with lineage enabled

feat(MultiTenantTestBase.java): overload the method for starting federated workers with a parameter for additional arguments

feat(AutomatedTestBase.java): overload the method for starting a federated worker with a parameter for additional arguments

refactor(FederatedStatistics.java): rename the ReadCache stats to ReuseRead
… an output matrix/frame with its CacheableData object after processing the instruction

feat(Lineage.java): add methods to serialize and deserialize the lineage trace of a single lineage item

feat(LineageItem.java): change the _visited boolean flag to a Map with a boolean flag for each different thread id (allows for multithreaded lineage trace checks)

feat(FederatedRequest.java): add the lineage trace string as a member variable
	add the respective getter and setter methods
	create a constructor with a lineage item as parameter which gets and sets the serialized lineage trace string

feat(FederatedWorkerHandler.java): deserialize the lineage trace for put matrices and frames and add it to the lineage of the respective matrix/frame and execution context
	add a literal lineage for put scalar objects as its lineage
	keep the fallback solution of the checksum for the case of lineage activated on the federated worker but not activated on the coordinator

feat(FederationMap.java): create the PUT_VAR fed requests including the lineage trace whenever broadcasting a matrix or frame

feat(LocalVariableMap.java): change localMap from HashMap to ConcurrentHashMap since we else have a problem with concurrent clear instructions on the federated worker

feat(CPOperand.java): create a static method to create a literal lineage string for a scalar object

feat(FederatedStatistics.java): add the LineageCacheStatsCollection to obtain and display the lineage cache statistics from the federated worker

chore(LineageCacheStatistics.java): add some getters
…instructions with a broadcast object and by its transferred lineage trace
…ashMap since we could run into some concurrency problems there (even though each thread has its own entry)
…t object inside the broadcast methods to avoid computing it separately for each federated worker in broadcast sliced

chore(FederatedRequest.java): change the constructor parameter from LineageItem to String for a serialized lineage trace
…tProcessInstruction again to not set the lineage item into the cacheable data

fix(FederatedWorkerHandler.java): move the closing bracket of the lineage cache reuse branch in readData back to its original place so that the lineage trace is set for a reused object as well (rebase issue)

chore(FederatedLineageTraceReuseTest): remove the print of the coordinator output
…ace (count, items) and the respective methods

chore(FederatedWorkerHandler.java): aggregate the statistics for put lineage whenever getting the serialized lineage with the PUT request
…he response for read frames again (rebase fail)
@ywcb00 ywcb00 force-pushed the feat/fed/multitenancy/optimize/lineagebased/synclineagetransfer branch from cfa8aef to b622804 Compare February 19, 2022 11:01
@phaniarnab phaniarnab closed this in 36e84d5 Mar 4, 2022
@phaniarnab
Copy link
Contributor

Thanks for the initial lineage trace transfer logic @ywcb00.
The code changes look good to me. I am only a bit skeptical about replacing the HashMap of our live variable map with a ConcurrentHashMap. However, we can discuss that later. I merged the changes with minor fixes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants