From 91ca862a481e295acea08405286661d77430ca3b Mon Sep 17 00:00:00 2001 From: pavankumar526 Date: Tue, 1 Mar 2016 12:07:02 +0530 Subject: [PATCH] FALCON-1802 Data based Support for Native Scheduler --- .../org/apache/falcon/entity/EntityUtil.java | 20 ++ .../falcon/expression/ExpressionHelper.java | 24 ++ .../java/org/apache/falcon/util/DateUtil.java | 175 ++++++++++++++ .../java/org/apache/falcon/util/TimeUnit.java | 39 +++ .../expression/ExpressionHelperTest.java | 24 ++ oozie/pom.xml | 6 + .../falcon/oozie/OozieEntityBuilder.java | 9 + .../OozieOrchestrationWorkflowBuilder.java | 29 ++- .../NativeOozieProcessWorkflowBuilder.java | 222 ++++++++++++++++++ .../execution/FalconExecutionService.java | 16 +- .../execution/ProcessExecutionInstance.java | 109 ++++++--- .../falcon/execution/ProcessExecutor.java | 37 ++- .../falcon/execution/SchedulerUtil.java | 34 +++ .../service/impl/AlarmService.java | 4 +- .../service/impl/DataAvailabilityService.java | 9 +- .../service/impl/SchedulerService.java | 6 +- .../request/DataNotificationRequest.java | 17 ++ .../apache/falcon/predicate/Predicate.java | 1 + .../org/apache/falcon/state/EntityState.java | 11 + .../apache/falcon/state/InstanceState.java | 6 +- .../org/apache/falcon/state/StateService.java | 19 +- .../falcon/state/store/EntityStateStore.java | 2 +- .../state/store/jdbc/BeanMapperUtil.java | 37 ++- .../falcon/state/store/jdbc/EntityBean.java | 15 +- .../state/store/jdbc/JDBCStateStore.java | 27 ++- .../falcon/workflow/engine/DAGEngine.java | 6 +- .../workflow/engine/FalconWorkflowEngine.java | 18 +- .../workflow/engine/OozieDAGEngine.java | 75 +++--- .../execution/FalconExecutionServiceTest.java | 7 +- .../falcon/execution/MockDAGEngine.java | 4 +- .../service/AlarmServiceTest.java | 2 +- .../src/test/resources/runtime.properties | 25 ++ src/conf/runtime.properties | 11 + .../falcon/unit/FalconUnitTestBase.java | 2 +- .../AbstractSchedulerManagerJerseyIT.java | 7 +- .../InstanceSchedulerManagerJerseyIT.java | 35 ++- .../resources/process-nolatedata-template.xml | 50 ++++ 37 files changed, 1016 insertions(+), 124 deletions(-) create mode 100644 common/src/main/java/org/apache/falcon/util/TimeUnit.java create mode 100644 oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java create mode 100644 scheduler/src/test/resources/runtime.properties create mode 100644 webapp/src/test/resources/process-nolatedata-template.xml diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 96befa12a..a8ccc41ca 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -94,6 +94,15 @@ public final class EntityUtil { public static final String WF_LIB_SEPARATOR = ","; private static final String STAGING_DIR_NAME_SEPARATOR = "_"; + public static final ThreadLocal PATH_FORMAT = new ThreadLocal() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmm"); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + return format; + } + }; + /** Priority with which the DAG will be scheduled. * Matches the five priorities of Hadoop jobs. */ @@ -1082,4 +1091,15 @@ public static JOBPRIORITY getPriority(Process process) { } return JOBPRIORITY.NORMAL; } + + public static String evaluateDependentPath(String feedPath, Date instanceTime) { + String timestamp = PATH_FORMAT.get().format(instanceTime); + String instancePath = feedPath.replaceAll("\\$\\{YEAR\\}", timestamp.substring(0, 4)); + instancePath = instancePath.replaceAll("\\$\\{MONTH\\}", timestamp.substring(4, 6)); + instancePath = instancePath.replaceAll("\\$\\{DAY\\}", timestamp.substring(6, 8)); + instancePath = instancePath.replaceAll("\\$\\{HOUR\\}", timestamp.substring(8, 10)); + instancePath = instancePath.replaceAll("\\$\\{MINUTE\\}", timestamp.substring(10, 12)); + return instancePath; + } + } diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java index 65aaeba7c..dc4dbaf5e 100644 --- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java +++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java @@ -21,6 +21,7 @@ import org.apache.commons.el.ExpressionEvaluatorImpl; import org.apache.falcon.FalconException; import org.apache.falcon.entity.common.FeedDataPath; +import org.apache.falcon.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,7 @@ import javax.servlet.jsp.el.FunctionMapper; import javax.servlet.jsp.el.VariableResolver; import java.lang.reflect.Method; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; @@ -53,6 +55,7 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); private static final ExpressionHelper RESOLVER = ExpressionHelper.get(); + public static final ThreadLocal FORMATTER = new ThreadLocal() { @Override protected SimpleDateFormat initialValue() { @@ -257,4 +260,25 @@ public static String substitute(String originalValue, Properties properties) { return originalValue; } + public static String formatTime(String dateTimeStr, String format) throws ParseException { + Date dateTime = DateUtil.parseDateOozieTZ(dateTimeStr); + return DateUtil.formatDateCustom(dateTime, format); + } + + public static String instanceTime() { + return DateUtil.formatDateOozieTZ(referenceDate.get()); + } + + public static String dateOffset(String strBaseDate, int offset, String unit) throws Exception { + Calendar baseCalDate = DateUtil.getCalendar(strBaseDate); + StringBuilder buffer = new StringBuilder(); + baseCalDate.add(org.apache.falcon.util.TimeUnit.valueOf(unit).getCalendarUnit(), offset); + buffer.append(DateUtil.formatDateOozieTZ(baseCalDate)); + return buffer.toString(); + } + + public static String user() { + return "${user.name}"; + } + } diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java index baf5b134f..1cbc06081 100644 --- a/common/src/main/java/org/apache/falcon/util/DateUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java @@ -17,12 +17,20 @@ */ package org.apache.falcon.util; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.Frequency; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.ParsePosition; +import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Helper to get date operations. @@ -39,8 +47,37 @@ public final class DateUtil { public static final long HOUR_IN_MILLIS = 60 * 60 * 1000; + private static final Pattern GMT_OFFSET_COLON_PATTERN = Pattern.compile("^GMT(\\-|\\+)(\\d{2})(\\d{2})$"); + + public static final TimeZone UTC = getTimeZone("UTC"); + + public static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'"; + private static String activeTimeMask = ISO8601_UTC_MASK; + private static TimeZone activeTimeZone = UTC; + + private static final Pattern VALID_TIMEZONE_PATTERN = Pattern.compile("^UTC$|^GMT(\\+|\\-)\\d{4}$"); + + private static final String ISO8601_TZ_MASK_WITHOUT_OFFSET = "yyyy-MM-dd'T'HH:mm"; + private static boolean entityInUTC = true; + private DateUtil() {} + /** + * Configures the Datetime parsing with Oozie processing timezone. + * + */ + public static void setTimeZone(String tz) throws FalconException { + if (StringUtils.isBlank(tz)) { + tz = "UTC"; + } + if (!VALID_TIMEZONE_PATTERN.matcher(tz).matches()) { + throw new FalconException("Invalid entity timezone, it must be 'UTC' or 'GMT(+/-)####"); + } + activeTimeZone = TimeZone.getTimeZone(tz); + entityInUTC = activeTimeZone.equals(UTC); + activeTimeMask = (entityInUTC) ? ISO8601_UTC_MASK : ISO8601_TZ_MASK_WITHOUT_OFFSET + tz.substring(3); + } + public static Date getNextMinute(Date time) throws Exception { Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); insCal.setTime(time); @@ -99,4 +136,142 @@ public static Date now() { public static Date offsetTime(Date date, int seconds) { return new Date(1000L * seconds + date.getTime()); } + + /** + * Parses a datetime in ISO8601 format in the Oozie processing timezone. + * + * @param s string with the datetime to parse. + * @return the corresponding {@link java.util.Date} instance for the parsed date. + * @throws java.text.ParseException thrown if the given string was + * not an ISO8601 value for the Oozie processing timezon. + */ + public static Date parseDateOozieTZ(String s) throws ParseException { + s = s.trim(); + ParsePosition pos = new ParsePosition(0); + Date d = getISO8601DateFormat(activeTimeZone, activeTimeMask).parse(s, pos); + if (d == null) { + throw new ParseException("Could not parse [" + s + "] using [" + activeTimeMask + "] mask", + pos.getErrorIndex()); + } + if (s.length() > pos.getIndex()) { + throw new ParseException("Correct datetime string is followed by invalid characters: " + s, pos.getIndex()); + } + return d; + } + + private static DateFormat getISO8601DateFormat(TimeZone tz, String mask) { + DateFormat dateFormat = new SimpleDateFormat(mask); + // Stricter parsing to prevent dates such as 2011-12-50T01:00Z (December 50th) from matching + dateFormat.setLenient(false); + dateFormat.setTimeZone(tz); + return dateFormat; + } + + private static DateFormat getSpecificDateFormat(String format) { + DateFormat dateFormat = new SimpleDateFormat(format); + dateFormat.setTimeZone(activeTimeZone); + return dateFormat; + } + + /** + * Formats a {@link java.util.Date} as a string using the specified format mask. + *

+ * The format mask must be a {@link java.text.SimpleDateFormat} valid format mask. + * + * @param d {@link java.util.Date} to format. + * @return the string for the given date using the specified format mask, + * NULL if the {@link java.util.Date} instance was NULL + */ + public static String formatDateCustom(Date d, String format) { + return (d != null) ? getSpecificDateFormat(format).format(d) : "NULL"; + } + + /** + * Formats a {@link java.util.Date} as a string in ISO8601 format using Oozie processing timezone. + * + * @param d {@link java.util.Date} to format. + * @return the ISO8601 string for the given date, NULL if the {@link java.util.Date} instance was + * NULL + */ + public static String formatDateOozieTZ(Date d) { + return (d != null) ? getISO8601DateFormat(activeTimeZone, activeTimeMask).format(d) : "NULL"; + } + + /** + * Returns the {@link java.util.TimeZone} for the given timezone ID. + * + * @param tzId timezone ID. + * @return the {@link java.util.TimeZone} for the given timezone ID. + */ + public static TimeZone getTimeZone(String tzId) { + if (tzId == null) { + throw new IllegalArgumentException("Timezone cannot be null"); + } + tzId = handleGMTOffsetTZNames(tzId); // account for GMT-#### + TimeZone tz = TimeZone.getTimeZone(tzId); + // If these are not equal, it means that the tzId is not valid (invalid tzId's return GMT) + if (!tz.getID().equals(tzId)) { + throw new IllegalArgumentException("Invalid TimeZone: " + tzId); + } + return tz; + } + + /** + * {@link java.util.TimeZone#getTimeZone(String)} takes the timezone ID as an argument; for invalid IDs + * it returns the GMT TimeZone. A timezone ID formatted like GMT-#### is not a valid ID, + * however, it will actually map this to the GMT-##:## TimeZone, instead of returning the + * GMT TimeZone. We check (later) check that a timezone ID is valid by calling + * {@link java.util.TimeZone#getTimeZone(String)} and seeing if the returned + * TimeZone ID is equal to the original; because we want to allow GMT-####, while still + * disallowing actual invalid IDs, we have to manually replace GMT-#### + * with GMT-##:## first. + * + * @param tzId The timezone ID + * @return If tzId matches GMT-####, then we return GMT-##:##; otherwise, + * we return tzId unaltered + */ + private static String handleGMTOffsetTZNames(String tzId) { + Matcher m = GMT_OFFSET_COLON_PATTERN.matcher(tzId); + if (m.matches() && m.groupCount() == 3) { + tzId = "GMT" + m.group(1) + m.group(2) + ":" + m.group(3); + } + return tzId; + } + + /** + * Create a Calendar instance for UTC time zone using the specified date. + * @param dateString + * @return appropriate Calendar object + * @throws Exception + */ + public static Calendar getCalendar(String dateString) throws Exception { + return getCalendar(dateString, activeTimeZone); + } + + /** + * Create a Calendar instance using the specified date and Time zone. + * @param dateString + * @param tz : TimeZone + * @return appropriate Calendar object + * @throws Exception + */ + public static Calendar getCalendar(String dateString, TimeZone tz) throws Exception { + Date date = DateUtil.parseDateOozieTZ(dateString); + Calendar calDate = Calendar.getInstance(); + calDate.setTime(date); + calDate.setTimeZone(tz); + return calDate; + } + + /** + * Formats a {@link java.util.Calendar} as a string in ISO8601 format using Oozie processing timezone. + * + * @param c {@link java.util.Calendar} to format. + * @return the ISO8601 string for the given date, NULL if the {@link java.util.Calendar} instance was + * NULL + */ + public static String formatDateOozieTZ(Calendar c) { + return (c != null) ? formatDateOozieTZ(c.getTime()) : "NULL"; + } + } diff --git a/common/src/main/java/org/apache/falcon/util/TimeUnit.java b/common/src/main/java/org/apache/falcon/util/TimeUnit.java new file mode 100644 index 000000000..c8398466b --- /dev/null +++ b/common/src/main/java/org/apache/falcon/util/TimeUnit.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import java.util.Calendar; + +/** + * TimeUnit used for Date operations. + */ +public enum TimeUnit { + MINUTE(Calendar.MINUTE), HOUR(Calendar.HOUR), DAY(Calendar.DATE), MONTH(Calendar.MONTH), + YEAR(Calendar.YEAR), END_OF_DAY(Calendar.DATE), END_OF_MONTH(Calendar.MONTH), CRON(0), NONE(-1); + + private int calendarUnit; + + private TimeUnit(int calendarUnit) { + this.calendarUnit = calendarUnit; + } + + public int getCalendarUnit() { + return calendarUnit; + } +} diff --git a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java index da5dbca62..b3895c31b 100644 --- a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java +++ b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java @@ -81,4 +81,28 @@ public Object[][] createOffsets() { {"future(1,0)", "2015-02-01T00:00Z"}, }; } + + @Test + public void testFormatTime() throws FalconException { + String output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy\")", + String.class); + Assert.assertEquals(output, "2016"); + output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM\")", + String.class); + Assert.assertEquals(output, "2016-02"); + output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM-dd\")", + String.class); + Assert.assertEquals(output, "2016-02-01"); + } + + + @Test + public void testOffsetAndInstanceTime() throws FalconException { + String date = expressionHelper.evaluate("dateOffset(instanceTime(), 1, 'DAY')", String.class); + Assert.assertEquals(date, "2015-02-02T00:00Z"); + date = expressionHelper.evaluate("dateOffset(instanceTime(), 3, 'HOUR')", String.class); + Assert.assertEquals(date, "2015-02-01T03:00Z"); + date = expressionHelper.evaluate("dateOffset(instanceTime(), -25, 'MINUTE')", String.class); + Assert.assertEquals(date, "2015-01-31T23:35Z"); + } } diff --git a/oozie/pom.xml b/oozie/pom.xml index 4623d8b88..304d54968 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -97,6 +97,12 @@ compile + + joda-time + joda-time + ${joda.version} + + diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java index a36ee797f..a856f8aaa 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java @@ -109,6 +109,14 @@ public OozieEntityBuilder(T entity) { public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException; public Properties build(Cluster cluster, Path buildPath, Map properties) throws FalconException { + Properties props = new Properties(); + if (properties != null) { + props.putAll(properties); + } + return build(cluster, buildPath, props); + } + + public Properties build(Cluster cluster, Path buildPath, Properties properties) throws FalconException { Properties builderProperties = build(cluster, buildPath); if (properties == null || properties.isEmpty()) { return builderProperties; @@ -130,6 +138,7 @@ public Properties build(Cluster cluster, Path buildPath, Map pro return propertiesCopy; } + protected String getStoragePath(Path path) { if (path != null) { return getStoragePath(path.toString()); diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index e137e11d4..5e87c1002 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -37,6 +37,7 @@ import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder; import org.apache.falcon.oozie.feed.HCatReplicationWorkflowBuilder; import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder; +import org.apache.falcon.oozie.process.NativeOozieProcessWorkflowBuilder; import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder; import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; @@ -58,6 +59,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.joda.time.DateTime; import javax.xml.bind.JAXBElement; import javax.xml.namespace.QName; @@ -93,11 +95,19 @@ public abstract class OozieOrchestrationWorkflowBuilder extend new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, })); private LifeCycle lifecycle; + private DateTime nominalTime; protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L; protected static final String MR_QUEUE_NAME = "queueName"; protected static final String MR_JOB_PRIORITY = "jobPriority"; + /** + * Represents Scheduler for Entities. + */ + public enum Scheduler { + OOZIE, NATIVE + } + public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) { super(entity); this.lifecycle = lifecycle; @@ -115,7 +125,13 @@ public OozieOrchestrationWorkflowBuilder(T entity) { super(entity); } - public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle) + public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, + Tag lifecycle) throws FalconException { + return get(entity, cluster, lifecycle, Scheduler.OOZIE); + } + + public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle, + Scheduler scheduler) throws FalconException { switch (entity.getEntityType()) { case FEED: @@ -166,6 +182,9 @@ public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster clust return new PigProcessWorkflowBuilder(process); case OOZIE: + if (Scheduler.NATIVE == scheduler) { + return new NativeOozieProcessWorkflowBuilder(process); + } return new OozieProcessWorkflowBuilder(process); case HIVE: @@ -497,4 +516,12 @@ protected CONFIGURATION getConfig(Properties props) { } return conf; } + + public void setNominalTime(DateTime nominalTime) { + this.nominalTime = nominalTime; + } + + public DateTime getNominalTime() { + return nominalTime; + } } diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java new file mode 100644 index 000000000..e66ad824f --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.process; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Property; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.util.DateUtil; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.hadoop.fs.Path; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +/** + * Workflow Builder for oozie process in case of Native Scheduler. + */ +public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuilder { + + private static final ExpressionHelper EXPRESSION_HELPER = ExpressionHelper.get(); + private static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; + + public NativeOozieProcessWorkflowBuilder(org.apache.falcon.entity.v0.process.Process entity) { + super(entity); + } + + @Override + public java.util.Properties build(org.apache.falcon.entity.v0.cluster.Cluster cluster, + Path buildPath, Properties suppliedProps) throws FalconException { + Properties elProps = new Properties(); + DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); + elProps.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), fmt.print(getNominalTime())); + elProps.put(WorkflowExecutionArgs.TIMESTAMP.getName(), fmt.print(getNominalTime())); + elProps.put(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED.getName(), "true"); + elProps.put(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED.getName(), "false"); //check true or false + + + DateUtil.setTimeZone(entity.getTimezone().getID()); + ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis())); + elProps.putAll(getInputProps(cluster)); + elProps.putAll(getOutputProps()); + elProps.putAll(evalProperties()); + Properties buildProps = build(cluster, buildPath); + buildProps.putAll(elProps); + copyPropsWithoutOverride(buildProps, suppliedProps); + return buildProps; + } + + private void copyPropsWithoutOverride(Properties buildProps, Properties suppliedProps) { + if (suppliedProps == null || suppliedProps.isEmpty()) { + return; + } + for (String propertyName : suppliedProps.stringPropertyNames()) { + if (buildProps.containsKey(propertyName)) { + LOG.warn("User provided property {} is already declared in the entity and will be ignored.", + propertyName); + continue; + } + String propertyValue = suppliedProps.getProperty(propertyName); + buildProps.put(propertyName, propertyValue); + } + } + + private Properties evalProperties() throws FalconException { + Properties props = new Properties(); + org.apache.falcon.entity.v0.process.Properties processProps = entity.getProperties(); + for (Property property : processProps.getProperties()) { + String propName = property.getName(); + String propValue = property.getValue(); + String evalExp = EXPRESSION_HELPER.evaluateFullExpression(propValue, String.class); + props.put(propName, evalExp); + } + return props; + } + + private Properties getOutputProps() throws FalconException { + Properties props = new Properties(); + if (entity.getOutputs() == null) { + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE); + return props; + } + List feedNames = new ArrayList<>(); + List feedInstancePaths= new ArrayList<>(); + for (Output output : entity.getOutputs().getOutputs()) { + Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); + feedNames.add(feed.getName()); + String outputExp = output.getInstance(); + Date outTime = EXPRESSION_HELPER.evaluate(outputExp, Date.class); + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = + EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); + if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { + continue; + } + + List locations = FeedHelper.getLocations(cluster, feed); + for (Location loc : locations) { + String path = EntityUtil.evaluateDependentPath(loc.getPath(), outTime); + path = getStoragePath(path); + if (loc.getType() != LocationType.DATA) { + props.put(output.getName() + "." + loc.getType().toString().toLowerCase(), path); + } else { + props.put(output.getName(), path); + } + feedInstancePaths.add(path); + } + } + } + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(feedNames, ",")); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(feedInstancePaths, ",")); + return props; + } + + private Properties getInputProps(Cluster clusterObj) throws FalconException { + Properties props = new Properties(); + + if (entity.getInputs() == null) { + props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE); + props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), NONE); + props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), NONE); + props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), NONE); + return props; + } + List falconInputFeeds = new ArrayList<>(); + List falconInputNames = new ArrayList<>(); + List falconInputPaths = new ArrayList<>(); + List falconInputFeedStorageTypes = new ArrayList<>(); + for (Input input : entity.getInputs().getInputs()) { + Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); + Storage storage = FeedHelper.createStorage(clusterObj, feed); + if (storage.getType() != Storage.TYPE.FILESYSTEM) { + throw new UnsupportedOperationException("Storage Type not supported " + storage.getType()); + } + falconInputFeeds.add(feed.getName()); + falconInputNames.add(input.getName()); + falconInputFeedStorageTypes.add(storage.getType().name()); + String partition = input.getPartition(); + + String startTimeExp = input.getStart(); + String endTimeExp = input.getEnd(); + ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis())); + Date startTime = EXPRESSION_HELPER.evaluate(startTimeExp, Date.class); + Date endTime = EXPRESSION_HELPER.evaluate(endTimeExp, Date.class); + + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = + EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); + if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { + continue; + } + + List locations = FeedHelper.getLocations(cluster, feed); + for (Location loc : locations) { + if (loc.getType() != LocationType.DATA) { + continue; + } + List paths = new ArrayList<>(); + List instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), + startTime, endTime); // test when startTime and endTime are equal. + for (Date instanceTime : instanceTimes) { + String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime); + if (StringUtils.isNotBlank(partition)) { + if (!path.endsWith("/") && !partition.startsWith("/")) { + path = path + "/"; + } + path = path + partition; + } + path = getStoragePath(path); + paths.add(path); + } + if (loc.getType() != LocationType.DATA) { + props.put(input.getName() + "." + loc.getType().toString().toLowerCase(), + StringUtils.join(paths, ",")); + } else { + props.put(input.getName(), StringUtils.join(paths, ",")); + } + falconInputPaths.add(StringUtils.join(paths, ",")); + } + } + } + props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(falconInputFeeds, "#")); + props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), StringUtils.join(falconInputNames, "#")); + props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), StringUtils.join(falconInputPaths, "#")); + props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), + StringUtils.join(falconInputFeedStorageTypes, "#")); + return props; + } + +} diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index 93c894d45..a969c7ad6 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -203,10 +204,11 @@ public void onKill(Entity entity) throws FalconException { * Schedules an entity. * * @param entity + * @param properties * @throws FalconException */ - public void schedule(Entity entity) throws FalconException { - StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this); + public void schedule(Entity entity, Properties properties) throws FalconException { + StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this, properties); } /** @@ -256,4 +258,14 @@ public EntityExecutor getEntityExecutor(Entity entity, String cluster) throws Fa throw new FalconException("Entity executor for entity cluster key : " + id.getKey() + " does not exist."); } } + + /** + * Schedules an entity. + * + * @param entity + * @throws FalconException + */ + public void schedule(Process entity) throws FalconException { + schedule(entity, null); + } } diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index 8f026b79d..3d4d2597f 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.DataEvent; import org.apache.falcon.notification.service.event.Event; @@ -47,7 +48,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.Iterator; import java.util.List; @@ -59,12 +62,14 @@ public class ProcessExecutionInstance extends ExecutionInstance { private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class); private final Process process; - private List awaitedPredicates = new ArrayList<>(); + private List awaitedPredicates = Collections.synchronizedList(new ArrayList()); private DAGEngine dagEngine = null; - private boolean hasTimedOut = false; + protected boolean hasTimedOut = false; private InstanceID id; private int instanceSequence; + private boolean areDataPredicatesEmpty; private final FalconExecutionService executionService = FalconExecutionService.get(); + private final ExpressionHelper expressionHelper = ExpressionHelper.get(); /** * Constructor. @@ -81,7 +86,7 @@ public ProcessExecutionInstance(Process process, DateTime instanceTime, String c this.id = new InstanceID(process, cluster, getInstanceTime()); computeInstanceSequence(); dagEngine = DAGEngineFactory.getDAGEngine(cluster); - registerForNotifications(false); + areDataPredicatesEmpty = true; } /** @@ -110,7 +115,7 @@ private void computeInstanceSequence() { // Currently, registers for only data notifications to ensure gating conditions are met. // Can be extended to register for other notifications. - private void registerForNotifications(boolean isResume) throws FalconException { + public void registerForNotifications(boolean isResume) throws FalconException { if (process.getInputs() == null) { return; } @@ -120,14 +125,40 @@ private void registerForNotifications(boolean isResume) throws FalconException { continue; } Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); - List paths = new ArrayList<>(); + String startTimeExp = input.getStart(); + String endTimeExp = input.getEnd(); + DateTime processInstanceTime = getInstanceTime(); + expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis())); + + Date startTime = expressionHelper.evaluate(startTimeExp, Date.class); + Date endTime = expressionHelper.evaluate(endTimeExp, Date.class); + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = + EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); + if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { + continue; + } + List paths = new ArrayList<>(); List locations = FeedHelper.getLocations(cluster, feed); for (Location loc : locations) { if (loc.getType() != LocationType.DATA) { continue; } - paths.add(new Path(loc.getPath())); + List instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), + startTime, endTime); + for (Date instanceTime : instanceTimes) { + String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime); + if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) { + if (!path.endsWith("/")) { + path = path + "/"; + } + path = path + feed.getAvailabilityFlag(); + } + if (!paths.contains(new Path(path))) { + paths.add(new Path(path)); + } + } } Predicate predicate = Predicate.createDataPredicate(paths); @@ -135,21 +166,19 @@ private void registerForNotifications(boolean isResume) throws FalconException { if (isResume && !awaitedPredicates.contains(predicate)) { continue; } - // TODO : Revisit this once the Data Notification Service has been built - // TODO Very IMP : Need to change the polling frequency + addDataPredicate(predicate); DataAvailabilityService.DataRequestBuilder requestBuilder = (DataAvailabilityService.DataRequestBuilder) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA) .createRequestBuilder(executionService, getId()); requestBuilder.setLocations(paths) .setCluster(cluster.getName()) - .setPollingFrequencyInMillis(100) + .setPollingFrequencyInMillis(SchedulerUtil.getPollingFrequencyinMillis(process.getFrequency())) .setTimeoutInMillis(getTimeOutInMillis()) .setLocations(paths); NotificationServicesRegistry.register(requestBuilder.build()); - LOG.info("Registered for a data notification for process {} for data location {}", - process.getName(), StringUtils.join(paths, ",")); - awaitedPredicates.add(predicate); + LOG.info("Registered for a data notification for process {} of instance time {} for data location {}", + process.getName(), getInstanceTime(), StringUtils.join(paths, ",")); } } } @@ -168,22 +197,26 @@ public void onEvent(Event event) throws FalconException { case DATA_AVAILABLE: // Data has not become available and the wait time has passed if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) { - if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) { - hasTimedOut = true; - } - } else { - // If the event matches any of the awaited predicates, remove the predicate of the awaited list - Predicate toRemove = null; - for (Predicate predicate : awaitedPredicates) { - if (predicate.evaluate(Predicate.getPredicate(event))) { - toRemove = predicate; - break; - } - } - if (toRemove != null) { - awaitedPredicates.remove(toRemove); + hasTimedOut = true; + } + // If the event matches any of the awaited predicates, remove the predicate of the awaited list + Predicate toRemove = null; + synchronized (awaitedPredicates) { + Iterator iterator = awaitedPredicates.iterator(); + while (iterator.hasNext()) { + Predicate predicate = iterator.next(); + if (predicate.evaluate(Predicate.getPredicate(event))) { + toRemove = predicate; + break; } } + if (toRemove != null) { + awaitedPredicates.remove(toRemove); + } + if (awaitedPredicates.size() == 0) { + areDataPredicatesEmpty = true; + } + } break; default: } @@ -198,13 +231,16 @@ public boolean isReady() { if (awaitedPredicates.isEmpty()) { return true; } else { - // If it is waiting to be scheduled, it is in ready. - for (Predicate predicate : awaitedPredicates) { - if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) { - return false; + synchronized (awaitedPredicates) { + Iterator iterator = awaitedPredicates.iterator(); + while (iterator.hasNext()) { + Predicate predicate = iterator.next(); + if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) { + return false; + } } + return true; } - return true; } } @@ -333,4 +369,15 @@ public void destroy() throws FalconException { public void rerun() throws FalconException { registerForNotifications(false); } + + public boolean areDataAwaitingPredicatesEmpty() { + return areDataPredicatesEmpty; + } + + protected synchronized void addDataPredicate(Predicate predicate) { + synchronized (awaitedPredicates) { + awaitedPredicates.add(predicate); + areDataPredicatesEmpty = false; + } + } } diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index 745d2ea60..52fe294ab 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -33,6 +33,7 @@ import org.apache.falcon.exception.InvalidStateTransitionException; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.notification.service.event.DataEvent; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.JobCompletedEvent; @@ -93,8 +94,9 @@ public void schedule() throws FalconException { initInstances(); } // Check to handle restart and restoration from state store. - if (STATE_STORE.getEntity(id.getEntityID()).getCurrentState() != EntityState.STATE.SCHEDULED) { - dryRun(); + EntityState entityState = STATE_STORE.getEntity(id.getEntityID()); + if (entityState.getCurrentState() != EntityState.STATE.SCHEDULED) { + dryRun(entityState.getProperties()); } else { LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster); LOG.info("Loading instances for process {} from state store.", process.getName()); @@ -103,8 +105,8 @@ public void schedule() throws FalconException { registerForNotifications(getLastInstanceTime()); } - private void dryRun() throws FalconException { - DAGEngineFactory.getDAGEngine(cluster).submit(process); + private void dryRun(Properties properties) throws FalconException { + DAGEngineFactory.getDAGEngine(cluster).submit(process, properties); } // Initializes the cache of execution instances. Cache is backed by the state store. @@ -422,6 +424,30 @@ private void handleEvent(Event event) throws FalconException { stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); } break; + case DATA_AVAILABLE: + instance = instances.get((InstanceID)event.getTarget()); + instance.onEvent(event); + switch (((DataEvent) event).getStatus()) { + case AVAILABLE: + if (instance.areDataAwaitingPredicatesEmpty() && !instance.hasTimedOut) { + LOG.info("Data conditions met for instance {} and scheduled for running ", instance.getId()); + stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); + } else if (instance.areDataAwaitingPredicatesEmpty()) { + LOG.info("Instance {} timedout since input data not available", instance.getId()); + stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this); + } else { + STATE_STORE.updateExecutionInstance(new InstanceState(instance)); + } + break; + case UNAVAILABLE: + if (instance.areDataAwaitingPredicatesEmpty()) { + stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this); + } + break; + default: + throw new InvalidStateTransitionException("Invalid Data event status."); + } + break; default: if (isTriggerEvent(event)) { instance = buildInstance(event); @@ -476,7 +502,8 @@ private boolean shouldHandleEvent(Event event) { return event.getTarget().equals(id) || event.getType() == EventType.JOB_COMPLETED || event.getType() == EventType.JOB_SCHEDULED - || event.getType() == EventType.RE_RUN; + || event.getType() == EventType.RE_RUN + || event.getType() == EventType.DATA_AVAILABLE; } @Override diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java index 3e7fc9b9f..236da11c9 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java @@ -18,6 +18,7 @@ package org.apache.falcon.execution; import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.util.RuntimeProperties; import org.joda.time.DateTime; /** @@ -27,6 +28,14 @@ public final class SchedulerUtil { private static final long MINUTE_IN_MS = 60 * 1000L; private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS; + public static final String MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.minutely.process.polling.frequency.millis"; + public static final String HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.hourly.process.polling.frequency.millis"; + public static final String DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.daily.process.polling.frequency.millis"; + public static final String MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.monthly.process.polling.frequency.millis"; private SchedulerUtil(){}; @@ -51,4 +60,29 @@ public static long getFrequencyInMillis(DateTime referenceTime, Frequency freque throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name()); } } + + /** + * + * @param frequency + * @return + */ + public static long getPollingFrequencyinMillis(Frequency frequency) { + switch (frequency.getTimeUnit()) { + case minutes: + return Long.parseLong(RuntimeProperties.get().getProperty(MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "20000")); + case hours: + return Long.parseLong(RuntimeProperties.get().getProperty(HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "60000")); + case days: + return Long.parseLong(RuntimeProperties.get().getProperty(DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "120000")); + case months: + return Long.parseLong(RuntimeProperties.get().getProperty(MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "180000")); + default: + throw new IllegalArgumentException("Unhandled frequency time unit " + frequency.getTimeUnit()); + } + } + } diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java index 115f31007..b9cbcd710 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java @@ -87,7 +87,7 @@ public void register(NotificationRequest notificationRequest) throws Notificatio DateTime nextStartTime = request.getStartTime(); DateTime endTime; if (request.getEndTime().isBefore(currentTime)) { - endTime = request.getEndTime(); + endTime = request.getEndTime().minusMinutes(1); } else { endTime = currentTime; } @@ -109,7 +109,7 @@ public void register(NotificationRequest notificationRequest) throws Notificatio } } // All past events have been scheduled. Nothing to schedule in the future. - if (request.getEndTime().isBefore(nextStartTime)) { + if (endTime.isBefore(nextStartTime)) { return; } LOG.debug("Scheduling to trigger events from {} to {} with frequency {}", nextStartTime, request.getEndTime(), diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java index 732da62b3..1240be9b5 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java @@ -67,6 +67,9 @@ public class DataAvailabilityService implements FalconNotificationService { public void register(NotificationRequest request) throws NotificationServiceException { LOG.info("Registering Data notification for " + request.getCallbackId().toString()); DataNotificationRequest dataNotificationRequest = (DataNotificationRequest) request; + if (instancesToIgnore.containsKey(dataNotificationRequest.getCallbackId())) { + instancesToIgnore.remove(dataNotificationRequest.getCallbackId()); + } delayQueue.offer(dataNotificationRequest); } @@ -246,7 +249,11 @@ private void updatePathsAvailability(List unAvailablePaths, FileSystem fs, Map locations) throws IOException { for (Path path : unAvailablePaths) { if (fs.exists(path)) { - locations.put(path, true); + if (locations.containsKey(path)) { + locations.put(path, true); + } else { + locations.put(new Path(path.toUri().getPath()), true); + } } } } diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index f5a7c86eb..9cc386315 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -44,6 +44,8 @@ import org.apache.falcon.notification.service.request.NotificationRequest; import org.apache.falcon.predicate.Predicate; import org.apache.falcon.state.EntityClusterID; +import org.apache.falcon.state.EntityID; +import org.apache.falcon.state.EntityState; import org.apache.falcon.state.ID; import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; @@ -309,7 +311,9 @@ public void run() { DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced); } } else { - externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance); + EntityState entityState = STATE_STORE.getEntity(new EntityID(instance.getEntity())); + externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()) + .run(instance, entityState.getProperties()); } LOG.info("Scheduled job {} for instance {}", externalId, instance.getId()); JobScheduledEvent event = new JobScheduledEvent(instance.getId(), diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java index c7dd5d368..9e2b993c5 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java @@ -151,6 +151,15 @@ public boolean equals(Object o) { if (!locations.equals(that.locations)) { return false; } + if (pollingFrequencyInMillis != (that.pollingFrequencyInMillis)) { + return false; + } + if (timeoutInMillis != that.timeoutInMillis) { + return false; + } + if (createdTimeInMillis != that.createdTimeInMillis) { + return false; + } return true; } @@ -158,8 +167,16 @@ public boolean equals(Object o) { public int hashCode() { int result = cluster.hashCode(); result = 31 * result + (locations != null ? locations.hashCode() : 0); + result = 31 * result + Long.valueOf(pollingFrequencyInMillis).hashCode(); + result = 31 * result + Long.valueOf(timeoutInMillis).hashCode(); + result = 31 * result + Long.valueOf(createdTimeInMillis).hashCode(); return result; } + @Override + public String toString() { + return "cluster: " + this.getCluster() + " locations: " + this.locations + " createdTime: " + + this.createdTimeInMillis; + } } diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java index c248db60a..93dcb123a 100644 --- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -163,6 +163,7 @@ public static Predicate createTimePredicate(long start, long end, long instanceT * @return */ public static Predicate createDataPredicate(List paths) { + Collections.sort(paths); return new Predicate(TYPE.DATA) .addClause("path", StringUtils.join(paths, ",")); } diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java index 38479a448..1b26c7ab4 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java @@ -20,6 +20,8 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.InvalidStateTransitionException; +import java.util.Properties; + /** * Represents the state of a schedulable entity. * Implements {@link org.apache.falcon.state.StateMachine} for an entity. @@ -27,8 +29,17 @@ public class EntityState implements StateMachine { private Entity entity; private STATE currentState; + private Properties properties; private static final STATE INITIAL_STATE = STATE.SUBMITTED; + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + /** * Enumerates all the valid states of a schedulable entity and the valid transitions from that state. */ diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java index b862e4dbe..ff0618510 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java @@ -270,7 +270,11 @@ public static List getTerminalStates() { @Override public String toString() { - return instance.getId().toString() + "STATE: " + currentState.toString(); + StringBuilder output = new StringBuilder(); + if (instance.getId() != null) { + output.append(instance.getId()); + } + return output.toString() + "STATE: " + currentState.toString(); } diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java index 638bb6eb8..8a66fb075 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java +++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java @@ -21,11 +21,14 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.ProcessExecutionInstance; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Properties; + /** * A service that fetches state from state store, handles state transitions of entities and instances, * invokes state change handler and finally persists the new state in the state store. @@ -62,14 +65,18 @@ public String getName() { * @param handler * @throws FalconException */ - public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler) - throws FalconException { + public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler, + Properties props) throws FalconException { EntityID id = new EntityID(entity); if (!stateStore.entityExists(id)) { // New entity if (event == EntityState.EVENT.SUBMIT) { callbackHandler(entity, EntityState.EVENT.SUBMIT, handler); - stateStore.putEntity(new EntityState(entity)); + EntityState entityState = new EntityState(entity); + if (props != null && !props.isEmpty()) { + entityState.setProperties(props); + } + stateStore.putEntity(entityState); LOG.debug("Entity {} submitted due to event {}.", id, event.name()); } else { throw new FalconException("Entity " + id + " does not exist in state store."); @@ -90,6 +97,11 @@ public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStat } } + public void handleStateChange(Entity entity, EntityState.EVENT event, + EntityStateChangeHandler handler) throws FalconException { + handleStateChange(entity, event, handler, null); + } + // Invokes the right method on the state change handler private void callbackHandler(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler) throws FalconException { @@ -133,6 +145,7 @@ public void handleStateChange(ExecutionInstance instance, InstanceState.EVENT ev if (event == InstanceState.EVENT.TRIGGER) { callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler); stateStore.putExecutionInstance(new InstanceState(instance)); + ((ProcessExecutionInstance) instance).registerForNotifications(false); LOG.debug("Instance {} triggered due to event {}.", id, event.name()); } else if (event == InstanceState.EVENT.EXTERNAL_TRIGGER) { callbackHandler(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, handler); diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java index a7deb89a3..10490e49a 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java @@ -45,7 +45,7 @@ public interface EntityStateStore { * @param entityId * @return true, if entity exists in store. */ - boolean entityExists(EntityID entityId) throws StateStoreException;; + boolean entityExists(EntityID entityId) throws StateStoreException; /** * @param state diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java index 194819e85..95724c255 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -57,7 +57,7 @@ private BeanMapperUtil() { * @param entityState * @return */ - public static EntityBean convertToEntityBean(EntityState entityState) { + public static EntityBean convertToEntityBean(EntityState entityState) throws IOException { EntityBean entityBean = new EntityBean(); Entity entity = entityState.getEntity(); String id = new EntityID(entity).getKey(); @@ -65,6 +65,10 @@ public static EntityBean convertToEntityBean(EntityState entityState) { entityBean.setName(entity.getName()); entityBean.setState(entityState.getCurrentState().toString()); entityBean.setType(entity.getEntityType().toString()); + if (entityState.getProperties() != null && !entityState.getProperties().isEmpty()) { + byte[] props = getProperties(entityState); + entityBean.setProperties(props); + } return entityBean; } @@ -74,11 +78,26 @@ public static EntityBean convertToEntityBean(EntityState entityState) { * @return * @throws StateStoreException */ - public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException { + public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException, IOException { try { Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName()); EntityState entityState = new EntityState(entity); entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState())); + byte[] result = entityBean.getProperties(); + if (result != null && result.length != 0) { + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result); + ObjectInputStream in = null; + Properties properties = null; + try { + in = new ObjectInputStream(byteArrayInputStream); + properties = (Properties) in.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(in); + } + entityState.setProperties(properties); + } return entityState; } catch (FalconException e) { throw new StateStoreException(e); @@ -92,7 +111,7 @@ public static EntityState convertToEntityState(EntityBean entityBean) throws Sta * @throws StateStoreException */ public static Collection convertToEntityState(Collection entityBeans) - throws StateStoreException { + throws StateStoreException, IOException { List entityStates = new ArrayList<>(); if (entityBeans != null && !entityBeans.isEmpty()) { for (EntityBean entityBean : entityBeans) { @@ -304,6 +323,18 @@ public static byte[] getAwaitedPredicates(InstanceState instanceState) throws IO } } + public static byte [] getProperties(EntityState entityState) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream out = null; + try { + out = new ObjectOutputStream(byteArrayOutputStream); + out.writeObject(entityState.getProperties()); + return byteArrayOutputStream.toByteArray(); + } finally { + IOUtils.closeQuietly(out); + } + } + /** * @param summary * @return A map of state and count given the JQL result. diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java index 37fb0cb14..f2492a7f2 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java @@ -24,6 +24,7 @@ import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; +import javax.persistence.Lob; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.OneToMany; @@ -38,7 +39,7 @@ @NamedQueries({ @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"), @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"), - @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"), + @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type, a.properties = :properties where a.id = :id"), @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"), @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"), @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"), @@ -68,6 +69,10 @@ public class EntityBean { @Column(name = "current_state") private String state; + @Column(name = "properties") + @Lob + private byte[] properties; + @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean") private List instanceBeans; @@ -113,5 +118,13 @@ public List getInstanceBeans() { public void setInstanceBeans(List instanceBeans) { this.instanceBeans = instanceBeans; } + + public byte[] getProperties() { + return properties; + } + + public void setProperties(byte[] properties) { + this.properties = properties; + } } diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index 1c072869a..9e6d099b9 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -72,11 +72,16 @@ public void putEntity(EntityState entityState) throws StateStoreException { if (entityExists(entityID)) { throw new StateStoreException("Entity with key, " + key + " already exists."); } - EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState); - EntityManager entityManager = getEntityManager(); - beginTransaction(entityManager); - entityManager.persist(entityBean); - commitAndCloseTransaction(entityManager); + EntityBean entityBean = null; + try { + entityBean = BeanMapperUtil.convertToEntityBean(entityState); + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + entityManager.persist(entityBean); + commitAndCloseTransaction(entityManager); + } catch (IOException e) { + throw new StateStoreException(e); + } } @@ -94,7 +99,11 @@ private EntityState getEntityByKey(EntityID id) throws StateStoreException { if (entityBean == null) { return null; } - return BeanMapperUtil.convertToEntityState(entityBean); + try { + return BeanMapperUtil.convertToEntityState(entityBean); + } catch (IOException e) { + throw new StateStoreException(e); + } } private EntityBean getEntityBean(EntityID id) { @@ -130,7 +139,11 @@ public Collection getAllEntities() throws StateStoreException { Query q = entityManager.createNamedQuery("GET_ENTITIES"); List result = q.getResultList(); entityManager.close(); - return BeanMapperUtil.convertToEntityState(result); + try { + return BeanMapperUtil.convertToEntityState(result); + } catch (IOException e) { + throw new StateStoreException(e); + } } @Override diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java index 49e083c96..29b3bbb68 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java @@ -35,10 +35,11 @@ public interface DAGEngine { * Run an instance for execution. * * @param instance + * @param props * @return * @throws DAGEngineException */ - String run(ExecutionInstance instance) throws DAGEngineException; + String run(ExecutionInstance instance, Properties props) throws DAGEngineException; /** * @param instance @@ -85,9 +86,10 @@ public interface DAGEngine { * Perform dryrun of an instance. * * @param entity + * @param props * @throws DAGEngineException */ - void submit(Entity entity) throws DAGEngineException; + void submit(Entity entity, Properties props) throws DAGEngineException; /** * Returns info about the Job. diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 77cb2fa64..39aac8820 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -68,6 +68,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun"; public static final String FALCON_RERUN = "falcon.system.rerun"; + public static final String FALCON_SKIP_DRYRUN = "falcon.system.skip.dryrun"; private enum JobAction { KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS @@ -84,13 +85,24 @@ public boolean isAlive(Cluster cluster) throws FalconException { } @Override - public void schedule(Entity entity, Boolean skipDryRun, Map properties) throws FalconException { - EXECUTION_SERVICE.schedule(entity); + public void schedule(Entity entity, Boolean skipDryRun, Map suppliedProps) throws FalconException { + Properties props = new Properties(); + if (suppliedProps != null && !suppliedProps.isEmpty()) { + props.putAll(suppliedProps); + } + if (skipDryRun) { + props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true"); + } + EXECUTION_SERVICE.schedule(entity, props); } @Override public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException { - DAGEngineFactory.getDAGEngine(clusterName).submit(entity); + Properties props = new Properties(); + if (skipDryRun) { + props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true"); + } + DAGEngineFactory.getDAGEngine(clusterName).submit(entity, props); } @Override diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java index b5a350031..ff8bf3734 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.exception.DAGEngineException; import org.apache.falcon.execution.ExecutionInstance; import org.apache.falcon.hadoop.HadoopClientFactory; @@ -43,8 +44,6 @@ import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,10 +94,15 @@ public OozieDAGEngine(String clusterName) throws DAGEngineException { } @Override - public String run(ExecutionInstance instance) throws DAGEngineException { + public String run(ExecutionInstance instance, Properties props) throws DAGEngineException { try { - Properties properties = getRunProperties(instance); + OozieOrchestrationWorkflowBuilder builder = + OozieOrchestrationWorkflowBuilder.get(instance.getEntity(), cluster, Tag.DEFAULT, + OozieOrchestrationWorkflowBuilder.Scheduler.NATIVE); + prepareEntityBuildPath(instance.getEntity()); Path buildPath = EntityUtil.getLatestStagingPath(cluster, instance.getEntity()); + builder.setNominalTime(instance.getInstanceTime()); + Properties properties = builder.build(cluster, buildPath, props); switchUserTo(instance.getEntity().getACL().getOwner()); properties.setProperty(OozieClient.USER_NAME, instance.getEntity().getACL().getOwner()); properties.setProperty(OozieClient.APP_PATH, buildPath.toString()); @@ -141,7 +145,6 @@ private void dryRunInternal(Properties properties, Path buildPath, Entity entity switchUserTo(entity.getACL().getOwner()); properties.setProperty(OozieClient.USER_NAME, entity.getACL().getOwner()); properties.setProperty(OozieClient.APP_PATH, buildPath.toString()); - properties.putAll(getDryRunProperties(entity)); //Do dryrun before run as run is asynchronous LOG.info("Dry run with properties {}", properties); client.dryrun(properties); @@ -161,42 +164,6 @@ public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException } } - // TODO : To be implemented. Currently hardcoded for process - private Properties getRunProperties(ExecutionInstance instance) { - Properties props = new Properties(); - DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); - String nominalTime = fmt.print(instance.getInstanceTime()); - props.put("nominalTime", nominalTime); - props.put("timeStamp", nominalTime); - props.put("feedNames", "NONE"); - props.put("feedInstancePaths", "NONE"); - props.put("falconInputFeeds", "NONE"); - props.put("falconInPaths", "NONE"); - props.put("feedNames", "NONE"); - props.put("feedInstancePaths", "NONE"); - props.put("userJMSNotificationEnabled", "true"); - props.put("systemJMSNotificationEnabled", "false"); - return props; - } - - // TODO : To be implemented. Currently hardcoded for process - private Properties getDryRunProperties(Entity entity) { - Properties props = new Properties(); - DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); - String nominalTime = fmt.print(DateTime.now()); - props.put("nominalTime", nominalTime); - props.put("timeStamp", nominalTime); - props.put("feedNames", "NONE"); - props.put("feedInstancePaths", "NONE"); - props.put("falconInputFeeds", "NONE"); - props.put("falconInPaths", "NONE"); - props.put("feedNames", "NONE"); - props.put("feedInstancePaths", "NONE"); - props.put("userJMSNotificationEnabled", "true"); - props.put("systemJMSNotificationEnabled", "false"); - return props; - } - @Override public void suspend(ExecutionInstance instance) throws DAGEngineException { try { @@ -212,6 +179,7 @@ public void suspend(ExecutionInstance instance) throws DAGEngineException { @Override public void resume(ExecutionInstance instance) throws DAGEngineException { + switchUserTo(instance.getEntity().getACL().getOwner()); try { client.resume(instance.getExternalID()); assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED, @@ -237,6 +205,7 @@ public void kill(ExecutionInstance instance) throws DAGEngineException { @Override public void reRun(ExecutionInstance instance, Properties props, boolean isForced) throws DAGEngineException { + switchUserTo(instance.getEntity().getACL().getOwner()); String jobId = instance.getExternalID(); try { WorkflowJob jobInfo = client.getJobInfo(jobId); @@ -274,15 +243,28 @@ public void reRun(ExecutionInstance instance, Properties props, boolean isForced } @Override - public void submit(Entity entity) throws DAGEngineException { + public void submit(Entity entity, Properties props) throws DAGEngineException { try { // TODO : remove hardcoded Tag value when feed support is added. OozieOrchestrationWorkflowBuilder builder = - OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT); + OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT, + OozieOrchestrationWorkflowBuilder.Scheduler.NATIVE); prepareEntityBuildPath(entity); Path buildPath = EntityUtil.getNewStagingPath(cluster, entity); - Properties properties = builder.build(cluster, buildPath); - dryRunInternal(properties, buildPath, entity); + org.apache.falcon.entity.v0.process.Process process = (Process) entity; + builder.setNominalTime(new DateTime(process.getClusters().getClusters().get(0).getValidity().getStart())); + Properties properties = builder.build(cluster, buildPath, props); + boolean skipDryRun = false; + if (props != null && !props.isEmpty() && props.containsKey(FalconWorkflowEngine.FALCON_SKIP_DRYRUN)) { + Boolean skipDryRunprop = Boolean + .parseBoolean(props.getProperty(FalconWorkflowEngine.FALCON_SKIP_DRYRUN)); + if (skipDryRunprop != null) { + skipDryRun = skipDryRunprop; + } + } + if (!skipDryRun) { + dryRunInternal(properties, buildPath, entity); + } } catch (OozieClientException e) { LOG.error("Oozie client exception:", e); throw new DAGEngineException(e); @@ -407,7 +389,8 @@ public void touch(Entity entity, Boolean skipDryRun) throws DAGEngineException { // TODO : remove hardcoded Tag value when feed support is added. try { OozieOrchestrationWorkflowBuilder builder = - OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT); + OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT, + OozieOrchestrationWorkflowBuilder.Scheduler.NATIVE); if (!skipDryRun) { Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis()); Properties props = builder.build(cluster, buildPath); diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index d08f7d40b..042c9fd9d 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -361,7 +361,7 @@ public void testTimeOut() throws Exception { } // Non-triggering event should not create an instance - @Test + @Test(enabled = false) public void testNonTriggeringEvents() throws Exception { storeEntity(EntityType.PROCESS, "summarize6"); Process process = getStore().get(EntityType.PROCESS, "summarize6"); @@ -611,12 +611,13 @@ private Event createEvent(NotificationServicesRegistry.SERVICE type, Process pro } } - private Event createEvent(NotificationServicesRegistry.SERVICE type, ExecutionInstance instance) { + private Event createEvent(NotificationServicesRegistry.SERVICE type, + ExecutionInstance instance) throws IOException { ID id = new InstanceID(instance); switch (type) { case DATA: DataEvent dataEvent = new DataEvent(id, - new ArrayList(Arrays.asList(new Path("/projects/falcon/clicks"))), + new ArrayList(Arrays.asList(new Path("/projects/falcon/clicks/_SUCCESS"))), DataEvent.STATUS.AVAILABLE); return dataEvent; case JOB_SCHEDULE: diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java index c99f3fd2a..008d2fe98 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java @@ -40,7 +40,7 @@ public MockDAGEngine(String cluster) { } @Override - public String run(ExecutionInstance instance) throws DAGEngineException { + public String run(ExecutionInstance instance, Properties props) throws DAGEngineException { if (failInstances.contains(instance)) { throw new DAGEngineException("Mock failure."); } @@ -86,7 +86,7 @@ public void reRun(ExecutionInstance instance, Properties props, boolean isForced } @Override - public void submit(Entity entity) throws DAGEngineException { + public void submit(Entity entity, Properties props) throws DAGEngineException { } diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java index 34965f232..a8e31bc69 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java @@ -69,7 +69,7 @@ public void testbackLogCatchup() throws Exception { timeService.register(request.build()); // Asynchronous execution, hence a small wait. - Thread.sleep(1000); + Thread.sleep(2000); // Based on the minute boundary, there might be 3. Mockito.verify(handler, Mockito.atLeast(2)).onEvent(Mockito.any(Event.class)); diff --git a/scheduler/src/test/resources/runtime.properties b/scheduler/src/test/resources/runtime.properties new file mode 100644 index 000000000..d3260dc35 --- /dev/null +++ b/scheduler/src/test/resources/runtime.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +*.domain=debug + +*.falcon.scheduler.minutely.process.polling.frequency.millis=1000 +*.falcon.scheduler.hourly.process.polling.frequency.millis=1000 +*.falcon.scheduler.daily.process.polling.frequency.millis=1000 +*.falcon.scheduler.monthly.process.polling.frequency.millis=1000 diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties index 2fb148b99..013ac1881 100644 --- a/src/conf/runtime.properties +++ b/src/conf/runtime.properties @@ -75,3 +75,14 @@ falcon.current.colo=local *.falcon.service.ProxyUserService.proxyuser.#USER#.groups=* ######### Proxyuser Configuration End ######### + +######### Scheduler Configuration Start ####### +## Polling frequencies for processes based on frequency. +#*.falcon.scheduler.minutely.process.polling.frequency.millis= +#*.falcon.scheduler.hourly.process.polling.frequency.millis= +#*.falcon.scheduler.daily.process.polling.frequency.millis= +#*.falcon.scheduler.monthly.process.polling.frequency.millis= +######### Scheduler Configuration End ####### + +### Timeout factor for processes ### +instance.timeout.factor=5 diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 70e1de94c..fab794c79 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -342,7 +342,7 @@ protected long waitFor(int timeout, Predicate predicate) { LOG.info("Waiting up to [{}] msec", waiting); lastEcho = System.currentTimeMillis(); } - Thread.sleep(5000); + Thread.sleep(7000); } if (!predicate.evaluate()) { LOG.info("Waiting timed out after [{}] msec", timeout); diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java index 175833a29..639bd458b 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java @@ -57,6 +57,7 @@ public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase { private static final String IT_RUN_MODE = "it.run.mode"; public static final String PROCESS_TEMPLATE = "/local-process-noinputs-template.xml"; + public static final String PROCESS_TEMPLATE_NOLATE_DATA = "/process-nolatedata-template.xml"; public static final String PROCESS_NAME = "processName"; protected static final String START_INSTANCE = "2012-04-20T00:00Z"; private static FalconJPAService falconJPAService = FalconJPAService.get(); @@ -107,12 +108,14 @@ protected void scheduleProcess(String processName, String cluster, } protected void setupProcessExecution(UnitTestContext context, - Map overlay, int numInstances) throws Exception { + Map overlay, int numInstances, + String processTemplate) throws Exception { String colo = overlay.get(COLO); String cluster = overlay.get(CLUSTER); submitCluster(colo, cluster, null); + submitFeeds(overlay); context.prepare(); - submitProcess(overlay); + submitProcess(processTemplate, overlay); String processName = overlay.get(PROCESS_NAME); scheduleProcess(processName, cluster, START_INSTANCE, numInstances); diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java index b06725fb5..54170b4c6 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java @@ -48,8 +48,9 @@ public void testProcessInstanceExecution() throws Exception { String cluster = overlay.get(CLUSTER); submitCluster(colo, cluster, null); + submitFeeds(overlay); context.prepare(HELLO_WORLD_WORKFLOW); - submitProcess(overlay); + submitProcess(PROCESS_TEMPLATE_NOLATE_DATA, overlay); String processName = overlay.get(PROCESS_NAME); scheduleProcess(processName, cluster, START_INSTANCE, 1); @@ -68,7 +69,7 @@ public void testKillAndRerunInstances() throws Exception { UnitTestContext context = new UnitTestContext(); Map overlay = context.getUniqueOverlay(); - setupProcessExecution(context, overlay, 1); + setupProcessExecution(context, overlay, 1, PROCESS_TEMPLATE); String processName = overlay.get(PROCESS_NAME); String colo = overlay.get(COLO); @@ -102,7 +103,7 @@ public void testSuspendResumeInstances() throws Exception { UnitTestContext context = new UnitTestContext(); Map overlay = context.getUniqueOverlay(); - setupProcessExecution(context, overlay, 1); + setupProcessExecution(context, overlay, 1, PROCESS_TEMPLATE_NOLATE_DATA); String processName = overlay.get(PROCESS_NAME); String colo = overlay.get(COLO); @@ -129,7 +130,7 @@ public void testListInstances() throws Exception { UnitTestContext context = new UnitTestContext(); Map overlay = context.getUniqueOverlay(); - setupProcessExecution(context, overlay, 4); + setupProcessExecution(context, overlay, 4, PROCESS_TEMPLATE); String processName = overlay.get(PROCESS_NAME); String colo = overlay.get(COLO); @@ -150,7 +151,7 @@ public void testInstanceSummary() throws Exception { UnitTestContext context = new UnitTestContext(); Map overlay = context.getUniqueOverlay(); - setupProcessExecution(context, overlay, 3); + setupProcessExecution(context, overlay, 3, PROCESS_TEMPLATE); String processName = overlay.get(PROCESS_NAME); String colo = overlay.get(COLO); @@ -168,4 +169,28 @@ public void testInstanceSummary() throws Exception { Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("RUNNING").longValue(), 2L); Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("READY").longValue(), 1L); } + + @Test + public void testProcessWithInputs() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map overlay = context.getUniqueOverlay(); + String colo = overlay.get(COLO); + String cluster = overlay.get(CLUSTER); + + submitCluster(colo, cluster, null); + submitFeeds(overlay); + context.prepare(HELLO_WORLD_WORKFLOW); + + submitProcess(PROCESS_TEMPLATE_NOLATE_DATA, overlay); + + String processName = overlay.get(PROCESS_NAME); + scheduleProcess(processName, cluster, START_INSTANCE, 1); + + waitForStatus(EntityType.PROCESS.toString(), processName, + START_INSTANCE, InstancesResult.WorkflowStatus.SUCCEEDED); + + InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), + processName, START_INSTANCE); + Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED); + } } diff --git a/webapp/src/test/resources/process-nolatedata-template.xml b/webapp/src/test/resources/process-nolatedata-template.xml new file mode 100644 index 000000000..4498a22f0 --- /dev/null +++ b/webapp/src/test/resources/process-nolatedata-template.xml @@ -0,0 +1,50 @@ + + + + + consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting + testPipeline,dataReplicationPipeline + + + + + + + 1 + FIFO + days(1) + UTC + + + + + + + + + + + + + + + + + +