From d9c3e4008ddeaf34a581c0c84534941611346823 Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Sat, 12 Mar 2016 17:06:29 +0530 Subject: [PATCH 1/8] falcon_scheduler WIP --- common/pom.xml | 41 ++++++ .../falcon/persistence}/EntityBean.java | 2 +- .../falcon/persistence}/InstanceBean.java | 2 +- .../persistence/MonitoredFeedsBean.java | 57 ++++++++ .../persistence/PendingInstanceBean.java | 71 +++++++++ .../falcon}/service/FalconJPAService.java | 7 +- .../falcon/tools/FalconStateStoreDBCLI.java | 8 +- .../main/resources/META-INF/persistence.xml | 28 ++-- common/src/main/resources/startup.properties | 5 +- .../src/main/resources/statestore.credentials | 4 +- .../src/main/resources/statestore.properties | 20 ++- .../site/twiki/FalconNativeScheduler.twiki | 2 +- .../falcon/jdbc/MonitoringJdbcStateStore.java | 136 ++++++++++++++++++ .../service/FeedSLAMonitoringService.java | 83 ++++++----- .../jdbc/MonitoringJdbcStateStoreTest.java | 99 +++++++++++++ .../falcon/service/FeedSLAMonitoringTest.java | 66 ++++----- scheduler/pom.xml | 42 +----- .../state/store/jdbc/BeanMapperUtil.java | 2 + .../state/store/jdbc/JDBCStateStore.java | 4 +- .../execution/FalconExecutionServiceTest.java | 2 +- .../state/AbstractSchedulerTestBase.java | 2 +- .../state/service/TestFalconJPAService.java | 2 +- .../service/store/TestJDBCStateStore.java | 2 +- .../src/test/resources/startup.properties | 2 +- src/build/findbugs-exclude.xml | 4 +- src/conf/startup.properties | 2 +- .../AbstractSchedulerManagerJerseyIT.java | 2 +- webapp/src/test/resources/startup.properties | 2 +- 28 files changed, 556 insertions(+), 143 deletions(-) rename {scheduler/src/main/java/org/apache/falcon/state/store/jdbc => common/src/main/java/org/apache/falcon/persistence}/EntityBean.java (98%) rename {scheduler/src/main/java/org/apache/falcon/state/store/jdbc => common/src/main/java/org/apache/falcon/persistence}/InstanceBean.java (99%) create mode 100644 common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java create mode 100644 common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java rename {scheduler/src/main/java/org/apache/falcon/state/store => common/src/main/java/org/apache/falcon}/service/FalconJPAService.java (97%) rename {scheduler => common}/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java (98%) rename {scheduler => common}/src/main/resources/META-INF/persistence.xml (77%) create mode 100644 prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java create mode 100644 prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java diff --git a/common/pom.xml b/common/pom.xml index 2e01282f6..725c9a8e3 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -187,6 +187,26 @@ com.thinkaurelius.titan titan-berkeleyje-jre6 + + + org.apache.openjpa + openjpa-jdbc + ${openjpa.version} + compile + + + + org.apache.openjpa + openjpa-persistence-jdbc + ${openjpa.version} + compile + + + + javax.validation + validation-api + ${javax-validation.version} + @@ -216,6 +236,27 @@ + + org.apache.maven.plugins + maven-antrun-plugin + 1.8 + + + process-classes + + + + + + + + + + run + + + + diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java similarity index 98% rename from scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java rename to common/src/main/java/org/apache/falcon/persistence/EntityBean.java index 37fb0cb14..5c94fa46a 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.falcon.state.store.jdbc; +package org.apache.falcon.persistence; import org.apache.openjpa.persistence.jdbc.Index; diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java similarity index 99% rename from scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java rename to common/src/main/java/org/apache/falcon/persistence/InstanceBean.java index dffb116d9..b7e10f119 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.falcon.state.store.jdbc; +package org.apache.falcon.persistence; import org.apache.openjpa.persistence.jdbc.ForeignKey; import org.apache.openjpa.persistence.jdbc.ForeignKeyAction; diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java new file mode 100644 index 000000000..8d972c8e7 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import javax.persistence.*; +import javax.validation.constraints.NotNull; + + +@Entity +@NamedQueries({ + @NamedQuery(name = "GET_MONITERED_INSTANCE", query = "select OBJECT(a) from MonitoredFeedsBean a where a.feedName = :feedName"), + @NamedQuery(name = "DELETE_MONITORED_INSTANCES", query = "delete from MonitoredFeedsBean a where a.feedName = :feedName"), + @NamedQuery(name = "GET_ALL_MONITORING_FEEDS", query = "select OBJECT(a) from MonitoredFeedsBean a") +}) +@Table(name="MONITORED_FEEDS") +public class MonitoredFeedsBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Column(name = "feed_name") + private String feedName; + + public String getFeedName() { + return feedName; + } + + public void setFeedName(String feedName) { + this.feedName = feedName; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java new file mode 100644 index 000000000..544858118 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -0,0 +1,71 @@ +package org.apache.falcon.persistence; + +import javax.persistence.*; +import javax.validation.constraints.NotNull; +import java.util.Date; + +/** + * Created by praveen on 8/3/16. + */ +@Entity +@NamedQueries({ + @NamedQuery(name = "GET_PENDING_INSTANCES", query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), + @NamedQuery(name = "DELETE_PENDING_NOMINAL_INSTANCES", query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), + @NamedQuery(name = "DELETE_ALL_PENDING_INSTANCES", query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name = "GET_DATE_FOR_PENDING_INSTANCES", query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name="GET_ALL_PENDING_INSTANCES" ,query = "select OBJECT(a) from PendingInstanceBean a ") +}) +@Table(name = "PENDING_INSTANCES") +public class PendingInstanceBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Column(name = "feed_name") + private String feedName; + + @Basic + @NotNull + @Column(name = "cluster_name") + private String clusterName; + + @Basic + @NotNull + @Column(name = "nominal_time") + private Date nominalTime; + + public Date getNominalTime() { + return nominalTime; + } + + public void setNominalTime(Date nominalTime) { + this.nominalTime = nominalTime; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getFeedName() { + return feedName; + } + + public void setFeedName(String feedName) { + this.feedName = feedName; + } +} diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java similarity index 97% rename from scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java rename to common/src/main/java/org/apache/falcon/service/FalconJPAService.java index 027a8efe6..73fde33ec 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java +++ b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java @@ -15,13 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.falcon.state.store.service; +package org.apache.falcon.service; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; -import org.apache.falcon.service.FalconService; -import org.apache.falcon.state.store.jdbc.EntityBean; -import org.apache.falcon.state.store.jdbc.InstanceBean; +import org.apache.falcon.persistence.EntityBean; +import org.apache.falcon.persistence.InstanceBean; import org.apache.falcon.util.StateStoreProperties; import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; import org.slf4j.Logger; diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java similarity index 98% rename from scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java rename to common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java index 6de9f7d7a..df8194c47 100644 --- a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -22,7 +22,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.falcon.cli.CLIParser; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.util.BuildProperties; import org.apache.falcon.util.StateStoreProperties; @@ -241,8 +241,10 @@ private String[] createMappingToolArguments(String sqlFile) throws Exception { } args.add("-indexes"); args.add("true"); - args.add("org.apache.falcon.state.store.jdbc.EntityBean"); - args.add("org.apache.falcon.state.store.jdbc.InstanceBean"); + args.add("org.apache.falcon.persistence.EntityBean"); + args.add("org.apache.falcon.persistence.InstanceBean"); + args.add("org.apache.falcon.persistence.PendingInstanceBean"); + args.add("org.apache.falcon.persistence.MonitoredFeedsBean"); return args.toArray(new String[args.size()]); } diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml similarity index 77% rename from scheduler/src/main/resources/META-INF/persistence.xml rename to common/src/main/resources/META-INF/persistence.xml index c2ef794da..206bb0508 100644 --- a/scheduler/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -23,8 +23,10 @@ org.apache.openjpa.persistence.PersistenceProviderImpl - org.apache.falcon.state.store.jdbc.EntityBean - org.apache.falcon.state.store.jdbc.InstanceBean + org.apache.falcon.persistence.EntityBean + org.apache.falcon.persistence.InstanceBean + org.apache.falcon.persistence.PendingInstanceBean + org.apache.falcon.persistence.MonitoredFeedsBean @@ -32,8 +34,9 @@ + value="jpa(Types=org.apache.falcon.persistence.EntityBean; + org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; + org.apache.falcon.persistence.MonitoredFeedsBean)"> @@ -50,8 +53,10 @@ org.apache.openjpa.persistence.PersistenceProviderImpl - org.apache.falcon.state.store.jdbc.EntityBean - org.apache.falcon.state.store.jdbc.InstanceBean + org.apache.falcon.persistence.EntityBean + org.apache.falcon.persistence.InstanceBean + org.apache.falcon.persistence.PendingInstanceBean + org.apache.falcon.persistence.MonitoredFeedsBean @@ -60,7 +65,8 @@ + org.apache.falcon.state.store.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; + org.apache.falcon.persistence.MonitoredFeedsBean)"> @@ -77,8 +83,9 @@ org.apache.openjpa.persistence.PersistenceProviderImpl - org.apache.falcon.state.store.jdbc.EntityBean - org.apache.falcon.state.store.jdbc.InstanceBean + org.apache.falcon.persistence.EntityBean + org.apache.falcon.persistence.InstanceBean + org.apache.falcon.persistence.MonitoredFeedsBean @@ -87,7 +94,8 @@ + org.apache.falcon.state.store.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; + org.apache.falcon.persistence.MonitoredFeedsBean)"> diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 123d63c6d..c7b026864 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -43,14 +43,15 @@ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ org.apache.falcon.service.ProxyUserService,\ - org.apache.falcon.adfservice.ADFProviderService + org.apache.falcon.adfservice.ADFProviderService,\ + org.apache.falcon.service.FalconJPAService ## If you wish to use Falcon native scheduler add the commented out services below to application.services ## # org.apache.falcon.notification.service.impl.JobCompletionService,\ # org.apache.falcon.notification.service.impl.SchedulerService,\ # org.apache.falcon.notification.service.impl.AlarmService,\ # org.apache.falcon.notification.service.impl.DataAvailabilityService,\ # org.apache.falcon.execution.FalconExecutionService,\ -# org.apache.falcon.state.store.service.FalconJPAService + # List of Lifecycle policies configured. diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials index 86c32a19c..b0e419686 100644 --- a/common/src/main/resources/statestore.credentials +++ b/common/src/main/resources/statestore.credentials @@ -18,5 +18,5 @@ ######### StateStore Credentials ##### -#*.falcon.statestore.jdbc.username=sa -#*.falcon.statestore.jdbc.password= \ No newline at end of file +*.falcon.statestore.jdbc.username=sa +*.falcon.statestore.jdbc.password= \ No newline at end of file diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties index 44e79b353..768642636 100644 --- a/common/src/main/resources/statestore.properties +++ b/common/src/main/resources/statestore.properties @@ -42,4 +42,22 @@ ## Creates Falcon DB. ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. -#*.falcon.statestore.create.db.schema=true \ No newline at end of file +#*.falcon.statestore.create.db.schema=true + + +######## StateStore Properties ##### +*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore +*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver +*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true +*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource +# Maximum number of active connections that can be allocated from this pool at the same time. +*.falcon.statestore.pool.max.active.conn=10 +*.falcon.statestore.connection.properties= +# Indicates the interval (in milliseconds) between eviction runs. +*.falcon.statestore.validate.db.connection.eviction.interval=300000 +# The number of objects to examine during each run of the idle object evictor thread. +*.falcon.statestore.validate.db.connection.eviction.num=10 +# Creates Falcon DB. +# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. +# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. +*.falcon.statestore.create.db.schema=true \ No newline at end of file diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki index 9ffc5e9fe..1f5173941 100644 --- a/docs/src/site/twiki/FalconNativeScheduler.twiki +++ b/docs/src/site/twiki/FalconNativeScheduler.twiki @@ -29,7 +29,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.service.FeedSLAMonitoringService,\ org.apache.falcon.service.LifecyclePolicyMap,\ - org.apache.falcon.state.store.service.FalconJPAService,\ + org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java new file mode 100644 index 000000000..aab89ce71 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -0,0 +1,136 @@ +package org.apache.falcon.jdbc; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.falcon.persistence.EntityBean; +import org.apache.falcon.persistence.MonitoredFeedsBean; +import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.service.FalconJPAService; +import org.apache.openjpa.persistence.OpenJPAEntityManager; +import org.apache.openjpa.persistence.OpenJPAPersistence; + +import javax.persistence.*; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +/** + * Created by praveen on 8/3/16. + */ +public class MonitoringJdbcStateStore { + + private EntityManager getEntityManager() { + return FalconJPAService.get().getEntityManager(); + } + + + public void putMonitoredFeed (String feedName){ + MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean(); + monitoredFeedsBean.setFeedName(feedName); + + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + entityManager.persist(monitoredFeedsBean); + commitAndCloseTransaction(entityManager); + } + + public MonitoredFeedsBean getMonitoredFeed(String feedName){ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_MONITERED_INSTANCE"); + q.setParameter("feedName", feedName); + List result = q.getResultList(); + if (result.isEmpty()) { + return null; + } + entityManager.close(); + return ((MonitoredFeedsBean)result.get(0)); + } + + public void deleteMonitoringFeed (String feedName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_MONITORED_INSTANCES"); + q.setParameter("feedName", feedName); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + public List getAllMonitoredFeed(){ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_ALL_MONITORING_FEEDS"); + List result = q.getResultList(); + if (result.isEmpty()) { + return null; + } + entityManager.close(); + return result; + } + + public void deletePendingNominalInstances (String feedName, String clusterName ,Date nominalTime){ + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_PENDING_NOMINAL_INSTANCES"); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + q.setParameter("nominalTime",nominalTime); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + public void deletePendingInstances (String feedName, String clusterName ){ + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_ALL_PENDING_INSTANCES"); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + public void putPendingInstances (String feed,String clusterName ,Date nominalTime){ + PendingInstanceBean pendingInstanceBean = new PendingInstanceBean(); + pendingInstanceBean.setFeedName(feed); + pendingInstanceBean.setClusterName(clusterName); + pendingInstanceBean.setNominalTime(nominalTime); + + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + entityManager.persist(pendingInstanceBean); + commitAndCloseTransaction(entityManager); + } + + public List getNominalInstances(String feedName,String clusterName){ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_DATE_FOR_PENDING_INSTANCES"); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + List result = q.getResultList(); + if (CollectionUtils.isEmpty(result)) { + return null; + } + entityManager.close(); + return result; + } + public List getAllInstances(){ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_ALL_PENDING_INSTANCES"); + List result = q.getResultList(); + if (CollectionUtils.isEmpty(result)) { + return null; + } + entityManager.close(); + return result; + + + } + + private void commitAndCloseTransaction(EntityManager entityManager) { + entityManager.getTransaction().commit(); + entityManager.close(); + } + + private void beginTransaction(EntityManager entityManager) { + entityManager.getTransaction().begin(); + } + +} diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 29bd7ba6c..c5edb5bb1 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.service; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; @@ -31,6 +32,9 @@ import org.apache.falcon.entity.v0.feed.Sla; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.apache.falcon.persistence.MonitoredFeedsBean; +import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.StartupProperties; @@ -47,13 +51,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -66,6 +64,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); + private static final MonitoringJdbcStateStore monitoringJdbc = new MonitoringJdbcStateStore(); + private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000); private static final int ONE_MS = 1; @@ -97,7 +97,7 @@ public static FeedSLAMonitoringService get() { * Map, Set to store * each missing instance of a feed. */ - protected Map, BlockingQueue> pendingInstances; + protected Map, BlockingQueue> pendingInstances; /** @@ -156,7 +156,7 @@ public void onAdd(Entity entity) throws FalconException { if (currentClusters.contains(cluster.getName())) { if (FeedHelper.getSLA(cluster, feed) != null) { LOG.debug("Adding feed:{} for monitoring", feed.getName()); - monitoredFeeds.add(feed.getName()); + monitoringJdbc.putMonitoredFeed(feed.getName()); } } } @@ -173,8 +173,8 @@ public void onRemove(Entity entity) throws FalconException { Set currentClusters = DeploymentUtil.getCurrentClusters(); for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { - monitoredFeeds.remove(feed.getName()); - pendingInstances.remove(new Pair<>(feed.getName(), cluster.getName())); + monitoringJdbc.deleteMonitoringFeed(feed.getName()); + monitoringJdbc.deletePendingInstances(feed.getName(), cluster.getName()); } } } @@ -212,7 +212,7 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException } for (String clusterName : slaRemovedClusters) { - pendingInstances.remove(new Pair<>(newFeed.getName(), clusterName)); + monitoringJdbc.deletePendingInstances(newFeed.getName(), clusterName); } } } @@ -269,10 +269,9 @@ public void destroy() throws FalconException { public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) { LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName, clusterName, nominalTime); - Pair feedCluster = new Pair<>(feedName, clusterName); // Slas for feeds not having sla tag are not stored. - if (pendingInstances.get(feedCluster) != null) { - pendingInstances.get(feedCluster).remove(nominalTime); + if(CollectionUtils.isEmpty(monitoringJdbc.getNominalInstances(feedName,clusterName))){ + monitoringJdbc.deletePendingNominalInstances(feedName,clusterName,nominalTime); } } @@ -296,7 +295,7 @@ private class Monitor implements Runnable { @Override public void run() { try { - if (!monitoredFeeds.isEmpty()) { + if (monitoringJdbc.getAllMonitoredFeed().size() > 0) { checkPendingInstanceAvailability(); // add Instances from last checked time to 10 minutes from now(some buffer for status check) @@ -320,14 +319,17 @@ public void run() { void addNewPendingFeedInstances(Date from, Date to) throws FalconException { Set currentClusters = DeploymentUtil.getCurrentClusters(); - for (String feedName : monitoredFeeds) { + List feedsBeanList = monitoringJdbc.getAllMonitoredFeed(); + for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) { + String feedName = monitoredFeedsBean.getFeedName(); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); for (Cluster feedCluster : feed.getClusters().getClusters()) { if (currentClusters.contains(feedCluster.getName())) { Date nextInstanceTime = from; Pair key = new Pair<>(feed.getName(), feedCluster.getName()); - BlockingQueue instances = pendingInstances.get(key); - if (instances == null) { + BlockingQueue instances = new LinkedBlockingQueue<>( + monitoringJdbc.getNominalInstances(feedName,feedCluster.getName())); + if (CollectionUtils.isEmpty(monitoringJdbc.getNominalInstances(feedName,feedCluster.getName()))) { instances = new LinkedBlockingQueue<>(queueSize); Date feedStartTime = feedCluster.getValidity().getStart(); Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, feedCluster); @@ -357,7 +359,9 @@ void addNewPendingFeedInstances(Date from, Date to) throws FalconException { nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); } - pendingInstances.put(key, instances); + for(Date date:instances){ + monitoringJdbc.putPendingInstances(feed.getName(), feedCluster.getName(),date); + } } } } @@ -368,11 +372,14 @@ void addNewPendingFeedInstances(Date from, Date to) throws FalconException { * Checks the availability of all the pendingInstances and removes the ones which have become available. */ private void checkPendingInstanceAvailability() throws FalconException { - for (Map.Entry, BlockingQueue> entry: pendingInstances.entrySet()) { - for (Date date : entry.getValue()) { - boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date); + for(PendingInstanceBean pendingInstanceBean : monitoringJdbc.getAllInstances()){ + for (Date date : monitoringJdbc.getNominalInstances(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName())) { + boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName(),date); if (status) { - pendingInstances.get(entry.getKey()).remove(date); + monitoringJdbc.deletePendingNominalInstances(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName(),date); } } } @@ -425,10 +432,11 @@ private void serializeState() throws FalconException{ } @SuppressWarnings("unchecked") + //Need to check this method private void deserialize(Path path) throws FalconException { try { Map state = deserializeInternal(path); - pendingInstances = new ConcurrentHashMap<>(); +// pendingInstances = new ConcurrentHashMap<>(); Map, BlockingQueue> pendingInstancesCopy = (Map, BlockingQueue>) state.get("pendingInstances"); // queue size can change during restarts, hence copy @@ -446,11 +454,13 @@ private void deserialize(Path path) throws FalconException { LOG.debug("Deserialization Adding: key={} to ={}", entry.getKey(), instance); value.add(instance); } - pendingInstances.put(entry.getKey(), value); + for(Date date:value){ + monitoringJdbc.putPendingInstances(entry.getKey().first, entry.getKey().second,date); + } } lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); lastSerializedAt = new Date((Long) state.get("lastSerializedAt")); - monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities. + //monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities. LOG.debug("Restored the service from old state."); } catch (IOException | ClassNotFoundException e) { throw new FalconException("Couldn't deserialize the old state", e); @@ -458,10 +468,10 @@ private void deserialize(Path path) throws FalconException { } protected void initializeService() { - pendingInstances = new ConcurrentHashMap<>(); + //pendingInstances = new ConcurrentHashMap<>(); lastCheckedAt = new Date(); lastSerializedAt = new Date(); - monitoredFeeds = new ConcurrentHashSet<>(); + //monitoredFeeds = new ConcurrentHashSet<>(); } @SuppressWarnings("unchecked") @@ -492,13 +502,18 @@ private Map deserializeInternal(Path path) throws IOException, C public Set getFeedSLAMissPendingAlerts(Date start, Date end) throws FalconException { Set result = new HashSet<>(); - for (Map.Entry, BlockingQueue> feedInstances : pendingInstances.entrySet()) { - Pair feedClusterPair = feedInstances.getKey(); + for(PendingInstanceBean pendingInstanceBean : monitoringJdbc.getAllInstances()){ + //for (Map.Entry, BlockingQueue> feedInstances : pendingInstances.entrySet()) { + Pair feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName()); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); Sla sla = FeedHelper.getSLA(cluster, feed); if (sla != null) { - Set> slaStatus = getSLAStatus(sla, start, end, feedInstances.getValue()); + Set> slaStatus = getSLAStatus(sla, start, end, + new LinkedBlockingQueue(monitoringJdbc.getNominalInstances + (pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName()))); for (Pair status : slaStatus){ SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, feedClusterPair.second, status.first, EntityType.FEED); @@ -506,6 +521,7 @@ public Set getFeedSLAMissPendingAlerts(Date start, Da result.add(instance); } } + // } } return result; } @@ -525,7 +541,8 @@ public Set getFeedSLAMissPendingAlerts(String feedNam Set result = new HashSet<>(); Pair feedClusterPair = new Pair<>(feedName, clusterName); - BlockingQueue missingInstances = pendingInstances.get(feedClusterPair); + BlockingQueue missingInstances = new LinkedBlockingQueue<>(monitoringJdbc. + getNominalInstances(feedName,clusterName)); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); Sla sla = FeedHelper.getSLA(cluster, feed); diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java new file mode 100644 index 000000000..7068793ff --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -0,0 +1,99 @@ +package org.apache.falcon.jdbc; + +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.service.FalconJPAService; +import org.apache.falcon.tools.FalconStateStoreDBCLI; +import org.apache.falcon.util.StateStoreProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.openjpa.persistence.OpenJPAEntityManager; +import org.apache.openjpa.persistence.OpenJPAPersistence; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.persistence.EntityManager; +import javax.persistence.EntityTransaction; +import java.io.File; +import java.util.Date; +import java.util.Random; + +/** + * Created by praveen on 8/3/16. + */ +public class MonitoringJdbcStateStoreTest extends AbstractTestBase { + private static final String DB_BASE_DIR = "target/test-data/falcondb"; + protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; + protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected static final String DB_UPDATE_SQL_FILE = DB_BASE_DIR + File.separator + "update.sql"; + protected LocalFileSystem fs = new LocalFileSystem(); + + private static Random randomValGenerator = new Random(); + private static FalconJPAService falconJPAService = FalconJPAService.get(); + + protected int execDBCLICommands(String[] args) { + return new FalconStateStoreDBCLI().run(args); + } + + public void createDB(String file) { + File sqlFile = new File(file); + String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; + int result = execDBCLICommands(argsCreate); + Assert.assertEquals(0, result); + Assert.assertTrue(sqlFile.exists()); + + } + + @BeforeClass + public void setup() throws Exception { + System.out.println("Executing setup"); + StateStoreProperties.get().setProperty(FalconJPAService.URL, url); + Configuration localConf = new Configuration(); + fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + fs.mkdirs(new Path(DB_BASE_DIR)); + createDB(DB_SQL_FILE); + falconJPAService.init(); + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + // registerServices(); + } + + @Test + public void testInsertRetrieveAndUpdate() throws Exception { + + MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore(); + monitoringJdbcStateStore.putMonitoredFeed("test_feed1"); + monitoringJdbcStateStore.putMonitoredFeed("test_feed2"); + Assert.assertEquals("test_feed1",monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName()); + Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(),2); + + monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1"); + monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2"); + Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); + Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); + monitoringJdbcStateStore.putPendingInstances("test_feed1","test_cluster",dateOne); + monitoringJdbcStateStore.putPendingInstances("test_feed1","test_cluster",dateTwo); + + System.out.println(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() + " abc1"); + monitoringJdbcStateStore.deletePendingNominalInstances("test_feed1","test_cluster",dateOne); + System.out.println(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() + " abc2"); + monitoringJdbcStateStore.deletePendingInstances("test_feed1","test_cluster"); + //System.out.println(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() + " abc3"); + //TODO + //check for unique contraint in the table + //monitoringJdbcStateStore.putMonitoredFeed("test_feed"); + +// System.out.println(monitoringJdbcStateStore.getMonitoredFeed("test_feed").getFeedName()); + + } + + @Test + public void testgetMonitoredFeed() throws Exception { + + } +} diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index 90eec4d78..613e21808 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -121,39 +121,39 @@ public void testOptionalEnd() throws FalconException { AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*"); } - @Test - public void testMakeFeedInstanceAvailable() { - Date instanceDate = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); - Date nextInstanceDate = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); - Pair feedCluster = new Pair<>("testFeed", "testCluster"); - - BlockingQueue missingInstances = new LinkedBlockingQueue<>(); - missingInstances.add(instanceDate); - missingInstances.add(nextInstanceDate); - - FeedSLAMonitoringService.get().initializeService(); - FeedSLAMonitoringService.get().pendingInstances.put(feedCluster, missingInstances); - FeedSLAMonitoringService.get().makeFeedInstanceAvailable("testFeed", "testCluster", instanceDate); - - Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1); - } - - @Test - public void testEndDateCheck() throws Exception { - Cluster cluster = publishCluster(); - publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC"); - Pair feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME); - - FeedSLAMonitoringService service = FeedSLAMonitoringService.get(); - service.initializeService(); - service.queueSize = 100; - service.monitoredFeeds.add(FEED_NAME); - Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); - Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z"); - service.addNewPendingFeedInstances(from, to); - // check that instances after feed's end date are not added. - Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5); - } +// @Test +// public void testMakeFeedInstanceAvailable() { +// Date instanceDate = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); +// Date nextInstanceDate = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); +// Pair feedCluster = new Pair<>("testFeed", "testCluster"); +// +// BlockingQueue missingInstances = new LinkedBlockingQueue<>(); +// missingInstances.add(instanceDate); +// missingInstances.add(nextInstanceDate); +// +// FeedSLAMonitoringService.get().initializeService(); +// FeedSLAMonitoringService.get().pendingInstances.put(feedCluster, missingInstances); +// FeedSLAMonitoringService.get().makeFeedInstanceAvailable("testFeed", "testCluster", instanceDate); +// +// Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1); +// } +// +// @Test +// public void testEndDateCheck() throws Exception { +// Cluster cluster = publishCluster(); +// publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC"); +// Pair feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME); +// +// FeedSLAMonitoringService service = FeedSLAMonitoringService.get(); +// service.initializeService(); +// service.queueSize = 100; +// service.monitoredFeeds.add(FEED_NAME); +// Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); +// Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z"); +// service.addNewPendingFeedInstances(from, to); +// // check that instances after feed's end date are not added. +// Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5); +// } private Cluster publishCluster() throws FalconException { Cluster cluster = new Cluster(); diff --git a/scheduler/pom.xml b/scheduler/pom.xml index dc006a177..6cb1c0d35 100644 --- a/scheduler/pom.xml +++ b/scheduler/pom.xml @@ -94,26 +94,6 @@ compile - - org.apache.openjpa - openjpa-jdbc - ${openjpa.version} - compile - - - - org.apache.openjpa - openjpa-persistence-jdbc - ${openjpa.version} - compile - - - - javax.validation - validation-api - ${javax-validation.version} - - org.testng testng @@ -169,27 +149,7 @@ - - org.apache.maven.plugins - maven-antrun-plugin - 1.8 - - - process-classes - - - - - - - - - - run - - - - + org.apache.maven.plugins maven-dependency-plugin diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java index 194819e85..338418689 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -27,6 +27,8 @@ import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ExecutionInstance; import org.apache.falcon.execution.ProcessExecutionInstance; +import org.apache.falcon.persistence.EntityBean; +import org.apache.falcon.persistence.InstanceBean; import org.apache.falcon.predicate.Predicate; import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index 1c072869a..d2bb8c851 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -22,6 +22,8 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.persistence.EntityBean; +import org.apache.falcon.persistence.InstanceBean; import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; @@ -30,7 +32,7 @@ import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.util.StateStoreProperties; import org.joda.time.DateTime; diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index d08f7d40b..96cef3dc1 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -45,7 +45,7 @@ import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java index 155be6990..cd9904925 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java +++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java @@ -18,7 +18,7 @@ package org.apache.falcon.state; import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.tools.FalconStateStoreDBCLI; import org.apache.falcon.util.StateStoreProperties; import org.apache.hadoop.conf.Configuration; diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java index ecd52935d..3e186dd47 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java @@ -19,7 +19,7 @@ import org.apache.falcon.FalconException; import org.apache.falcon.state.AbstractSchedulerTestBase; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java index d597e2778..bf5c14282 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -44,7 +44,7 @@ import org.apache.falcon.state.store.jdbc.BeanMapperUtil; import org.apache.falcon.state.store.jdbc.JDBCStateStore; import org.apache.falcon.state.store.StateStore; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties index 7160bb223..af9a01b57 100644 --- a/scheduler/src/test/resources/startup.properties +++ b/scheduler/src/test/resources/startup.properties @@ -41,7 +41,7 @@ org.apache.falcon.notification.service.impl.AlarmService,\ org.apache.falcon.notification.service.impl.DataAvailabilityService,\ org.apache.falcon.execution.FalconExecutionService,\ - org.apache.falcon.state.store.service.FalconJPAService + org.apache.falcon.service.FalconJPAService ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index 78f2fd009..94d0b6be6 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -38,12 +38,12 @@ - + - + diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 51a791eaa..7af2c7253 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -58,7 +58,7 @@ # org.apache.falcon.service.ProcessSubscriberService,\ # org.apache.falcon.service.FeedSLAMonitoringService,\ # org.apache.falcon.service.LifecyclePolicyMap,\ -# org.apache.falcon.state.store.service.FalconJPAService,\ +# org.apache.falcon.service.FalconJPAService,\ # org.apache.falcon.entity.store.ConfigurationStore,\ # org.apache.falcon.rerun.service.RetryService,\ # org.apache.falcon.rerun.service.LateRunService,\ diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java index 175833a29..1bd4f45b8 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java @@ -24,7 +24,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.state.AbstractSchedulerTestBase; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.unit.FalconUnitTestBase; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.util.StateStoreProperties; diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties index bc8853433..3544f0a52 100644 --- a/webapp/src/test/resources/startup.properties +++ b/webapp/src/test/resources/startup.properties @@ -32,7 +32,7 @@ *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ - org.apache.falcon.state.store.service.FalconJPAService,\ + org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ From 3ea535d185bde11e3a27ddbb358a8aa267e9cdf5 Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Wed, 16 Mar 2016 18:47:16 +0530 Subject: [PATCH 2/8] WIP --- .../falcon/persistence/MonitoredFeedsBean.java | 6 +++--- .../falcon/persistence/PendingInstanceBean.java | 10 +++++----- .../persistence/PersistenceConstants.java | 17 +++++++++++++++++ .../falcon/jdbc/MonitoringJdbcStateStore.java | 15 ++++++++------- .../service/FeedSLAMonitoringService.java | 17 ----------------- .../jdbc/MonitoringJdbcStateStoreTest.java | 16 ++-------------- 6 files changed, 35 insertions(+), 46 deletions(-) create mode 100644 common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java index 8d972c8e7..1d921cae7 100644 --- a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java @@ -23,9 +23,9 @@ @Entity @NamedQueries({ - @NamedQuery(name = "GET_MONITERED_INSTANCE", query = "select OBJECT(a) from MonitoredFeedsBean a where a.feedName = :feedName"), - @NamedQuery(name = "DELETE_MONITORED_INSTANCES", query = "delete from MonitoredFeedsBean a where a.feedName = :feedName"), - @NamedQuery(name = "GET_ALL_MONITORING_FEEDS", query = "select OBJECT(a) from MonitoredFeedsBean a") + @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from MonitoredFeedsBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) from MonitoredFeedsBean a") }) @Table(name="MONITORED_FEEDS") public class MonitoredFeedsBean { diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 544858118..c237be48d 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -9,11 +9,11 @@ */ @Entity @NamedQueries({ - @NamedQuery(name = "GET_PENDING_INSTANCES", query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), - @NamedQuery(name = "DELETE_PENDING_NOMINAL_INSTANCES", query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), - @NamedQuery(name = "DELETE_ALL_PENDING_INSTANCES", query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name = "GET_DATE_FOR_PENDING_INSTANCES", query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name="GET_ALL_PENDING_INSTANCES" ,query = "select OBJECT(a) from PendingInstanceBean a ") + @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), + @NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES, query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES ,query = "select OBJECT(a) from PendingInstanceBean a ") }) @Table(name = "PENDING_INSTANCES") public class PendingInstanceBean { diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java new file mode 100644 index 000000000..7e537a73c --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -0,0 +1,17 @@ +package org.apache.falcon.persistence; + +/** + * Created by praveen on 16/3/16. + */ +public class PersistenceConstants { + + public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE"; + public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES"; + public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS"; + public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES"; + public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES"; + public static final String DELETE_ALL_PENDING_INSTANCES = "DELETE_ALL_PENDING_INSTANCES"; + public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; + public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES"; + public static String feedName ; +} diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index aab89ce71..dd68b469c 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -4,6 +4,7 @@ import org.apache.falcon.persistence.EntityBean; import org.apache.falcon.persistence.MonitoredFeedsBean; import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.persistence.PersistenceConstants; import org.apache.falcon.service.FalconJPAService; import org.apache.openjpa.persistence.OpenJPAEntityManager; import org.apache.openjpa.persistence.OpenJPAPersistence; @@ -36,7 +37,7 @@ public void putMonitoredFeed (String feedName){ public MonitoredFeedsBean getMonitoredFeed(String feedName){ EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery("GET_MONITERED_INSTANCE"); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE); q.setParameter("feedName", feedName); List result = q.getResultList(); if (result.isEmpty()) { @@ -49,7 +50,7 @@ public MonitoredFeedsBean getMonitoredFeed(String feedName){ public void deleteMonitoringFeed (String feedName) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery("DELETE_MONITORED_INSTANCES"); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES); q.setParameter("feedName", feedName); q.executeUpdate(); commitAndCloseTransaction(entityManager); @@ -57,7 +58,7 @@ public void deleteMonitoringFeed (String feedName) { public List getAllMonitoredFeed(){ EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery("GET_ALL_MONITORING_FEEDS"); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS); List result = q.getResultList(); if (result.isEmpty()) { return null; @@ -69,7 +70,7 @@ public List getAllMonitoredFeed(){ public void deletePendingNominalInstances (String feedName, String clusterName ,Date nominalTime){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery("DELETE_PENDING_NOMINAL_INSTANCES"); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); q.setParameter("nominalTime",nominalTime); @@ -80,7 +81,7 @@ public void deletePendingNominalInstances (String feedName, String clusterName , public void deletePendingInstances (String feedName, String clusterName ){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery("DELETE_ALL_PENDING_INSTANCES"); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_PENDING_INSTANCES); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); q.executeUpdate(); @@ -101,7 +102,7 @@ public void putPendingInstances (String feed,String clusterName ,Date nominalTim public List getNominalInstances(String feedName,String clusterName){ EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery("GET_DATE_FOR_PENDING_INSTANCES"); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); List result = q.getResultList(); @@ -113,7 +114,7 @@ public List getNominalInstances(String feedName,String clusterName){ } public List getAllInstances(){ EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery("GET_ALL_PENDING_INSTANCES"); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES); List result = q.getResultList(); if (CollectionUtils.isEmpty(result)) { return null; diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index c5edb5bb1..13d93745f 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -87,19 +87,6 @@ public static FeedSLAMonitoringService get() { */ private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); - /** - * Feeds to be monitored. - */ - protected Set monitoredFeeds; - - - /** - * Map, Set to store - * each missing instance of a feed. - */ - protected Map, BlockingQueue> pendingInstances; - - /** * Used to store the last time when pending instances were checked for SLA. */ @@ -421,7 +408,6 @@ private void serializeState() throws FalconException{ Map state = new HashMap<>(); state.put("lastSerializedAt", lastSerializedAt.getTime()); state.put("lastCheckedAt", lastCheckedAt.getTime()); - state.put("pendingInstances", pendingInstances); oos.writeObject(state); fileSystem.rename(tmp, filePath); } catch (IOException e) { @@ -436,7 +422,6 @@ private void serializeState() throws FalconException{ private void deserialize(Path path) throws FalconException { try { Map state = deserializeInternal(path); -// pendingInstances = new ConcurrentHashMap<>(); Map, BlockingQueue> pendingInstancesCopy = (Map, BlockingQueue>) state.get("pendingInstances"); // queue size can change during restarts, hence copy @@ -468,10 +453,8 @@ private void deserialize(Path path) throws FalconException { } protected void initializeService() { - //pendingInstances = new ConcurrentHashMap<>(); lastCheckedAt = new Date(); lastSerializedAt = new Date(); - //monitoredFeeds = new ConcurrentHashSet<>(); } @SuppressWarnings("unchecked") diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index 7068793ff..3f9de649d 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -79,21 +79,9 @@ public void testInsertRetrieveAndUpdate() throws Exception { monitoringJdbcStateStore.putPendingInstances("test_feed1","test_cluster",dateOne); monitoringJdbcStateStore.putPendingInstances("test_feed1","test_cluster",dateTwo); - System.out.println(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() + " abc1"); + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() ,2); monitoringJdbcStateStore.deletePendingNominalInstances("test_feed1","test_cluster",dateOne); - System.out.println(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() + " abc2"); + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() ,1); monitoringJdbcStateStore.deletePendingInstances("test_feed1","test_cluster"); - //System.out.println(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() + " abc3"); - //TODO - //check for unique contraint in the table - //monitoringJdbcStateStore.putMonitoredFeed("test_feed"); - -// System.out.println(monitoringJdbcStateStore.getMonitoredFeed("test_feed").getFeedName()); - - } - - @Test - public void testgetMonitoredFeed() throws Exception { - } } From 90634f50cdde2a840dc6837650f51c40c774e3df Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Thu, 17 Mar 2016 17:01:04 +0530 Subject: [PATCH 3/8] checkstyle issues fixed --- .../persistence/MonitoredFeedsBean.java | 26 ++++++-- .../persistence/PendingInstanceBean.java | 37 ++++++++++-- .../persistence/PersistenceConstants.java | 26 ++++++-- common/src/main/resources/startup.properties | 4 +- .../falcon/jdbc/MonitoringJdbcStateStore.java | 47 +++++++++------ .../service/FeedSLAMonitoringService.java | 60 ++++++++++--------- .../jdbc/MonitoringJdbcStateStoreTest.java | 43 ++++++++----- .../falcon/service/FeedSLAMonitoringTest.java | 2 +- src/build/findbugs-exclude.xml | 21 +++++++ 9 files changed, 186 insertions(+), 80 deletions(-) diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java index 1d921cae7..2b485692f 100644 --- a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java @@ -17,17 +17,33 @@ */ package org.apache.falcon.persistence; -import javax.persistence.*; +import javax.persistence.Entity; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Column; +import javax.persistence.Basic; import javax.validation.constraints.NotNull; +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** +* The Feeds that are to be monitered will be stored in the db. +* */ @Entity @NamedQueries({ - @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from MonitoredFeedsBean a where a.feedName = :feedName"), - @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean a where a.feedName = :feedName"), - @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) from MonitoredFeedsBean a") + @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from " + + "MonitoredFeedsBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean " + + "a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) " + + "from MonitoredFeedsBean a") }) @Table(name="MONITORED_FEEDS") +//RESUME CHECKSTYLE CHECK LineLengthCheck public class MonitoredFeedsBean { @NotNull @GeneratedValue(strategy = GenerationType.AUTO) @@ -54,4 +70,4 @@ public String getId() { public void setId(String id) { this.id = id; } -} \ No newline at end of file +} diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index c237be48d..0072f5fa5 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -1,21 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.falcon.persistence; -import javax.persistence.*; +import javax.persistence.Entity; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.persistence.GenerationType; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.Basic; +import javax.persistence.Column; import javax.validation.constraints.NotNull; import java.util.Date; +//SUSPEND CHECKSTYLE CHECK LineLengthCheck /** - * Created by praveen on 8/3/16. - */ +* The instances of feed to be monitored will be stored in db. +* */ @Entity @NamedQueries({ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), @NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES, query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES ,query = "select OBJECT(a) from PendingInstanceBean a ") + @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ") }) @Table(name = "PENDING_INSTANCES") +//RESUME CHECKSTYLE CHECK LineLengthCheck public class PendingInstanceBean { @NotNull @GeneratedValue(strategy = GenerationType.AUTO) diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 7e537a73c..15b8a178f 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -1,10 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.falcon.persistence; - /** - * Created by praveen on 16/3/16. + * The name of queries to be used as constants accross the packages. */ -public class PersistenceConstants { +public final class PersistenceConstants { + private PersistenceConstants(){ + + } public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE"; public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES"; public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS"; @@ -13,5 +32,4 @@ public class PersistenceConstants { public static final String DELETE_ALL_PENDING_INSTANCES = "DELETE_ALL_PENDING_INSTANCES"; public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES"; - public static String feedName ; } diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index c7b026864..0befc8072 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -42,9 +42,7 @@ org.apache.falcon.metadata.MetadataMappingService,\ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ - org.apache.falcon.service.ProxyUserService,\ - org.apache.falcon.adfservice.ADFProviderService,\ - org.apache.falcon.service.FalconJPAService + org.apache.falcon.service.ProxyUserService ## If you wish to use Falcon native scheduler add the commented out services below to application.services ## # org.apache.falcon.notification.service.impl.JobCompletionService,\ # org.apache.falcon.notification.service.impl.SchedulerService,\ diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index dd68b469c..eeed78443 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -1,31 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.falcon.jdbc; import org.apache.commons.collections.CollectionUtils; -import org.apache.falcon.persistence.EntityBean; import org.apache.falcon.persistence.MonitoredFeedsBean; import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.persistence.PersistenceConstants; import org.apache.falcon.service.FalconJPAService; -import org.apache.openjpa.persistence.OpenJPAEntityManager; -import org.apache.openjpa.persistence.OpenJPAPersistence; -import javax.persistence.*; -import java.util.Collection; -import java.util.Collections; +import javax.persistence.EntityManager; +import javax.persistence.Query; import java.util.Date; import java.util.List; /** - * Created by praveen on 8/3/16. - */ +* StateStore for MonitoringFeeds and PendingFeedInstances. +*/ + public class MonitoringJdbcStateStore { private EntityManager getEntityManager() { return FalconJPAService.get().getEntityManager(); } - - public void putMonitoredFeed (String feedName){ + public void putMonitoredFeed(String feedName){ MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean(); monitoredFeedsBean.setFeedName(feedName); @@ -47,7 +60,7 @@ public MonitoredFeedsBean getMonitoredFeed(String feedName){ return ((MonitoredFeedsBean)result.get(0)); } - public void deleteMonitoringFeed (String feedName) { + public void deleteMonitoringFeed(String feedName) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES); @@ -67,18 +80,18 @@ public List getAllMonitoredFeed(){ return result; } - public void deletePendingNominalInstances (String feedName, String clusterName ,Date nominalTime){ + public void deletePendingNominalInstances(String feedName, String clusterName , Date nominalTime){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); - q.setParameter("nominalTime",nominalTime); + q.setParameter("nominalTime", nominalTime); q.executeUpdate(); commitAndCloseTransaction(entityManager); } - public void deletePendingInstances (String feedName, String clusterName ){ + public void deletePendingInstances(String feedName, String clusterName){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_PENDING_INSTANCES); @@ -88,7 +101,7 @@ public void deletePendingInstances (String feedName, String clusterName ){ commitAndCloseTransaction(entityManager); } - public void putPendingInstances (String feed,String clusterName ,Date nominalTime){ + public void putPendingInstances(String feed, String clusterName, Date nominalTime){ PendingInstanceBean pendingInstanceBean = new PendingInstanceBean(); pendingInstanceBean.setFeedName(feed); pendingInstanceBean.setClusterName(clusterName); @@ -100,7 +113,7 @@ public void putPendingInstances (String feed,String clusterName ,Date nominalTim commitAndCloseTransaction(entityManager); } - public List getNominalInstances(String feedName,String clusterName){ + public List getNominalInstances(String feedName, String clusterName){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); q.setParameter("feedName", feedName); @@ -121,8 +134,6 @@ public List getAllInstances(){ } entityManager.close(); return result; - - } private void commitAndCloseTransaction(EntityManager entityManager) { diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 13d93745f..58d36dde7 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,9 +50,14 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; -import java.util.*; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -64,7 +68,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); - private static final MonitoringJdbcStateStore monitoringJdbc = new MonitoringJdbcStateStore(); + private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore(); private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000); @@ -143,7 +147,7 @@ public void onAdd(Entity entity) throws FalconException { if (currentClusters.contains(cluster.getName())) { if (FeedHelper.getSLA(cluster, feed) != null) { LOG.debug("Adding feed:{} for monitoring", feed.getName()); - monitoringJdbc.putMonitoredFeed(feed.getName()); + MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName()); } } } @@ -160,8 +164,8 @@ public void onRemove(Entity entity) throws FalconException { Set currentClusters = DeploymentUtil.getCurrentClusters(); for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { - monitoringJdbc.deleteMonitoringFeed(feed.getName()); - monitoringJdbc.deletePendingInstances(feed.getName(), cluster.getName()); + MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName()); + MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName()); } } } @@ -199,7 +203,7 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException } for (String clusterName : slaRemovedClusters) { - monitoringJdbc.deletePendingInstances(newFeed.getName(), clusterName); + MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), clusterName); } } } @@ -257,8 +261,8 @@ public void makeFeedInstanceAvailable(String feedName, String clusterName, Date LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName, clusterName, nominalTime); // Slas for feeds not having sla tag are not stored. - if(CollectionUtils.isEmpty(monitoringJdbc.getNominalInstances(feedName,clusterName))){ - monitoringJdbc.deletePendingNominalInstances(feedName,clusterName,nominalTime); + if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName))){ + MONITORING_JDBC_STATE_STORE.deletePendingNominalInstances(feedName, clusterName, nominalTime); } } @@ -282,7 +286,7 @@ private class Monitor implements Runnable { @Override public void run() { try { - if (monitoringJdbc.getAllMonitoredFeed().size() > 0) { + if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) { checkPendingInstanceAvailability(); // add Instances from last checked time to 10 minutes from now(some buffer for status check) @@ -306,7 +310,7 @@ public void run() { void addNewPendingFeedInstances(Date from, Date to) throws FalconException { Set currentClusters = DeploymentUtil.getCurrentClusters(); - List feedsBeanList = monitoringJdbc.getAllMonitoredFeed(); + List feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed(); for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) { String feedName = monitoredFeedsBean.getFeedName(); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); @@ -315,8 +319,9 @@ void addNewPendingFeedInstances(Date from, Date to) throws FalconException { Date nextInstanceTime = from; Pair key = new Pair<>(feed.getName(), feedCluster.getName()); BlockingQueue instances = new LinkedBlockingQueue<>( - monitoringJdbc.getNominalInstances(feedName,feedCluster.getName())); - if (CollectionUtils.isEmpty(monitoringJdbc.getNominalInstances(feedName,feedCluster.getName()))) { + MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, feedCluster.getName())); + if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, + feedCluster.getName()))) { instances = new LinkedBlockingQueue<>(queueSize); Date feedStartTime = feedCluster.getValidity().getStart(); Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, feedCluster); @@ -347,7 +352,7 @@ void addNewPendingFeedInstances(Date from, Date to) throws FalconException { nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); } for(Date date:instances){ - monitoringJdbc.putPendingInstances(feed.getName(), feedCluster.getName(),date); + MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date); } } } @@ -359,14 +364,14 @@ void addNewPendingFeedInstances(Date from, Date to) throws FalconException { * Checks the availability of all the pendingInstances and removes the ones which have become available. */ private void checkPendingInstanceAvailability() throws FalconException { - for(PendingInstanceBean pendingInstanceBean : monitoringJdbc.getAllInstances()){ - for (Date date : monitoringJdbc.getNominalInstances(pendingInstanceBean.getFeedName(), + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ + for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName())) { boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName(),date); + pendingInstanceBean.getClusterName(), date); if (status) { - monitoringJdbc.deletePendingNominalInstances(pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName(),date); + MONITORING_JDBC_STATE_STORE.deletePendingNominalInstances(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName(), date); } } } @@ -440,7 +445,7 @@ private void deserialize(Path path) throws FalconException { value.add(instance); } for(Date date:value){ - monitoringJdbc.putPendingInstances(entry.getKey().first, entry.getKey().second,date); + MONITORING_JDBC_STATE_STORE.putPendingInstances(entry.getKey().first, entry.getKey().second, date); } } lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); @@ -485,8 +490,7 @@ private Map deserializeInternal(Path path) throws IOException, C public Set getFeedSLAMissPendingAlerts(Date start, Date end) throws FalconException { Set result = new HashSet<>(); - for(PendingInstanceBean pendingInstanceBean : monitoringJdbc.getAllInstances()){ - //for (Map.Entry, BlockingQueue> feedInstances : pendingInstances.entrySet()) { + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ Pair feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName()); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); @@ -494,9 +498,8 @@ public Set getFeedSLAMissPendingAlerts(Date start, Da Sla sla = FeedHelper.getSLA(cluster, feed); if (sla != null) { Set> slaStatus = getSLAStatus(sla, start, end, - new LinkedBlockingQueue(monitoringJdbc.getNominalInstances - (pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName()))); + new LinkedBlockingQueue(MONITORING_JDBC_STATE_STORE.getNominalInstances( + pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName()))); for (Pair status : slaStatus){ SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, feedClusterPair.second, status.first, EntityType.FEED); @@ -504,7 +507,6 @@ public Set getFeedSLAMissPendingAlerts(Date start, Da result.add(instance); } } - // } } return result; } @@ -524,8 +526,8 @@ public Set getFeedSLAMissPendingAlerts(String feedNam Set result = new HashSet<>(); Pair feedClusterPair = new Pair<>(feedName, clusterName); - BlockingQueue missingInstances = new LinkedBlockingQueue<>(monitoringJdbc. - getNominalInstances(feedName,clusterName)); + BlockingQueue missingInstances = new LinkedBlockingQueue<>(MONITORING_JDBC_STATE_STORE. + getNominalInstances(feedName, clusterName)); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); Sla sla = FeedHelper.getSLA(cluster, feed); diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index 3f9de649d..94dc2c72f 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -1,30 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.falcon.jdbc; import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.tools.FalconStateStoreDBCLI; import org.apache.falcon.util.StateStoreProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.openjpa.persistence.OpenJPAEntityManager; -import org.apache.openjpa.persistence.OpenJPAPersistence; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import javax.persistence.EntityManager; -import javax.persistence.EntityTransaction; import java.io.File; import java.util.Date; import java.util.Random; /** - * Created by praveen on 8/3/16. - */ +*Unit test for MonitoringJdbcStateStore. + * */ + public class MonitoringJdbcStateStoreTest extends AbstractTestBase { private static final String DB_BASE_DIR = "target/test-data/falcondb"; protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; @@ -69,19 +82,19 @@ public void testInsertRetrieveAndUpdate() throws Exception { MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore(); monitoringJdbcStateStore.putMonitoredFeed("test_feed1"); monitoringJdbcStateStore.putMonitoredFeed("test_feed2"); - Assert.assertEquals("test_feed1",monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName()); - Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(),2); + Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName()); + Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2); monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1"); monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2"); Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); - monitoringJdbcStateStore.putPendingInstances("test_feed1","test_cluster",dateOne); - monitoringJdbcStateStore.putPendingInstances("test_feed1","test_cluster",dateTwo); + monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne); + monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo); - Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() ,2); - monitoringJdbcStateStore.deletePendingNominalInstances("test_feed1","test_cluster",dateOne); - Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1","test_cluster").size() ,1); - monitoringJdbcStateStore.deletePendingInstances("test_feed1","test_cluster"); + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2); + monitoringJdbcStateStore.deletePendingNominalInstances("test_feed1", "test_cluster", dateOne); + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1); + monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster"); } } diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index 613e21808..099e9d333 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -137,7 +137,7 @@ public void testOptionalEnd() throws FalconException { // // Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1); // } -// + // @Test // public void testEndDateCheck() throws Exception { // Cluster cluster = publishCluster(); diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index 94d0b6be6..395a56db3 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -47,6 +47,27 @@ + + + + + + + + + + + + + + + + + + + + + From 74672e9b3cee19a59bec49b1bdcdfa201770f47a Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Sat, 19 Mar 2016 01:44:08 +0530 Subject: [PATCH 4/8] partial commit --- .../main/resources/META-INF/persistence.xml | 9 ++--- common/src/main/resources/startup.properties | 3 +- .../src/test/resources/statestore.properties | 34 +++++++++---------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml index 206bb0508..4c9388c85 100644 --- a/common/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -64,8 +64,8 @@ @@ -86,6 +86,7 @@ org.apache.falcon.persistence.EntityBean org.apache.falcon.persistence.InstanceBean org.apache.falcon.persistence.MonitoredFeedsBean + org.apache.falcon.persistence.PendingInstanceBean @@ -93,8 +94,8 @@ diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 0befc8072..a4c1857ee 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -42,7 +42,8 @@ org.apache.falcon.metadata.MetadataMappingService,\ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ - org.apache.falcon.service.ProxyUserService + org.apache.falcon.service.ProxyUserService,\ + org.apache.falcon.service.FalconJPAService ## If you wish to use Falcon native scheduler add the commented out services below to application.services ## # org.apache.falcon.notification.service.impl.JobCompletionService,\ # org.apache.falcon.notification.service.impl.SchedulerService,\ diff --git a/scheduler/src/test/resources/statestore.properties b/scheduler/src/test/resources/statestore.properties index 2ae642ff1..5cb75da0b 100644 --- a/scheduler/src/test/resources/statestore.properties +++ b/scheduler/src/test/resources/statestore.properties @@ -17,20 +17,20 @@ # -*.domain=debug -######## StateStore Properties ##### -*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore -*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver -*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true -*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource -# Maximum number of active connections that can be allocated from this pool at the same time. -*.falcon.statestore.pool.max.active.conn=10 -*.falcon.statestore.connection.properties= -# Indicates the interval (in milliseconds) between eviction runs. -*.falcon.statestore.validate.db.connection.eviction.interval=300000 -# The number of objects to examine during each run of the idle object evictor thread. -*.falcon.statestore.validate.db.connection.eviction.num=10 -# Creates Falcon DB. -# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. -# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. -*.falcon.statestore.create.db.schema=true \ No newline at end of file +#*.domain=debug +######### StateStore Properties ##### +#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore +#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver +#*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true +#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource +## Maximum number of active connections that can be allocated from this pool at the same time. +#*.falcon.statestore.pool.max.active.conn=10 +#*.falcon.statestore.connection.properties= +## Indicates the interval (in milliseconds) between eviction runs. +#*.falcon.statestore.validate.db.connection.eviction.interval=300000 +## The number of objects to examine during each run of the idle object evictor thread. +#*.falcon.statestore.validate.db.connection.eviction.num=10 +## Creates Falcon DB. +## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. +## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. +#*.falcon.statestore.create.db.schema=true \ No newline at end of file From 391a47c2a98f16fce6ed72ada6f1027d67eb05cb Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Mon, 21 Mar 2016 16:06:25 +0530 Subject: [PATCH 5/8] test case issue resolved --- unit/pom.xml | 10 ++++++++++ unit/src/main/resources/startup.properties | 1 + .../java/org/apache/falcon/unit/TestFalconUnit.java | 1 + 3 files changed, 12 insertions(+) diff --git a/unit/pom.xml b/unit/pom.xml index 7e5b073d4..f1ef4639e 100644 --- a/unit/pom.xml +++ b/unit/pom.xml @@ -44,6 +44,16 @@ org.apache.oozie oozie-core + + + org.apache.openjpa + openjpa-jdbc + + + org.apache.openjpa + openjpa-persistence + + diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties index 4576e0bd0..4dfea31b5 100644 --- a/unit/src/main/resources/startup.properties +++ b/unit/src/main/resources/startup.properties @@ -33,6 +33,7 @@ *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ + org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index aaf2b3733..9b1ff2abf 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -133,6 +133,7 @@ public void testDelete() throws IOException, FalconCLIException, FalconException ParseException, InterruptedException { // submit cluster and feeds submitClusterAndFeeds(); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); From 76889040be430aa0f2e0a8baa423f3d906f71401 Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Mon, 21 Mar 2016 18:18:47 +0530 Subject: [PATCH 6/8] FALCON-1865 Persist Feed sla data to database --- .../main/resources/META-INF/persistence.xml | 8 ++--- .../falcon/jdbc/MonitoringJdbcStateStore.java | 18 ++++++---- .../service/FeedSLAMonitoringService.java | 1 - .../falcon/service/FeedSLAMonitoringTest.java | 34 ------------------- 4 files changed, 16 insertions(+), 45 deletions(-) diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml index 206bb0508..076efce26 100644 --- a/common/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -64,8 +64,8 @@ @@ -93,8 +93,8 @@ diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index eeed78443..abc5e1093 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -53,10 +53,13 @@ public MonitoredFeedsBean getMonitoredFeed(String feedName){ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE); q.setParameter("feedName", feedName); List result = q.getResultList(); - if (result.isEmpty()) { - return null; + try{ + if (result.isEmpty()) { + return null; + } + } finally { + entityManager.close(); } - entityManager.close(); return ((MonitoredFeedsBean)result.get(0)); } @@ -73,10 +76,13 @@ public List getAllMonitoredFeed(){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS); List result = q.getResultList(); - if (result.isEmpty()) { - return null; + try{ + if (result.isEmpty()) { + return null; + } + } finally { + entityManager.close(); } - entityManager.close(); return result; } diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 58d36dde7..def793be8 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -450,7 +450,6 @@ private void deserialize(Path path) throws FalconException { } lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); lastSerializedAt = new Date((Long) state.get("lastSerializedAt")); - //monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities. LOG.debug("Restored the service from old state."); } catch (IOException | ClassNotFoundException e) { throw new FalconException("Couldn't deserialize the old state", e); diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index 099e9d333..b739037a4 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -121,40 +121,6 @@ public void testOptionalEnd() throws FalconException { AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*"); } -// @Test -// public void testMakeFeedInstanceAvailable() { -// Date instanceDate = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); -// Date nextInstanceDate = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); -// Pair feedCluster = new Pair<>("testFeed", "testCluster"); -// -// BlockingQueue missingInstances = new LinkedBlockingQueue<>(); -// missingInstances.add(instanceDate); -// missingInstances.add(nextInstanceDate); -// -// FeedSLAMonitoringService.get().initializeService(); -// FeedSLAMonitoringService.get().pendingInstances.put(feedCluster, missingInstances); -// FeedSLAMonitoringService.get().makeFeedInstanceAvailable("testFeed", "testCluster", instanceDate); -// -// Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1); -// } - -// @Test -// public void testEndDateCheck() throws Exception { -// Cluster cluster = publishCluster(); -// publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC"); -// Pair feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME); -// -// FeedSLAMonitoringService service = FeedSLAMonitoringService.get(); -// service.initializeService(); -// service.queueSize = 100; -// service.monitoredFeeds.add(FEED_NAME); -// Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); -// Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z"); -// service.addNewPendingFeedInstances(from, to); -// // check that instances after feed's end date are not added. -// Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5); -// } - private Cluster publishCluster() throws FalconException { Cluster cluster = new Cluster(); cluster.setName(CLUSTER_NAME); From 283e1878837bfd1f2805df00524ab6f3f2f3c149 Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Thu, 24 Mar 2016 23:36:25 +0530 Subject: [PATCH 7/8] review comments addressed --- .../persistence/PendingInstanceBean.java | 2 +- .../persistence/PersistenceConstants.java | 2 +- .../persistence/ResultNotFoundException.java | 31 +++++ common/src/main/resources/startup.properties | 1 - .../falcon/jdbc/MonitoringJdbcStateStore.java | 67 ++++++---- .../service/FeedSLAMonitoringService.java | 120 ++---------------- .../jdbc/MonitoringJdbcStateStoreTest.java | 3 +- .../src/test/resources/startup.properties | 3 +- .../src/test/resources/statestore.properties | 32 ++--- src/build/findbugs-exclude.xml | 20 +-- 10 files changed, 119 insertions(+), 162 deletions(-) create mode 100644 common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 0072f5fa5..038244a3e 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -37,7 +37,7 @@ @NamedQueries({ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), - @NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ") }) diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 15b8a178f..511270e96 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -29,7 +29,7 @@ private PersistenceConstants(){ public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS"; public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES"; public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES"; - public static final String DELETE_ALL_PENDING_INSTANCES = "DELETE_ALL_PENDING_INSTANCES"; + public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED"; public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES"; } diff --git a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java new file mode 100644 index 000000000..c368d2cd5 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.persistence; + +import org.apache.falcon.FalconException; + +/** + * Exception to be through by the bean classes. + */ +public class ResultNotFoundException extends FalconException { + + public ResultNotFoundException(String message) { + super(message); + } +} diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index df646704c..87a74bfc1 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -43,7 +43,6 @@ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ org.apache.falcon.service.ProxyUserService,\ - org.apache.falcon.service.ProxyUserService,\ org.apache.falcon.service.FalconJPAService ## Add if you want to use Falcon Azure integration ## # org.apache.falcon.adfservice.ADFProviderService diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index abc5e1093..39e256294 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -21,6 +21,7 @@ import org.apache.falcon.persistence.MonitoredFeedsBean; import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.persistence.PersistenceConstants; +import org.apache.falcon.persistence.ResultNotFoundException; import org.apache.falcon.service.FalconJPAService; import javax.persistence.EntityManager; @@ -38,14 +39,18 @@ private EntityManager getEntityManager() { return FalconJPAService.get().getEntityManager(); } + public void putMonitoredFeed(String feedName){ + MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean(); monitoredFeedsBean.setFeedName(feedName); - EntityManager entityManager = getEntityManager(); - beginTransaction(entityManager); - entityManager.persist(monitoredFeedsBean); - commitAndCloseTransaction(entityManager); + try { + beginTransaction(entityManager); + entityManager.persist(monitoredFeedsBean); + } finally { + commitAndCloseTransaction(entityManager); + } } public MonitoredFeedsBean getMonitoredFeed(String feedName){ @@ -53,7 +58,7 @@ public MonitoredFeedsBean getMonitoredFeed(String feedName){ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE); q.setParameter("feedName", feedName); List result = q.getResultList(); - try{ + try { if (result.isEmpty()) { return null; } @@ -68,17 +73,20 @@ public void deleteMonitoringFeed(String feedName) { beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES); q.setParameter("feedName", feedName); - q.executeUpdate(); - commitAndCloseTransaction(entityManager); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } } - public List getAllMonitoredFeed(){ + public List getAllMonitoredFeed() throws ResultNotFoundException{ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS); List result = q.getResultList(); try{ if (result.isEmpty()) { - return null; + throw new ResultNotFoundException("No Feed has been scheduled for monitoring."); } } finally { entityManager.close(); @@ -86,59 +94,72 @@ public List getAllMonitoredFeed(){ return result; } - public void deletePendingNominalInstances(String feedName, String clusterName , Date nominalTime){ + public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); q.setParameter("nominalTime", nominalTime); - q.executeUpdate(); - commitAndCloseTransaction(entityManager); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } } public void deletePendingInstances(String feedName, String clusterName){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_PENDING_INSTANCES); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); - q.executeUpdate(); - commitAndCloseTransaction(entityManager); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } } public void putPendingInstances(String feed, String clusterName, Date nominalTime){ + EntityManager entityManager = getEntityManager(); PendingInstanceBean pendingInstanceBean = new PendingInstanceBean(); pendingInstanceBean.setFeedName(feed); pendingInstanceBean.setClusterName(clusterName); pendingInstanceBean.setNominalTime(nominalTime); - EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); entityManager.persist(pendingInstanceBean); commitAndCloseTransaction(entityManager); } - public List getNominalInstances(String feedName, String clusterName){ + public List getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); List result = q.getResultList(); - if (CollectionUtils.isEmpty(result)) { - return null; + try{ + if (CollectionUtils.isEmpty(result)) { + throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found"); + } + } finally { + entityManager.close(); } - entityManager.close(); return result; } public List getAllInstances(){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES); List result = q.getResultList(); - if (CollectionUtils.isEmpty(result)) { - return null; + + try { + if (CollectionUtils.isEmpty(result)) { + return null; + } + } finally{ + entityManager.close(); } - entityManager.close(); return result; } diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index def793be8..b5a2569ca 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -18,7 +18,6 @@ package org.apache.falcon.service; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; import org.apache.falcon.entity.EntityUtil; @@ -45,16 +44,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; import java.util.Date; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.ArrayList; import java.util.concurrent.BlockingQueue; @@ -96,12 +88,6 @@ public static FeedSLAMonitoringService get() { */ private Date lastCheckedAt; - - /** - * Used to store last time when the state was serialized to the store. - */ - private Date lastSerializedAt; - /** * Frequency in seconds of "status check" for pending feed instances. */ @@ -238,31 +224,21 @@ public void init() throws FalconException { String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288"); queueSize = Integer.parseInt(size); - try { - if (fileSystem.exists(filePath)) { - deserialize(filePath); - } else { - LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); - initializeService(); - } - } catch (IOException e) { - throw new FalconException("Couldn't check the existence of " + filePath, e); - } + LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); + initializeService(); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); } - @Override - public void destroy() throws FalconException { - serializeState(); // store the state of monitoring service to the disk. - } - - public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) { + public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) + throws FalconException { LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName, clusterName, nominalTime); + List instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName)); // Slas for feeds not having sla tag are not stored. - if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName))){ - MONITORING_JDBC_STATE_STORE.deletePendingNominalInstances(feedName, clusterName, nominalTime); + if (CollectionUtils.isEmpty(instances)){ + MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime); } } @@ -280,6 +256,10 @@ private FileSystem initializeFileSystem() { } } + @Override + public void destroy() throws FalconException { + } + //Periodically update status of pending instances, add new instances and take backup. private class Monitor implements Runnable { @@ -294,12 +274,6 @@ public void run() { Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); addNewPendingFeedInstances(lastCheckedAt, newCheckPoint); lastCheckedAt = newCheckPoint; - - //take backup - if (now.getTime() - lastSerializedAt.getTime() > serializationFrequencyMillis) { - serializeState(); - lastSerializedAt = new Date(); - } } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -370,7 +344,7 @@ private void checkPendingInstanceAvailability() throws FalconException { boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName(), date); if (status) { - MONITORING_JDBC_STATE_STORE.deletePendingNominalInstances(pendingInstanceBean.getFeedName(), + MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName(), date); } } @@ -401,77 +375,9 @@ private boolean checkFeedInstanceAvailability(String feedName, String clusterNam return false; } - private void serializeState() throws FalconException{ - LOG.info("Saving context to: [{}]", storePath); - - //create a temporary file and rename it. - Path tmp = new Path(storePath , "tmp"); - ObjectOutputStream oos = null; - try { - OutputStream out = fileSystem.create(tmp); - oos = new ObjectOutputStream(out); - Map state = new HashMap<>(); - state.put("lastSerializedAt", lastSerializedAt.getTime()); - state.put("lastCheckedAt", lastCheckedAt.getTime()); - oos.writeObject(state); - fileSystem.rename(tmp, filePath); - } catch (IOException e) { - throw new FalconException("Error serializing context to : " + storePath.toUri(), e); - } finally { - IOUtils.closeQuietly(oos); - } - } - - @SuppressWarnings("unchecked") - //Need to check this method - private void deserialize(Path path) throws FalconException { - try { - Map state = deserializeInternal(path); - Map, BlockingQueue> pendingInstancesCopy = - (Map, BlockingQueue>) state.get("pendingInstances"); - // queue size can change during restarts, hence copy - for (Map.Entry, BlockingQueue> entry : pendingInstancesCopy.entrySet()) { - BlockingQueue value = new LinkedBlockingQueue<>(queueSize); - BlockingQueue oldValue = entry.getValue(); - LOG.debug("Number of old instances:{}, new queue size:{}", oldValue.size(), queueSize); - while (!oldValue.isEmpty()) { - Date instance = oldValue.remove(); - if (value.size() == queueSize) { // if full - LOG.debug("Deserialization: Removing value={} for ={}", value.peek(), - entry.getKey()); - value.remove(); - } - LOG.debug("Deserialization Adding: key={} to ={}", entry.getKey(), instance); - value.add(instance); - } - for(Date date:value){ - MONITORING_JDBC_STATE_STORE.putPendingInstances(entry.getKey().first, entry.getKey().second, date); - } - } - lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); - lastSerializedAt = new Date((Long) state.get("lastSerializedAt")); - LOG.debug("Restored the service from old state."); - } catch (IOException | ClassNotFoundException e) { - throw new FalconException("Couldn't deserialize the old state", e); - } - } protected void initializeService() { lastCheckedAt = new Date(); - lastSerializedAt = new Date(); - } - - @SuppressWarnings("unchecked") - private Map deserializeInternal(Path path) throws IOException, ClassNotFoundException { - Map state; - InputStream in = fileSystem.open(path); - ObjectInputStream ois = new ObjectInputStream(in); - try { - state = (Map) ois.readObject(); - } finally { - IOUtils.closeQuietly(ois); - } - return state; } /** diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index 94dc2c72f..af5aad3cb 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -73,7 +73,6 @@ public void setup() throws Exception { falconJPAService.init(); this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); this.conf = dfsCluster.getConf(); - // registerServices(); } @Test @@ -93,7 +92,7 @@ public void testInsertRetrieveAndUpdate() throws Exception { monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo); Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2); - monitoringJdbcStateStore.deletePendingNominalInstances("test_feed1", "test_cluster", dateOne); + monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne); Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1); monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster"); } diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties index af9a01b57..6216b7004 100644 --- a/scheduler/src/test/resources/startup.properties +++ b/scheduler/src/test/resources/startup.properties @@ -121,7 +121,8 @@ debug.libext.process.paths=${falcon.libext} *.falcon.http.authentication.simple.anonymous.allowed=false # Indicates the Kerberos principal to be used for HTTP endpoint. -# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification. +# The principal MUST +#rt with 'HTTP/' as per Kerberos HTTP SPNEGO specification. *.falcon.http.authentication.kerberos.principal= # Location of the keytab file with the credentials for the HTTP principal. diff --git a/scheduler/src/test/resources/statestore.properties b/scheduler/src/test/resources/statestore.properties index 5cb75da0b..e7a08fc5e 100644 --- a/scheduler/src/test/resources/statestore.properties +++ b/scheduler/src/test/resources/statestore.properties @@ -17,20 +17,20 @@ # -#*.domain=debug +*.domain=debug ######### StateStore Properties ##### -#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore -#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver -#*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true -#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource -## Maximum number of active connections that can be allocated from this pool at the same time. -#*.falcon.statestore.pool.max.active.conn=10 -#*.falcon.statestore.connection.properties= -## Indicates the interval (in milliseconds) between eviction runs. -#*.falcon.statestore.validate.db.connection.eviction.interval=300000 -## The number of objects to examine during each run of the idle object evictor thread. -#*.falcon.statestore.validate.db.connection.eviction.num=10 -## Creates Falcon DB. -## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. -## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. -#*.falcon.statestore.create.db.schema=true \ No newline at end of file +*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore +*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver +*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true +*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource +# Maximum number of active connections that can be allocated from this pool at the same time. +*.falcon.statestore.pool.max.active.conn=10 +*.falcon.statestore.connection.properties= +# Indicates the interval (in milliseconds) between eviction runs. +*.falcon.statestore.validate.db.connection.eviction.interval=300000 +# The number of objects to examine during each run of the idle object evictor thread. +*.falcon.statestore.validate.db.connection.eviction.num=10 +# Creates Falcon DB. +# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. +# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. +*.falcon.statestore.create.db.schema=true \ No newline at end of file diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index 395a56db3..a6766df12 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -49,24 +49,24 @@ - + - - - - + + + + - + - - - - + + + + From 695150b88ee97d1f942d114449b23372ca6fd2d2 Mon Sep 17 00:00:00 2001 From: Praveen Adlakha Date: Fri, 25 Mar 2016 14:18:27 +0530 Subject: [PATCH 8/8] comments addressed --- .../apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index af5aad3cb..aa3216713 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -39,11 +39,10 @@ * */ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { - private static final String DB_BASE_DIR = "target/test-data/falcondb"; + private static final String DB_BASE_DIR = "target/test-data/persistancedb"; protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; - protected static final String DB_UPDATE_SQL_FILE = DB_BASE_DIR + File.separator + "update.sql"; protected LocalFileSystem fs = new LocalFileSystem(); private static Random randomValGenerator = new Random(); @@ -63,8 +62,7 @@ public void createDB(String file) { } @BeforeClass - public void setup() throws Exception { - System.out.println("Executing setup"); + public void setup() throws Exception{ StateStoreProperties.get().setProperty(FalconJPAService.URL, url); Configuration localConf = new Configuration(); fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);