Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented new REST API hpFlows to add high priority flows to Azkaban. #3014

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import azkaban.imagemgmt.converters.ImageRampupPlanConverter;
import azkaban.imagemgmt.converters.ImageTypeConverter;
import azkaban.imagemgmt.converters.ImageVersionConverter;
import azkaban.imagemgmt.daos.HPFlowDao;
import azkaban.imagemgmt.daos.HPFlowDaoImpl;
import azkaban.imagemgmt.daos.ImageMgmtCommonDao;
import azkaban.imagemgmt.daos.ImageMgmtCommonDaoImpl;
import azkaban.imagemgmt.daos.ImageRampupDao;
Expand Down Expand Up @@ -104,7 +106,9 @@ protected void configure() {
Constants.DEFAULT_AZKABAN_POLLING_INTERVAL_MS);
return new OsCpuUtil(Math.max(1, (cpuLoadPeriodSec * 1000) / pollingIntervalMs));
});
// Following bindings are needed for containerized flow execution
bindImageManagementDependencies();
bindHPFlowManagementDependencies();
}

public Class<? extends Storage> resolveStorageClassType() {
Expand Down Expand Up @@ -200,4 +204,11 @@ private boolean isContainerizedDispatchMethodEnabled() {
.getString(Constants.ConfigurationKeys.AZKABAN_EXECUTION_DISPATCH_METHOD,
DispatchMethod.PUSH.name()));
}

private void bindHPFlowManagementDependencies() {
if (!isContainerizedDispatchMethodEnabled()) {
return;
}
bind(HPFlowDao.class).to(HPFlowDaoImpl.class).in(Scopes.SINGLETON);
}
}
32 changes: 32 additions & 0 deletions azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package azkaban.imagemgmt.daos;

import azkaban.executor.ExecutableFlow;
import java.util.List;


/**
* Data access object (DAO) for accessing high priority flow metadata.
* This interface defines add/remove/get methods for high priority flows.
*/
public interface HPFlowDao {
/**
* isHPFlowOwner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please appropriate comment for each method? For example, why is hpFlowOwner only taking userId and no reference for flow? Shouldn't there be a mapping between user and flow in metadata?
Please add appropriate method doc for rest of the methods too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in reply to main comment, the idea was to have only a handful of owners for a certain jobtype to use this functionality. The logic with mapping will come in next commit.

* @param userId : owner of high priority flows.
* @return : True if user is owner, false otherwise.
*/
boolean isHPFlowOwner(final String userId);

/**
* addHPFlows
* @param flowIds : list of flows.
* @param userId : userId of owner who is adding the HP flows.
*/
int addHPFlows(final List<String> flowIds, final String userId);

/**
* isHPFlow
* @param flow : Given a flow, finds if it is a high priority flow.
* @return
*/
boolean isHPFlow(final ExecutableFlow flow);
}
169 changes: 169 additions & 0 deletions azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDaoImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package azkaban.imagemgmt.daos;

import azkaban.db.DatabaseOperator;
import azkaban.db.SQLTransaction;
import azkaban.executor.ExecutableFlow;
import azkaban.imagemgmt.exception.ErrorCode;
import azkaban.imagemgmt.exception.ImageMgmtDaoException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;

import static azkaban.imagemgmt.utils.ErroCodeConstants.*;


