Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DeepSpeed] Fix datatype and allow checkpoint loading #366

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions .github/workflows/llm_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,59 @@ jobs:
name: logs
path: tests/integration/logs/

ds-handler-test:
runs-on: [ self-hosted, g5 ]
timeout-minutes: 60
needs: create-runners
steps:
- uses: actions/checkout@v3
- name: Clean env
run: |
yes | docker system prune -a --volumes
sudo rm -rf /home/ubuntu/actions-runner/_work/_tool/Java_Corretto_jdk/
echo "wait dpkg lock..."
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do sleep 5; done
- name: Set up Python3
uses: actions/setup-python@v4
with:
python-version: '3.10.x'
- name: Install pip dependencies
run: pip3 install requests
- name: Build container name
run: ./serving/docker/scripts/docker_name_builder.sh deepspeed ${{ github.event.inputs.djl-version }}
- name: Download models and dockers
working-directory: tests/integration
run: |
docker pull deepjavalibrary/djl-serving:$DJLSERVING_DOCKER_TAG
mkdir logs
- name: Test bloom-7b
working-directory: tests/integration
run: |
sudo python3 llm/prepare.py deepspeed bloom-7b1-int8
./launch_container.sh deepjavalibrary/djl-serving:$DJLSERVING_DOCKER_TAG $PWD/models deepspeed \
serve
python3 llm/client.py deepspeed bloom-7b1-int8
docker rm -f $(docker ps -aq)
- name: Test GPTJ-6B
working-directory: tests/integration
run: |
sudo python3 llm/prepare.py deepspeed gpt-j-6b
./launch_container.sh deepjavalibrary/djl-serving:$DJLSERVING_DOCKER_TAG $PWD/models deepspeed \
serve
python3 llm/client.py deepspeed gpt-j-6b
docker rm -f $(docker ps -aq)
sudo rm -rf models
- name: On fail step
if: ${{ failure() }}
working-directory: tests/integration
run: |
cat logs/serving.log
- name: Upload test logs
uses: actions/upload-artifact@v3
with:
name: logs
path: tests/integration/logs/


stop-runners:
if: always()
Expand Down
35 changes: 20 additions & 15 deletions engines/python/setup/djl_python/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def get_torch_dtype_from_str(dtype: str):
return torch.bfloat16
elif dtype == "int8":
return torch.int8
elif dtype is None:
return None
else:
raise ValueError(f"Invalid data type: {dtype}")

Expand All @@ -120,47 +122,47 @@ def __init__(self):
self.data_type = None
self.max_tokens = None
self.device = None
self.world_size = None
self.tensor_parallel_degree = None
self.model_config = None
self.low_cpu_mem_usage = False

def initialize(self, properties: dict):
self.parse_properties(properties)
self.validate_model_type_and_task()
self._parse_properties(properties)
self._validate_model_type_and_task()
self.create_model_pipeline()
self.logger.info(f"Initialized DeepSpeed model with the following configurations"
f"model: {self.model_id}"
f"task: {self.task}"
f"data_type: {self.data_type}"
f"data_type: {self.ds_config['dtype']}"
f"tensor_parallel_degree: {self.tensor_parallel_degree}")
self.initialized = True

def _parse_properties(self, properties):
self.model_dir = properties.get("model_dir")
self.model_id = properties.get("model_id")
self.task = properties.get("task")
self.data_type = get_torch_dtype_from_str(properties.get("data_type", "fp32"))
self.data_type = get_torch_dtype_from_str(properties.get("dtype"))
self.max_tokens = int(properties.get("max_tokens", 1024))
self.device = int(os.getenv("LOCAL_RANK", 0))
self.world_size = int(os.getenv("WORLD_SIZE", 1))
self.tensor_parallel_degree = int(properties.get("tensor_parallel_degree", self.world_size))
self.tensor_parallel_degree = int(properties.get("tensor_parallel_degree", 1))
self.low_cpu_mem_usage = properties.get("low_cpu_mem_usage", "true").lower() == "true"
self.ds_config = {
"replace_with_kernel_inject": True,
"dtype": self.data_type,
"mp_size": self.tensor_parallel_degree,
"mpu": None,
"enable_cuda_graph": properties.get("enable_cuda_graph", "false").lower() == "true",
"triangular_masking": properties.get("triangular_masking", "true").lower() == "true",
"checkpoint": properties.get("checkpoint"),
"base_dir": properties.get("base_dir"),
"return_tuple": properties.get("return_tuple", "true").lower() == "true",
"training_mp_size": int(properties.get("training_mp_size", 1)),
"replace_method": "auto",
"injection_policy": None,
"max_tokens": self.max_tokens,
}
if properties.get("checkpoint"):
self.ds_config["checkpoint"] = os.path.join(self.model_dir, properties.get("checkpoint"))
self.ds_config["base_dir"] = self.model_dir
if self.data_type is None:
raise ValueError("dtype should also be provided for checkpoint loading")

def _validate_model_type_and_task(self):
if not self.model_id:
Expand Down Expand Up @@ -194,14 +196,17 @@ def infer_task_from_model_architecture(self, config: PretrainedConfig):

