Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1980 Change input and output argument order for Spark process …
…workflow

Details to perform this change has been mentioned in the FALCON-1980.

Author: peeyush b <pbishnoi@hortonworks.com>

Reviewers: Pavan <pavan.kolamuri@gmail.com>

Closes #160 from peeyushb/FALCON-1980 and squashes the following commits:

23d4e24 [peeyush b] Added comments for input and output argument
3d86f58 [peeyush b] FALCON-1980 : Change input and output argument order for Spark process workflow

(cherry picked from commit 400ef32)
Signed-off-by: peeyush b <pbishnoi@hortonworks.com>
  • Loading branch information
peeyushb committed May 30, 2016
1 parent 16a07cb commit 33a37a6380f16830265d23c47d444cd3b1d7ddbe
Showing 1 changed file with 9 additions and 3 deletions.
@@ -89,8 +89,9 @@ protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconExc
argList.addAll(sparkArgs);
}

addInputFeedsAsArgument(argList, cluster);
//Adding output first so that final order must have input and then output followed by user's arguments.
addOutputFeedsAsArgument(argList, cluster);
addInputFeedsAsArgument(argList, cluster);

sparkAction.setJar(addUri(sparkFilePath, cluster));

@@ -145,6 +146,7 @@ private void addInputFeedsAsArgument(List<String> argList, Cluster cluster) thro
return;
}

//Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect.
int numInputFeed = entity.getInputs().getInputs().size();
while (numInputFeed > 0) {
Input input = entity.getInputs().getInputs().get(numInputFeed-1);
@@ -163,13 +165,17 @@ private void addOutputFeedsAsArgument(List<String> argList, Cluster cluster) thr
return;
}

for(Output output : entity.getOutputs().getOutputs()) {
//Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect.
int numOutputFeed = entity.getOutputs().getOutputs().size();
while (numOutputFeed > 0) {
Output output = entity.getOutputs().getOutputs().get(numOutputFeed-1);
Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
final String outputName = output.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(argList.size(), "${" + outputName + "}");
argList.add(0, "${" + outputName + "}");
}
numOutputFeed--;
}
}

0 comments on commit 33a37a6

Please sign in to comment.