diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java index 483a177567..8a343a6126 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java @@ -8,7 +8,7 @@ import java.util.List; /** - * Created by sblackmon on 12/10/13. + * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration */ public class ElasticsearchConfigurator { @@ -51,9 +51,11 @@ public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config String index = elasticsearch.getString("index"); String type = elasticsearch.getString("type"); + Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null; elasticsearchWriterConfiguration.setIndex(index); elasticsearchWriterConfiguration.setType(type); + elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush); return elasticsearchWriterConfiguration; } diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index ff7e0f51d1..5756e1cfe1 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -36,10 +36,15 @@ import java.text.DecimalFormat; import java.text.NumberFormat; import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable { public static final String STREAMS_ID = "ElasticsearchPersistWriter"; - public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD; private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class); @@ -49,8 +54,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private static final long WAITING_DOCS_LIMIT = 10000; private static final int BYTES_IN_MB = 1024 * 1024; private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB; + private static final long DEFAULT_MAX_WAIT = 10000; private final List affectedIndexes = new ArrayList(); + private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private ObjectMapper mapper = new StreamsJacksonMapper(); private ElasticsearchClientManager manager; @@ -61,6 +69,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private OutputStreamWriter currentWriter = null; private int batchSize = 50; private int totalRecordsWritten = 0; + private long maxMsBeforeFlush; private boolean veryLargeBulk = false; // by default this setting is set to false protected Thread task; @@ -78,10 +87,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private volatile int batchItemsSent = 0; private volatile int totalByteCount = 0; private volatile int byteCount = 0; + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); public ElasticsearchPersistWriter() { Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config); + this.config = ElasticsearchConfigurator.detectWriterConfiguration(config); } public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) { @@ -173,6 +183,7 @@ public void cleanUp() { try { flush(); + backgroundFlushTask.shutdownNow(); } catch (IOException e) { e.printStackTrace(); } @@ -240,6 +251,7 @@ public void flush() throws IOException { @Override public void prepare(Object configurationObject) { + maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs(); mapper = StreamsJacksonMapper.getInstance(); start(); } @@ -254,7 +266,17 @@ public DatumStatusCounter getDatumStatusCounter() { } public void start() { - + backgroundFlushTask.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + LOGGER.debug("Checking to see if data needs to be flushed"); + long time = System.currentTimeMillis() - lastWrite.get(); + if (time > maxMsBeforeFlush && batchItemsSent > 0) { + LOGGER.debug("Background Flush task determined {} are waiting to be flushed. It has been {} since the last write to ES", batchItemsSent, time); + flushInternal(); + } + } + }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS); manager = new ElasticsearchClientManager(config); client = manager.getClient(); @@ -262,62 +284,64 @@ public void start() { } public void flushInternal() { - synchronized (this) { - // we do not have a working bulk request, we can just exit here. - if (this.bulkRequest == null || batchItemsSent == 0) - return; + lock.writeLock().lock(); + // we do not have a working bulk request, we can just exit here. + if (this.bulkRequest == null || batchItemsSent == 0) + return; - // call the flush command. - flush(this.bulkRequest, batchItemsSent, batchSizeInBytes); + // call the flush command. + flush(this.bulkRequest, batchItemsSent, batchSizeInBytes); - // null the flush request, this will be created in the 'add' function below - this.bulkRequest = null; + // null the flush request, this will be created in the 'add' function below + this.bulkRequest = null; - // record the proper statistics, and add it to our totals. - this.totalSizeInBytes += this.batchSizeInBytes; - this.totalSent += batchItemsSent; + // record the proper statistics, and add it to our totals. + this.totalSizeInBytes += this.batchSizeInBytes; + this.totalSent += batchItemsSent; - // reset the current batch statistics - this.batchSizeInBytes = 0; - this.batchItemsSent = 0; + // reset the current batch statistics + this.batchSizeInBytes = 0; + this.batchItemsSent = 0; - try { - int count = 0; - if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { - /**************************************************************************** - * Author: - * Smashew - * - * Date: - * 2013-10-20 - * - * Note: - * With the information that we have on hand. We need to develop a heuristic - * that will determine when the cluster is having a problem indexing records - * by telling it to pause and wait for it to catch back up. A - * - * There is an impact to us, the caller, whenever this happens as well. Items - * that are not yet fully indexed by the server sit in a queue, on the client - * that can cause the heap to overflow. This has been seen when re-indexing - * large amounts of data to a small cluster. The "deletes" + "indexes" can - * cause the server to have many 'outstandingItems" in queue. Running this - * software with large amounts of data, on a small cluster, can re-create - * this problem. - * - * DO NOT DELETE THESE LINES - ****************************************************************************/ - - // wait for the flush to catch up. We are going to cap this at - while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500) - Thread.sleep(10); - - if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) - LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding()); - } - } catch (Exception e) { - LOGGER.info("We were broken from our loop: {}", e.getMessage()); + try { + int count = 0; + if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { + /**************************************************************************** + * Author: + * Smashew + * + * Date: + * 2013-10-20 + * + * Note: + * With the information that we have on hand. We need to develop a heuristic + * that will determine when the cluster is having a problem indexing records + * by telling it to pause and wait for it to catch back up. A + * + * There is an impact to us, the caller, whenever this happens as well. Items + * that are not yet fully indexed by the server sit in a queue, on the client + * that can cause the heap to overflow. This has been seen when re-indexing + * large amounts of data to a small cluster. The "deletes" + "indexes" can + * cause the server to have many 'outstandingItems" in queue. Running this + * software with large amounts of data, on a small cluster, can re-create + * this problem. + * + * DO NOT DELETE THESE LINES + ****************************************************************************/ + + // wait for the flush to catch up. We are going to cap this at + while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500) + Thread.sleep(10); + + if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) + LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding()); } + } catch (Exception e) { + LOGGER.info("We were broken from our loop: {}", e.getMessage()); + } finally { + lock.writeLock().unlock(); } + } public void add(String indexName, String type, String json) { @@ -353,32 +377,35 @@ public void add(String indexName, String type, String id, String json) { public void add(UpdateRequest updateRequest) { Preconditions.checkNotNull(updateRequest); - synchronized (this) { - checkAndCreateBulkRequest(); - checkIndexImplications(updateRequest.index()); - bulkRequest.add(updateRequest); - try { - Optional size = Objects.firstNonNull( - Optional.fromNullable(updateRequest.doc().source().length()), - Optional.fromNullable(updateRequest.script().length())); - trackItemAndBytesWritten(size.get().longValue()); - } catch (NullPointerException x) { - trackItemAndBytesWritten(1000); - } + lock.writeLock().lock(); + checkAndCreateBulkRequest(); + checkIndexImplications(updateRequest.index()); + bulkRequest.add(updateRequest); + try { + Optional size = Objects.firstNonNull( + Optional.fromNullable(updateRequest.doc().source().length()), + Optional.fromNullable(updateRequest.script().length())); + trackItemAndBytesWritten(size.get().longValue()); + } catch (NullPointerException x) { + trackItemAndBytesWritten(1000); + } finally { + lock.writeLock().unlock(); } } public void add(IndexRequest indexRequest) { - synchronized (this) { - checkAndCreateBulkRequest(); - checkIndexImplications(indexRequest.index()); - bulkRequest.add(indexRequest); - try { - trackItemAndBytesWritten(indexRequest.source().length()); - } catch (NullPointerException x) { - LOGGER.warn("NPE adding/sizing indexrequest"); - } + lock.writeLock().lock(); + checkAndCreateBulkRequest(); + checkIndexImplications(indexRequest.index()); + bulkRequest.add(indexRequest); + try { + trackItemAndBytesWritten(indexRequest.source().length()); + } catch (NullPointerException x) { + LOGGER.warn("NPE adding/sizing indexrequest"); + } finally { + lock.writeLock().unlock(); } + } public void createIndexIfMissing(String indexName) { @@ -446,10 +473,20 @@ private Set checkIds(Set input, String index, String type) { return toReturn; } + /** + * This method is to ONLY be called by flushInternal otherwise the counts will be off. + * @param bulkRequest + * @param thisSent + * @param thisSizeInBytes + */ private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) { + final Object messenger = new Object(); + LOGGER.debug("Attempting to write {} items to ES", thisSent); bulkRequest.execute().addListener(new ActionListener() { @Override public void onResponse(BulkResponse bulkItemResponses) { + lastWrite.set(System.currentTimeMillis()); + if (bulkItemResponses.hasFailures()) LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage()); @@ -484,8 +521,6 @@ public void onFailure(Throwable e) { e.printStackTrace(); } }); - - this.notify(); } private void trackItemAndBytesWritten(long sizeInBytes) { @@ -499,9 +534,12 @@ private void trackItemAndBytesWritten(long sizeInBytes) { private void checkAndCreateBulkRequest() { // Synchronize to ensure that we don't lose any records - synchronized (this) { + lock.writeLock().lock(); + try { if (bulkRequest == null) bulkRequest = this.manager.getClient().prepareBulk(); + } finally { + lock.writeLock().unlock(); } } diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json index 21aad5c8ed..c56aa82e27 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json @@ -13,6 +13,10 @@ "type": { "type": "string", "description": "Type to write as" - } + }, + "MaxTimeBetweenFlushMs": { + "type": "String", + "format": "utc-millisec" + } } } \ No newline at end of file diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java index c5145fbab1..9cd5898cb4 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java @@ -72,29 +72,36 @@ protected void sleep() { } protected QueryResult executeAPIRequest() { - BeatApi.BeatResponse response = this.client.createRequestBuilder() - .setHeartBeatId(heartbeatId) - .setOffset(0) - .setReturnSetSize(maxApiBatch).execute(); - - LOGGER.debug("Received {} results from API query", response.getCount()); + BeatApi.BeatResponse response = null; + try { + response = this.client.createRequestBuilder() + .setHeartBeatId(heartbeatId) + .setOffset(0) + .setReturnSetSize(maxApiBatch).execute(); + + LOGGER.debug("Received {} results from API query", response.getCount()); + } catch (Exception e) { + LOGGER.warn("Error querying Sysomos API", e); + } String currentId = null; boolean matched = false; - for(BeatApi.BeatResponse.Beat beat : response.getBeat()) { - String docId = beat.getDocid(); - //We get documents in descending time order. This will set the id to the latest document - if(currentId == null) { - currentId = docId; - } - //We only want to process documents that we know we have not seen before - if(lastID != null && lastID.equals(docId)) { - matched = true; - break; + if(response != null) { + for (BeatApi.BeatResponse.Beat beat : response.getBeat()) { + String docId = beat.getDocid(); + //We get documents in descending time order. This will set the id to the latest document + if (currentId == null) { + currentId = docId; + } + //We only want to process documents that we know we have not seen before + if (lastID != null && lastID.equals(docId)) { + matched = true; + break; + } + StreamsDatum item = new StreamsDatum(beat, docId); + item.getMetadata().put("heartbeat", this.heartbeatId); + this.provider.enqueueItem(item); } - StreamsDatum item = new StreamsDatum(beat, docId); - item.getMetadata().put("heartbeat", this.heartbeatId); - this.provider.enqueueItem(item); } return new QueryResult(matched, currentId); } diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 8e688ba771..d313b3f16b 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -1,5 +1,6 @@ package org.apache.streams.local.builders; +import org.apache.log4j.spi.LoggerFactory; import org.apache.streams.core.*; import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; import org.apache.streams.local.tasks.StatusCounterMonitorThread; @@ -7,6 +8,7 @@ import org.apache.streams.local.tasks.StreamsTask; import org.apache.streams.util.SerializationUtil; import org.joda.time.DateTime; +import org.slf4j.Logger; import java.math.BigInteger; import java.util.*; @@ -22,6 +24,8 @@ */ public class LocalStreamBuilder implements StreamBuilder { + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class); + public static final String TIMEOUT_KEY = "TIMEOUT"; private Map providers; private Map components; @@ -32,6 +36,7 @@ public class LocalStreamBuilder implements StreamBuilder { private int totalTasks; private int monitorTasks; private LocalStreamProcessMonitorThread monitorThread; + private Map> tasks; /** * @@ -139,39 +144,18 @@ public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter wri */ @Override public void start() { + attachShutdownHandler(); boolean isRunning = true; this.executor = Executors.newFixedThreadPool(this.totalTasks); this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1); Map provTasks = new HashMap(); - Map> streamsTasks = new HashMap>(); + tasks = new HashMap>(); try { monitorThread = new LocalStreamProcessMonitorThread(executor, 10); this.monitor.submit(monitorThread); - for(StreamComponent comp : this.components.values()) { - int tasks = comp.getNumTasks(); - List compTasks = new LinkedList(); - for(int i=0; i < tasks; ++i) { - StreamsTask task = comp.createConnectedTask(getTimeout()); - task.setStreamConfig(this.streamConfig); - this.executor.submit(task); - compTasks.add(task); - if( comp.isOperationCountable() ) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } - } - streamsTasks.put(comp.getId(), compTasks); - } - for(StreamComponent prov : this.providers.values()) { - StreamsTask task = prov.createConnectedTask(getTimeout()); - task.setStreamConfig(this.streamConfig); - this.executor.submit(task); - provTasks.put(prov.getId(), (StreamsProviderTask) task); - if( prov.isOperationCountable() ) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } - } + setupComponentTasks(tasks); + setupProviderTasks(provTasks); + LOGGER.info("Started stream with {} components", tasks.size()); while(isRunning) { isRunning = false; for(StreamsProviderTask task : provTasks.values()) { @@ -184,44 +168,98 @@ public void start() { Thread.sleep(3000); } } - monitorThread.shutdown(); - this.executor.shutdown(); - //complete stream shut down gracfully - for(StreamComponent prov : this.providers.values()) { - shutDownTask(prov, streamsTasks); + LOGGER.debug("Components are no longer running or timed out due to completion"); + shutdown(tasks); + } catch (InterruptedException e){ + forceShutdown(tasks); + } + + } + + private void attachShutdownHandler() { + final LocalStreamBuilder self = this; + LOGGER.debug("Attaching shutdown handler"); + Runtime.getRuntime().addShutdownHook(new Thread(){ + @Override + public void run() { + LOGGER.debug("Shutdown hook received. Beginning shutdown"); + self.stop(); + } + }); + } + + protected void forceShutdown(Map> streamsTasks) { + LOGGER.debug("Shutdown failed. Forcing shutdown"); + //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise + for(List tasks : streamsTasks.values()) { + for(StreamsTask task : tasks) { + task.stopTask(); } - //need to make this configurable - if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already. + } + this.executor.shutdown(); + this.monitor.shutdown(); + try { + if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){ this.executor.shutdownNow(); - this.executor.awaitTermination(10, TimeUnit.SECONDS); } - if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already. + if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){ this.monitor.shutdownNow(); - this.monitor.awaitTermination(5, TimeUnit.SECONDS); } - } catch (InterruptedException e){ - //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise - for(List tasks : streamsTasks.values()) { - for(StreamsTask task : tasks) { - task.stopTask(); - } + }catch (InterruptedException ie) { + this.executor.shutdownNow(); + this.monitor.shutdownNow(); + throw new RuntimeException(ie); + } + } + + protected void shutdown(Map> streamsTasks) throws InterruptedException { + LOGGER.info("Attempting to shutdown tasks"); + monitorThread.shutdown(); + this.executor.shutdown(); + //complete stream shut down gracfully + for(StreamComponent prov : this.providers.values()) { + shutDownTask(prov, streamsTasks); + } + //need to make this configurable + if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already. + this.executor.shutdownNow(); + this.executor.awaitTermination(10, TimeUnit.SECONDS); + } + if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already. + this.monitor.shutdownNow(); + this.monitor.awaitTermination(5, TimeUnit.SECONDS); + } + } + + protected void setupProviderTasks(Map provTasks) { + for(StreamComponent prov : this.providers.values()) { + StreamsTask task = prov.createConnectedTask(getTimeout()); + task.setStreamConfig(this.streamConfig); + this.executor.submit(task); + provTasks.put(prov.getId(), (StreamsProviderTask) task); + if( prov.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } - this.executor.shutdown(); - this.monitor.shutdown(); - try { - if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){ - this.executor.shutdownNow(); - } - if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){ - this.monitor.shutdownNow(); + } + } + + protected void setupComponentTasks(Map> streamsTasks) { + for(StreamComponent comp : this.components.values()) { + int tasks = comp.getNumTasks(); + List compTasks = new LinkedList(); + for(int i=0; i < tasks; ++i) { + StreamsTask task = comp.createConnectedTask(getTimeout()); + task.setStreamConfig(this.streamConfig); + this.executor.submit(task); + compTasks.add(task); + if( comp.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } - }catch (InterruptedException ie) { - this.executor.shutdownNow(); - this.monitor.shutdownNow(); - throw new RuntimeException(ie); } + streamsTasks.put(comp.getId(), compTasks); } - } /** @@ -249,8 +287,13 @@ private void shutDownTask(StreamComponent comp, Map> s task.stopTask(); } for(StreamsTask task : tasks) { - while(task.isRunning()) { + int count = 0; + while(count < 20 && task.isRunning()) { Thread.sleep(500); + count++; + } + if(task.isRunning()) { + LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString()); } } } @@ -268,7 +311,11 @@ private void shutDownTask(StreamComponent comp, Map> s */ @Override public void stop() { - + try { + shutdown(tasks); + } catch (Exception e) { + forceShutdown(tasks); + } } private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) { diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index 1eac1d9632..8146bdd5f1 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -72,12 +72,13 @@ public void run() { try { this.writer.prepare(this.streamConfig); StreamsDatum datum = this.inQueue.poll(); - while(datum != null || this.keepRunning.get()) { + while(this.keepRunning.get()) { if(datum != null) { try { this.writer.write(datum); statusCounter.incrementStatus(DatumStatus.SUCCESS); } catch (Exception e) { + LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e); this.keepRunning.set(false); statusCounter.incrementStatus(DatumStatus.FAIL); } @@ -86,12 +87,15 @@ public void run() { try { Thread.sleep(this.sleepTime); } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted in Writer task for {}",this.writer.getClass().getSimpleName(), e); this.keepRunning.set(false); } } datum = this.inQueue.poll(); } + } catch(Exception e) { + LOGGER.error("Failed to execute Persist Writer {}",this.writer.getClass().getSimpleName(), e); } finally { this.writer.cleanUp(); this.isRunning.set(false); diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index d1ac905b88..d4c7a16510 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@ -67,7 +67,7 @@ public void run() { try { this.processor.prepare(this.streamConfig); StreamsDatum datum = this.inQueue.poll(); - while(datum != null || this.keepRunning.get()) { + while(this.keepRunning.get()) { if(datum != null) { List output = this.processor.process(datum); if(output != null) {