def create_model_pipeline(self):
# If a ds checkpoint is provided, we instantiate model with meta tensors. weights loaded when DS engine invoked
# Workaround on int8. fp16 fp32 bf16 init supported
dtype = torch.float16 if self.data_type == torch.int8 else self.data_type
kwargs = {"torch_dtype" : dtype} if dtype else {}
if self.ds_config["checkpoint"]:
dtype = torch.float32 if self.data_type == torch.float32 else torch.float16
with deepspeed.OnDevice(dtype=dtype, device="meta"):
model = TASK_TO_MODEL[self.task].from_config(self.model_config)
model = TASK_TO_MODEL[self.task].from_config(self.model_config, **kwargs)
else:
model = TASK_TO_MODEL[self.task].from_pretrained(self.model_id, low_cpu_mem_usage=self.low_cpu_mem_usage)

model = TASK_TO_MODEL[self.task].from_pretrained(self.model_id, low_cpu_mem_usage=self.low_cpu_mem_usage,
**kwargs)
model.eval()
self.ds_config["dtype"] = torch.int8 if self.data_type == torch.int8 else model.dtype
tokenizer = AutoTokenizer.from_pretrained(self.model_id)
self.pipeline = pipeline(task=self.task, model=model, tokenizer=tokenizer, device=self.device)
if self.model_config.model_type in MODEL_TYPE_TO_INJECTION_POLICY:
Expand Down Expand Up @@ -244,7 +249,7 @@ def inference(self, inputs: Input):
json_input = inputs.get_as_json()
if isinstance(json_input, dict):
input_data = self.format_input_for_task(json_input.pop("inputs"))
model_kwargs = json_input
model_kwargs = json_input.pop("parameters", None)
else:
input_data = json_input
else:
Expand Down
26 changes: 17 additions & 9 deletions tests/integration/llm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"bloom-7b1": {"max_memory_per_gpu": 10.0, "batch_size": [1, 2, 4, 8], "seq_length": [64, 128]}
}

ds_model_spec = {
"gpt-j-6b": {"max_memory_per_gpu": 14.0, "batch_size": [1, 2, 4, 8], "seq_length": [64, 128, 256], "worker": 2},
"bloom-7b1-int8": {"max_memory_per_gpu": 10.0, "batch_size": [1, 2, 4, 8], "seq_length": [64, 128, 256]}
}


def check_worker_number(desired):
endpoint = "http://127.0.0.1:8080/models/test"
Expand Down Expand Up @@ -64,10 +69,10 @@ def batch_generation(batch_size):
return input_sentences[: batch_size]


def test_hf_model(model):
if model not in hf_model_spec:
raise ValueError(f"{args.model} is not one of the supporting models {list(hf_model_spec.keys())}")
spec = hf_model_spec[args.model]
def test_handler(model, model_spec):
if model not in model_spec:
raise ValueError(f"{args.model} is not one of the supporting models {list(model_spec.keys())}")
spec = model_spec[args.model]
if "worker" in spec:
check_worker_number(spec["worker"])
for batch_size in spec["batch_size"]:
Expand Down Expand Up @@ -103,10 +108,13 @@ def test_ds_raw_model(model):
assert float(memory) / 1024.0 < spec["max_memory_per_gpu"]


supported_handler = {'deepspeed': None, 'huggingface': test_hf_model, "deepspeed_raw": test_ds_raw_model}

if __name__ == '__main__':
args = parser.parse_args()
if args.handler not in supported_handler:
raise ValueError(f"{args.handler} is not one of the supporting handler {list(supported_handler.keys())}")
supported_handler[args.handler](args.model)
if args.handler == "deepspeed_raw":
test_ds_raw_model(args.model)
elif args.handler == "huggingface":
test_handler(args.model, hf_model_spec)
elif args.handler == "deepspeed":
test_handler(args.model, ds_model_spec)
else:
raise ValueError(f"{args.handler} is not one of the supporting handler")
19 changes: 18 additions & 1 deletion tests/integration/llm/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
"option.task": "text-generation", "option.load_in_8bit": "TRUE", "option.device_map": "auto"}
}

ds_handler_list = {
"gpt-j-6b": {"option.s3url": "s3://djl-llm/gpt-j-6b/", "option.task": "text-generation",
"option.tensor_parallel_degree": 2, "option.dtype": "bf16"},
"bloom-7b1-int8": {"option.s3url": "s3://djl-llm/bloom-7b1/", "option.tensor_parallel_degree": 4,
"option.task": "text-generation", "option.dtype": "int8"}
}


def write_prperties(properties):
model_path = "models/test"
Expand All @@ -45,14 +52,24 @@ def build_hf_handler_model(model):
write_prperties(options)


def build_ds_handler_model(model):
if model not in hf_handler_list:
raise ValueError(f"{model} is not one of the supporting handler {list(hf_handler_list.keys())}")
options = hf_handler_list[model]
options["engine"] = "DeepSpeed"
options["option.entryPoint"] = "djl_python.deepspeed"
write_prperties(options)


def build_ds_raw_model(model):
options = ds_model_list[model]
options["engine"] = "DeepSpeed"
write_prperties(options)
shutil.copyfile("llm/deepspeed-model.py", "models/test/model.py")


supported_handler = {'deepspeed': None, 'huggingface': build_hf_handler_model, "deepspeed_raw": build_ds_raw_model}
supported_handler = {'deepspeed': build_ds_handler_model, 'huggingface': build_hf_handler_model,
"deepspeed_raw": build_ds_raw_model}

if __name__ == '__main__':
args = parser.parse_args()
Expand Down