Skip to content

Conversation

@sachingoel0101
Copy link
Contributor

  1. This PR adds support for accessing distributed cache entries when running iterations.
  2. Since there are several tests which execute on both Cluster and Collection modes, it seems logical to not fail a test on either if it passes on both. Distributed Cache files create one such case. There is nothing actually wrong with trying to access a distributed cache entry when running in collection environment. It just doesn't really make sense to do so.
    This takes care of that too.

@sachingoel0101 sachingoel0101 force-pushed the iteration_cache_files branch 2 times, most recently from 1a1ddb3 to 0675cb4 Compare August 6, 2015 20:18
@@ -79,7 +68,7 @@ public AbstractRuntimeUDFContext(String name,
this.subtaskIndex = subtaskIndex;
this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig;
this.distributedCache = new DistributedCache(cpTasks);
this.distributedCache = Preconditions.checkNotNull(new DistributedCache(cpTasks));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to check cpTasks for being null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. Sorry.

@sachingoel0101 sachingoel0101 force-pushed the iteration_cache_files branch 2 times, most recently from c22cdec to 05c5326 Compare August 7, 2015 14:07
@sachingoel0101
Copy link
Contributor Author

Addressed PR comments. There is one unrelated failure on the GroupReduceITCase. I've filed a JIRA for that.

@sachingoel0101 sachingoel0101 force-pushed the iteration_cache_files branch from 05c5326 to e571f3b Compare August 7, 2015 17:53
@StephanEwen
Copy link
Contributor

Looks good, in general.

Can you add the test to one of the other iteration test files? This saves cluster startup and shutdown costs, making builds faster. Maybe to the iteration aggregators, or iteration accumulators.

@@ -501,4 +536,22 @@ public int getSuperstepNumber() {
return (T) previousAggregates.get(name);
}
}

private static final class DoingNothing implements Callable<Path>{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually does something ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha. Yes. In an earlier version of the code, it wasn't. :')

@sachingoel0101 sachingoel0101 force-pushed the iteration_cache_files branch 3 times, most recently from fe9bb3a to 376425c Compare August 10, 2015 22:17
@sachingoel0101
Copy link
Contributor Author

I've moved the test to an existing MultipleProgramTestBase. Should be good to merge now. :)

@sachingoel0101
Copy link
Contributor Author

I'd like to get this merged soon. This removes multiple constructors for Runtime contexts and establishes a clean hierarchy, making any changes to the constructors easier. This will be useful for two Jiras on exposing task configuration and task attempt number to the Runtime context.

@sachingoel0101
Copy link
Contributor Author

These changes have been reverted back
I decided to go ahead and implement things which touch the Runtime Context constructors with this PR. This now closes five Jiras, namely 2449, 2458, 2488, 2496 and 2524. Commit messages are descriptive of each Jira.
Flink-2449: Allow access to distributed cache from Collection Environment
Flink-2458: Allow access to distributed cache from Iterative Tasks
Flink-2488: Expose Attempt number from Runtime Context
Flink-2496: Expose Task Manager configuration in Runtime Context
Flink-2524: Add getTaskNameWithSubtasks in Runtime Context.

@@ -897,7 +897,7 @@ class TaskManager(
config.timeout,
libCache,
fileCache,
runtimeInfo)
new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed from before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to provide access to Task attempt number from Runtime Context. I should add a description of the other tickets this resolves.
Is this a good idea though? To fix five issues in one PR? Or should I open a separate one and keep this one for just distributed cache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally we try to keep one PR for one issue, exceptions should only be done for closely related issues.

why did you decide to add these issues into this PR? ( i have a hard time understanding it, since the commits barely touch the same files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The addition of distributed cache removes the need for multiple constructors for RuntimeContexts. Since providing access to runtime information needed changing the constructors, I deemed it better to work with what would be the only needed constructors after merging this.
I can revert this commit and open a separate PR for the other three issues if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if you opened a second PR once this is merged. The issues are not really related to each other; the 2nd commit was simply made based on the 1st commit. We would end up having two separate discussions in 1 PR, which i think is a bad idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Yes. That makes sense. I will revert this and open a separate PR. Apologies.

@sachingoel0101
Copy link
Contributor Author

Reverting back to make this PR only about the distributed cache.

@StephanEwen
Copy link
Contributor

We are indeed falling behind on merging pull requests, right now. Many committers are on vacation this month, and for the others, the large amount of pull requests is hard to keep up with, especially next to the work on our own issues.

Hope this will get better in a week or two.

I'll try to get a look at this very soon...

@StephanEwen
Copy link
Contributor

In the CollectionExecutor, can you skip creating the ExecutiorService? You can eagerly resolve the path and then put an already finished future into the map.

@StephanEwen
Copy link
Contributor

Aside from the comment above, this looks good. Would merge this, after the comment is addressed.

[FLINK-2449]Allow use of distributed cache from Collection Environments
@sachingoel0101
Copy link
Contributor Author

Addressed comments. @StephanEwen

@StephanEwen
Copy link
Contributor

Looks good, merging this!

@asfgit asfgit closed this in 358259d Aug 16, 2015
@sachingoel0101 sachingoel0101 deleted the iteration_cache_files branch August 23, 2015 14:56
nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
…from Iteration contexts & use of distributed cache from Collection Environments

This closes apache#970
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants