Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-2034 Make numThreads and timeOut configurable In Configuration…
…Store init

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao, @peeyushb

Closes #192 from sandeepSamudrala/FALCON-2034 and squashes the following commits:

78b98d5 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore init
9d00722 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore init

(cherry picked from commit d3ebf0b)
Signed-off-by: Pallavi Rao <pallavi.rao@inmobi.com>
  • Loading branch information
sandeepSamudrala authored and Pallavi Rao committed Jun 21, 2016
1 parent bdda78c commit b67dd535ea6c57c4008fd3165a339a6bc1581839
Showing 2 changed files with 31 additions and 4 deletions.
@@ -69,6 +69,10 @@ public final class ConfigurationStore implements FalconService {
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
private static final String UTF_8 = CharEncoding.UTF_8;
private static final String LOAD_ENTITIES_THREADS = "config.store.num.threads.load.entities";
private static final String TIMEOUT_MINS_LOAD_ENTITIES = "config.store.start.timeout.minutes";
private int numThreads;
private int restoreTimeOutInMins;
private final boolean shouldPersist;

private static final FsPermission STORE_PERMISSION =
@@ -149,6 +153,21 @@ private FileSystem initializeFileSystem() {

@Override
public void init() throws FalconException {
try {
numThreads = Integer.parseInt(StartupProperties.get().getProperty(LOAD_ENTITIES_THREADS, "100"));
LOG.info("Number of threads used to restore entities: {}", restoreTimeOutInMins);
} catch (NumberFormatException nfe) {
throw new FalconException("Invalid value specified for start up property \""
+ LOAD_ENTITIES_THREADS + "\".Please provide an integer value");
}
try {
restoreTimeOutInMins = Integer.parseInt(StartupProperties.get().
getProperty(TIMEOUT_MINS_LOAD_ENTITIES, "30"));
LOG.info("TimeOut to load Entities is taken as {} mins", restoreTimeOutInMins);
} catch (NumberFormatException nfe) {
throw new FalconException("Invalid value specified for start up property \""
+ TIMEOUT_MINS_LOAD_ENTITIES + "\".Please provide an integer value");
}
String listenerClassNames = StartupProperties.get().
getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph");
for (String listenerClassName : listenerClassNames.split(",")) {
@@ -172,7 +191,8 @@ private void loadEntity(final EntityType type) throws FalconException {
final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
if (files != null) {
final ExecutorService service = Executors.newFixedThreadPool(100);

final ExecutorService service = Executors.newFixedThreadPool(numThreads);
for (final FileStatus file : files) {
service.execute(new Runnable() {
@Override
@@ -183,6 +203,7 @@ public void run() {
// ".xml"
String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
Entity entity = restore(type, entityName);
LOG.info("Restored configuration {}/{}", type, entityName);
entityMap.put(entityName, entity);
} catch (IOException | FalconException e) {
LOG.error("Unable to restore entity of", file);
@@ -191,10 +212,10 @@ public void run() {
});
}
service.shutdown();
if (service.awaitTermination(10, TimeUnit.MINUTES)) {
if (service.awaitTermination(restoreTimeOutInMins, TimeUnit.MINUTES)) {
LOG.info("Restored Configurations for entity type: {} ", type.name());
} else {
LOG.warn("Time out happened while waiting for all threads to finish while restoring entities "
LOG.warn("Timed out while waiting for all threads to finish while restoring entities "
+ "for type: {}", type.name());
}
// Checking if all entities were loaded
@@ -336,6 +357,7 @@ public <T extends Entity> T get(EntityType type, String name) throws FalconExcep
} catch (IOException e) {
throw new StoreAccessException(e);
}
LOG.info("Restored configuration {}/{}", type, name);
entityMap.put(name, entity);
return entity;
} else {
@@ -445,7 +467,6 @@ private synchronized <T extends Entity> T restore(EntityType type, String name)
throw new StoreAccessException("Unable to un-marshall xml definition for " + type + "/" + name, e);
} finally {
in.close();
LOG.info("Restored configuration {}/{}", type, name);
}
}

@@ -136,6 +136,12 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\

*.falcon.cleanup.service.frequency=days(1)

# Default number of threads to be used to restore entities.
*.config.store.num.threads.load.entities=100

# Default timeout in minutes to load entities
*.config.store.start.timeout.minutes=30

######### Properties for Feed SLA Monitoring #########
# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
*.feed.sla.serialization.frequency.millis=3600000

0 comments on commit b67dd53

Please sign in to comment.