diff --git a/README.md b/README.md
index a14303ae..03818b72 100644
--- a/README.md
+++ b/README.md
@@ -5,13 +5,22 @@
+
+| Documentation | Roadmap |
+
+
---
*Latest News* 🔥
-- [2025/07/30] We are excited to announce the alpha release of Unified Cache Manager.
+- [2025/08/01] We are excited to announce the alpha release of Unified Cache Manager.
---
+## Performance
+nfs connector has reached about 4x TTFT accelerate.
+
+
+
## Overview
### Motivation
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 06edc006..fe2c2b84 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -14,11 +14,10 @@ ENV VLLM_USE_PRECOMPILED=1
RUN VLLM_TARGET_DEVICE=cuda pip install -v -e /vllm-workspace/vllm --extra-index=https://download.pytorch.org/whl/nightly/cu128
# Install unified-cache-management
-ARG UCM_REPO=https://github.com/ModelEngine-Group/unified-cache-management.git
-ARG UCM_BRANCH=develop
-RUN git clone --depth 1 $UCM_REPO --branch $UCM_BRANCH /vllm-workspace/unified-cache-management
+COPY . /vllm-workspace/unified-cache-management
-RUN pip install -v -e /vllm-workspace/unified-cache-management
+RUN export PLATFORM="cuda" && \
+ pip install -v -e /vllm-workspace/unified-cache-management
# Apply patch for vLLM
RUN cd /vllm-workspace/vllm \
diff --git a/docker/Dockerfile-NPU b/docker/Dockerfile-NPU
index 4216e292..519d4253 100644
--- a/docker/Dockerfile-NPU
+++ b/docker/Dockerfile-NPU
@@ -4,11 +4,11 @@ FROM quay.io/ascend/vllm-ascend:v0.9.2rc1
WORKDIR /workspace
# Install unified-cache-management
-ARG UCM_REPO=https://github.com/ModelEngine-Group/unified-cache-management.git
-ARG UCM_BRANCH=develop
-RUN git clone --depth 1 $UCM_REPO --branch $UCM_BRANCH /vllm-workspace/unified-cache-management
+COPY . /vllm-workspace/unified-cache-management
-RUN pip install -v -e /vllm-workspace/unified-cache-management
+RUN export PLATFORM="ascend" && \
+ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/Ascend/ascend-toolkit/latest/`uname -i`-linux/devlib && \
+ pip install -v -e /vllm-workspace/unified-cache-management
# Apply patch for vLLM
RUN cd /vllm-workspace/vllm \
diff --git a/docs/source/developer/add_connector.md b/docs/source/developer/add_connector.md
index 37552e9f..8d456ede 100644
--- a/docs/source/developer/add_connector.md
+++ b/docs/source/developer/add_connector.md
@@ -1 +1 @@
-# Add New Connector
+# How To Add New Connector
diff --git a/docs/source/developer/block_layout.md b/docs/source/developer/block_layout.md
deleted file mode 100644
index a7365ff1..00000000
--- a/docs/source/developer/block_layout.md
+++ /dev/null
@@ -1 +0,0 @@
-# Block Layout
diff --git a/docs/source/developer/index.md b/docs/source/developer/index.md
index d6069244..488d86cf 100644
--- a/docs/source/developer/index.md
+++ b/docs/source/developer/index.md
@@ -3,7 +3,8 @@
:::{toctree}
:maxdepth: 2
architecture.md
-block_layout.md
add_connector.md
+nfs_connector.md
+performance_benchmark.md
:::
diff --git a/docs/source/developer/nfs_connector.md b/docs/source/developer/nfs_connector.md
new file mode 100644
index 00000000..629c2daa
--- /dev/null
+++ b/docs/source/developer/nfs_connector.md
@@ -0,0 +1 @@
+# NFS Connector
\ No newline at end of file
diff --git a/docs/source/developer/performance_benchmark.md b/docs/source/developer/performance_benchmark.md
new file mode 100644
index 00000000..927cc276
--- /dev/null
+++ b/docs/source/developer/performance_benchmark.md
@@ -0,0 +1 @@
+# Performance Benchmark
\ No newline at end of file
diff --git a/docs/source/getting-started/example/dram_conn.md b/docs/source/getting-started/example/dram_conn.md
index 6739b637..5ffcf276 100644
--- a/docs/source/getting-started/example/dram_conn.md
+++ b/docs/source/getting-started/example/dram_conn.md
@@ -2,6 +2,14 @@
This document provides a usage example and configuration guide for the **DRAM Connector**. This connector enables offloading of KV cache from GPU HBM to CPU DRAM, helping reduce memory pressure and support larger models or batch sizes.
+## Performance
+
+Combining UCM with vLLM delivers 3–10× improvements in latency and GPU efficiency, especially for long-context LLM tasks.
+
+
+
+
+
## Features
The DRAM connector supports the following functionalities:
@@ -21,25 +29,28 @@ To use the DRAM connector, you need to configure the `connector_config` dictiona
- `max_cache_size` *(optional)*:
Specifies the maximum allowed DRAM memory usage (in **byte**) for caching in `kv_connector_extra_config["ucm_connector_config"]`.
If not provided, it defaults to **5 GB**.
+- `kv_block_size` *(optional)*:
+ Specifies the memory size (in bytes) of a single key or value cache block used in vLLM’s paged attention mechanism, which is calculated as : `block_size * head_size * total_num_kv_heads * element_size`.
### Example:
```python
-kv_connector_extra_config={"ucm_connector_name": "UcmDram", "ucm_connector_config":{"max_cache_size": 5368709120}}
# Allocate up to 8GB DRAM for KV cache
+# KV Block size (in byte) is 262144
+kv_connector_extra_config={"ucm_connector_name": "UcmDram", "ucm_connector_config":{"max_cache_size": 5368709120, "kv_block_size": 262144}}
```
## Launching Inference
### Offline Inference
-To start **offline inference** with the DRAM connector,modify the script `examples/vllm_kv_offload.py` to include the `kv_connector_extra_config` for DRAM connector usage:
+To start **offline inference** with the DRAM connector,modify the script `examples/offline_inference.py` to include the `kv_connector_extra_config` for DRAM connector usage:
```python
-# In examples/vllm_kv_offload.py
+# In examples/offline_inference.py
ktc = KVTransferConfig(
...
- kv_connector_extra_config={"ucm_connector_name": "UcmDram", "ucm_connector_config":{"max_cache_size": 5368709120}}
+ kv_connector_extra_config={"ucm_connector_name": "UcmDram", "ucm_connector_config":{"max_cache_size": 5368709120, "kv_block_size": 262144}}
)
```
@@ -47,12 +58,19 @@ Then run the script as follows:
```bash
cd examples/
-python vllm_kv_offload.py
+python offline_inference.py
```
### Online Inference
-For **online inference** , vLLM with our connector can also be deployed as a server that implements the OpenAI API protocol. Run the following command to start the vLLM server with the Qwen/Qwen2.5-14B-Instruct model:
+For **online inference** , vLLM with our connector can also be deployed as a server that implements the OpenAI API protocol.
+
+First, specify the python hash seed by:
+```bash
+export PYTHONHASHSEED=123456
+```
+
+Run the following command to start the vLLM server with the Qwen/Qwen2.5-14B-Instruct model:
```bash
vllm serve /home/models/Qwen2.5-14B-Instruct \
@@ -69,7 +87,8 @@ vllm serve /home/models/Qwen2.5-14B-Instruct \
"kv_connector_extra_config": {
"ucm_connector_name": "UcmDram",
"ucm_connector_config": {
- "max_cache_size": 5368709120
+ "max_cache_size": 5368709120,
+ "kv_block_size": 262144
}
}
}'
diff --git a/docs/source/getting-started/example/nfs_conn.md b/docs/source/getting-started/example/nfs_conn.md
index d43aae88..95da8f69 100644
--- a/docs/source/getting-started/example/nfs_conn.md
+++ b/docs/source/getting-started/example/nfs_conn.md
@@ -1,2 +1,128 @@
# NFS Connector
+This document provides a usage example and configuration guide for the **NFS Connector**. This connector enables offloading of KV cache from GPU HBM to SSD or Local Disk, helping reduce memory pressure and support larger models or batch sizes.
+
+## Performance: DRAM Connector vs NFS Connector
+
+### Overview
+When the total size of `kvcache` does not exceed the `max_cache_size` configured for the DRAM Connector, the DRAM Connector demonstrates superior performance. However, when the `kvcache` size exceeds `max_cache_size`, the DRAM Connector experiences significant performance degradation, at which point the NFS Connector becomes the better-performing option.
+
+
+
+
+
+## Features
+
+The DRAM connector supports the following functionalities:
+
+- `dump`: Offload KV cache blocks from HBM to SSD or Local Disk.
+- `load`: Load KV cache blocks from SSD or Local Disk back to HBM.
+- `lookup`: Look up KV blocks stored in SSD or Local Disk by block hash.
+- `wait`: Ensure that all dump or load operations have completed.
+- `commit`: Mark cache operations as complete and ready for reuse.
+
+## Configuration
+
+To use the NFS connector, you need to configure the `connector_config` dictionary in your model's launch configuration.
+
+### Required Parameters
+
+- `storage_backends` *(required)*:
+ The `storage_backends` directory can either be a local folder or an NFS-mounted directory backed by an SSD driver
+- `kv_block_size` *(required)*:
+ `kv_block_size` represents `block_size * head_size * total_num_kv_heads * element_size * num_layers * 2`
+
+### Example:
+
+```python
+kv_connector_extra_config={"ucm_connector_name": "UcmNfsStore", "ucm_connector_config":{"storage_backends": "/mnt/test1", "kv_block_size": 33554432}}
+```
+
+## Launching Inference
+
+### Offline Inference
+
+To start **offline inference** with the NFS connector,modify the script `examples/offline_inference.py` to include the `kv_connector_extra_config` for NFS connector usage:
+
+```python
+# In examples/offline_inference.py
+ktc = KVTransferConfig(
+ ...
+ kv_connector_extra_config={"ucm_connector_name": "UcmNfsStore", "ucm_connector_config":{"storage_backends": "/mnt/test1", "kv_block_size": 33554432}}
+)
+```
+
+Then run the script as follows:
+
+```bash
+cd examples/
+export PYTHONHASHSEED=123456
+python offline_inference.py
+```
+
+### Online Inference
+
+For **online inference** , vLLM with our connector can also be deployed as a server that implements the OpenAI API protocol. Run the following command to start the vLLM server with the Qwen/Qwen2.5-14B-Instruct model:
+
+```bash
+export PYTHONHASHSEED=123456
+vllm serve /home/models/Qwen2.5-14B-Instruct \
+--max-model-len 20000 \
+--tensor-parallel-size 2 \
+--gpu_memory_utilization 0.87 \
+--trust-remote-code \
+--port 7800 \
+--kv-transfer-config \
+'{
+ "kv_connector": "UnifiedCacheConnectorV1",
+ "kv_connector_module_path": "unifiedcache.integration.vllm.uc_connector",
+ "kv_role": "kv_both",
+ "kv_connector_extra_config": {
+ "ucm_connector_name": "UcmNfsStore",
+ "ucm_connector_config": {
+ "storage_backends": "/mnt/test",
+ "kv_block_size": 33554432
+ }
+ }
+}'
+```
+
+If you see log as below:
+
+```bash
+INFO: Started server process [1049932]
+INFO: Waiting for application startup.
+INFO: Application startup complete.
+```
+
+Congratulations, you have successfully started the vLLM server with NFS Connector!
+
+Afrer successfully started the vLLM server,You can interact with the API as following:
+
+```bash
+curl http://localhost:7800/v1/completions \
+ -H "Content-Type: application/json" \
+ -d '{
+ "model": "/home/models/Qwen2.5-14B-Instruct",
+ "prompt": "Shanghai is a",
+ "max_tokens": 7,
+ "temperature": 0
+ }'
+```
+To quickly experience the NFS Connector's effect:
+
+1. Start the service with:
+ `--no-enable-prefix-caching`
+2. Send the same request (exceed 128 tokens) twice consecutively
+3. Remember to enable prefix caching (do not add `--no-enable-prefix-caching`) in production environments.
+### Log Message Structure
+```plaintext
+[UCMNFSSTORE] [I] Task(,,,) finished, elapsed s
+```
+| Component | Description |
+|--------------|-----------------------------------------------------------------------------|
+| `task_id` | Unique identifier for the task |
+| `direction` | `D2S`: Dump to Storage (Device → SSD) `S2D`: Load from Storage (SSD → Device) |
+| `task_count` | Number of tasks executed in this operation |
+| `size` | Total size of data transferred in bytes (across all tasks) |
+| `time` | Time taken for the complete operation in seconds |
\ No newline at end of file
diff --git a/docs/source/getting-started/index.md b/docs/source/getting-started/index.md
index f2f03c4c..e3f8e3e3 100644
--- a/docs/source/getting-started/index.md
+++ b/docs/source/getting-started/index.md
@@ -4,7 +4,6 @@
:maxdepth: 2
installation.md
installation_npu.md
-quick_start.md
example/index.md
:::
diff --git a/docs/source/getting-started/installation.md b/docs/source/getting-started/installation.md
index 4adfc2e0..e151f35c 100644
--- a/docs/source/getting-started/installation.md
+++ b/docs/source/getting-started/installation.md
@@ -35,8 +35,10 @@ Refer to [Set up using docker](https://docs.vllm.ai/en/latest/getting_started/in
### Build from source code
Follow commands below to install unified-cache-management:
```bash
-git clone --depth 1 --branch develop https://github.com/ModelEngine-Group/unified-cache-management.git
+# Replace with the branch or tag name needed
+git clone --depth 1 --branch https://github.com/ModelEngine-Group/unified-cache-management.git
cd unified-cache-management
+export PLATFORM=cuda
pip install -v -e .
cd ..
```
@@ -44,10 +46,10 @@ cd ..
## Setup from docker
Download the pre-built docker image provided or build unified-cache-management docker image by commands below:
```bash
- # Build docker image using source code
- git clone --depth 1 --branch develop https://github.com/ModelEngine-Group/unified-cache-management.git
- cd unified-cache-management/docker
- docker build -t ucm-vllm:latest -f ./Dockerfile ./
+ # Build docker image using source code, replace with the branch or tag name needed
+ git clone --depth 1 --branch https://github.com/ModelEngine-Group/unified-cache-management.git
+ cd unified-cache-management
+ docker build -t ucm-vllm:latest -f ./docker/Dockerfile ./
```
Then run your container using following command. You can add or remove Docker parameters as needed.
```bash
diff --git a/docs/source/getting-started/installation_npu.md b/docs/source/getting-started/installation_npu.md
index 6c4322c2..f252c64b 100644
--- a/docs/source/getting-started/installation_npu.md
+++ b/docs/source/getting-started/installation_npu.md
@@ -44,8 +44,10 @@ Codes of vLLM and vLLM Ascend are placed in /vllm-workspace, you can refer to [v
### Build from source code
Follow commands below to install unified-cache-management:
```bash
-git clone --depth 1 --branch develop https://github.com/ModelEngine-Group/unified-cache-management.git
+# Replace with the branch or tag name needed
+git clone --depth 1 --branch https://github.com/ModelEngine-Group/unified-cache-management.git
cd unified-cache-management
+export PLATFORM=ascend
pip install -v -e .
cd ..
```
@@ -53,15 +55,18 @@ cd ..
## Setup from docker
Download the pre-built docker image provided or build unified-cache-management docker image by commands below:
```bash
- # Build docker image using source code
- git clone --depth 1 --branch develop https://github.com/ModelEngine-Group/unified-cache-management.git
- cd unified-cache-management/docker
- docker build -t ucm-vllm:latest -f ./Dockerfile-NPU ./
+ # Build docker image using source code, replace with the branch or tag name needed
+ git clone --depth 1 --branch https://github.com/ModelEngine-Group/unified-cache-management.git
+ cd unified-cache-management
+ docker build -t ucm-vllm:latest -f ./docker/Dockerfile-NPU ./
```
Then run your container using following command. You can add or remove Docker parameters as needed.
```bash
-# Use `--ipc=host` to make sure the shared memory is large enough.
+# Update DEVICE according to your device (/dev/davinci[0-7])
+export DEVICE=/dev/davinci7
+# Update the vllm-ascend image
docker run --rm \
+ --network=host \
--device $DEVICE \
--device /dev/davinci_manager \
--device /dev/devmm_svm \
diff --git a/docs/source/getting-started/quick_start.md b/docs/source/getting-started/quick_start.md
deleted file mode 100644
index 05cf8c1f..00000000
--- a/docs/source/getting-started/quick_start.md
+++ /dev/null
@@ -1 +0,0 @@
-# Quick Start
diff --git a/docs/source/images/dram_perform.png b/docs/source/images/dram_perform.png
new file mode 100644
index 00000000..89a14f96
Binary files /dev/null and b/docs/source/images/dram_perform.png differ
diff --git a/docs/source/images/nfs_performance.png b/docs/source/images/nfs_performance.png
new file mode 100644
index 00000000..823df443
Binary files /dev/null and b/docs/source/images/nfs_performance.png differ
diff --git a/examples/vllm_kv_offload.py b/examples/offline_inference.py
similarity index 88%
rename from examples/vllm_kv_offload.py
rename to examples/offline_inference.py
index c9f50248..5c63195e 100644
--- a/examples/vllm_kv_offload.py
+++ b/examples/offline_inference.py
@@ -2,7 +2,6 @@
import os
import time
from dataclasses import asdict
-
# Third Party
from vllm import LLM, SamplingParams
from vllm.config import KVTransferConfig
@@ -15,7 +14,7 @@
def setup_environment_variables():
os.environ["VLLM_USE_V1"] = "1"
-
+ os.environ["PYTHONHASHSEED"] = "123456"
@contextlib.contextmanager
def build_llm_with_uc(module_path: str, name: str, model: str):
@@ -23,7 +22,10 @@ def build_llm_with_uc(module_path: str, name: str, model: str):
kv_connector=name,
kv_connector_module_path=module_path,
kv_role="kv_both",
- kv_connector_extra_config={"ucm_connector_name": "UcmOceanStore", "ucm_connector_config": {"block_size": 128}}
+ kv_connector_extra_config={"ucm_connector_name": "UcmDram",
+ "ucm_connector_config": {"max_cache_size": 5368709120,
+ "kv_block_size": 262144}
+ }
)
llm_args = EngineArgs(
@@ -73,12 +75,13 @@ def main():
"century, the root sauses behind it, and a set of scientifically grounded, morally sound, and globally "
"cooperative solutions that transcend culturak and national boundaries. Include both immediate actions "
"and long-term strategies."
- ]
+ ]
sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=100)
print_output(llm, prompts, sampling_params, "first")
print_output(llm, prompts, sampling_params, "second")
+
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/setup.py b/setup.py
index a0ec1bfc..eab497cc 100644
--- a/setup.py
+++ b/setup.py
@@ -1,24 +1,113 @@
-import os
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+import os
+import subprocess
from distutils.core import setup
+from pathlib import Path
from setuptools import find_packages
+from setuptools.command.build_ext import build_ext
ROOT_DIR = os.path.dirname(__file__)
-
+PLATFORM = os.getenv("PLATFORM")
def get_path(*filepath) -> str:
return os.path.join(ROOT_DIR, *filepath)
-print("FOUND PACKAGES:", find_packages())
+def _is_cuda() -> bool:
+ return PLATFORM == "cuda"
+
+
+def _is_npu() -> bool:
+ return PLATFORM == "ascend"
+
+
+class BuildUCMExtension(build_ext):
+ """Build UCM Extensions Using Cmake"""
+
+ def run(self):
+ package_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'unifiedcache'))
+ ucm_nfs_path = os.path.join(package_path, 'csrc', 'ucmnfsstore')
+ if not os.path.exists(ucm_nfs_path):
+ raise RuntimeError(f"Expected directory {ucm_nfs_path} does not exist")
+ build_path = os.path.join(ucm_nfs_path, 'build')
+ if not os.path.exists(build_path):
+ os.makedirs(build_path)
+
+ os.chdir(build_path)
+ if _is_npu():
+ cmake_command = [
+ 'cmake',
+ '-DDOWNLOAD_DEPENDENCE=ON',
+ '-DRUNTIME_ENVIRONMENT=ascend',
+ '..',
+ ucm_nfs_path
+ ]
+ elif _is_cuda():
+ cmake_command = [
+ 'cmake',
+ '-DDOWNLOAD_DEPENDENCE=ON',
+ '-DRUNTIME_ENVIRONMENT=cuda',
+ '..',
+ ucm_nfs_path
+ ]
+ else:
+ raise RuntimeError(
+ "No supported accelerator found. "
+ "Please ensure either CUDA or NPU is available."
+ )
+ subprocess.check_call(cmake_command)
+
+ make_command = ['make', '-j', '8']
+ subprocess.check_call(make_command)
+
+ output_lib_path = os.path.join(ucm_nfs_path, 'output', 'lib')
+ so_files = [f for f in os.listdir(output_lib_path) if f.endswith('.so')]
+ for so_file in so_files:
+ src = os.path.join(output_lib_path, so_file)
+ dest = os.path.join(package_path, 'ucm_connector', so_file)
+ os.rename(src, dest)
+
+ os.chdir(os.path.dirname(__file__))
+ super().run()
+
+
+cmdclass = {
+ 'build_ext': BuildUCMExtension,
+}
+
+print("FOUND PACKAGES:", find_packages())
setup(
- name="unified_cache",
- version="0.0.1",
+ name="unifiedcache",
+ version='0.0.1',
author="Unified Cache Team",
description="Unified Cache Management",
packages=find_packages(),
ext_modules=[],
+ cmdclass=cmdclass,
package_data={},
include_package_data=True,
install_requires=[],
diff --git a/test/dump_and_load_on_dram.py b/test/dump_and_load_on_dram.py
index 12ae52da..55af3c2b 100644
--- a/test/dump_and_load_on_dram.py
+++ b/test/dump_and_load_on_dram.py
@@ -1,3 +1,26 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
# -*- coding: utf-8 -*-
from unifiedcache.csrc.ucmnfsstore.output.lib import ucmnfsstore as ucmstore
import secrets
diff --git a/test/dump_and_load_on_hbm.py b/test/dump_and_load_on_hbm.py
index d59197d5..a002afa8 100644
--- a/test/dump_and_load_on_hbm.py
+++ b/test/dump_and_load_on_hbm.py
@@ -1,3 +1,26 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
# -*- coding: utf-8 -*-
from unifiedcache.csrc.ucmnfsstore.output.lib import ucmnfsstore as ucmstore
import secrets
diff --git a/test/test_uc_connector.py b/test/test_uc_connector.py
index bff47297..1fb81877 100644
--- a/test/test_uc_connector.py
+++ b/test/test_uc_connector.py
@@ -1,3 +1,27 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
import random
import secrets
import unittest
diff --git a/test/test_ucm_dram.py b/test/test_ucm_dram.py
index 1454fc2e..bbb66848 100644
--- a/test/test_ucm_dram.py
+++ b/test/test_ucm_dram.py
@@ -1,3 +1,27 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
import random
import torch
import unittest
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.cc b/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.cc
index 59238adb..624078f9 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.cc
@@ -38,6 +38,7 @@ void ShowSetupParam(const SetupParam& param)
UC_INFO("Set UC::TransferEnable to {}.", param.transferEnable);
UC_INFO("Set UC::DeviceId to {}.", param.transferDeviceId);
UC_INFO("Set UC::StreamNumber to {}.", param.transferStreamNumber);
+ UC_INFO("Set UC::IOSize to {}.", param.transferIoSize);
}
int32_t Setup(const SetupParam& param)
@@ -48,7 +49,8 @@ int32_t Setup(const SetupParam& param)
return status.Underlying();
}
if (param.transferEnable) {
- status = Singleton::Instance()->Setup(param.transferDeviceId, param.transferStreamNumber);
+ status = Singleton::Instance()->Setup(param.transferDeviceId, param.transferStreamNumber,
+ param.transferIoSize);
if (status.Failure()) {
UC_ERROR("Failed({}) to setup TsfTaskManager.", status);
return status.Underlying();
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.h b/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.h
index 3ead1eb0..ad722dd1 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/api/ucmnfsstore/ucmnfsstore.h
@@ -37,11 +37,12 @@ struct SetupParam {
bool transferEnable;
int32_t transferDeviceId;
size_t transferStreamNumber;
+ size_t transferIoSize;
SetupParam(const std::vector& storageBackends, const size_t kvcacheBlockSize,
const bool transferEnable)
: storageBackends{storageBackends}, kvcacheBlockSize{kvcacheBlockSize}, transferEnable{transferEnable},
- transferDeviceId{-1}, transferStreamNumber{256}
+ transferDeviceId{-1}, transferStreamNumber{256}, transferIoSize{262144}
{
}
};
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.cc
index 536ee5a6..bedcf721 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.cc
@@ -29,27 +29,21 @@ namespace UC {
AclrtDevice::~AclrtDevice()
{
- if (!this->_stream) {
- return;
- }
+ if (!this->_stream) { return; }
auto ret = aclrtDestroyStream(this->_stream);
- if (ret != ACL_SUCCESS) {
- UC_WARN("Failed({}) to run aclrtDestroyStream", ret);
- }
+ if (ret != ACL_SUCCESS) { UC_WARN("Failed({}) to run aclrtDestroyStream", ret); }
this->_stream = nullptr;
ret = aclrtResetDevice(this->_deviceId);
- if (ret != ACL_SUCCESS) {
- UC_WARN("Failed({}) to run aclrtResetDevice", ret);
- }
+ if (ret != ACL_SUCCESS) { UC_WARN("Failed({}) to run aclrtResetDevice", ret); }
}
Status AclrtDevice::Setup()
{
auto status = IBufferedDevice::Setup();
- if (status.Failure()){return status;}
+ if (status.Failure()){ return status; }
auto ret = aclrtSetDevice(this->_deviceId);
if (ret != ACL_SUCCESS) {
- UC_ERROR("Failed({}) to run aclrtSetDevice with device({}).", ret , this->_deviceId);
+ UC_ERROR("Failed({}) to run aclrtSetDevice with device({}).", ret, this->_deviceId);
return Status::OsApiError();
}
ret = aclrtCreateStream(&this->_stream);
@@ -88,6 +82,7 @@ Status AclrtDevice::WaitFinish()
UC_ERROR("Failed({}) to synchronize device stream.", ret);
return Status::OsApiError();
}
+ this->ResetHostBufferIndex();
return Status::OK();
}
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.h
index 91590f51..e57da8a8 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/aclrt_device.h
@@ -32,13 +32,13 @@ class AclrtDevice : public IBufferedDevice {
public:
public:
AclrtDevice(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber)
- : IBufferedDevice{bufferSize, bufferNumber}, _deviceId(deviceId), _stream(nullptr)
+ : IBufferedDevice{bufferSize, bufferNumber}, _deviceId(deviceId), _stream(nullptr)
{
}
~AclrtDevice();
Status Setup() override;
- Status H2DAsync (void* dst, size_t dstMax, const void* src, const size_t count) override;
- Status D2HAsync (void* dst, size_t dstMax, const void* src, const size_t count) override;
+ Status H2DAsync(void* dst, size_t dstMax, const void* src, const size_t count) override;
+ Status D2HAsync(void* dst, size_t dstMax, const void* src, const size_t count) override;
Status WaitFinish() override;
private:
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.cc
index a395e06b..58b80b38 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.cc
@@ -22,40 +22,36 @@
* SOFTWARE.
* */
#include "cuda_device.h"
-#include
#include "logger/logger.h"
+template <>
+struct fmt::formatter : formatter {
+ auto format(cudaError_t err, format_context& ctx) const -> format_context::iterator
+ {
+ return formatter::format(err, ctx);
+ }
+};
+
namespace UC {
CudaDevice::~CudaDevice()
{
- if (!this->_stream) {
- return;
- }
- auto ret = cudaDestroyStream(this->_stream);
- if (ret != cudaSUCCESS) {
- UC_WARN("Failed({}) to run cudaDestroyStream", ret);
- }
+ if (!this->_stream) { return; }
this->_stream = nullptr;
- ret = cudaResetDevice(this->_deviceId);
- if (ret != cudaSUCCESS) {
- UC_WARN("Failed({}) to run cudaResetDevice", ret);
- }
}
Status CudaDevice::Setup()
{
auto status = IBufferedDevice::Setup();
- if (status.Failure()){return status;}
+ if (status.Failure()){ return status; }
auto ret = cudaSetDevice(this->_deviceId);
- if (ret != cudaSUCCESS) {
- UC_ERROR("Failed({}) to run cudaSetDevice with device({}).", ret , this->_deviceId);
+ if (ret != cudaSuccess) {
+ UC_ERROR("Failed({}) to run cudaSetDevice with device({}).", ret, this->_deviceId);
return Status::OsApiError();
}
- ret = cudaCreateStream(&this->_stream);
- if (ret != cudaSUCCESS) {
- (void)cudaResetDevice(this->_deviceId);
- UC_ERROR("Failed({}) to run cudaCreateStream.", ret);
+ ret = cudaStreamCreate(&this->_stream);
+ if (ret != cudaSuccess) {
+ UC_ERROR("Failed({}) to run cudaStreamCreate.", ret);
return Status::OsApiError();
}
return Status::OK();
@@ -63,8 +59,8 @@ Status CudaDevice::Setup()
Status CudaDevice::H2DAsync(void* dst, size_t dstMax, const void* src, const size_t count)
{
- auto ret = cudaMemcpyAsync(dst, dstMax, src, count, cuda_MEMCPY_HOST_TO_DEVICE, this->_stream);
- if (ret != cudaSUCCESS) {
+ auto ret = cudaMemcpyAsync(dst, src, count, cudaMemcpyHostToDevice, this->_stream);
+ if (ret != cudaSuccess) {
UC_ERROR("Failed({}) to copy data from H({}) to D({}).", ret, count, dstMax);
return Status::OsApiError();
}
@@ -73,8 +69,8 @@ Status CudaDevice::H2DAsync(void* dst, size_t dstMax, const void* src, const siz
Status CudaDevice::D2HAsync(void* dst, size_t dstMax, const void* src, const size_t count)
{
- auto ret = cudaMemcpyAsync(dst, dstMax, src, count, cuda_MEMCPY_DEVICE_TO_HOST, this->_stream);
- if (ret != cudaSUCCESS) {
+ auto ret = cudaMemcpyAsync(dst, src, count, cudaMemcpyDeviceToHost, this->_stream);
+ if (ret != cudaSuccess) {
UC_ERROR("Failed({}) to copy data from D({}) to H({}).", ret, count, dstMax);
return Status::OsApiError();
}
@@ -83,11 +79,12 @@ Status CudaDevice::D2HAsync(void* dst, size_t dstMax, const void* src, const siz
Status CudaDevice::WaitFinish()
{
- auto ret = cudaSynchronizeStream(this->_stream);
- if (ret != cudaSUCCESS) {
+ auto ret = cudaStreamSynchronize(this->_stream);
+ if (ret != cudaSuccess) {
UC_ERROR("Failed({}) to synchronize device stream.", ret);
return Status::OsApiError();
}
+ this->ResetHostBufferIndex();
return Status::OK();
}
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.h
index a85144d9..39f37b0c 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/cuda_device.h
@@ -24,6 +24,7 @@
#ifndef UNIFIEDCACHE_CUDA_DEVICE_H
#define UNIFIEDCACHE_CUDA_DEVICE_H
+#include
#include "ibuffered_device.h"
namespace UC {
@@ -31,18 +32,18 @@ namespace UC {
class CudaDevice : public IBufferedDevice {
public:
CudaDevice(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber)
- : IBufferedDevice{bufferSize, bufferNumber}, _deviceId(deviceId), _stream(nullptr)
+ : IBufferedDevice{bufferSize, bufferNumber}, _deviceId(deviceId), _stream(nullptr)
{
}
~CudaDevice();
Status Setup() override;
- Status H2DAsync (void* dst, size_t dstMax, const void* src, const size_t count) override;
- Status D2HAsync (void* dst, size_t dstMax, const void* src, const size_t count) override;
+ Status H2DAsync(void* dst, size_t dstMax, const void* src, const size_t count) override;
+ Status D2HAsync(void* dst, size_t dstMax, const void* src, const size_t count) override;
Status WaitFinish() override;
private:
int32_t _deviceId;
- void* _stream;
+ cudaStream_t _stream;
};
} // namespace UC
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.cc
index 92e4a167..8c15f232 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.cc
@@ -22,14 +22,18 @@
* SOFTWARE.
* */
#include "device.h"
+#ifdef ASCEND_AVAILABLE
#include "aclrt_device.h"
+#endif
+#ifdef CUDA_AVAILABLE
#include "cuda_device.h"
+#endif
#include "logger/logger.h"
namespace UC {
std::unique_ptr Device::Make(const int32_t deviceId, const size_t hostBufferSize,
- const size_t hostBufferNumber)
+ const size_t hostBufferNumber)
{
try {
std::unique_ptr device = nullptr;
@@ -41,10 +45,10 @@ std::unique_ptr Device::Make(const int32_t deviceId, const size_t hostB
#endif
return device;
} catch (const std::exception& e) {
- UC_ERROR("Failed({}) to instantiate the device({},{},{}) object", e.what(), deviceId, hostBufferSize,
+ UC_ERROR("Failed({}) to instantiate the device({},{},{}) object.", e.what(), deviceId, hostBufferSize,
hostBufferNumber);
- return nullptr;
}
+ return nullptr;
}
} // namespace UC
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.h
index 02188f1b..8780c222 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/device.h
@@ -21,12 +21,10 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
-#ifndef UNIFIEDCACHE_IDEVICE
-#define UNIFIEDCACHE_IDEVICE
+#ifndef UNIFIEDCACHE_DEVICE_H
+#define UNIFIEDCACHE_DEVICE_H
-#include
#include "idevice.h"
-#include "status/status.h"
namespace UC {
@@ -38,4 +36,4 @@ class Device {
} // namespace UC
-#endif // UNIFIEDCACHE_IDEVICE
\ No newline at end of file
+#endif // UNIFIEDCACHE_DEVICE_H
\ No newline at end of file
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.cc
index 105a5365..447eee89 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.cc
@@ -27,7 +27,7 @@
namespace UC {
-IBufferedDevice::IBufferedDevice(size_t bufferSize, size_t bufferNumber)
+IBufferedDevice::IBufferedDevice(const size_t bufferSize, const size_t bufferNumber)
{
this->_bufferSize = Memory::Align(bufferSize);
this->_bufferNumber = bufferNumber;
@@ -38,7 +38,7 @@ Status IBufferedDevice::Setup()
{
auto buffer = Memory::AllocAlign(this->_bufferSize * this->_bufferNumber);
if (!buffer) {
- UC_ERROR("Failed to make buffer({},{}).", this->_bufferSize * this->_bufferNumber);
+ UC_ERROR("Failed to make buffer({},{}).", this->_bufferSize, this->_bufferNumber);
return Status::OutOfMemory();
}
this->_buffer.Setup(buffer, this->_bufferSize, this->_bufferNumber);
@@ -47,14 +47,16 @@ Status IBufferedDevice::Setup()
std::shared_ptr IBufferedDevice::GetHostBuffer(size_t size)
{
- if (this->_buffer.Full()){
+ if (this->_buffer.Full()) {
auto status = this->WaitFinish();
- if (status.Failure()) { return nullptr;}
- this->_buffer.Reset();
+ if (status.Failure()) { return nullptr; }
+ this->ResetHostBufferIndex();
}
if (this->_buffer.Available(size)) { return this->_buffer.GetBuffer(); }
this->_tmpBuffer = Memory::AllocAlign(Memory::Align(size));
return this->_tmpBuffer;
}
+void IBufferedDevice::ResetHostBufferIndex() { this->_buffer.Reset(); }
+
} // namespace UC
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.h
index 9bb0b832..67a83530 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/ibuffered_device.h
@@ -25,12 +25,12 @@
#ifndef UNIFIEDCACHE_IBUFFERED_DEVICE_H
#define UNIFIEDCACHE_IBUFFERED_DEVICE_H
-#include "device/idevice.h"
+#include "idevice.h"
namespace UC {
-class IBufferedDevice : public IDevice {
- class Buffer{
+class IBufferedDevice : public IDevice {
+ class Buffer {
public:
void Setup(std::shared_ptr buffer, size_t size, size_t number)
{
@@ -39,10 +39,10 @@ class IBufferedDevice : public IDevice {
this->_number = number;
this->_index = 0;
}
- bool Empty() const {return this->_index == 0;}
- bool Full() const {return this->_index == this->_number;}
- bool Available(size_t size) const {return this->_index <= this->_number;}
- void Reset() {this->_index = 0;}
+ bool Empty() const { return this->_index == 0; }
+ bool Full() const { return this->_index == this->_number; }
+ bool Available(size_t size) const { return size <= this->_size; }
+ void Reset() { this->_index = 0; }
std::shared_ptr GetBuffer()
{
auto ptr = static_cast(this->_buffer.get());
@@ -59,10 +59,11 @@ class IBufferedDevice : public IDevice {
};
public:
- IBufferedDevice(size_t bufferSize, size_t bufferNumber);
+ IBufferedDevice(const size_t bufferSize, const size_t bufferNumber);
virtual ~IBufferedDevice() = default;
Status Setup() override;
std::shared_ptr GetHostBuffer(size_t size) override;
+ void ResetHostBufferIndex() override;
private:
size_t _bufferSize;
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/idevice.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/idevice.h
index 1aae092a..7a4514d6 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/device/idevice.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/device/idevice.h
@@ -24,7 +24,6 @@
#ifndef UNIFIEDCACHE_IDEVICE_H
#define UNIFIEDCACHE_IDEVICE_H
-#include
#include
#include "status/status.h"
@@ -35,8 +34,9 @@ class IDevice {
virtual ~IDevice() = default;
virtual Status Setup() = 0;
virtual std::shared_ptr GetHostBuffer(size_t size) = 0;
- virtual Status H2DAsync(void* dst, size_t dstMax, const void* src, const size_t count)=0;
- virtual Status D2HAsync(void* dst, size_t dstMax, const void* src, const size_t count)=0;
+ virtual void ResetHostBufferIndex() = 0;
+ virtual Status H2DAsync(void* dst, size_t dstMax, const void* src, const size_t count)=0;
+ virtual Status D2HAsync(void* dst, size_t dstMax, const void* src, const size_t count)=0;
virtual Status WaitFinish() = 0;
};
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/space/space_manager.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/space/space_manager.cc
index 92255326..9ea7a94b 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/space/space_manager.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/space/space_manager.cc
@@ -24,7 +24,6 @@
#include "space_manager.h"
#include "file/file.h"
#include "logger/logger.h"
-#include
namespace UC {
@@ -34,14 +33,14 @@ Status SpaceManager::Setup(const std::vector& storageBackends, cons
UC_ERROR("Empty backend list.");
return Status::InvalidParam();
}
- for (auto& path : storageBackends) {
- Status status = this->AddStorageBackend(path);
- if (status.Failure()) { return status; }
- }
if (blockSize == 0) {
UC_ERROR("Invalid block size({}).", blockSize);
return Status::InvalidParam();
}
+ for (auto& path : storageBackends) {
+ Status status = this->AddStorageBackend(path);
+ if (status.Failure()) { return status; }
+ }
this->_blockSize = blockSize;
return Status::OK();
}
@@ -103,9 +102,9 @@ bool SpaceManager::LookupBlock(const std::string& blockId)
UC_ERROR("Failed to make file smart pointer, path: {}.", path);
return false;
}
- auto s = file->Access(IFile::AccessMode::EXIST);
+ auto s = file->Access(IFile::AccessMode::EXIST | IFile::AccessMode::READ | IFile::AccessMode::WRITE);
if (s.Failure()) {
- UC_ERROR("Failed to access file, path: {}, errcode: {}.", path, s.Underlying());
+ if (s != Status::NotFound()) { UC_ERROR("Failed to access file, path: {}, errcode: {}.", path, s); }
return false;
}
return true;
@@ -124,13 +123,9 @@ Status SpaceManager::AddStorageBackend(const std::string& path)
if (this->_storageBackends.empty()) {
status = this->AddFirstStorageBackend(normalizedPath);
} else {
- status = this->AddSecondaryStorageBackend(path);
- }
- if (status.Success()) {
- UC_INFO("Add UC::StorageBackend: {}.", normalizedPath);
- } else {
- UC_ERROR("Failed({}) to add storage backend({}).", status, normalizedPath);
+ status = this->AddSecondaryStorageBackend(normalizedPath);
}
+ if (status.Failure()) { UC_ERROR("Failed({}) to add storage backend({}).", status, normalizedPath); }
return status;
}
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/configurator.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/configurator.h
deleted file mode 100644
index 028bd3ac..00000000
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/configurator.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
-* MIT License
-*
-* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
-*
-* Permission is hereby granted, free of charge, to any person obtaining a copy
-* of this software and associated documentation files (the "Software"), to deal
-* in the Software without restriction, including without limitation the rights
-* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-* copies of the Software, and to permit persons to whom the Software is
-* furnished to do so, subject to the following conditions:
-*
-* The above copyright notice and this permission notice shall be included in all
-* copies or substantial portions of the Software.
-*
-* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-* SOFTWARE.
- * */
-#ifndef UNIFIEDCACHE_CONFIGURATOR_H
-#define UNIFIEDCACHE_CONFIGURATOR_H
-
-#include
-#include "logger/logger.h"
-
-namespace UC {
-
-class Configurator {
-public:
- void DeviceId(const int32_t deviceId) {
- this->_deviceId = deviceId;
- UC_INFO("Set UC::DeviceId: {}", this->_deviceId);
- }
- int32_t DeviceId() const {
- return this->_deviceId;
- }
- void StreamNumber(const size_t streamNumber) {
- this->_streamNumber = streamNumber;
- UC_INFO("Set UC::StreamNumber: {}", this->_streamNumber);
- }
- size_t StreamNumber() const {
- return this->_streamNumber;
- }
- void Timeout(const size_t timeout) {
- this->_timeout = timeout;
- UC_INFO("Set UC::Timeout: {}", this->_timeout);
- }
- size_t Timeout() const {
- return this->_timeout;
- }
- void QueueDepth(const size_t queueDepth) {
- this->_queueDepth = queueDepth;
- UC_INFO("Set UC::QueueDepth: {}", this->_queueDepth);
- }
- size_t QueueDepth() const {
- return this->_queueDepth;
- }
- void BufferSize(const size_t bufferSize) {
- this->_bufferSize = bufferSize;
- UC_INFO("Set UC::BufferSize: {}", this->_bufferSize);
- }
- size_t BufferSize() const {
- return this->_bufferSize;
- }
-
-private:
- int32_t _deviceId{-1};
- size_t _streamNumber{0};
- size_t _timeout{0};
- size_t _queueDepth{0};
- size_t _bufferSize{0}; // todo: buffer size
-
-};
-} // namespace UC {
-
-#endif // UNIFIEDCACHE_CONFIGURATOR_H
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task.h
index 0d05a279..5acbd29a 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task.h
@@ -31,20 +31,20 @@
namespace UC {
struct TsfTask {
- enum class Type { DUMP, LOAD };
- enum class Location { HOST, DEVICE };
+ enum class Type { DUMP, LOAD };
+ enum class Location { HOST, DEVICE };
Type type;
- Location location;
+ Location location;
std::string blockId;
- size_t offset;
- uintptr_t address;
- size_t length;
- size_t owner;
+ size_t offset;
+ uintptr_t address;
+ size_t length;
+ size_t owner;
std::shared_ptr waiter;
TsfTask(const Type type, const Location location, const std::string& blockId, const size_t offset,
const uintptr_t address, const size_t length)
- : type{type}, location{location}, blockId{blockId}, offset{offset}, address{address}, length{length}, owner{0},
+ : type{type}, location{location}, blockId{blockId}, offset{offset}, address{address}, length{length}, owner{0},
waiter{nullptr}
{
}
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.cc
index 8e89ddd8..1bf64ca6 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.cc
@@ -22,28 +22,22 @@
* SOFTWARE.
* */
#include "tsf_task_manager.h"
-#include
-#include
-#include
-#include
-#include "template/singleton.h"
-#include "configurator.h"
namespace UC {
-Status TsfTaskManager::Setup(const int32_t deviceId, const size_t streamNumber)
+Status TsfTaskManager::Setup(const int32_t deviceId, const size_t streamNumber, const size_t ioSize)
{
this->_queues.reserve(streamNumber);
for (size_t i = 0; i < streamNumber; ++i) {
auto& queue = this->_queues.emplace_back(std::make_unique());
- auto status = queue->Setup(deviceId, &this->_failureSet);
+ auto status = queue->Setup(deviceId, ioSize, &this->_failureSet);
if (status.Failure()) { return status; }
}
return Status::OK();
}
-Status TsfTaskManager::Submit(std::list tasks, const size_t size, const size_t number,
- const std::string& brief, size_t& taskId)
+Status TsfTaskManager::Submit(std::list& tasks, const size_t size, const size_t number,
+ const std::string& brief, size_t& taskId)
{
std::unique_lock lk(this->_mutex);
taskId = ++this->_taskIdSeed;
@@ -51,8 +45,8 @@ Status TsfTaskManager::Submit(std::list tasks, const size_t size, const
if (!success) { return Status::OutOfMemory(); }
std::vector> lists;
this->Dispatch(tasks, lists, taskId, iter->second);
- for (size_t i = 0; i < lists.size(); i++){
- if (lists[i].empty()){continue;}
+ for (size_t i = 0; i < lists.size(); i++) {
+ if (lists[i].empty()) { continue; }
this->_queues[this->_qIdx]->Push(lists[i]);
this->_qIdx = (this->_qIdx + 1) % this->_queues.size();
}
@@ -70,12 +64,13 @@ Status TsfTaskManager::Wait(const size_t taskId)
this->_waiters.erase(iter);
}
waiter->Wait();
- bool failed = this->_failureSet.Exist(taskId);
+ bool failure = this->_failureSet.Exist(taskId);
this->_failureSet.Remove(taskId);
- return failed ? Status::Error() : Status::OK();
+ if (failure) { UC_ERROR("Transfer task({}) failed.", taskId); }
+ return failure ? Status::Error() : Status::OK();
}
-void TsfTaskManager::Dispatch(std::list& tasks, std::vector>& targets, size_t& taskId,
+void TsfTaskManager::Dispatch(std::list& tasks, std::vector>& targets, const size_t taskId,
std::shared_ptr waiter) const
{
auto qNumber = this->_queues.size();
@@ -86,7 +81,6 @@ void TsfTaskManager::Dispatch(std::list& tasks, std::vectorowner = taskId;
it->waiter = waiter;
- waiter->Up();
auto& target = targets[index % qNumber];
target.splice(target.end(), tasks, it);
index++;
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.h
index a6740ba9..807b83bf 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_manager.h
@@ -24,28 +24,28 @@
#ifndef UNIFIEDCACHE_TSF_TASK_MANAGER
#define UNIFIEDCACHE_TSF_TASK_MANAGER
-#include "tsf_task_queue.h"
-#include "tsf_task_waiter.h"
+#include
#include
#include
+#include "tsf_task_queue.h"
namespace UC {
class TsfTaskManager{
public:
- Status Setup(const int32_t deviceId, const size_t streamNumber);
- Status Submit(std::listtasks, const size_t size, const size_t number, const std::string& brief,
+ Status Setup(const int32_t deviceId, const size_t streamNumber, const size_t ioSize);
+ Status Submit(std::list& tasks, const size_t size, const size_t number, const std::string& brief,
size_t& taskId);
Status Wait(const size_t taskId);
private:
- void Dispatch(std::list& tasks, std::vector>& targets ,size_t& taskId,
+ void Dispatch(std::list& tasks, std::vector>& targets , const size_t taskId,
std::shared_ptr waiter) const;
private:
std::mutex _mutex;
TsfTaskSet _failureSet;
- std::unordered_map> _waiters;
+ std::unordered_map> _waiters;
std::vector> _queues;
size_t _qIdx{0};
size_t _taskIdSeed{0};
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.cc
index 2d4236a7..91294c65 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.cc
@@ -22,43 +22,45 @@
* SOFTWARE.
* */
-#include
+#include "tsf_task_queue.h"
#include "device/device.h"
+#include "tsf_task_runner.h"
namespace UC{
TsfTaskQueue::~TsfTaskQueue()
{
{
- std::unique_lock lock(this->_mutex);
- if(!this->_running){ return; }
+ std::unique_lock lk(this->_mutex);
+ if (!this->_running) { return; }
this->_running = false;
}
- if(this->_worker.joinable()){
+ if (this->_worker.joinable()){
this->_cv.notify_all();
this->_worker.join();
}
}
-Status TsfTaskQueue::Setup(const int32_t deviceId, TsfTaskSet* failureSet)
+Status TsfTaskQueue::Setup(const int32_t deviceId, const size_t ioSize, TsfTaskSet* failureSet)
{
this->_deviceId = deviceId;
+ this->_ioSize = ioSize;
this->_failureSet = failureSet;
{
- std::unique_lock lock(this->_mutex);
+ std::unique_lock lk(this->_mutex);
this->_running = true;
}
std::promise started;
auto fut = started.get_future();
- this->_worker = std::thread([&]{this->Worker(started);});
+ this->_worker = std::thread([&]{ this->Worker(started); });
return fut.get();
}
void TsfTaskQueue::Push(std::list& tasks)
{
{
- std::unique_lock lock(this->_mutex);
- this->_taskQ.splice(this->_taskQ.end(),tasks);
+ std::unique_lock lk(this->_mutex);
+ this->_taskQ.splice(this->_taskQ.end(), tasks);
}
this->_cv.notify_all();
}
@@ -67,8 +69,8 @@ void TsfTaskQueue::Worker(std::promise& started)
{
auto status = Status::OK();
std::unique_ptr device = nullptr;
- if (this -> _deviceId >=0 ){
- if (!(device = Device::Make(this->_deviceId)) ){
+ if (this -> _deviceId >= 0){
+ if (!(device = Device::Make(this->_deviceId, this->_ioSize))){
started.set_value(Status::OutOfMemory());
return;
}
@@ -81,15 +83,18 @@ void TsfTaskQueue::Worker(std::promise& started)
started.set_value(status);
for(;;){
std::unique_lock lk(this->_mutex);
- this->_cv.wait(lk, [this]{ return !this->_taskQ.empty() || !this->_running; });
- if (!this->_running) { break; }
+ this->_cv.wait(lk, [this] { return !this->_taskQ.empty() || !this->_running; });
+ if (!this->_running) { return; }
if (this->_taskQ.empty()) { continue; }
auto task = std::move(this->_taskQ.front());
this->_taskQ.pop_front();
bool lastTask = this->_taskQ.empty() || this->_taskQ.front().owner != task.owner;
lk.unlock();
if (!this->_failureSet->Exist(task.owner)) {
- if ((status = runner.Run(task)).Failure()) { this->_failureSet->Insert(task.owner); }
+ if ((status = runner.Run(task)).Failure()) {
+ UC_ERROR("Failed({}) to run transfer task({}).", status, task.owner);
+ this->_failureSet->Insert(task.owner);
+ }
}
if (device && lastTask) {
if ((status = device->WaitFinish()).Failure()) { this->_failureSet->Insert(task.owner); }
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.h
index 4caf78d1..dc73a662 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_queue.h
@@ -21,15 +21,16 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
-#ifndef UNIFIEDCACHE_TSF_TASK_QUEUE
-#define UNIFIEDCACHE_TSF_TASK_QUEUE
+#ifndef UNIFIEDCACHE_TSF_TASK_QUEUE_H
+#define UNIFIEDCACHE_TSF_TASK_QUEUE_H
#include
#include
#include
#include
#include
-#include "tsf_task_runner.h"
+#include "status/status.h"
+#include "tsf_task.h"
#include "tsf_task_set.h"
namespace UC{
@@ -37,7 +38,7 @@ namespace UC{
class TsfTaskQueue{
public:
~TsfTaskQueue();
- Status Setup(const int32_t deviceId, TsfTaskSet* failureSet);
+ Status Setup(const int32_t deviceId, const size_t size, TsfTaskSet* failureSet);
void Push(std::list& tasks);
private:
@@ -50,6 +51,7 @@ class TsfTaskQueue{
std::thread _worker;
bool _running{false};
int32_t _deviceId;
+ size_t _ioSize;
TsfTaskSet* _failureSet;
};
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.cc
index 4fa8fba2..481075e7 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.cc
@@ -22,17 +22,16 @@
* SOFTWARE.
* */
-#include
+#include "tsf_task_runner.h"
+#include
+#include
#include "file/file.h"
#include "logger/logger.h"
-#include "memory/memory.h"
-#include "template/singleton.h"
-#include
-#include
+#include "memory/memory.h"
#include "space/space_manager.h"
+#include "template/singleton.h"
-
-namespace UC{
+namespace UC {
Status TsfTaskRunner::Run(const TsfTask& task)
{
@@ -42,85 +41,81 @@ Status TsfTaskRunner::Run(const TsfTask& task)
using Value = std::function;
std::map runners = {
- {{Location::HOST, Type::DUMP}, [this](const TsfTask& task) { return this->Host2SSD(task); }},
- {{Location::DEVICE, Type::DUMP}, [this](const TsfTask& task) { return this->Device2SSD(task); }},
- {{Location::HOST, Type::LOAD}, [this](const TsfTask& task) { return this->SSD2Host(task); }},
- {{Location::DEVICE, Type::LOAD}, [this](const TsfTask& task) { return this->SSD2Device(task); }},
+ {{Location::HOST, Type::LOAD}, [this](const TsfTask& task) { return this->Ssd2Host(task); }},
+ {{Location::HOST, Type::DUMP}, [this](const TsfTask& task) { return this->Host2Ssd(task); }},
+ {{Location::DEVICE, Type::LOAD}, [this](const TsfTask& task) { return this->Ssd2Device(task); }},
+ {{Location::DEVICE, Type::DUMP}, [this](const TsfTask& task) { return this->Device2Ssd(task); }},
};
auto runner = runners.find({task.location, task.type});
if (runner == runners.end()) {
- UC_ERROR("Unsupported task({},{})", fmt::underlying(task.type), fmt::underlying(task.location));
+ UC_ERROR("Unsupported task({},{})", fmt::underlying(task.location), fmt::underlying(task.type));
return Status::Unsupported();
}
return runner->second(task);
}
-Status TsfTaskRunner::Host2SSD(const TsfTask& task)
+Status TsfTaskRunner::Ssd2Host(const TsfTask& task)
{
auto path = Singleton::Instance()->BlockPath(task.blockId);
auto file = File::Make(path);
if (!file) { return Status::OutOfMemory(); }
- auto aligned = Memory::Aligned(task.length) && Memory::Aligned(task.address) && Memory::Aligned(task.offset);
- auto openFlags = aligned ? IFile::OpenFlag::WRITE_ONLY | IFile::OpenFlag::DIRECT
- : IFile::OpenFlag::WRITE_ONLY;
+ auto aligned = Memory::Aligned(task.address) && Memory::Aligned(task.offset) && Memory::Aligned(task.length);
+ auto openFlags = aligned ? IFile::OpenFlag::READ_ONLY | IFile::OpenFlag::DIRECT : IFile::OpenFlag::READ_ONLY;
auto status = file->Open(openFlags);
if (status.Failure()) { return status; }
- return file->Write((void*)task.address, task.length, task.offset);
+ return file->Read((void*)task.address, task.length, task.offset);
}
-Status TsfTaskRunner::SSD2Host(const TsfTask& task)
+Status TsfTaskRunner::Host2Ssd(const TsfTask& task)
{
- auto path = Singleton::Instance()->BlockPath(task.blockId);
+ auto path = Singleton::Instance()->BlockPath(task.blockId, true);
auto file = File::Make(path);
if (!file) { return Status::OutOfMemory(); }
- auto aligned = Memory::Aligned(task.length) && Memory::Aligned(task.address) && Memory::Aligned(task.offset);
- auto openFlags = aligned ? IFile::OpenFlag::READ_ONLY | IFile::OpenFlag::DIRECT
- : IFile::OpenFlag::READ_ONLY;
+ auto aligned = Memory::Aligned(task.address) && Memory::Aligned(task.offset) && Memory::Aligned(task.length);
+ auto openFlags = aligned ? IFile::OpenFlag::WRITE_ONLY | IFile::OpenFlag::DIRECT : IFile::OpenFlag::WRITE_ONLY;
auto status = file->Open(openFlags);
if (status.Failure()) { return status; }
- return file->Read((void*)task.address, task.length, task.offset);
+ return file->Write((void*)task.address, task.length, task.offset);
}
-Status TsfTaskRunner::Device2SSD(const TsfTask& task)
+Status TsfTaskRunner::Ssd2Device(const TsfTask& task)
{
auto path = Singleton::Instance()->BlockPath(task.blockId);
auto file = File::Make(path);
if (!file) { return Status::OutOfMemory(); }
auto buffer = this->_device->GetHostBuffer(task.length);
if (!buffer) {
- UC_ERROR("Failed to get host buffer({}) on device", task.length);
+ UC_ERROR("Failed to get host buffer({}) on device", task.length);
return Status::OutOfMemory();
}
- auto status = this->_device->D2HAsync(buffer.get(), task.length, (void*)task.address, task.length);
- if (status.Failure()) { return status; }
- status = this->_device->WaitFinish();
+ auto aligned = Memory::Aligned(task.offset) && Memory::Aligned(task.length);
+ auto openFlags = aligned ? IFile::OpenFlag::READ_ONLY | IFile::OpenFlag::DIRECT : IFile::OpenFlag::READ_ONLY;
+ auto status = file->Open(openFlags);
if (status.Failure()) { return status; }
- auto alligned = Memory::Aligned(task.length) && Memory::Aligned(task.offset);
- auto openFlags = alligned ? IFile::OpenFlag::WRITE_ONLY | IFile::OpenFlag::DIRECT
- : IFile::OpenFlag::WRITE_ONLY;
- status = file->Open(openFlags);
+ status = file->Read(buffer.get(), task.length, task.offset);
if (status.Failure()) { return status; }
- return file->Write(buffer.get(), task.length, task.offset);
+ return this->_device->H2DAsync((void*)task.address, task.length, buffer.get(), task.length);
}
-Status TsfTaskRunner::SSD2Device(const TsfTask& task)
+Status TsfTaskRunner::Device2Ssd(const TsfTask& task)
{
- auto path = Singleton::Instance()->BlockPath(task.blockId);
+ auto path = Singleton::Instance()->BlockPath(task.blockId, true);
auto file = File::Make(path);
if (!file) { return Status::OutOfMemory(); }
auto buffer = this->_device->GetHostBuffer(task.length);
- if (!buffer) {
- UC_ERROR("Failed to get host buffer({}) on device", task.length);
+ if (!buffer) {
+ UC_ERROR("Failed to get host buffer({}) on device", task.length);
return Status::OutOfMemory();
}
- auto aligned = Memory::Aligned(task.length) && Memory::Aligned(task.offset);
- auto openFlags = aligned ? IFile::OpenFlag::READ_ONLY | IFile::OpenFlag::DIRECT
- : IFile::OpenFlag::READ_ONLY;
- auto status = file->Open(openFlags);
+ auto status = this->_device->D2HAsync(buffer.get(), task.length, (void*)task.address, task.length);
if (status.Failure()) { return status; }
- status = file->Read(buffer.get(), task.length, task.offset);
+ status = this->_device->WaitFinish();
if (status.Failure()) { return status; }
- return this->_device->H2DAsync((void*)task.address, task.length, buffer.get(), task.length);
+ auto aligned = Memory::Aligned(task.offset) && Memory::Aligned(task.length);
+ auto openFlags = aligned ? IFile::OpenFlag::WRITE_ONLY | IFile::OpenFlag::DIRECT : IFile::OpenFlag::WRITE_ONLY;
+ status = file->Open(openFlags);
+ if (status.Failure()) { return status; }
+ return file->Write(buffer.get(), task.length, task.offset);
}
} // namespace UC
\ No newline at end of file
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.h
index f3eae2fb..d0b9693f 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_runner.h
@@ -21,8 +21,8 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
-#ifndef UNIFIEDCACHE_TSF_TASK_RUNNER
-#define UNIFIEDCACHE_TSF_TASK_RUNNER
+#ifndef UNIFIEDCACHE_TSF_TASK_RUNNER_H
+#define UNIFIEDCACHE_TSF_TASK_RUNNER_H
#include "device/idevice.h"
#include "status/status.h"
@@ -32,14 +32,14 @@ namespace UC {
class TsfTaskRunner {
public:
- TsfTaskRunner(IDevice* device) : _device(device) {};
+ TsfTaskRunner(IDevice* device) : _device{device} {}
Status Run(const TsfTask& task);
private:
- Status Host2SSD(const TsfTask& task);
- Status SSD2Host(const TsfTask& task);
- Status Device2SSD(const TsfTask& task);
- Status SSD2Device(const TsfTask& task);
+ Status Ssd2Host(const TsfTask& task);
+ Status Host2Ssd(const TsfTask& task);
+ Status Ssd2Device(const TsfTask& task);
+ Status Device2Ssd(const TsfTask& task);
private:
IDevice* _device;
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_set.cc b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_set.cc
deleted file mode 100644
index e158b25f..00000000
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_set.cc
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
-* MIT License
-*
-* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
-*
-* Permission is hereby granted, free of charge, to any person obtaining a copy
-* of this software and associated documentation files (the "Software"), to deal
-* in the Software without restriction, including without limitation the rights
-* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-* copies of the Software, and to permit persons to whom the Software is
-* furnished to do so, subject to the following conditions:
-*
-* The above copyright notice and this permission notice shall be included in all
-* copies or substantial portions of the Software.
-*
-* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-* SOFTWARE.
- * */
-#include "tsf_task_set.h"
-#include
-#include
-#include
-
-namespace UC{
-
-size_t TsfTaskSet::Hash(const size_t id)
-{
- return id % nBucket;
-}
-
-void TsfTaskSet::Insert(const size_t id)
-{
- size_t bucketId = Hash(id);
- std::unique_lock lock(this->_mutexs[bucketId]);
- this->_sets[bucketId].push_back(id);
-}
-
-bool TsfTaskSet::Exist(const size_t id)
-{
- size_t bucketId = Hash(id);
- std::shared_lock lock(this->_mutexs[bucketId]);
- return std::find(this->_sets[bucketId].begin(), this->_sets[bucketId].end(), id) != this->_sets[bucketId].end();
-}
-
-void TsfTaskSet::Remove(const size_t id)
-{
- size_t bucketId = Hash(id);
- std::unique_lock lock(this->_mutexs[bucketId]);
- this->_sets[bucketId].remove(id);
-}
-
-} // namespace UC
\ No newline at end of file
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_set.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_set.h
index 15cde716..d94d993a 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_set.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_set.h
@@ -21,29 +21,45 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
-#ifndef UNIFIEDCACHE_TSF_TASK_SET
-#define UNIFIEDCACHE_TSF_TASK_SET
+#ifndef UNIFIEDCACHE_TSF_TASK_SET_H
+#define UNIFIEDCACHE_TSF_TASK_SET_H
-#include
-#include
-#include
+#include
#include
+#include
+#include
namespace UC{
class TsfTaskSet{
static constexpr size_t nBucket = 8192;
public:
- void Insert(const size_t id);
- bool Exist(const size_t id);
- void Remove(const size_t id);
+ void Insert(const size_t id)
+ {
+ auto idx = this->Hash(id);
+ std::unique_lock lk(this->_mutexes[idx]);
+ this->_buckets[idx].push_back(id);
+ }
+ bool Exist(const size_t id)
+ {
+ auto idx = this->Hash(id);
+ std::shared_lock lk(this->_mutexes[idx]);
+ auto bucket = this->_buckets + idx;
+ return std::find(bucket->begin(), bucket->end(), id) != bucket->end();
+ }
+ void Remove(const size_t id)
+ {
+ auto idx = this->Hash(id);
+ std::unique_lock lk(this->_mutexes[idx]);
+ this->_buckets[idx].remove(id);
+ }
private:
- size_t Hash(const size_t id);
+ size_t Hash(const size_t id) { return id % nBucket; }
private:
- std::shared_mutex _mutexs[nBucket];
- std::list _sets[nBucket];
+ std::shared_mutex _mutexes[nBucket];
+ std::list _buckets[nBucket];
};
} // namespace UC
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_waiter.h b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_waiter.h
index f32e4e2a..621fcdba 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_waiter.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/domain/tsf_task/tsf_task_waiter.h
@@ -33,22 +33,27 @@ namespace UC {
class TsfTaskWaiter : public Latch {
public:
- TsfTaskWaiter(const size_t id, const size_t size, const size_t number, const std::string& brief) : Latch{0}
+ TsfTaskWaiter(const size_t id, const size_t size, const size_t number, const std::string& brief)
+ : Latch{number}, _id{id}, _size{size}, _number{number}, _brief{brief}
{
- this->_ino = fmt::format("{}-{}-{}-{}", id, brief, size, number);
}
void Done()
{
if (Latch::Done() == 0) {
- UC_INFO("Task({}) finished in {:.006f}s", _ino, _sw.elapsed().count());
+ UC_INFO("Task({}, {}, {}, {}) finished, elapsed {:.06f}s", this->_id, this->_brief, this->_number,
+ this->_size, this->_sw.elapsed().count());
+ this->Notify();
}
}
private:
+ size_t _id;
+ size_t _size;
+ size_t _number;
+ std::string _brief;
spdlog::stopwatch _sw;
- std::string _ino;
};
} // namespace UC
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/file.cc b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/file.cc
index 5806e00d..75818a7e 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/file.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/file.cc
@@ -22,7 +22,6 @@
* SOFTWARE.
* */
#include "file.h"
-#include
#include "logger/logger.h"
#include "posix_file.h"
@@ -33,7 +32,7 @@ std::unique_ptr File::Make(const std::string& path)
try {
return std::make_unique(path);
} catch (const std::exception& e) {
- UC_ERROR("Failed to create file object, path: {}, error: {}.", path, e.what());
+ UC_ERROR("Failed({}) to make file({}) pointer.", e.what(), path);
return nullptr;
}
}
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/ifile.h b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/ifile.h
index a775e86f..3fd6fd7c 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/ifile.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/ifile.h
@@ -30,16 +30,6 @@
namespace UC {
-class OpenMode{
-public:
- static const int32_t RD = O_RDONLY;
- static const int32_t WR = O_WRONLY;
- static const int32_t RDWR = O_RDWR;
- static const int32_t APP = O_APPEND;
- static const int32_t DIRECT = O_DIRECT;
- static const int32_t CREATE = O_CREAT;
-};
-
class IFile {
public:
class AccessMode {
@@ -64,20 +54,14 @@ class IFile {
virtual ~IFile() = default;
const std::string& Path() const { return this->_path; }
virtual Status MkDir() = 0;
- virtual void RmDir() = 0;
+ virtual Status RmDir() = 0;
virtual Status Rename(const std::string& newName) = 0;
virtual Status Access(const int32_t mode) = 0;
virtual Status Open(const uint32_t flags) = 0;
virtual void Close() = 0;
virtual void Remove() = 0;
- virtual Status Seek2End() = 0;
virtual Status Read(void* buffer, size_t size, off64_t offset = -1) = 0;
virtual Status Write(const void* buffer, size_t size, off64_t offset = -1) = 0;
- virtual Status Lock() = 0;
- virtual Status Lock(uint32_t retryCnt, uint32_t intervalUs) = 0;
- virtual void Unlock() = 0;
- virtual Status MMap(off64_t offset, size_t length, void*& addr, bool wr) = 0;
- virtual Status Stat(struct stat* buffer) = 0;
virtual Status Truncate(size_t length) = 0;
private:
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.cc b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.cc
index b0cbf13b..06bb3df3 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.cc
@@ -30,12 +30,7 @@
namespace UC {
-PosixFile::~PosixFile()
-{
- if (this->_handle != -1) {
- this->Close();
- }
-}
+PosixFile::~PosixFile() { this->Close(); }
Status PosixFile::MkDir()
{
@@ -53,13 +48,17 @@ Status PosixFile::MkDir()
return Status::OK();
}
-void PosixFile::RmDir()
+Status PosixFile::RmDir()
{
auto ret = rmdir(this->Path().c_str());
auto eno = errno;
if (ret != 0) {
- UC_ERROR("Failed to remove directory, path: {}, errcode: {}, errno: {}.", this->Path(), ret, eno);
+ if (eno != ENOTEMPTY) {
+ UC_WARN("Failed to remove directory, path: {}.", this->Path());
+ }
+ return Status::OsApiError();
}
+ return Status::OK();
}
Status PosixFile::Rename(const std::string& newName)
@@ -96,14 +95,12 @@ Status PosixFile::Access(const int32_t mode)
Status PosixFile::Open(const uint32_t flags)
{
- auto eno = 0;
- this->_openMode = flags;
constexpr auto permission = (S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
- this->_handle = open(this->Path().c_str(), this->_openMode, permission);
- eno = errno;
+ this->_handle = open(this->Path().c_str(), flags, permission);
+ auto eno = errno;
if (!this->_handle) {
UC_ERROR("Failed to open file, path: {}, flags: {}, errno: {}.",
- this->Path(), this->_openMode, eno);
+ this->Path(), flags, eno);
return Status::OsApiError();
}
return Status::OK();
@@ -111,7 +108,7 @@ Status PosixFile::Open(const uint32_t flags)
void PosixFile::Close()
{
- close(this->_handle);
+ if (this->_handle != -1) { close(this->_handle); }
this->_handle = -1;
}
@@ -126,17 +123,6 @@ void PosixFile::Remove()
}
}
-Status PosixFile::Seek2End()
-{
- auto ret = lseek64(this->_handle, 0, SEEK_END);
- auto eno = errno;
- if (ret < 0) {
- UC_ERROR("Failed to seek to end of file, path: {}, errno: {}.", this->Path(), ret, eno);
- return Status::OsApiError();
- }
- return Status::OK();
-}
-
Status PosixFile::Read(void* buffer, size_t size, off64_t offset)
{
ssize_t nBytes = -1;
@@ -171,84 +157,6 @@ Status PosixFile::Write(const void* buffer, size_t size, off64_t offset)
return Status::OK();
}
-Status PosixFile::Lock()
-{
- struct flock lock = {0};
- lock.l_type = F_WRLCK;
- lock.l_whence = SEEK_SET;
- lock.l_start = 0;
- lock.l_len = 0; // Lock the whole file
- auto ret = fcntl(this->_handle, F_SETLK, &lock);
- auto eno = errno;
- if (ret != 0) {
- if (eno == EACCES || eno == EAGAIN) {
- return Status::Retry();
- } else {
- UC_ERROR("Failed to lock file, path: {}, errno: {}.", this->Path(), eno);
- return Status::OsApiError();
- }
- }
- return Status::OK();
-}
-
-Status PosixFile::Lock(uint32_t retryCnt, uint32_t interval)
-{
- uint32_t retry = 0;
- auto ret = this->Lock();
- while (ret == Status::Retry() && retry < retryCnt) {
- usleep(interval);
- ret = this->Lock();
- ++retry;
- }
- if (ret.Failure()) {
- UC_ERROR("Failed to lock file after {} retries, path: {}, status: {}.", retry, this->Path(), ret);
- }
- return ret;
-}
-
-void PosixFile::Unlock()
-{
- struct flock lock = {0};
- lock.l_type = F_UNLCK;
- lock.l_whence = SEEK_SET;
- lock.l_start = 0;
- lock.l_len = 0; // Unlock the whole file
- auto ret = fcntl(this->_handle, F_SETLK, &lock);
- auto eno = errno;
- if (ret != 0) {
- UC_ERROR("Failed to unlock file, path: {}, errno: {}.", this->Path(), eno);
- }
-}
-
-Status PosixFile::MMap(off64_t offset, size_t length, void*& addr, bool wr)
-{
- auto prot = PROT_READ;
- if (wr) {
- prot |= PROT_WRITE;
- }
- auto flags = MAP_SHARED | MAP_POPULATE;
- auto ptr = mmap(nullptr, length, prot, flags, this->_handle, offset);
- auto eno = errno;
- if (ptr == MAP_FAILED || ptr == nullptr) {
- UC_ERROR("Failed to mmap file, path: {}, offset: {}, length: {}, errno: {}.",
- this->Path(), offset, length, eno);
- return Status::OsApiError();
- }
- addr = ptr;
- return Status::OK();
-}
-
-Status PosixFile::Stat(struct stat* buffer)
-{
- auto ret = stat(this->Path().c_str(), buffer);
- auto eno = errno;
- if (ret != 0) {
- UC_ERROR("Failed to stat file, path: {}, errno: {}.", this->Path(), eno);
- return Status::OsApiError();
- }
- return Status::OK();
-}
-
Status PosixFile::Truncate(size_t length)
{
auto ret = ftruncate(this->_handle, length);
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.h b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.h
index d3fc65da..9f306c23 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/file/posix_file.h
@@ -31,27 +31,20 @@ namespace UC {
class PosixFile : public IFile {
public:
PosixFile(const std::string& path) : IFile{path}, _handle{-1} {}
- virtual ~PosixFile() override;
- virtual Status MkDir() override;
- virtual void RmDir() override;
- virtual Status Rename(const std::string& newName) override;
- virtual Status Access(const int32_t mode) override;
- virtual Status Open(const uint32_t flags) override;
- virtual void Close() override;
- virtual void Remove() override;
- virtual Status Seek2End() override;
- virtual Status Read(void* buffer, size_t size, off64_t offset = -1) override;
- virtual Status Write(const void* buffer, size_t size, off64_t offset = -1) override;
- virtual Status Lock() override;
- virtual Status Lock(uint32_t retryCnt, uint32_t intervalUs) override;
- virtual void Unlock() override;
- virtual Status MMap(off64_t offset, size_t length, void*& addr, bool wr) override;
- virtual Status Stat(struct stat* buffer) override;
- virtual Status Truncate(size_t length) override;
+ ~PosixFile() override;
+ Status MkDir() override;
+ Status RmDir() override;
+ Status Rename(const std::string& newName) override;
+ Status Access(const int32_t mode) override;
+ Status Open(const uint32_t flags) override;
+ void Close() override;
+ void Remove() override;
+ Status Read(void* buffer, size_t size, off64_t offset = -1) override;
+ Status Write(const void* buffer, size_t size, off64_t offset = -1) override;
+ Status Truncate(size_t length) override;
private:
int32_t _handle;
- uint32_t _openMode;
};
} // namespace UC
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.cc b/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.cc
index 2b2f75a2..f8bb05fe 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.cc
@@ -29,7 +29,7 @@ namespace UC {
std::shared_ptr MakePtr(void* ptr)
{
if (!ptr) {return nullptr;}
- return std::shared_ptr(ptr, [](void* p) { free(p); });
+ return std::shared_ptr(ptr, [](void* ptr) { free(ptr); });
}
std::shared_ptr Memory::Alloc(const size_t size) {return MakePtr(malloc(size));}
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.h b/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.h
index c41ecbce..90758d0d 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/memory/memory.h
@@ -31,13 +31,13 @@ namespace UC {
class Memory {
public:
- static bool Aligned(const size_t size){ return size % _alignment == 0;}
- static size_t Align(const size_t size){ return (size + _alignment - 1) / _alignment * _alignment; }
+ static bool Aligned(const size_t size) { return size % _alignment == 0;}
+ static size_t Align(const size_t size) { return (size + _alignment - 1) / _alignment * _alignment; }
static std::shared_ptr Alloc(const size_t size);
static std::shared_ptr AllocAlign(const size_t size);
private:
- static constexpr size_t _alignment{4096};
+ static constexpr size_t _alignment{4096};
};
} // namespace UC
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/status/status.h b/unifiedcache/csrc/ucmnfsstore/cc/infra/status/status.h
index 24590fde..5c52f658 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/status/status.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/status/status.h
@@ -103,7 +103,6 @@ class Status {
}
public:
- Status() : _code{Code::OK} {}
Status(const Status& status) { this->_code = status._code; }
Status& operator=(const Status& status)
{
diff --git a/unifiedcache/csrc/ucmnfsstore/cc/infra/thread/latch.h b/unifiedcache/csrc/ucmnfsstore/cc/infra/thread/latch.h
index 754d10f8..2bb8f14a 100644
--- a/unifiedcache/csrc/ucmnfsstore/cc/infra/thread/latch.h
+++ b/unifiedcache/csrc/ucmnfsstore/cc/infra/thread/latch.h
@@ -32,18 +32,15 @@ namespace UC {
class Latch {
public:
- explicit Latch(const size_t expected = 0) : _counter(expected) {}
- void Up() {++this->_counter;}
- size_t Done()
- {
- auto counter = -- this->_counter;
- if (counter == 0) { this->_cv.notify_all(); }
- return counter;
- }
+ explicit Latch(const size_t expected = 0) : _counter{expected} {}
+ void Up() { ++this->_counter; }
+ size_t Done() { return --this->_counter; }
+ void Notify() { this->_cv.notify_all(); }
void Wait()
{
- std::unique_lock lock(this->_mutex);
- this->_cv.wait(lock, [this] {return this->_counter == 0;});
+ std::unique_lock lk(this->_mutex);
+ if (this->_counter == 0) { return; }
+ this->_cv.wait(lk, [this] { return this->_counter == 0; });
}
private:
diff --git a/unifiedcache/csrc/ucmnfsstore/cmake/cuda.cmake b/unifiedcache/csrc/ucmnfsstore/cmake/cuda.cmake
index 17c8723f..2a07a3ad 100644
--- a/unifiedcache/csrc/ucmnfsstore/cmake/cuda.cmake
+++ b/unifiedcache/csrc/ucmnfsstore/cmake/cuda.cmake
@@ -1,4 +1,10 @@
+set(CUDA_ROOT "/usr/local/cuda/" CACHE PATH "Path to CUDA root directory")
add_library(Cuda::cudart UNKNOWN IMPORTED)
+set_target_properties(Cuda::cudart PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${CUDA_ROOT}/include"
+ IMPORTED_LOCATION "${CUDA_ROOT}/lib64/libcudart.so"
+)
+
add_compile_definitions(CUDA_AVAILABLE=1)
diff --git a/unifiedcache/csrc/ucmnfsstore/cmake/flags.cmake b/unifiedcache/csrc/ucmnfsstore/cmake/flags.cmake
index ed34746e..a7b85f84 100644
--- a/unifiedcache/csrc/ucmnfsstore/cmake/flags.cmake
+++ b/unifiedcache/csrc/ucmnfsstore/cmake/flags.cmake
@@ -6,8 +6,8 @@ set(FLAGS_RELEASE "-O3 -D_FORTIFY_SOURCE=2")
string(TOLOWER "${CMAKE_BUILD_TYPE}" CMAKE_BUILD_TYPE_LOWER)
if(CMAKE_BUILD_TYPE_LOWER STREQUAL "debug")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FLAGS_PUBLIC} ${FLAGS_DEBUG}")
- set(CMAKE_CXX_FLAGS "${CMAKE_C_FLAGS} ${FLAGS_PUBLIC} ${FLAGS_DEBUG}")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${FLAGS_PUBLIC} ${FLAGS_DEBUG}")
else()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FLAGS_PUBLIC} ${FLAGS_RELEASE}")
- set(CMAKE_CXX_FLAGS "${CMAKE_C_FLAGS} ${FLAGS_PUBLIC} ${FLAGS_RELEASE}")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${FLAGS_PUBLIC} ${FLAGS_RELEASE}")
endif()
diff --git a/unifiedcache/csrc/ucmnfsstore/cpy/ucmnfsstore.py.cc b/unifiedcache/csrc/ucmnfsstore/cpy/ucmnfsstore.py.cc
index 26bf73a2..ca2d7445 100644
--- a/unifiedcache/csrc/ucmnfsstore/cpy/ucmnfsstore.py.cc
+++ b/unifiedcache/csrc/ucmnfsstore/cpy/ucmnfsstore.py.cc
@@ -23,6 +23,8 @@
* */
#include "ucmnfsstore/ucmnfsstore.h"
#include
+#include
+#include "status/status.h"
namespace py = pybind11;
@@ -59,8 +61,8 @@ inline size_t SubmitTsfTasks(const py::list& blockIdList, const py::list& offset
(length != lengthList.end())) {
tasks.emplace_back(type, location, blockId->cast(), offset->cast(),
address->cast(), length->cast());
- size += length->cast();
number++;
+ size += length->cast();
blockId++;
offset++;
address++;
@@ -113,7 +115,8 @@ PYBIND11_MODULE(ucmnfsstore, module)
.def_readwrite("kvcacheBlockSize", &UC::SetupParam::kvcacheBlockSize)
.def_readwrite("transferEnable", &UC::SetupParam::transferEnable)
.def_readwrite("transferDeviceId", &UC::SetupParam::transferDeviceId)
- .def_readwrite("transferStreamNumber", &UC::SetupParam::transferStreamNumber);
+ .def_readwrite("transferStreamNumber", &UC::SetupParam::transferStreamNumber)
+ .def_readwrite("transferIoSize", &UC::SetupParam::transferIoSize);
module.def("Setup", &UC::Setup);
module.def("Alloc", &UC::AllocBatch);
module.def("Lookup", &UC::LookupBatch);
diff --git a/unifiedcache/integration/vllm/uc_connector.py b/unifiedcache/integration/vllm/uc_connector.py
index 6d9becdd..bcc507c5 100644
--- a/unifiedcache/integration/vllm/uc_connector.py
+++ b/unifiedcache/integration/vllm/uc_connector.py
@@ -1,3 +1,29 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+# Adapted from lmcache/lmcache/integration/vllm/vllm_v1_adapter.py
+#
+
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Optional, Any, Generator
@@ -110,9 +136,6 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
config = self._vllm_config.kv_transfer_config.kv_connector_extra_config["ucm_connector_config"]
config["device"] = self.rank
config["role"] = "scheduler" if role == KVConnectorRole.SCHEDULER else "worker"
- head_size = vllm_config.model_config.get_head_size()
- total_num_kv_heads = vllm_config.model_config.get_total_num_kv_heads()
- config["kv_block_size"] = self.block_size * head_size * total_num_kv_heads * self.element_size
logger.info("init UCConnectorImpl, connector: %s", name)
self.connector = UcmConnectorFactory.create_connector(name, config)
else:
@@ -451,7 +474,7 @@ def get_num_new_matched_tokens(
# we need to recompute the last token. This if condition will be removed
# once vLLM's scheduler provides a better solution in the future.
if num_external_computed_tokens == request.num_tokens:
- num_external_computed_tokens -= 1
+ num_external_computed_tokens -= self.block_size
self.load_paras[request.request_id] = LoadPara(
vllm_cached_tokens=num_computed_tokens,
storage_cached_tokens=num_external_computed_tokens,
diff --git a/unifiedcache/logger.py b/unifiedcache/logger.py
index 3c5e03ba..4eb0ff78 100644
--- a/unifiedcache/logger.py
+++ b/unifiedcache/logger.py
@@ -1,3 +1,27 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
import logging
import os
diff --git a/unifiedcache/ucm_connector/base.py b/unifiedcache/ucm_connector/base.py
index ec2c3748..e30ec049 100644
--- a/unifiedcache/ucm_connector/base.py
+++ b/unifiedcache/ucm_connector/base.py
@@ -1,3 +1,27 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
from abc import ABC, abstractmethod
from typing import List, Dict
diff --git a/unifiedcache/ucm_connector/factory.py b/unifiedcache/ucm_connector/factory.py
index f7973676..075e65bc 100644
--- a/unifiedcache/ucm_connector/factory.py
+++ b/unifiedcache/ucm_connector/factory.py
@@ -1,3 +1,27 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
import importlib
from typing import Callable
@@ -39,11 +63,12 @@ def create_connector(
return connector_cls(config)
-UcmConnectorFactory.register_connector(
- "UcmOceanStore",
- "unifiedcache.ucm_connector.ucm_oceanstor",
- "UcmOceanStore")
UcmConnectorFactory.register_connector(
"UcmDram",
"unifiedcache.ucm_connector.ucm_dram",
- "UcmDram")
\ No newline at end of file
+ "UcmDram")
+UcmConnectorFactory.register_connector(
+ "UcmNfsStore",
+ "unifiedcache.ucm_connector.ucm_nfs_store",
+ "UcmNfsStore"
+)
\ No newline at end of file
diff --git a/unifiedcache/ucm_connector/ucm_dram.py b/unifiedcache/ucm_connector/ucm_dram.py
index de4c2f08..49583bd6 100644
--- a/unifiedcache/ucm_connector/ucm_dram.py
+++ b/unifiedcache/ucm_connector/ucm_dram.py
@@ -1,3 +1,27 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
import torch
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
@@ -36,7 +60,7 @@ def __init__(self, config: Dict):
super().__init__(config)
self.dram_cache: Dict[str, any] = {}
self.max_cache_byte = int(config.get("max_cache_size", 5368709120))
- self.kv_block_size = config["kv_block_size"]
+ self.kv_block_size = int(config.get("kv_block_size", 262144))
self.max_block_num = self.max_cache_byte // self.kv_block_size
if config["role"] == "scheduler":
self.cached_blocks = set()
diff --git a/unifiedcache/ucm_connector/ucm_nfs_store.py b/unifiedcache/ucm_connector/ucm_nfs_store.py
index 0f2d8e2f..5f421f09 100644
--- a/unifiedcache/ucm_connector/ucm_nfs_store.py
+++ b/unifiedcache/ucm_connector/ucm_nfs_store.py
@@ -1,10 +1,33 @@
+#
+# MIT License
+#
+# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
import torch
-# import ucmnfsstore
from dataclasses import dataclass
from typing import List, Dict
from unifiedcache.logger import init_logger
from unifiedcache.ucm_connector import Task, UcmKVStoreBase
-import unifiedcache.ucm_connector.ucmnfsstore as ucmnfsstore
+from unifiedcache.ucm_connector import ucmnfsstore
logger = init_logger(__name__)
@@ -25,14 +48,14 @@ class UcmNfsStore(UcmKVStoreBase):
def __init__(self, config: Dict):
super().__init__(config)
-
- param = ucmnfsstore.SetupParam(config["storage_backends"],
- config["block_size"],
- config["transformer_enable"])
- if param.transferEnable:
- param.transferDeviceId = config["transfer_device_id"]
- param.transferStreamNumber = config["transfer_stream_number"]
-
+ storage_backends_config = config["storage_backends"]
+ storage_backends = [path for path in storage_backends_config.split(":") if path]
+ device_id = int(config["device"])
+ block_size = int(config["kv_block_size"])
+ enableTransfer = True if config["role"] == "worker" else False
+ param = ucmnfsstore.SetupParam(storage_backends, block_size, enableTransfer)
+ if enableTransfer:
+ param.transferDeviceId = device_id
ret = ucmnfsstore.Setup(param)
if ret != 0:
msg = f"Failed to initialize ucmnfsstore, errcode: {ret}."
@@ -95,15 +118,9 @@ def load(self, block_ids: List[str], offset: List[int], dst_tensor: List[torch.T
"""
dst_tensor_ptr = [t.data_ptr() for t in dst_tensor]
dst_tensor_size = [t.numel() * t.element_size() for t in dst_tensor]
- device_type = dst_tensor[0].device.type
-
- if device_type == "cpu":
- task_id = ucmnfsstore.LoadToHost(block_ids, offset, dst_tensor_ptr, dst_tensor_size)
- elif device_type == "cuda" or device_type == "npu":
- task_id = ucmnfsstore.LoadToDevice(block_ids, offset, dst_tensor_ptr, dst_tensor_size)
- logger.info(f"Succeed in loading kv cache to {device_type}, task id: {task_id}.")
-
- return Task(task_id=id)
+ task_id = ucmnfsstore.LoadToDevice(block_ids, offset, dst_tensor_ptr, dst_tensor_size)
+ logger.debug(f"Succeed in loading kv cache , task id: {task_id}, offset: {offset}.")
+ return NfsTask(task_id=task_id)
def dump(self, block_ids: List[str], offset: List[int], src_tensor: List[torch.Tensor]) -> Task:
"""
@@ -118,15 +135,9 @@ def dump(self, block_ids: List[str], offset: List[int], src_tensor: List[torch.T
"""
src_tensor_ptr = [t.data_ptr() for t in src_tensor]
src_tensor_size = [t.numel() * t.element_size() for t in src_tensor]
- device_type = src_tensor[0].device.type
-
- if device_type == "cpu":
- task_id = ucmnfsstore.DumpFromHost(block_ids, offset, src_tensor_ptr, src_tensor_size)
- elif device_type == "cuda" or device_type == "npu":
- task_id = ucmnfsstore.DumpFromDevice(block_ids, offset, src_tensor_ptr, src_tensor_size)
- logger.info(f"Succeed in dumping kv cache from {device_type}, task id: {task_id}.")
-
- return Task(task_id=id)
+ task_id = ucmnfsstore.DumpFromDevice(block_ids, offset, src_tensor_ptr, src_tensor_size)
+ logger.debug(f"Succeed in dumping kv cache, task id: {task_id}, offset {offset}.")
+ return NfsTask(task_id=task_id)
def wait(self, task: Task) -> int:
"""
@@ -138,11 +149,14 @@ def wait(self, task: Task) -> int:
0 - success
others - failed.
"""
+ if not isinstance(task, NfsTask):
+ logger.error("This is not NfsTask")
+ return -1
ret = ucmnfsstore.Wait(task.get_id())
if ret != 0:
logger.error(f"Failed to wait for kv cache transfer task, errcode: {ret}.")
else:
- logger.info("Succeed in waiting for kv cache transfer task.")
+ logger.debug("Succeed in waiting for kv cache transfer task.")
return ret
def commit(self, block_ids: List[str], is_success: bool = True) -> None:
@@ -153,5 +167,7 @@ def commit(self, block_ids: List[str], is_success: bool = True) -> None:
block_ids (List[str]): vLLM block hash.
is_success(bool): if False, we need release block
"""
+ if not is_success:
+ logger.warning(f"commit {block_ids} to {is_success}")
ucmnfsstore.Commit(block_ids, is_success)
- logger.info("Succeed in committing kv cache.")
+ logger.debug("Succeed in committing kv cache.")