/**
* Dao implementation for managing high priority flows
*/
@Singleton
public class HPFlowDaoImpl implements HPFlowDao {
private static final Logger log = Logger.getLogger(HPFlowDaoImpl.class);

private final DatabaseOperator databaseOperator;

// Flow ID is a fully qualified flow name with format,
// <project_name>.<flow_ame>
private static String INSERT_HP_FLOW_OWNER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any mapping between hp_flow_owners and hp_flows tables? Can anyone be owner for any flows and insert anything? Can you please add more detail about who can be the owner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied in earlier comment.

"INSERT into hp_flow_owners (name, created_on, created_by, "
+ "modified_on, modified_by ) "
+ "values (?, ?, ?, ?, ?)";

private static String INSERT_HP_FLOWS =
"INSERT into hp_flows (flow_id, created_on, created_by) "
+ "values (?, ?, ?)";

@Inject
public HPFlowDaoImpl(final DatabaseOperator databaseOperator) {
this.databaseOperator = databaseOperator;
}

/**
* Returns if the provided user has high priority flow management access.
* @param userId : owner of high priority flows.
* @return
*/
@Override
public boolean isHPFlowOwner(final String userId) {
final FetchHPFlowOwnership fetchHPFlowOwnership = new FetchHPFlowOwnership();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to create a new object every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, I believe I followed the pattern used in the rest of the code. Do you see any harm in doing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that in general dependencies should be injected. I don't see a specific issue here, but I think it might be better memory management to only create the object as necessary, especially since it seems like we're using this in a mostly static context (only to access the encapsulated query).

try {
final String returnedName = this.databaseOperator
.query(FetchHPFlowOwnership.FETCH_HP_OWNER_BY_NAME,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename FETCH_HP_OWNER_BY_NAME to FETCH_HP_FLOW_OWNER_BY_NAME?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any validation that same user can't be added in hp_flow_owners table? Same question for hp_flows table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user and flows are primary keys, so same user or flow cant be added if already exist.

fetchHPFlowOwnership, userId.toLowerCase());
log.info(String.format(
"HPFlowDao : Returned user id %s for given user id %s", returnedName, userId));
return returnedName.equalsIgnoreCase(userId);
} catch (final SQLException e) {
log.error(FetchHPFlowOwnership.FETCH_HP_OWNER_BY_NAME + " failed.", e);
throw new ImageMgmtDaoException(ErrorCode.INTERNAL_SERVER_ERROR, String.format(
"Unable to fetch HP flow ownership for %s.", userId));
}
}

/**
* Adds a list of flows into high priority flows db.
* @param flowIds : list of flows.
* @param userId : userId of owner who is adding the HP flows.
* @return
*/
@Override
public int addHPFlows(final List<String> flowIds, final String userId) {
// Create the transaction block
final SQLTransaction<Integer> insertAllFlows = transOperator -> {
// Insert each flow in its own row with same userId and created time.
final Timestamp currTimeStamp = Timestamp.valueOf(LocalDateTime.now());
int count = 0;
for (final String flowId : flowIds) {
transOperator.update(INSERT_HP_FLOWS, flowId, currTimeStamp, userId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see userId in INSERT_HP_FLOWS query. Am I missing something here? Can you please test this code once again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained in earlier comment.

count++;
}
transOperator.getConnection().commit();
return count;
};

int count = 0;
try {
count = this.databaseOperator.transaction(insertAllFlows);
if (count < 1) {
log.error("Exception while adding HP flows");
throw new ImageMgmtDaoException(ErrorCode.BAD_REQUEST,
"Exception while adding HP flows");
}
log.info("Added the HP flows by user " + userId);
return count;
} catch (final SQLException e) {
log.error("Unable to add HP flows", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we interested in adding the user in this log message, if we're adding the user in the info message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good catch. Will add.

String errorMessage = "";
if (e.getErrorCode() == SQL_ERROR_CODE_DUPLICATE_ENTRY) {
errorMessage = "Reason: One or more flows already exists.";
} else {
errorMessage = "Reason: ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we say "Reason: Unknown" here, or something else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

}
throw new ImageMgmtDaoException(ErrorCode.BAD_REQUEST, "Exception while "
+ "adding HP flows. " + errorMessage);
}
}

/**
* Checks if the provided flow is high priority.
* @param flow : Given a flow, finds if it is a high priority flow.
* @return
*/
@Override
public boolean isHPFlow(final ExecutableFlow flow) {
if (null == flow) {
return false;
}
final FetchHPFlow fetchHPFlow = new FetchHPFlow();
final String FQFN = flow.getProjectName() + "." + flow.getFlowId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please use getFlowName method of ExecutableFlow? It is giving projectName + delimiter + flowId. Please use it for other references in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this one. Will use it.

try {
final String returnedFlowId = this.databaseOperator
.query(FetchHPFlow.SELECT_HP_FLOW, fetchHPFlow, FQFN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you passing fetchHPFlow? It's an empty object and not there in any clause in select query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the result set handler implementation.

log.info("HPFlowDao : Found high priority flow "
+ returnedFlowId == null ? "" : returnedFlowId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case, flow is not available, you are logging "Found hp flow" for that as well. Can you please correct this logic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix the log line.

return returnedFlowId.equals(FQFN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be only one entry of each flowName?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

} catch (final SQLException e) {
log.error(FetchHPFlow.SELECT_HP_FLOW + " failed.", e);
throw new ImageMgmtDaoException(ErrorCode.INTERNAL_SERVER_ERROR, String.format(
"Unable to fetch high priority flow info for flow %s.", FQFN));
}
}

/**
* ResultSetHandler implementation class for fetching high priority flow ownership
*/
public static class FetchHPFlowOwnership implements ResultSetHandler<String> {
private static final String FETCH_HP_OWNER_BY_NAME =
"SELECT name FROM hp_flow_owners where lower(name) = ?";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename name column for this table to username or owner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.


@Override
public String handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return null;
}
return rs.getString("name");
}
}

/**
* ResultSetHandler implementation class for fetching high priority flow
*/
public static class FetchHPFlow implements ResultSetHandler<String> {

private static String SELECT_HP_FLOW =
"SELECT count(*) from hp_flows where flow_id = ?";

@Override
public String handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return null;
}
return rs.getString("flow_id");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will you get flow_id for this? SELECT_HP_FLOW is only getting count(*). Can you please test this part?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow name is already know, we are only testing if the flow is in the table. By design there can be exactly zero or one entry for a flow. Hence the use of count(*) can be used to find its presence.

}
}
}
43 changes: 43 additions & 0 deletions azkaban-common/src/main/java/azkaban/imagemgmt/dto/HPFlowDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package azkaban.imagemgmt.dto;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.validation.constraints.NotBlank;
import org.codehaus.jackson.annotate.JsonProperty;


/**
* This is Add HP Flow class for updating high priority flow metadata.
*/
public class HPFlowDTO extends BaseDTO {
// CSV of flow ids.
@JsonProperty("flowIds")
@NotBlank(message = "flowList cannot be empty.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be flowIds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Should be consistent.

private String flowIds;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flowId refers to only flow name. While flowName refers to projectName + delimiter + flowId at all the places in ExecutableFlow. Can you please use the similar name and keep consistency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we keep list of flowids instead of comma separate string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get the CSV as input and eventually convert it to list when needed.


public void setFlowIds(final String flowIds) {
// Remove all whitespaces
this.flowIds = flowIds.replaceAll("\\s", "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we allow whitespaces in flow names? We should keep same validations as flow name for this. Can you please use same code rather than adding this replacement here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a CSV of flow names for which we do not have an existing use case. The user can put whitespaces around commas, this check makes it fool proof.

}

public String getFlowIds() {
return this.flowIds;
}

/**
* Converts the CSV into a list of flow IDs.
* @return list of flow IDs.
*/
public List<String> getFlowIdList() {
if (this.flowIds.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this possible without an error from the Not Blank annotation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a sanity check. If you think its useless, let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better not to validate twice, but that is just my personal preference.

return new ArrayList<>();
}
return Arrays.asList(flowIds.split(","));
}

@Override
public String toString() {
return "HPFlowDTO{" + "flowList='" + this.flowIds + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package azkaban.imagemgmt.models;

/**
* This class represents HP Flow managers' metadata
*/
public class HPFlowOwnership extends BaseModel {
// HP flow management owner name
private String name;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this variable to owner or username.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.


public String getName() {
return this.name;
}

public void setName(final String name) {
this.name = name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,11 @@ public interface PermissionManager {
public boolean hasPermission(final String imageTypeName, final String userId, final Type type)
throws ImageMgmtException;

/**
* Checks if the user has permission to manage high priority flows.
* @param userId
* @return
* @throws ImageMgmtException
*/
public boolean hasPermission(final String userId) throws ImageMgmtException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is validating permission for hp flow. Can you please rename this method to convey the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name should be hasHPFlowPermission

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it same as there is already a method with same name.

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.imagemgmt.permission;

import azkaban.imagemgmt.daos.HPFlowDao;
import azkaban.imagemgmt.daos.ImageTypeDao;
import azkaban.imagemgmt.exception.ErrorCode;
import azkaban.imagemgmt.exception.ImageMgmtException;
Expand All @@ -27,6 +28,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.*;


/**
* This class defines and manages the permission for accessing image management APIs. The
* permissions are defined on image type. The permissions are created using user role and access
Expand All @@ -38,10 +42,13 @@ public class PermissionManagerImpl implements PermissionManager {
private static final Logger log = LoggerFactory.getLogger(PermissionManagerImpl.class);

private final ImageTypeDao imageTypeDao;
private final HPFlowDao hpFlowDao;

@Inject
public PermissionManagerImpl(final ImageTypeDao imageTypeDao) {
public PermissionManagerImpl(final ImageTypeDao imageTypeDao,
final HPFlowDao hpFlowDao) {
this.imageTypeDao = imageTypeDao;
this.hpFlowDao = hpFlowDao;
}

/**
Expand Down Expand Up @@ -76,4 +83,24 @@ public boolean hasPermission(final String imageTypeName, final String userId, fi

return hasPermission;
}

/**
* Checks if the user has permission to manage high priority flows.
* @param userId : must not be null.
* @return boolean
*/
@Override
public boolean hasPermission(final String userId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems all HP owners have permission to update the hp_flows table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, yes. Eventually no.

requireNonNull(userId, "userId must not be null");
if (this.hpFlowDao.isHPFlowOwner(userId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who has permission to add username in hpflowowner? Please add that detail in comment for the method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like ImageManagement, for this, the ability is with Azkaban admins, I will write a comment explaining that.

return true;
}

// Throw exception with FORBIDDEN error.
log.error(String.format("API access permission check failed. The user %s "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please create object for error message and use it for both logging as well as exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will do.

+ "does not have access to manage high priority flows", userId));
throw new ImageMgmtException(ErrorCode.FORBIDDEN, String.format(
"API access permission check failed. The user %s does not have access "
+ "to manage high priority flows", userId));
}
}
Loading