From eac0e7932bf3ab3fcdbb507dc52eda686a5735fa Mon Sep 17 00:00:00 2001 From: Deepak Jaiswal Date: Tue, 7 Dec 2021 16:56:44 -0800 Subject: [PATCH] These high priority flows when run in containeirzed dispatch mode, will only use ACTIVE version of the docker image of all the jobtypes required. This ensures that these flows always run on a stable version rather than any version which is being ramped up and is potentially unstable. There API can be only called by either Azkaban admins or the designated owners. Currently the ability to add owners is a manual process. An ADMIN only API to add such owners will come in the future. API to remove a flow or list of flows from the high priority flows will also come in future. Added unit tests as and where needed. --- .../java/azkaban/AzkabanCommonModule.java | 11 ++ .../azkaban/imagemgmt/daos/HPFlowDao.java | 32 ++++ .../azkaban/imagemgmt/daos/HPFlowDaoImpl.java | 169 ++++++++++++++++++ .../java/azkaban/imagemgmt/dto/HPFlowDTO.java | 43 +++++ .../imagemgmt/models/HPFlowOwnership.java | 17 ++ .../permission/PermissionManager.java | 7 + .../permission/PermissionManagerImpl.java | 29 ++- .../rampup/ImageRampupManagerImpl.java | 19 +- .../rampup/ImageRampupManagerImplTest.java | 87 ++++++++- .../create.containerization-tables-all.sql | 17 ++ .../imagemgmt/services/HPFlowService.java | 21 +++ .../imagemgmt/services/HPFlowServiceImpl.java | 45 +++++ .../imagemgmt/servlets/HPFlowServlet.java | 129 +++++++++++++ .../java/azkaban/webapp/AzkabanWebServer.java | 7 + .../webapp/AzkabanWebServerModule.java | 10 ++ .../services/HpFlowServiceImplTest.java | 58 ++++++ .../resources/hp_flows/hp_flow_invalid.json | 3 + .../src/test/resources/hp_flows/hp_flows.json | 3 + 18 files changed, 699 insertions(+), 8 deletions(-) create mode 100644 azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDao.java create mode 100644 azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDaoImpl.java create mode 100644 azkaban-common/src/main/java/azkaban/imagemgmt/dto/HPFlowDTO.java create mode 100644 azkaban-common/src/main/java/azkaban/imagemgmt/models/HPFlowOwnership.java create mode 100644 azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowService.java create mode 100644 azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowServiceImpl.java create mode 100644 azkaban-web-server/src/main/java/azkaban/imagemgmt/servlets/HPFlowServlet.java create mode 100644 azkaban-web-server/src/test/java/azkaban/imagemgmt/services/HpFlowServiceImplTest.java create mode 100644 azkaban-web-server/src/test/resources/hp_flows/hp_flow_invalid.json create mode 100644 azkaban-web-server/src/test/resources/hp_flows/hp_flows.json diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java index e1a146acde..58f05530c3 100644 --- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java +++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java @@ -32,6 +32,8 @@ import azkaban.imagemgmt.converters.ImageRampupPlanConverter; import azkaban.imagemgmt.converters.ImageTypeConverter; import azkaban.imagemgmt.converters.ImageVersionConverter; +import azkaban.imagemgmt.daos.HPFlowDao; +import azkaban.imagemgmt.daos.HPFlowDaoImpl; import azkaban.imagemgmt.daos.ImageMgmtCommonDao; import azkaban.imagemgmt.daos.ImageMgmtCommonDaoImpl; import azkaban.imagemgmt.daos.ImageRampupDao; @@ -104,7 +106,9 @@ protected void configure() { Constants.DEFAULT_AZKABAN_POLLING_INTERVAL_MS); return new OsCpuUtil(Math.max(1, (cpuLoadPeriodSec * 1000) / pollingIntervalMs)); }); + // Following bindings are needed for containerized flow execution bindImageManagementDependencies(); + bindHPFlowManagementDependencies(); } public Class resolveStorageClassType() { @@ -200,4 +204,11 @@ private boolean isContainerizedDispatchMethodEnabled() { .getString(Constants.ConfigurationKeys.AZKABAN_EXECUTION_DISPATCH_METHOD, DispatchMethod.PUSH.name())); } + + private void bindHPFlowManagementDependencies() { + if (!isContainerizedDispatchMethodEnabled()) { + return; + } + bind(HPFlowDao.class).to(HPFlowDaoImpl.class).in(Scopes.SINGLETON); + } } diff --git a/azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDao.java b/azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDao.java new file mode 100644 index 0000000000..57e3b763fc --- /dev/null +++ b/azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDao.java @@ -0,0 +1,32 @@ +package azkaban.imagemgmt.daos; + +import azkaban.executor.ExecutableFlow; +import java.util.List; + + +/** + * Data access object (DAO) for accessing high priority flow metadata. + * This interface defines add/remove/get methods for high priority flows. + */ +public interface HPFlowDao { + /** + * isHPFlowOwner + * @param userId : owner of high priority flows. + * @return : True if user is owner, false otherwise. + */ + boolean isHPFlowOwner(final String userId); + + /** + * addHPFlows + * @param flowIds : list of flows. + * @param userId : userId of owner who is adding the HP flows. + */ + int addHPFlows(final List flowIds, final String userId); + + /** + * isHPFlow + * @param flow : Given a flow, finds if it is a high priority flow. + * @return + */ + boolean isHPFlow(final ExecutableFlow flow); +} diff --git a/azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDaoImpl.java b/azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDaoImpl.java new file mode 100644 index 0000000000..703d3e20f6 --- /dev/null +++ b/azkaban-common/src/main/java/azkaban/imagemgmt/daos/HPFlowDaoImpl.java @@ -0,0 +1,169 @@ +package azkaban.imagemgmt.daos; + +import azkaban.db.DatabaseOperator; +import azkaban.db.SQLTransaction; +import azkaban.executor.ExecutableFlow; +import azkaban.imagemgmt.exception.ErrorCode; +import azkaban.imagemgmt.exception.ImageMgmtDaoException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.List; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.commons.dbutils.ResultSetHandler; +import org.apache.log4j.Logger; + +import static azkaban.imagemgmt.utils.ErroCodeConstants.*; + + +/** + * Dao implementation for managing high priority flows + */ +@Singleton +public class HPFlowDaoImpl implements HPFlowDao { + private static final Logger log = Logger.getLogger(HPFlowDaoImpl.class); + + private final DatabaseOperator databaseOperator; + + // Flow ID is a fully qualified flow name with format, + // . + private static String INSERT_HP_FLOW_OWNER = + "INSERT into hp_flow_owners (name, created_on, created_by, " + + "modified_on, modified_by ) " + + "values (?, ?, ?, ?, ?)"; + + private static String INSERT_HP_FLOWS = + "INSERT into hp_flows (flow_id, created_on, created_by) " + + "values (?, ?, ?)"; + + @Inject + public HPFlowDaoImpl(final DatabaseOperator databaseOperator) { + this.databaseOperator = databaseOperator; + } + + /** + * Returns if the provided user has high priority flow management access. + * @param userId : owner of high priority flows. + * @return + */ + @Override + public boolean isHPFlowOwner(final String userId) { + final FetchHPFlowOwnership fetchHPFlowOwnership = new FetchHPFlowOwnership(); + try { + final String returnedName = this.databaseOperator + .query(FetchHPFlowOwnership.FETCH_HP_OWNER_BY_NAME, + fetchHPFlowOwnership, userId.toLowerCase()); + log.info(String.format( + "HPFlowDao : Returned user id %s for given user id %s", returnedName, userId)); + return returnedName.equalsIgnoreCase(userId); + } catch (final SQLException e) { + log.error(FetchHPFlowOwnership.FETCH_HP_OWNER_BY_NAME + " failed.", e); + throw new ImageMgmtDaoException(ErrorCode.INTERNAL_SERVER_ERROR, String.format( + "Unable to fetch HP flow ownership for %s.", userId)); + } + } + + /** + * Adds a list of flows into high priority flows db. + * @param flowIds : list of flows. + * @param userId : userId of owner who is adding the HP flows. + * @return + */ + @Override + public int addHPFlows(final List flowIds, final String userId) { + // Create the transaction block + final SQLTransaction insertAllFlows = transOperator -> { + // Insert each flow in its own row with same userId and created time. + final Timestamp currTimeStamp = Timestamp.valueOf(LocalDateTime.now()); + int count = 0; + for (final String flowId : flowIds) { + transOperator.update(INSERT_HP_FLOWS, flowId, currTimeStamp, userId); + count++; + } + transOperator.getConnection().commit(); + return count; + }; + + int count = 0; + try { + count = this.databaseOperator.transaction(insertAllFlows); + if (count < 1) { + log.error("Exception while adding HP flows"); + throw new ImageMgmtDaoException(ErrorCode.BAD_REQUEST, + "Exception while adding HP flows"); + } + log.info("Added the HP flows by user " + userId); + return count; + } catch (final SQLException e) { + log.error("Unable to add HP flows", e); + String errorMessage = ""; + if (e.getErrorCode() == SQL_ERROR_CODE_DUPLICATE_ENTRY) { + errorMessage = "Reason: One or more flows already exists."; + } else { + errorMessage = "Reason: "; + } + throw new ImageMgmtDaoException(ErrorCode.BAD_REQUEST, "Exception while " + + "adding HP flows. " + errorMessage); + } + } + + /** + * Checks if the provided flow is high priority. + * @param flow : Given a flow, finds if it is a high priority flow. + * @return + */ + @Override + public boolean isHPFlow(final ExecutableFlow flow) { + if (null == flow) { + return false; + } + final FetchHPFlow fetchHPFlow = new FetchHPFlow(); + final String FQFN = flow.getProjectName() + "." + flow.getFlowId(); + try { + final String returnedFlowId = this.databaseOperator + .query(FetchHPFlow.SELECT_HP_FLOW, fetchHPFlow, FQFN); + log.info("HPFlowDao : Found high priority flow " + + returnedFlowId == null ? "" : returnedFlowId); + return returnedFlowId.equals(FQFN); + } catch (final SQLException e) { + log.error(FetchHPFlow.SELECT_HP_FLOW + " failed.", e); + throw new ImageMgmtDaoException(ErrorCode.INTERNAL_SERVER_ERROR, String.format( + "Unable to fetch high priority flow info for flow %s.", FQFN)); + } + } + + /** + * ResultSetHandler implementation class for fetching high priority flow ownership + */ + public static class FetchHPFlowOwnership implements ResultSetHandler { + private static final String FETCH_HP_OWNER_BY_NAME = + "SELECT name FROM hp_flow_owners where lower(name) = ?"; + + @Override + public String handle(final ResultSet rs) throws SQLException { + if (!rs.next()) { + return null; + } + return rs.getString("name"); + } + } + + /** + * ResultSetHandler implementation class for fetching high priority flow + */ + public static class FetchHPFlow implements ResultSetHandler { + + private static String SELECT_HP_FLOW = + "SELECT count(*) from hp_flows where flow_id = ?"; + + @Override + public String handle(final ResultSet rs) throws SQLException { + if (!rs.next()) { + return null; + } + return rs.getString("flow_id"); + } + } +} diff --git a/azkaban-common/src/main/java/azkaban/imagemgmt/dto/HPFlowDTO.java b/azkaban-common/src/main/java/azkaban/imagemgmt/dto/HPFlowDTO.java new file mode 100644 index 0000000000..ed50cebdb0 --- /dev/null +++ b/azkaban-common/src/main/java/azkaban/imagemgmt/dto/HPFlowDTO.java @@ -0,0 +1,43 @@ +package azkaban.imagemgmt.dto; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.validation.constraints.NotBlank; +import org.codehaus.jackson.annotate.JsonProperty; + + +/** + * This is Add HP Flow class for updating high priority flow metadata. + */ +public class HPFlowDTO extends BaseDTO { + // CSV of flow ids. + @JsonProperty("flowIds") + @NotBlank(message = "flowList cannot be empty.") + private String flowIds; + + public void setFlowIds(final String flowIds) { + // Remove all whitespaces + this.flowIds = flowIds.replaceAll("\\s", ""); + } + + public String getFlowIds() { + return this.flowIds; + } + + /** + * Converts the CSV into a list of flow IDs. + * @return list of flow IDs. + */ + public List getFlowIdList() { + if (this.flowIds.isEmpty()) { + return new ArrayList<>(); + } + return Arrays.asList(flowIds.split(",")); + } + + @Override + public String toString() { + return "HPFlowDTO{" + "flowList='" + this.flowIds + '\'' + '}'; + } +} diff --git a/azkaban-common/src/main/java/azkaban/imagemgmt/models/HPFlowOwnership.java b/azkaban-common/src/main/java/azkaban/imagemgmt/models/HPFlowOwnership.java new file mode 100644 index 0000000000..e1f888e064 --- /dev/null +++ b/azkaban-common/src/main/java/azkaban/imagemgmt/models/HPFlowOwnership.java @@ -0,0 +1,17 @@ +package azkaban.imagemgmt.models; + +/** + * This class represents HP Flow managers' metadata + */ +public class HPFlowOwnership extends BaseModel { + // HP flow management owner name + private String name; + + public String getName() { + return this.name; + } + + public void setName(final String name) { + this.name = name; + } +} diff --git a/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManager.java b/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManager.java index a204ec5bf4..f9451a8c56 100644 --- a/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManager.java +++ b/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManager.java @@ -36,4 +36,11 @@ public interface PermissionManager { public boolean hasPermission(final String imageTypeName, final String userId, final Type type) throws ImageMgmtException; + /** + * Checks if the user has permission to manage high priority flows. + * @param userId + * @return + * @throws ImageMgmtException + */ + public boolean hasPermission(final String userId) throws ImageMgmtException; } diff --git a/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManagerImpl.java b/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManagerImpl.java index 0617f962eb..9ac4bfa833 100644 --- a/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManagerImpl.java +++ b/azkaban-common/src/main/java/azkaban/imagemgmt/permission/PermissionManagerImpl.java @@ -15,6 +15,7 @@ */ package azkaban.imagemgmt.permission; +import azkaban.imagemgmt.daos.HPFlowDao; import azkaban.imagemgmt.daos.ImageTypeDao; import azkaban.imagemgmt.exception.ErrorCode; import azkaban.imagemgmt.exception.ImageMgmtException; @@ -27,6 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Objects.*; + + /** * This class defines and manages the permission for accessing image management APIs. The * permissions are defined on image type. The permissions are created using user role and access @@ -38,10 +42,13 @@ public class PermissionManagerImpl implements PermissionManager { private static final Logger log = LoggerFactory.getLogger(PermissionManagerImpl.class); private final ImageTypeDao imageTypeDao; + private final HPFlowDao hpFlowDao; @Inject - public PermissionManagerImpl(final ImageTypeDao imageTypeDao) { + public PermissionManagerImpl(final ImageTypeDao imageTypeDao, + final HPFlowDao hpFlowDao) { this.imageTypeDao = imageTypeDao; + this.hpFlowDao = hpFlowDao; } /** @@ -76,4 +83,24 @@ public boolean hasPermission(final String imageTypeName, final String userId, fi return hasPermission; } + + /** + * Checks if the user has permission to manage high priority flows. + * @param userId : must not be null. + * @return boolean + */ + @Override + public boolean hasPermission(final String userId) { + requireNonNull(userId, "userId must not be null"); + if (this.hpFlowDao.isHPFlowOwner(userId)) { + return true; + } + + // Throw exception with FORBIDDEN error. + log.error(String.format("API access permission check failed. The user %s " + + "does not have access to manage high priority flows", userId)); + throw new ImageMgmtException(ErrorCode.FORBIDDEN, String.format( + "API access permission check failed. The user %s does not have access " + + "to manage high priority flows", userId)); + } } diff --git a/azkaban-common/src/main/java/azkaban/imagemgmt/rampup/ImageRampupManagerImpl.java b/azkaban-common/src/main/java/azkaban/imagemgmt/rampup/ImageRampupManagerImpl.java index 2c6c169b5b..a9818a795c 100644 --- a/azkaban-common/src/main/java/azkaban/imagemgmt/rampup/ImageRampupManagerImpl.java +++ b/azkaban-common/src/main/java/azkaban/imagemgmt/rampup/ImageRampupManagerImpl.java @@ -18,6 +18,7 @@ import azkaban.Constants.ImageMgmtConstants; import azkaban.executor.ExecutableFlow; import azkaban.executor.container.ContainerImplUtils; +import azkaban.imagemgmt.daos.HPFlowDao; import azkaban.imagemgmt.daos.ImageRampupDao; import azkaban.imagemgmt.daos.ImageTypeDao; import azkaban.imagemgmt.daos.ImageVersionDao; @@ -30,8 +31,12 @@ import azkaban.imagemgmt.models.ImageVersionMetadata; import azkaban.imagemgmt.version.VersionInfo; import azkaban.imagemgmt.version.VersionSet; +import com.google.common.annotations.VisibleForTesting; +import io.kubernetes.client.Exec; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -69,6 +74,7 @@ public class ImageRampupManagerImpl implements ImageRampupManager { private final ImageTypeDao imageTypeDao; private final ImageVersionDao imageVersionDao; private final ImageRampupDao imageRampupDao; + private final HPFlowDao hpFlowDao; private static final String MSG_RANDOM_RAMPUP_VERSION_SELECTION = "The version selection is " + "based on deterministic rampup."; private static final String MSG_ACTIVE_VERSION_SELECTION = "The version selection is " @@ -81,11 +87,12 @@ public class ImageRampupManagerImpl implements ImageRampupManager { @Inject public ImageRampupManagerImpl(final ImageRampupDao imageRampupDao, - final ImageVersionDao imageVersionDao, - final ImageTypeDao imageTypeDao) { + final ImageVersionDao imageVersionDao, final ImageTypeDao imageTypeDao, + final HPFlowDao hpFlowDao) { this.imageRampupDao = imageRampupDao; this.imageVersionDao = imageVersionDao; this.imageTypeDao = imageTypeDao; + this.hpFlowDao = hpFlowDao; } @Override @@ -166,8 +173,12 @@ public Map validateAndGetUpdatedVersionMap( public Map getVersionByImageTypes(final ExecutableFlow flow, final Set imageTypes, Set overlayImageTypes) throws ImageMgmtException { - final Map> imageTypeRampups = this.imageRampupDao - .getRampupByImageTypes(imageTypes); + // If the flow is high priority flow, then skip fetching rampup plan and + // use the ACTIVE version of the image. + final boolean isHPFlow = this.hpFlowDao.isHPFlow(flow); + final Map> imageTypeRampups = isHPFlow ? + new LinkedHashMap<>(1) : + this.imageRampupDao.getRampupByImageTypes(imageTypes); final Set remainingImageTypes = new TreeSet<>(); final Map imageTypeVersionMap = this.processAndGetVersionForImageTypes(flow, imageTypes, imageTypeRampups, diff --git a/azkaban-common/src/test/java/azkaban/imagemgmt/rampup/ImageRampupManagerImplTest.java b/azkaban-common/src/test/java/azkaban/imagemgmt/rampup/ImageRampupManagerImplTest.java index e82ffd50c4..28d1b15f69 100644 --- a/azkaban-common/src/test/java/azkaban/imagemgmt/rampup/ImageRampupManagerImplTest.java +++ b/azkaban-common/src/test/java/azkaban/imagemgmt/rampup/ImageRampupManagerImplTest.java @@ -24,6 +24,7 @@ import azkaban.imagemgmt.converters.Converter; import azkaban.imagemgmt.converters.ImageTypeConverter; import azkaban.imagemgmt.converters.ImageVersionConverter; +import azkaban.imagemgmt.daos.HPFlowDao; import azkaban.imagemgmt.daos.ImageRampupDao; import azkaban.imagemgmt.daos.ImageRampupDaoImpl; import azkaban.imagemgmt.daos.ImageTypeDao; @@ -42,6 +43,8 @@ import azkaban.utils.JSONUtils; import azkaban.utils.TestUtils; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -62,6 +65,7 @@ public class ImageRampupManagerImplTest { private ImageTypeDao imageTypeDao; private ImageVersionDao imageVersionDao; private ImageRampupDao imageRampupDao; + private HPFlowDao hpFlowDao; private ObjectMapper objectMapper; private ImageRampupManager imageRampupManger; private Converter imageTypeConverter; @@ -74,11 +78,12 @@ public void setup() { this.imageTypeDao = mock(ImageTypeDaoImpl.class); this.imageVersionDao = mock(ImageVersionDaoImpl.class); this.imageRampupDao = mock(ImageRampupDaoImpl.class); + this.hpFlowDao = mock(HPFlowDao.class); this.imageTypeConverter = new ImageTypeConverter(); this.imageVersionConverter = new ImageVersionConverter(); this.converterUtils = new ConverterUtils(this.objectMapper); this.imageRampupManger = new ImageRampupManagerImpl(this.imageRampupDao, this.imageVersionDao, - this.imageTypeDao); + this.imageTypeDao, this.hpFlowDao); } /** @@ -106,10 +111,12 @@ public void testFetchVersionByImageTypesCase1() throws Exception { imageTypes.add("azkaban_exec"); when(this.imageRampupDao.getRampupByImageTypes(any(Set.class))).thenReturn(imageTypeRampups); when(this.imageVersionDao.findImageVersions(any(ImageMetadataRequest.class))).thenReturn(newAndRampupImageVersions); + when(this.hpFlowDao.isHPFlow(any(ExecutableFlow.class))).thenReturn(false); // Assert the for a flow, versions are always deterministic and remain the same + final String projectName = "exectest1", flowName = "exec1"; final ExecutableFlow flow = TestUtils - .createTestExecutableFlow("exectest1", "exec1", DispatchMethod.CONTAINERIZED); + .createTestExecutableFlow(projectName, flowName, DispatchMethod.CONTAINERIZED); Map imageTypeVersionMap = this.imageRampupManger .getVersionByImageTypes(flow, imageTypes, new HashSet<>()); Assert.assertEquals("3.6.5", imageTypeVersionMap.get("azkaban_config").getVersion()); @@ -143,8 +150,21 @@ public void testFetchVersionByImageTypesCase2() throws Exception { + "/all_image_types_active_version.json"); final List activeImageVersionDTOs = converterUtils.convertToDTOs( jsonImageTypeActiveVersion, ImageVersionDTO.class); - final List activeImageVersions = + final List activeImageVersionsAll = this.imageVersionConverter.convertToDataModels(activeImageVersionDTOs); + // Create a map of these ImageVersions and remove those which are found in imageTypes + // to mock remainingImages + Map activeVersionMap = new HashMap<>(); + for (final ImageVersion imageVersion : activeImageVersionsAll) { + activeVersionMap.put(imageVersion.getName(), imageVersion); + } + for (final String name : imageTypeRampups.keySet()) { + if (activeVersionMap.containsKey(name)) { + activeVersionMap.remove(name); + } + } + final List activeImageVersions = + new ArrayList<>(activeVersionMap.values()); final String jsonImageTypeNewAndRampupVersion = JSONUtils.readJsonFileAsString( "image_management" + "/all_image_types_new_and_rampup_version.json"); @@ -162,10 +182,15 @@ public void testFetchVersionByImageTypesCase2() throws Exception { // Below image type versions are obtained from active ramp up. Version is selected randomly // based on rampup percentage. Assert.assertNotNull(imageTypeVersionMap.get("azkaban_config")); + Assert.assertEquals("3.6.5", imageTypeVersionMap.get("azkaban_config").getVersion()); Assert.assertNotNull(imageTypeVersionMap.get("azkaban_core")); + Assert.assertEquals("3.6.1", imageTypeVersionMap.get("azkaban_core").getVersion()); Assert.assertNotNull(imageTypeVersionMap.get("azkaban_exec")); + Assert.assertEquals("1.8.1", imageTypeVersionMap.get("azkaban_exec").getVersion()); Assert.assertNotNull(imageTypeVersionMap.get("hive_job")); + Assert.assertEquals("2.1.1", imageTypeVersionMap.get("hive_job").getVersion()); Assert.assertNotNull(imageTypeVersionMap.get("spark_job")); + Assert.assertEquals("1.1.1", imageTypeVersionMap.get("spark_job").getVersion()); // Below two image types are from based on active image version Assert.assertNotNull(imageTypeVersionMap.get("pig_job")); Assert.assertEquals("4.1.2", imageTypeVersionMap.get("pig_job").getVersion()); @@ -354,6 +379,62 @@ public void testFetchAllImageTypesVersionFailureCase() throws Exception { Assert.assertEquals("5.1.5", imageTypeVersionMap.get("hadoop_job")); } + /** + * The test is for HP flow. If a flow is High priority, then skip ramp up. + */ + @Test + public void testFetchVersionByImageTypesHPFlow() throws Exception { + final String jsonImageTypeRampups = JSONUtils.readJsonFileAsString("image_management/" + + "image_type_rampups.json"); + final Map> imageTypeRampups = convertToRampupMap(jsonImageTypeRampups); + final Set imageTypes = new TreeSet<>(); + imageTypes.add("spark_job"); + imageTypes.add("hive_job"); + imageTypes.add("azkaban_core"); + imageTypes.add("azkaban_config"); + imageTypes.add("azkaban_exec"); + imageTypes.add("pig_job"); + imageTypes.add("hadoop_job"); + final String jsonImageTypeActiveVersion = JSONUtils.readJsonFileAsString("image_management" + + "/all_image_types_active_version.json"); + final List activeImageVersionDTOs = converterUtils.convertToDTOs( + jsonImageTypeActiveVersion, ImageVersionDTO.class); + final List activeImageVersionsAll = + this.imageVersionConverter.convertToDataModels(activeImageVersionDTOs); + final String jsonImageTypeNewAndRampupVersion = JSONUtils.readJsonFileAsString( + "image_management" + + "/all_image_types_new_and_rampup_version.json"); + final List newAndRampupImageVersionDTOs = converterUtils.convertToDTOs( + jsonImageTypeNewAndRampupVersion, ImageVersionDTO.class); + final List newAndRampupImageVersions = + this.imageVersionConverter.convertToDataModels(newAndRampupImageVersionDTOs); + when(this.imageRampupDao.getRampupByImageTypes(any(Set.class))).thenReturn(imageTypeRampups); + when(this.imageVersionDao.findImageVersions(any(ImageMetadataRequest.class))).thenReturn(newAndRampupImageVersions); + // Only active versions are used. + when(this.imageVersionDao.getActiveVersionByImageTypes(any(Set.class))) + .thenReturn(activeImageVersionsAll); + when(this.hpFlowDao.isHPFlow(any(ExecutableFlow.class))).thenReturn(true); + final Map imageTypeVersionMap = this.imageRampupManger + .getVersionByImageTypes(null, imageTypes, new HashSet<>()); + Assert.assertNotNull(imageTypeVersionMap); + // Below image type versions are obtained from active versions despite there being active ramp up. + Assert.assertNotNull(imageTypeVersionMap.get("azkaban_config")); + Assert.assertEquals("3.6.7", imageTypeVersionMap.get("azkaban_config").getVersion()); + Assert.assertNotNull(imageTypeVersionMap.get("azkaban_core")); + Assert.assertEquals("3.6.3", imageTypeVersionMap.get("azkaban_core").getVersion()); + Assert.assertNotNull(imageTypeVersionMap.get("azkaban_exec")); + Assert.assertEquals("1.8.3", imageTypeVersionMap.get("azkaban_exec").getVersion()); + Assert.assertNotNull(imageTypeVersionMap.get("hive_job")); + Assert.assertEquals("2.1.4", imageTypeVersionMap.get("hive_job").getVersion()); + Assert.assertNotNull(imageTypeVersionMap.get("spark_job")); + Assert.assertEquals("1.1.3", imageTypeVersionMap.get("spark_job").getVersion()); + // Below two image types are from based on active image version + Assert.assertNotNull(imageTypeVersionMap.get("pig_job")); + Assert.assertEquals("4.1.2", imageTypeVersionMap.get("pig_job").getVersion()); + Assert.assertNotNull(imageTypeVersionMap.get("hadoop_job")); + Assert.assertEquals("5.1.5", imageTypeVersionMap.get("hadoop_job").getVersion()); + } + private Map> convertToRampupMap(final String input) { Map> imageTypeRampups = null; try { diff --git a/azkaban-db/src/main/sql/create.containerization-tables-all.sql b/azkaban-db/src/main/sql/create.containerization-tables-all.sql index 44f108f0c9..558574d828 100644 --- a/azkaban-db/src/main/sql/create.containerization-tables-all.sql +++ b/azkaban-db/src/main/sql/create.containerization-tables-all.sql @@ -120,3 +120,20 @@ CREATE UNIQUE INDEX version_set_md5 -- alter table execution_flows add column dispatch_method TINYINT default 1; -- CREATE INDEX ex_flows_dispatch_method ON execution_flows (dispatch_method); +-- Definition for hp_flow_owners table. It contains list of owners who have +-- access to add/remove high priority flows. +CREATE TABLE IF NOT EXISTS hp_flow_owners ( + owner VARCHAR(64) NOT NULL PRIMARY KEY, + created_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + created_by VARCHAR(64) NOT NULL, + modified_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + modified_by VARCHAR(64) NOT NULL +); + +-- Definition for hp_flows. It contains the flow_id in format "project_name.flow_name". + +CREATE TABLE IF NOT EXISTS hp_flows ( + flow_id VARCHAR(256) NOT NULL PRIMARY KEY, + created_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + created_by VARCHAR(64) NOT NULL +); diff --git a/azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowService.java b/azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowService.java new file mode 100644 index 0000000000..e7f2f84aa0 --- /dev/null +++ b/azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowService.java @@ -0,0 +1,21 @@ +package azkaban.imagemgmt.services; + +import azkaban.imagemgmt.dto.HPFlowDTO; +import azkaban.imagemgmt.exception.ImageMgmtException; +import java.util.List; + + +/** + * This service layer interface exposes methods for delegation and processing + * logic for high priority flow APIs. The requests are routed to respective + * DAO layer for data access. + */ +public interface HPFlowService { + + /** + * Add one or more HP flows. + * @param hpFlowDTO + * @throws ImageMgmtException + */ + List addHPFlows(final HPFlowDTO hpFlowDTO) throws ImageMgmtException; +} diff --git a/azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowServiceImpl.java b/azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowServiceImpl.java new file mode 100644 index 0000000000..42a43b6e56 --- /dev/null +++ b/azkaban-web-server/src/main/java/azkaban/imagemgmt/services/HPFlowServiceImpl.java @@ -0,0 +1,45 @@ +package azkaban.imagemgmt.services; + +import azkaban.imagemgmt.daos.HPFlowDao; +import azkaban.imagemgmt.dto.HPFlowDTO; +import azkaban.imagemgmt.exception.ImageMgmtException; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.log4j.Logger; + +/** + * This service layer implementation exposes methods for delegation and + * processing logic for high priority flow APIs. The requests are routed + * to respective DAO layer for data access. + */ +@Singleton +public class HPFlowServiceImpl implements HPFlowService { + public static final Logger log = Logger.getLogger(HPFlowServiceImpl.class); + + private final HPFlowDao hpFlowDao; + + @Inject + public HPFlowServiceImpl(final HPFlowDao hpFlowDao) { + this.hpFlowDao = hpFlowDao; + } + + /** + * Add one or more HP flows. + * @param hpFlowDTO + * @throws ImageMgmtException + */ + @Override + public List addHPFlows(final HPFlowDTO hpFlowDTO) throws ImageMgmtException { + // Get list of flows from the DTO + final List flowIdList = hpFlowDTO.getFlowIdList(); + this.hpFlowDao.addHPFlows(flowIdList, hpFlowDTO.getCreatedBy()); + return flowIdList; + } + + @VisibleForTesting + List getFlowIdList(final HPFlowDTO hpFlowDTO) { + return hpFlowDTO.getFlowIdList(); + } +} diff --git a/azkaban-web-server/src/main/java/azkaban/imagemgmt/servlets/HPFlowServlet.java b/azkaban-web-server/src/main/java/azkaban/imagemgmt/servlets/HPFlowServlet.java new file mode 100644 index 0000000000..9021cd57be --- /dev/null +++ b/azkaban-web-server/src/main/java/azkaban/imagemgmt/servlets/HPFlowServlet.java @@ -0,0 +1,129 @@ +package azkaban.imagemgmt.servlets; + +import azkaban.imagemgmt.daos.HPFlowDao; +import azkaban.imagemgmt.dto.HPFlowDTO; +import azkaban.imagemgmt.exception.ErrorCode; +import azkaban.imagemgmt.exception.ImageMgmtException; +import azkaban.imagemgmt.exception.ImageMgmtInvalidInputException; +import azkaban.imagemgmt.exception.ImageMgmtInvalidPermissionException; +import azkaban.imagemgmt.exception.ImageMgmtValidationException; +import azkaban.imagemgmt.permission.PermissionManager; +import azkaban.imagemgmt.services.HPFlowService; +import azkaban.imagemgmt.utils.ConverterUtils; +import azkaban.server.HttpRequestUtils; +import azkaban.server.session.Session; +import azkaban.user.User; +import azkaban.webapp.AzkabanWebServer; +import azkaban.webapp.servlet.LoginAbstractAzkabanServlet; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.http.HttpStatus; +import org.apache.log4j.Logger; + + +/** + * This class handles all the REST API calls for API "/hpFlows" + * It currently implements addition of high priority flows. + */ +public class HPFlowServlet extends LoginAbstractAzkabanServlet { + + private static final Logger log = Logger.getLogger(HPFlowServlet.class); + private static final String BASE_HP_FLOW_URI = "/hpFlows"; + private ConverterUtils converterUtils; + private HPFlowService hpFlowService; + + public HPFlowServlet() { + super(new ArrayList<>()); + } + + @Override + public void init(final ServletConfig config) throws ServletException { + super.init(config); + final AzkabanWebServer server = getApplication(); + this.converterUtils = server.getConverterUtils(); + this.hpFlowService = server.getHPFlowService(); + } + + @Override + protected void handleGet(final HttpServletRequest req, + final HttpServletResponse resp, final Session session) + throws ServletException, IOException { + + } + + @Override + protected void handlePost(final HttpServletRequest req, + final HttpServletResponse resp, final Session session) + throws ServletException, IOException { + handleAddHPFlows(req, resp, session); + } + + /** + * This method takes in a CSV of high priority flows and adds them to metadata. + * @param req + * @param resp + * @param session + * @throws ServletException + * @throws IOException + */ + protected void handleAddHPFlows(final HttpServletRequest req, + final HttpServletResponse resp, final Session session) + throws ServletException, IOException { + try { + final String jsonPayLoad = HttpRequestUtils.getBody(req); + final HPFlowDTO hpFlowDTO = this.converterUtils.convertToDTO(jsonPayLoad, HPFlowDTO.class); + // If there are no flow IDs in the list return with error + if (hpFlowDTO.getFlowIds().isEmpty()) { + log.error("There are no flow IDs provided"); + throw new ImageMgmtValidationException(ErrorCode.BAD_REQUEST, + "Required field flow IDs is empty"); + } + // Check for required permission to invoke the API + final User user = session.getUser(); + if (!hasHPFlowManagementPermission(user)) { + log.debug(String.format("Invalid permission to access High Priority " + + "flows for user: %s", user.getUserId())); + throw new ImageMgmtInvalidPermissionException(ErrorCode.FORBIDDEN, + "Invalid permission to manage high priority flows"); + } + // Set the user who invoked the API + hpFlowDTO.setCreatedBy(user.getUserId()); + // Add the HP flow metadata + this.hpFlowService.addHPFlows(hpFlowDTO); + // Prepare to send response + resp.setStatus(HttpStatus.SC_CREATED); + sendResponse(resp, HttpServletResponse.SC_CREATED, new HashMap<>()); + } catch (final ImageMgmtException e) { + log.error("Exception while adding high priority flows.", e); + sendErrorResponse(resp, e.getErrorCode().getCode(), e.getMessage()); + } catch (final Exception e) { + log.error("Exception while adding high priority flows", e); + sendErrorResponse(resp, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, + "Exception while adding high priority flows. " + e.getMessage()); + } + } + + /** + * Checks if the provided user has access to manage high priority flows. + * @param user + * @return + */ + private boolean hasHPFlowManagementPermission(final User user) { + /** + * Azkaban ADMIN role must have full permission to access image management APIs. Hence, no + * further permission check is required. + */ + if (isAzkabanAdmin(user)) { + return true; + } + + // Check the HPFlow Management API's access permission for other users. + final PermissionManager permissionManager = getApplication().getPermissionManager(); + return permissionManager.hasPermission(user.getUserId()); + } +} diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java index 4995fe6c98..8e549ff412 100644 --- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java +++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java @@ -37,10 +37,12 @@ import azkaban.flowtrigger.FlowTriggerService; import azkaban.flowtrigger.quartz.FlowTriggerScheduler; import azkaban.imagemgmt.permission.PermissionManager; +import azkaban.imagemgmt.services.HPFlowService; import azkaban.imagemgmt.services.ImageMgmtCommonService; import azkaban.imagemgmt.services.ImageRampupService; import azkaban.imagemgmt.services.ImageTypeService; import azkaban.imagemgmt.services.ImageVersionService; +import azkaban.imagemgmt.servlets.HPFlowServlet; import azkaban.imagemgmt.servlets.ImageRampupServlet; import azkaban.imagemgmt.servlets.ImageTypeServlet; import azkaban.imagemgmt.servlets.ImageVersionServlet; @@ -535,6 +537,7 @@ protected Map getRoutesMap() { routesMap.put("/imageTypes/*", new ImageTypeServlet()); routesMap.put("/imageVersions/*", new ImageVersionServlet()); routesMap.put("/imageRampup/*", new ImageRampupServlet()); + routesMap.put("/hpFlows/*", new HPFlowServlet()); } return routesMap; } @@ -813,4 +816,8 @@ public ConverterUtils getConverterUtils() { public ImageMgmtCommonService getImageMgmtCommonService() { return SERVICE_PROVIDER.getInstance(ImageMgmtCommonService.class); } + + public HPFlowService getHPFlowService() { + return SERVICE_PROVIDER.getInstance(HPFlowService.class); + } } diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java index 33071bb831..d40925758f 100644 --- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java +++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java @@ -48,6 +48,8 @@ import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl; import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException; import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager; +import azkaban.imagemgmt.services.HPFlowService; +import azkaban.imagemgmt.services.HPFlowServiceImpl; import azkaban.imagemgmt.services.ImageMgmtCommonService; import azkaban.imagemgmt.services.ImageMgmtCommonServiceImpl; import azkaban.imagemgmt.services.ImageRampupService; @@ -137,6 +139,7 @@ protected void configure() { bindContainerWatchDependencies(); bindContainerCleanupManager(); bindOnExecutionEventListener(); + bindHPFlowManagementDependencies(); } private Class resolveContainerMetricsClass() { @@ -235,6 +238,13 @@ private void bindOnExecutionEventListener() { bind(OnExecutionEventListener.class).to(OnContainerizedExecutionEventListener.class).in(Scopes.SINGLETON); } + private void bindHPFlowManagementDependencies() { + if (!isContainerizedDispatchMethodEnabled()) { + return; + } + bind(HPFlowService.class).to(HPFlowServiceImpl.class).in(Scopes.SINGLETON); + } + @Inject @Singleton @Provides diff --git a/azkaban-web-server/src/test/java/azkaban/imagemgmt/services/HpFlowServiceImplTest.java b/azkaban-web-server/src/test/java/azkaban/imagemgmt/services/HpFlowServiceImplTest.java new file mode 100644 index 0000000000..9d9b6c37fa --- /dev/null +++ b/azkaban-web-server/src/test/java/azkaban/imagemgmt/services/HpFlowServiceImplTest.java @@ -0,0 +1,58 @@ +package azkaban.imagemgmt.services; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import azkaban.imagemgmt.daos.HPFlowDao; +import azkaban.imagemgmt.daos.HPFlowDaoImpl; +import azkaban.imagemgmt.dto.HPFlowDTO; +import azkaban.imagemgmt.utils.ConverterUtils; +import azkaban.utils.JSONUtils; +import java.util.List; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class HpFlowServiceImplTest { + private HPFlowDao hpFlowDao; + private HPFlowService hpFlowService; + private ObjectMapper objectMapper; + private ConverterUtils converterUtils; + + @Before + public void setup() { + this.hpFlowDao = mock(HPFlowDaoImpl.class); + this.hpFlowService = new HPFlowServiceImpl(hpFlowDao); + this.objectMapper = new ObjectMapper(); + this.converterUtils = new ConverterUtils(objectMapper); + } + + /** + * Test to add high priority flows + */ + @Test + public void testAddHPFlows() { + final String jsonPayLoad = JSONUtils.readJsonFileAsString("hp_flows/hp_flows.json"); + final HPFlowDTO hpFlowDTO = this.converterUtils.convertToDTO(jsonPayLoad, HPFlowDTO.class); + final List hpFlowList = ((HPFlowServiceImpl)(this.hpFlowService)).getFlowIdList(hpFlowDTO); + hpFlowDTO.setCreatedBy("azkaban"); + when(this.hpFlowDao.addHPFlows(hpFlowList, "azkaban")).thenReturn(3); + this.hpFlowService.addHPFlows(hpFlowDTO); + // Verify that the list has flow names as intended. + Assert.assertEquals("project1.flow1", hpFlowList.get(0)); + Assert.assertEquals("project_2.flow_2", hpFlowList.get(1)); + Assert.assertEquals("project-3.flow-3", hpFlowList.get(2)); + } + + /** + * Negative test with empty list of flows. + */ + @Test + public void testAddHPFlowsEmpty() { + final String jsonPayLoad = JSONUtils.readJsonFileAsString("hp_flows/hp_flow_invalid.json"); + final HPFlowDTO hpFlowDTO = this.converterUtils.convertToDTO(jsonPayLoad, HPFlowDTO.class); + final List hpFlowList = ((HPFlowServiceImpl)(this.hpFlowService)).getFlowIdList(hpFlowDTO); + Assert.assertTrue(hpFlowList.isEmpty()); + } +} diff --git a/azkaban-web-server/src/test/resources/hp_flows/hp_flow_invalid.json b/azkaban-web-server/src/test/resources/hp_flows/hp_flow_invalid.json new file mode 100644 index 0000000000..b9a46e1144 --- /dev/null +++ b/azkaban-web-server/src/test/resources/hp_flows/hp_flow_invalid.json @@ -0,0 +1,3 @@ +{ + "flowIds": "" +} diff --git a/azkaban-web-server/src/test/resources/hp_flows/hp_flows.json b/azkaban-web-server/src/test/resources/hp_flows/hp_flows.json new file mode 100644 index 0000000000..016187a416 --- /dev/null +++ b/azkaban-web-server/src/test/resources/hp_flows/hp_flows.json @@ -0,0 +1,3 @@ +{ + "flowIds": "project1.flow1, project_2.flow_2, project-3.flow-3" +}