-
Notifications
You must be signed in to change notification settings - Fork 117
FALCON-2226 Submit ,Schedule and submitAndSchedule API for extension #327
Conversation
@@ -52,6 +52,7 @@ | |||
|
|||
##Add if you want to use Trusted or User Extensions | |||
## In case of distributed Mode enable ExtensionService only on Prism via prism.application.services | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit : Unnecessary change.
@@ -58,6 +46,7 @@ | |||
private static final String CONFIG = "config"; | |||
private static final String CREATION_TIME = "creationTime"; | |||
private static final String LAST_UPDATE_TIME = "lastUpdatedTime"; | |||
public static final String FALCON_TAG = "falcon"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think this is used in this class. May be move it proxy?
@@ -317,8 +330,8 @@ public APIResult submit( | |||
|
|||
try { | |||
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); | |||
submitEntities(extensionName, doAsUser, jobName, entityMap, config); | |||
} catch (FalconException | IOException e) { | |||
submitEntities(extensionName, jobName, entityMap, config, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has doAsUser been knocked off? I know we don't use it now. But, since there is a plan to enable it later, we shouldn't be removing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
submit API in SchedulableEntityManagerProxy doesnot take doAs as a param we should not do it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough.
|
||
for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ | ||
for(final Entity entity : entry.getValue()){ | ||
final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the eventual goal is to get the bufferedRequest I think getEntityStream and getBufferedRequest can be merged into a single method.
submitInternal(process, doAsUser); | ||
processNames.add(process.getName()); | ||
|
||
for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The feeds have to be submitted before processes are. Else, the process submission will fail due to non-existence of a feed.
@@ -691,4 +778,61 @@ private static void checkIfExtensionServiceIsEnabled() { | |||
ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND); | |||
} | |||
} | |||
|
|||
private abstract class EntitiesProxy<T extends APIResult> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this class here? Where is it used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed will remove.
|
||
for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ | ||
for(Entity entity : entry.getValue()){ | ||
submitInternal(entity, "falconUser"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feeds before processes, here too. May be create a method and call from both submit and submitAndSchedule.
List<String> processNames = new ArrayList<>(); | ||
for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ | ||
for(Entity entity : entry.getValue()){ | ||
submitInternal(entity, "falconUser"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feeds before processes.
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* A proxy implementation of the extension operations in local mode. | ||
*/ | ||
public class LocalExtensionManager extends AbstractExtensionManager { | ||
public class LocalExtensionManager extends ExtensionManagerProxy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why extend Proxy? Falcon Unit shouldn't worry about Proxy.
@@ -326,7 +325,7 @@ public APIResult submit( | |||
@FormDataParam("feeds") List<FormDataBodyPart> feedForms, | |||
@FormDataParam("config") InputStream config) { | |||
checkIfExtensionServiceIsEnabled(); | |||
Map<EntityType, List<Entity>> entityMap; | |||
TreeMap<EntityType, List<Entity>> entityMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Declaration should only have Map... Only instantiation should be TreeMap.
@@ -338,15 +337,15 @@ public APIResult submit( | |||
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully:" + jobName); | |||
} | |||
|
|||
private Map<EntityType, List<Entity>> getEntityList(String extensionName, String jobName, | |||
private TreeMap<EntityType, List<Entity>> getEntityList(String extensionName, String jobName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Declaration should just be Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want the contract to be tightly coupled so have changed it to SortedMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, as long as it is an Interface as you have used (SortedMap) and not the impl., it is fine.
List<FormDataBodyPart> feedForms, | ||
List<FormDataBodyPart> processForms, InputStream config) | ||
throws FalconException, IOException{ | ||
List<Entity> processes = getProcesses(processForms); | ||
List<Entity> feeds = getFeeds(feedForms); | ||
ExtensionType extensionType = getExtensionType(extensionName); | ||
List<Entity> entities; | ||
Map<EntityType, List<Entity>> entityMap = new HashMap<>(); | ||
TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Believe you made this a tree map, so that during submission, feeds go first? But, while putting elements into this map, processes are going in first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internally tree will store in the natural ordering and Enum maintains it in the ordinal value in which feed it the first one so we should be good here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh.. right.. My bad. In my head, I mixed it up with LinkedHashMap.
@@ -442,7 +441,7 @@ private BufferedRequest getBufferedRequest(HttpServletRequest request) { | |||
} | |||
|
|||
protected void submitEntities(String extensionName, String jobName, | |||
Map<EntityType, List<Entity>> entityMap, InputStream configStream, | |||
TreeMap<EntityType, List<Entity>> entityMap, InputStream configStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method signature should just have the interface name, Map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it to SortedMap.
@@ -285,14 +285,14 @@ public APIResult submitExtensionJob(String extensionName, String jobName, String | |||
|
|||
InputStream configStream = getServletInputStream(configPath); | |||
try { | |||
Map<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); | |||
TreeMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above for the declarations in this class too. Method signatures and declaration should just have Map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change the TreeMap to SortedMap both in FalconUnitClient and LocalExtensionManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Unrelated build failure.. merging. |
Author: Praveen Adlakha <adlakha.praveen@gmail.com> Reviewers: @pallavi-rao Closes apache#327 from PraveenAdlakha/2226 and squashes the following commits: 59a43ef [Praveen Adlakha] minor changes in falcon client and localextensionmanager 473f04a [Praveen Adlakha] comments addressed 6a346aa [Praveen Adlakha] comments addressed 8733f53 [Praveen Adlakha] checkstyle issue resolved 9ba005e [Praveen Adlakha] FALCON-2226 Submit ,Schedule and submitAndSchedule API for extension in distributed mode 699b06f [Praveen Adlakha] WIP 29da911 [Praveen Adlakha] WIP 0b7d02a [Praveen Adlakha] merge conflicts resolved 6b77cc1 [Praveen Adlakha] FALCON-2223 Distributed mode support for User Extension
No description provided.