-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Description
This is likely an omission. Caching logic should all be consolidated in EvaluationContext to ensure consistency.
beam/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
Lines 413 to 428 in d634674
| if (outputs.size() > 1) { | |
| StorageLevel level = StorageLevel.fromString(context.storageLevel()); | |
| if (canAvoidRddSerialization(level)) { | |
| // if it is memory only reduce the overhead of moving to bytes | |
| all = all.persist(level); | |
| } else { | |
| // Caching can cause Serialization, we need to code to bytes | |
| // more details in https://issues.apache.org/jira/browse/BEAM-2669 | |
| Map<TupleTag<?>, Coder<WindowedValue<?>>> coderMap = | |
| TranslationUtils.getTupleTagCoders(outputs); | |
| all = | |
| all.mapToPair(TranslationUtils.getTupleTagEncodeFunction(coderMap)) | |
| .persist(level) | |
| .mapToPair(TranslationUtils.getTupleTagDecodeFunction(coderMap)); | |
| } | |
| } |
Imported from Jira BEAM-9387. Original Jira may contain additional context.
Reported by: ibzib.