Even if you cache your heavy handlers in the NoSQL database, you can still reduce the number of queries to the database with ppao.
Often, multiple single-threaded workers are used to handle queues of pipelines:
The algorithm allows you to increase the amount of data processed by the handler at a time by grouping operations in the pipelines in such a way that the order of the pipeline operations performing is not broken.
- You can significantly reduce the number of calls to data handlers or requests to the database to retrieve a cached handler.
- You can group requests to a third-party API to send more data in a single transaction, if that's more advantageous in your case.
- Algorithm is only suitable for the architecture described in the pictures above.
- The maximum number of operations in the pipelines must be limited and all pipelines must be the same length. If there are fewer operations, zeros are placed in the empty space.
- You should be ready to add the numpy dependency to your project.
- Not all pipelines may be suitable for using this algorithm. The "Grouper" is responsible for checking this. If a pipeline cannot be grouped with others, it will have to wait for new pipelines with which it can form a group to successfully solve the problem. If you don't want to wait, execute the pipeline without ppao. The decision is up to you.
- numpy
- python >= 3.10
pip install ppao
git clone https://github.com/borontov/ppao.git
poetry install
conda-lock install --micromamba -n ppao_dev_env conda-lock.yml
from ppao import (
Grouper,
PipelineMatrixSolver,
settings as ppao_settings,
)
import numpy as np
settings_ = ppao_settings.Settings(
group_size_limit=4,
pipeline_size_limit=4,
common_ops_percent_bound=0.85,
)
pipelines = np.array(
[
[1, 3, 1, 2],
[1, 1, 1, 2],
[3, 2, 1, 1],
[1, 2, 2, 1],
],
)
grouper = Grouper(settings_=settings_)
grouper.add(pipelines=pipelines)
source_matrix = grouper.pop()
solver = PipelineMatrixSolver(
source_matrix=source_matrix,
settings_=settings_,
)
solution = solver.solve()
print(solution)
# output:
# [
# ExecutionUnit(operation=1, pipelines=array([0, 2, 0, 1], dtype=uint16)),
# ExecutionUnit(operation=3, pipelines=array([2, 3], dtype=uint16)),
# ExecutionUnit(operation=1, pipelines=array([0, 2], dtype=uint16)),
# ExecutionUnit(operation=2, pipelines=array([1, 3, 0, 1, 2], dtype=uint16)),
# ExecutionUnit(operation=1, pipelines=array([3, 1, 3], dtype=uint16))
# ]
for execution_unit in solution:
handler = get_handler(execution_unit.operation)
for pipeline_id in execution_unit.pipelines:
pipeline_data = get_pipeline_data(pipeline_id)
handler(pipeline_data)
- Add debug logging
- Deduplicate equal shift combinations like (-1, -1, 0, 0) & (0, 0, 1, 1)
- PyPI-friendly README
See the open issues for a full list of proposed features (and known issues).
Contributions are welcome.
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/AmazingFeature
) - Prepare your feature with shell commands:
- make format
- make check
- make test
- Commit your Changes (
git commit -m 'Add some AmazingFeature'
) - Push to the Branch (
git push origin feature/AmazingFeature
) - Open a Pull Request
Distributed under the MIT License. See LICENSE.txt
for more information.