Skip to content

CiscoSE/commons-processing

Repository files navigation

Commons processing

Utilities

ResourcesUtil - Utility statistics methods related to resources like CPU and memory

Data Processing

Processing data objects/messages by multiple parallel consumers with ability to override pending objects tasks for saving redundant data and tasks, based on queue map.
Using Independent asynchronous retry executor.

Note: This is not an official Cisco product.

Problem & Solution

queue

queue.drawio

Flow Diagram

Commons processing

commons-processing.drawio

Highlight features

  • Ability to override pending objects tasks for saving redundant tasks.
  • Asynchronous retry mechanism for data objects processing tasks.
  • Save potential memory by holding data objects instead of tasks class instances.
  • No redundant live threads where there are no pending tasks.

This is useful for example case where multiple notification received on same data object IDs in a time window where the previous data objects are still pending processing since the internal thread pool is running other tasks up to the core pool size limit. The data processing logic involves fetching the object from the DB and parsing the result. In this case, the new notifications will override the same data objects entries, and each data object will be fetched and processed with hopefully a single task instead of multiple times.

DataProcessor.aggregate() vs threadPool.execute() - by the above example:

threadPool.execute:

  • 10 notifications arrive on data object with key 'x'.
  • 10 similar tasks are created and executed via the thread pool for fetching and processing the same object, 9 of them are redundant.

DataProcessor.aggregate():

  • 10 notifications arrive on data object with key 'x'.
  • 10 notifications are mapped to the same single queue map entry.
  • 1 task is created and executed via the thread pool.

The solution is in-process, non-persistent and non-distributed.
For a persistent distributed solution, see commons-processing-etcd

Example usage

DataProcessor

DataObjectProcessor dataObjectProcessor = (dataObject) -> {
	log.info("processing dataObject: {}", dataObject.getData());
	return true;
};

DataProcessor dataProcessor = DataProcessor.builder().dataObjectProcessor(dataObjectProcessor)
		.dataObjectProcessResultHandler(resultHandler).failureHandler(failureHandler).numOfThreads(numOfThreads)
		.retries(retries).retryDelay(retryDelay).retryDelayTimeUnit(retryDelayTimeUnit).build();
				
dataProcessor.aggregate(1, dataObject);

RetryExecutor

RetryExecutor retryExecutor = RetryExecutor.builder().build();
retryExecutor.executeAsync(supplier, pool, retryDelaySeconds, TimeUnit.SECONDS, retries, resultHandler, null);

or with a backoff policy with a random jitter:

BackOff backOff = new ExponentialBackOff.Builder()
	.setInitialIntervalMillis(500)
	.setMultiplier(1.5)
	.setMaxElapsedTimeMillis(Integer.MAX_VALUE)
	.build();
retryExecutor.executeAsync(supplier, pool, backOff, retries, resultHandler, null);

See DataProcessorTest for further details.

commons-processing-kafka

Kafka processing utilities, see commons-processing-kafka

Quality Assurance

Code analysis

Code analysis done with Sonar.

Security

Scanned with OWASP dependency-check-maven plugin for dependency-check-report.

Testing

Flows are covered by unit tests.

Build

Run maven install on parent project.

Contributions

Licensing


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Author

Liran Mendelovich

Cisco

About

Commons data processing utilities

Resources

License

Code of conduct

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages