-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented new REST API hpFlows to add high priority flows to Azkaban. #3014
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package azkaban.imagemgmt.daos; | ||
|
||
import azkaban.executor.ExecutableFlow; | ||
import java.util.List; | ||
|
||
|
||
/** | ||
* Data access object (DAO) for accessing high priority flow metadata. | ||
* This interface defines add/remove/get methods for high priority flows. | ||
*/ | ||
public interface HPFlowDao { | ||
/** | ||
* isHPFlowOwner | ||
* @param userId : owner of high priority flows. | ||
* @return : True if user is owner, false otherwise. | ||
*/ | ||
boolean isHPFlowOwner(final String userId); | ||
|
||
/** | ||
* addHPFlows | ||
* @param flowIds : list of flows. | ||
* @param userId : userId of owner who is adding the HP flows. | ||
*/ | ||
int addHPFlows(final List<String> flowIds, final String userId); | ||
|
||
/** | ||
* isHPFlow | ||
* @param flow : Given a flow, finds if it is a high priority flow. | ||
* @return | ||
*/ | ||
boolean isHPFlow(final ExecutableFlow flow); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
package azkaban.imagemgmt.daos; | ||
|
||
import azkaban.db.DatabaseOperator; | ||
import azkaban.db.SQLTransaction; | ||
import azkaban.executor.ExecutableFlow; | ||
import azkaban.imagemgmt.exception.ErrorCode; | ||
import azkaban.imagemgmt.exception.ImageMgmtDaoException; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.sql.Timestamp; | ||
import java.time.LocalDateTime; | ||
import java.util.List; | ||
import javax.inject.Inject; | ||
import javax.inject.Singleton; | ||
import org.apache.commons.dbutils.ResultSetHandler; | ||
import org.apache.log4j.Logger; | ||
|
||
import static azkaban.imagemgmt.utils.ErroCodeConstants.*; | ||
|
||
|
||
/** | ||
* Dao implementation for managing high priority flows | ||
*/ | ||
@Singleton | ||
public class HPFlowDaoImpl implements HPFlowDao { | ||
private static final Logger log = Logger.getLogger(HPFlowDaoImpl.class); | ||
|
||
private final DatabaseOperator databaseOperator; | ||
|
||
// Flow ID is a fully qualified flow name with format, | ||
// <project_name>.<flow_ame> | ||
private static String INSERT_HP_FLOW_OWNER = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any mapping between hp_flow_owners and hp_flows tables? Can anyone be owner for any flows and insert anything? Can you please add more detail about who can be the owner? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replied in earlier comment. |
||
"INSERT into hp_flow_owners (name, created_on, created_by, " | ||
+ "modified_on, modified_by ) " | ||
+ "values (?, ?, ?, ?, ?)"; | ||
|
||
private static String INSERT_HP_FLOWS = | ||
"INSERT into hp_flows (flow_id, created_on, created_by) " | ||
+ "values (?, ?, ?)"; | ||
|
||
@Inject | ||
public HPFlowDaoImpl(final DatabaseOperator databaseOperator) { | ||
this.databaseOperator = databaseOperator; | ||
} | ||
|
||
/** | ||
* Returns if the provided user has high priority flow management access. | ||
* @param userId : owner of high priority flows. | ||
* @return | ||
*/ | ||
@Override | ||
public boolean isHPFlowOwner(final String userId) { | ||
final FetchHPFlowOwnership fetchHPFlowOwnership = new FetchHPFlowOwnership(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to create a new object every time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure, I believe I followed the pattern used in the rest of the code. Do you see any harm in doing this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that in general dependencies should be injected. I don't see a specific issue here, but I think it might be better memory management to only create the object as necessary, especially since it seems like we're using this in a mostly static context (only to access the encapsulated query). |
||
try { | ||
final String returnedName = this.databaseOperator | ||
.query(FetchHPFlowOwnership.FETCH_HP_OWNER_BY_NAME, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please rename FETCH_HP_OWNER_BY_NAME to FETCH_HP_FLOW_OWNER_BY_NAME? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any validation that same user can't be added in hp_flow_owners table? Same question for hp_flows table. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The user and flows are primary keys, so same user or flow cant be added if already exist. |
||
fetchHPFlowOwnership, userId.toLowerCase()); | ||
log.info(String.format( | ||
"HPFlowDao : Returned user id %s for given user id %s", returnedName, userId)); | ||
return returnedName.equalsIgnoreCase(userId); | ||
} catch (final SQLException e) { | ||
log.error(FetchHPFlowOwnership.FETCH_HP_OWNER_BY_NAME + " failed.", e); | ||
throw new ImageMgmtDaoException(ErrorCode.INTERNAL_SERVER_ERROR, String.format( | ||
"Unable to fetch HP flow ownership for %s.", userId)); | ||
} | ||
} | ||
|
||
/** | ||
* Adds a list of flows into high priority flows db. | ||
* @param flowIds : list of flows. | ||
* @param userId : userId of owner who is adding the HP flows. | ||
* @return | ||
*/ | ||
@Override | ||
public int addHPFlows(final List<String> flowIds, final String userId) { | ||
// Create the transaction block | ||
final SQLTransaction<Integer> insertAllFlows = transOperator -> { | ||
// Insert each flow in its own row with same userId and created time. | ||
final Timestamp currTimeStamp = Timestamp.valueOf(LocalDateTime.now()); | ||
int count = 0; | ||
for (final String flowId : flowIds) { | ||
transOperator.update(INSERT_HP_FLOWS, flowId, currTimeStamp, userId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see userId in INSERT_HP_FLOWS query. Am I missing something here? Can you please test this code once again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explained in earlier comment. |
||
count++; | ||
} | ||
transOperator.getConnection().commit(); | ||
return count; | ||
}; | ||
|
||
int count = 0; | ||
try { | ||
count = this.databaseOperator.transaction(insertAllFlows); | ||
if (count < 1) { | ||
log.error("Exception while adding HP flows"); | ||
throw new ImageMgmtDaoException(ErrorCode.BAD_REQUEST, | ||
"Exception while adding HP flows"); | ||
} | ||
log.info("Added the HP flows by user " + userId); | ||
return count; | ||
} catch (final SQLException e) { | ||
log.error("Unable to add HP flows", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we interested in adding the user in this log message, if we're adding the user in the info message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is a good catch. Will add. |
||
String errorMessage = ""; | ||
if (e.getErrorCode() == SQL_ERROR_CODE_DUPLICATE_ENTRY) { | ||
errorMessage = "Reason: One or more flows already exists."; | ||
} else { | ||
errorMessage = "Reason: "; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we say "Reason: Unknown" here, or something else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. |
||
} | ||
throw new ImageMgmtDaoException(ErrorCode.BAD_REQUEST, "Exception while " | ||
+ "adding HP flows. " + errorMessage); | ||
} | ||
} | ||
|
||
/** | ||
* Checks if the provided flow is high priority. | ||
* @param flow : Given a flow, finds if it is a high priority flow. | ||
* @return | ||
*/ | ||
@Override | ||
public boolean isHPFlow(final ExecutableFlow flow) { | ||
if (null == flow) { | ||
return false; | ||
} | ||
final FetchHPFlow fetchHPFlow = new FetchHPFlow(); | ||
final String FQFN = flow.getProjectName() + "." + flow.getFlowId(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please use getFlowName method of ExecutableFlow? It is giving projectName + delimiter + flowId. Please use it for other references in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this one. Will use it. |
||
try { | ||
final String returnedFlowId = this.databaseOperator | ||
.query(FetchHPFlow.SELECT_HP_FLOW, fetchHPFlow, FQFN); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are you passing fetchHPFlow? It's an empty object and not there in any clause in select query. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is the result set handler implementation. |
||
log.info("HPFlowDao : Found high priority flow " | ||
+ returnedFlowId == null ? "" : returnedFlowId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case, flow is not available, you are logging "Found hp flow" for that as well. Can you please correct this logic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will fix the log line. |
||
return returnedFlowId.equals(FQFN); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will there be only one entry of each flowName? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
} catch (final SQLException e) { | ||
log.error(FetchHPFlow.SELECT_HP_FLOW + " failed.", e); | ||
throw new ImageMgmtDaoException(ErrorCode.INTERNAL_SERVER_ERROR, String.format( | ||
"Unable to fetch high priority flow info for flow %s.", FQFN)); | ||
} | ||
} | ||
|
||
/** | ||
* ResultSetHandler implementation class for fetching high priority flow ownership | ||
*/ | ||
public static class FetchHPFlowOwnership implements ResultSetHandler<String> { | ||
private static final String FETCH_HP_OWNER_BY_NAME = | ||
"SELECT name FROM hp_flow_owners where lower(name) = ?"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please rename name column for this table to username or owner. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. |
||
|
||
@Override | ||
public String handle(final ResultSet rs) throws SQLException { | ||
if (!rs.next()) { | ||
return null; | ||
} | ||
return rs.getString("name"); | ||
} | ||
} | ||
|
||
/** | ||
* ResultSetHandler implementation class for fetching high priority flow | ||
*/ | ||
public static class FetchHPFlow implements ResultSetHandler<String> { | ||
|
||
private static String SELECT_HP_FLOW = | ||
"SELECT count(*) from hp_flows where flow_id = ?"; | ||
|
||
@Override | ||
public String handle(final ResultSet rs) throws SQLException { | ||
if (!rs.next()) { | ||
return null; | ||
} | ||
return rs.getString("flow_id"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will you get flow_id for this? SELECT_HP_FLOW is only getting count(*). Can you please test this part? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The flow name is already know, we are only testing if the flow is in the table. By design there can be exactly zero or one entry for a flow. Hence the use of count(*) can be used to find its presence. |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package azkaban.imagemgmt.dto; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import javax.validation.constraints.NotBlank; | ||
import org.codehaus.jackson.annotate.JsonProperty; | ||
|
||
|
||
/** | ||
* This is Add HP Flow class for updating high priority flow metadata. | ||
*/ | ||
public class HPFlowDTO extends BaseDTO { | ||
// CSV of flow ids. | ||
@JsonProperty("flowIds") | ||
@NotBlank(message = "flowList cannot be empty.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be flowIds? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. Should be consistent. |
||
private String flowIds; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. flowId refers to only flow name. While flowName refers to projectName + delimiter + flowId at all the places in ExecutableFlow. Can you please use the similar name and keep consistency? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't we keep list of flowids instead of comma separate string? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We get the CSV as input and eventually convert it to list when needed. |
||
|
||
public void setFlowIds(final String flowIds) { | ||
// Remove all whitespaces | ||
this.flowIds = flowIds.replaceAll("\\s", ""); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we allow whitespaces in flow names? We should keep same validations as flow name for this. Can you please use same code rather than adding this replacement here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a CSV of flow names for which we do not have an existing use case. The user can put whitespaces around commas, this check makes it fool proof. |
||
} | ||
|
||
public String getFlowIds() { | ||
return this.flowIds; | ||
} | ||
|
||
/** | ||
* Converts the CSV into a list of flow IDs. | ||
* @return list of flow IDs. | ||
*/ | ||
public List<String> getFlowIdList() { | ||
if (this.flowIds.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this possible without an error from the Not Blank annotation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its a sanity check. If you think its useless, let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better not to validate twice, but that is just my personal preference. |
||
return new ArrayList<>(); | ||
} | ||
return Arrays.asList(flowIds.split(",")); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "HPFlowDTO{" + "flowList='" + this.flowIds + '\'' + '}'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package azkaban.imagemgmt.models; | ||
|
||
/** | ||
* This class represents HP Flow managers' metadata | ||
*/ | ||
public class HPFlowOwnership extends BaseModel { | ||
// HP flow management owner name | ||
private String name; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please rename this variable to owner or username. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. |
||
|
||
public String getName() { | ||
return this.name; | ||
} | ||
|
||
public void setName(final String name) { | ||
this.name = name; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,4 +36,11 @@ public interface PermissionManager { | |
public boolean hasPermission(final String imageTypeName, final String userId, final Type type) | ||
throws ImageMgmtException; | ||
|
||
/** | ||
* Checks if the user has permission to manage high priority flows. | ||
* @param userId | ||
* @return | ||
* @throws ImageMgmtException | ||
*/ | ||
public boolean hasPermission(final String userId) throws ImageMgmtException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is validating permission for hp flow. Can you please rename this method to convey the same? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Method name should be hasHPFlowPermission There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kept it same as there is already a method with same name. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
*/ | ||
package azkaban.imagemgmt.permission; | ||
|
||
import azkaban.imagemgmt.daos.HPFlowDao; | ||
import azkaban.imagemgmt.daos.ImageTypeDao; | ||
import azkaban.imagemgmt.exception.ErrorCode; | ||
import azkaban.imagemgmt.exception.ImageMgmtException; | ||
|
@@ -27,6 +28,9 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static java.util.Objects.*; | ||
|
||
|
||
/** | ||
* This class defines and manages the permission for accessing image management APIs. The | ||
* permissions are defined on image type. The permissions are created using user role and access | ||
|
@@ -38,10 +42,13 @@ public class PermissionManagerImpl implements PermissionManager { | |
private static final Logger log = LoggerFactory.getLogger(PermissionManagerImpl.class); | ||
|
||
private final ImageTypeDao imageTypeDao; | ||
private final HPFlowDao hpFlowDao; | ||
|
||
@Inject | ||
public PermissionManagerImpl(final ImageTypeDao imageTypeDao) { | ||
public PermissionManagerImpl(final ImageTypeDao imageTypeDao, | ||
final HPFlowDao hpFlowDao) { | ||
this.imageTypeDao = imageTypeDao; | ||
this.hpFlowDao = hpFlowDao; | ||
} | ||
|
||
/** | ||
|
@@ -76,4 +83,24 @@ public boolean hasPermission(final String imageTypeName, final String userId, fi | |
|
||
return hasPermission; | ||
} | ||
|
||
/** | ||
* Checks if the user has permission to manage high priority flows. | ||
* @param userId : must not be null. | ||
* @return boolean | ||
*/ | ||
@Override | ||
public boolean hasPermission(final String userId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems all HP owners have permission to update the hp_flows table. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this PR, yes. Eventually no. |
||
requireNonNull(userId, "userId must not be null"); | ||
if (this.hpFlowDao.isHPFlowOwner(userId)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Who has permission to add username in hpflowowner? Please add that detail in comment for the method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just like ImageManagement, for this, the ability is with Azkaban admins, I will write a comment explaining that. |
||
return true; | ||
} | ||
|
||
// Throw exception with FORBIDDEN error. | ||
log.error(String.format("API access permission check failed. The user %s " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please create object for error message and use it for both logging as well as exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, will do. |
||
+ "does not have access to manage high priority flows", userId)); | ||
throw new ImageMgmtException(ErrorCode.FORBIDDEN, String.format( | ||
"API access permission check failed. The user %s does not have access " | ||
+ "to manage high priority flows", userId)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please appropriate comment for each method? For example, why is hpFlowOwner only taking userId and no reference for flow? Shouldn't there be a mapping between user and flow in metadata?
Please add appropriate method doc for rest of the methods too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As stated in reply to main comment, the idea was to have only a handful of owners for a certain jobtype to use this functionality. The logic with mapping will come in next commit.