New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[NEMO-139, 6] Logic in the scheduler for appending jobs, Support RDD caching #111
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did my first pass. I'll focus on the caching logic on my next pass.
final PhysicalPlan appendedPlan = | ||
PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan); | ||
updatePlan(appendedPlan, maxScheduleAttempt); | ||
planStateManager.storeJSON("appended"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have multiple appended
plans. Can we have a unique id for each of them?
vertexHarness, isToSideInput)); // Parent-task read | ||
}); | ||
vertexHarness, isToSideInput)) // Parent-task read | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's push this line up.
* A {@link Transform} does not emit any output. | ||
* @param <T> input/output type. | ||
*/ | ||
public final class DummyTransform<T> implements Transform<T, T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have an EmptyTransform
in EmptyComponents
class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the EmptyComponents
are under the test
package. Should it be moved to non-test package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be better than having duplicate code
* This kind of vertices is needed when some data have to be written before it's usage is not determined yet | ||
* (e.g., for caching). | ||
*/ | ||
public final class GhostProperty extends VertexExecutionProperty<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about keeping all these components in the EmptyComponents
class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my thought, having multiple empty classes in a single class file is not a good design.
Also, it would be good to have ExecutionProperty
s in a single package especially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done my pass and left a minor comment. Thank you!
* This kind of vertices is needed when some data have to be written before it's usage is not determined yet | ||
* (e.g., for caching). | ||
*/ | ||
public final class GhostProperty extends VertexExecutionProperty<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about MarkerProperty
which has enum values such as CachedDataMarker
?
As the class comment says, vertex with this property is kind of a dummy vertex(in that it doesn't process data) used as a 'marker', in this case marks the existence of cached data. I think this type of vertex can also be used in contexts other than caching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should the value be an enum? Just having MARKER_PROPERTY
as now is not enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. MARKER_PROPERTY
would be enough for now. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left comments to ask about the relationships between the new properties, and existing properties/vertex.
/** | ||
* Cache ID ExecutionProperty. | ||
*/ | ||
public final class CacheIDProperty extends EdgeExecutionProperty<UUID> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more of a question than a suggestion:
Would it be a good idea to somehow merge this into the existing PersistenceProperty
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my thought, it would be good to separate them because the PersistenceProperty
can be used for changing the persistence itself but not for caching. For example, our large shuffle optimization change the persistence to conduct shuffle on memory. If we merge the cache ID with the persistence property, the Pass
developer should decide to maintain or discard the cache ID or not while modifying the PersistenceProperty
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That makes sense. Can you add a class-level comment about this?
* This kind of vertices is needed when some data have to be written before it's usage is not determined yet | ||
* (e.g., for caching). | ||
*/ | ||
public final class MarkerProperty extends VertexExecutionProperty<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DoNotScheduleProperty?
How is this related to the 'barrier' aspects in the existing MetricCollectionBarrierVertex
? Would it be a good idea to reuse this property for that as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a vertex is annotated as a MarkerProperty
(or DoNotScheduleProperty
), the vertex should not be scheduled to any executor (forever). It will act as a simple marker to construct an edge (and the data in the edge).
In contrast, the barrier vertex is scheduled to an executor, collects the metric of data, sends the metric data to master, becomes ON_HOLD
state, and is completed after a dynamic optimization with the metric.
It seems that there is no reason to merge the two property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So vertices with this property is simply 'ignored' when scheduling, whereas the barrier vertex is scheduled but 'blocks' the scheduling of downstream vertices.
How about renaming it to something like IgnoreSchedulingProperty
? (consistent with the existing ClonedSchedulingProperty
) The term Marker
seems a bit generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk I understand your point, but additional to being skipped in scheduling, this property acts as a marker used in PlanAppender
and NemoOptimizer
when scanning Plan to find cached data. Maybe acting as a marker(indicator?) is a primary feature and being skipped in scheduling is just a side effect(skipped since it's a dummy vertex).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that the name is generic. Every annotation acts as a kind of marker. If @jeongyooneo agree, I will rename it to IgnoreSchedulingProperty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the explanation is now in the class-level comment, so going for IgnoreSchedulingProperty
looks fine! 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jeongyooneo @sanha for your explanation. I understood that this property is actually being interpreted in two different ways:
(1) Ignore(skip) scheduling
(2) Some kind of an indicator when scanning for cached data (as in https://github.com/apache/incubator-nemo/pull/111/files/cfcbf0f6c16347d377e2e671a88739d65bbc36b1#diff-b29364b5afa203a9c5f3f3af1717b866R68)
IMHO (1) and (2) are orthogonal, and I feel we should have a separate property for each.
I feel that (1) is straightforward, whereas (2) assumes certain things (e.g., Cached edge toward a ghost is a representative edge).
If we go with a single property that does both (1) and (2), then I'd suggest using a name that clearly describes either (1) or (2) and maybe leave a TODO or comment to indicate that it is being used for the other purpose as well. (with certain assumptions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk I chose to change it's name and update the comments for now. Thanks!
* This kind of vertices is needed when some data have to be written before it's usage is not determined yet | ||
* (e.g., for caching). | ||
*/ | ||
public final class MarkerProperty extends VertexExecutionProperty<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jeongyooneo @sanha for your explanation. I understood that this property is actually being interpreted in two different ways:
(1) Ignore(skip) scheduling
(2) Some kind of an indicator when scanning for cached data (as in https://github.com/apache/incubator-nemo/pull/111/files/cfcbf0f6c16347d377e2e671a88739d65bbc36b1#diff-b29364b5afa203a9c5f3f3af1717b866R68)
IMHO (1) and (2) are orthogonal, and I feel we should have a separate property for each.
I feel that (1) is straightforward, whereas (2) assumes certain things (e.g., Cached edge toward a ghost is a representative edge).
If we go with a single property that does both (1) and (2), then I'd suggest using a name that clearly describes either (1) or (2) and maybe leave a TODO or comment to indicate that it is being used for the other purpose as well. (with certain assumptions)
final Map<UUID, StageEdge> cachedEdges = new HashMap<>(); | ||
originalPlan.getStageDAG().getVertices().forEach( | ||
stage -> originalPlan.getStageDAG().getIncomingEdgesOf(stage).stream() | ||
// Cached edge toward a ghost is a representative edge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an important assumption.
Can you add a pointer to comments in {@link MarkerProperty}, or a TODO to emphasize/explain this assumption?
@johnyangk @jeongyooneo Thanks for the review! Please take a look. |
LGTM 😄 |
JIRA: NEMO-139: Logic in the scheduler for appending jobs
JIRA: NEMO-6: Support RDD caching
Major changes:
PlanAppender
that appends submittedPhysicalPlan
to a originalPhysicalPlan
PlanStateManager
,BatchScheduler
,BlockManagerMaster
, andTaskDispatcher
to reflect that all plans from a single job are appended to a singlePhysicalPlan
throughPlanAppender
CacheIdProperty
property andGhostProperty
cache()
orpersist()
for aRDD
, the RDD creates a ghost vertex and connect the vertex having theRDD
to the ghost vertex. This edge to the ghost vertex is annotated with an ID of cache (cacheIdProperty
). When a plan with this edge is executed in our runtime, the data to cache will be stored in the edge as the requiredStorageLevel
format. (Any extra feature is not required in our runtime to produce or sustain this data.)BatchScheduler
encounter a task that annotated with theGhostProperty
, the vertex will not be scheduled but just regarded as a completed task.Optimizer
that conducts optimization by usingOptimizationPass
es from ourUserApplicationRunner
to separate the roll.cacheIdProperty
is submitted and there was any already executed IR DAG that contains an edge with the identicalcacheIdProperty
, theOptimizer
crops the IR DAG before the cache edge and adds aCachedSourceVertex
before the edge.PlanAppender
properly handle the cachingPlanAppender
append thePhysicalPlan
constructed from the cropped IR DAG with caching edge to the originalPhysicalPlan
and add a new edge from the vertex that has the actual edge to a ghost vertex and the newCachedSourceVertex
. In runtime, when theCachedSourceVertex
requires the data, the cached data that produced and stored in the edge to the ghost vertex will be read through ourDuplicateEdgeGroupProperty
logic.Minor changes to note:
Tests for the changes:
SparkCachingWordCount
applicationSparkCachingWordCount
caches a shuffle data and calculates that which keys have identical count by using the cached data.Other comments:
Closes #111