Skip to content

Commit

Permalink
Add a few modifications
Browse files Browse the repository at this point in the history
  • Loading branch information
lanking520 committed Dec 7, 2022
1 parent b0f7364 commit 35589b4
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 25 deletions.
53 changes: 53 additions & 0 deletions .github/workflows/llm_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,59 @@ jobs:
name: logs
path: tests/integration/logs/

ds-handler-test:
runs-on: [ self-hosted, g5 ]
timeout-minutes: 60
needs: create-runners
steps:
- uses: actions/checkout@v3
- name: Clean env
run: |
yes | docker system prune -a --volumes
sudo rm -rf /home/ubuntu/actions-runner/_work/_tool/Java_Corretto_jdk/
echo "wait dpkg lock..."
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do sleep 5; done
- name: Set up Python3
uses: actions/setup-python@v4
with:
python-version: '3.10.x'
- name: Install pip dependencies
run: pip3 install requests
- name: Build container name
run: ./serving/docker/scripts/docker_name_builder.sh deepspeed ${{ github.event.inputs.djl-version }}
- name: Download models and dockers
working-directory: tests/integration
run: |
docker pull deepjavalibrary/djl-serving:$DJLSERVING_DOCKER_TAG
mkdir logs
- name: Test bloom-7b
working-directory: tests/integration
run: |
sudo python3 llm/prepare.py deepspeed bloom-7b1-int8
./launch_container.sh deepjavalibrary/djl-serving:$DJLSERVING_DOCKER_TAG $PWD/models deepspeed \
serve
python3 llm/client.py deepspeed bloom-7b1-int8
docker rm -f $(docker ps -aq)
- name: Test GPTJ-6B
working-directory: tests/integration
run: |
sudo python3 llm/prepare.py deepspeed gpt-j-6b
./launch_container.sh deepjavalibrary/djl-serving:$DJLSERVING_DOCKER_TAG $PWD/models deepspeed \
serve
python3 llm/client.py deepspeed gpt-j-6b
docker rm -f $(docker ps -aq)
sudo rm -rf models
- name: On fail step
if: ${{ failure() }}
working-directory: tests/integration
run: |
cat logs/serving.log
- name: Upload test logs
uses: actions/upload-artifact@v3
with:
name: logs
path: tests/integration/logs/


stop-runners:
if: always()
Expand Down
35 changes: 20 additions & 15 deletions engines/python/setup/djl_python/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def get_torch_dtype_from_str(dtype: str):
return torch.bfloat16
elif dtype == "int8":
return torch.int8
elif dtype is None:
return None
else:
raise ValueError(f"Invalid data type: {dtype}")

Expand All @@ -120,47 +122,47 @@ def __init__(self):
self.data_type = None
self.max_tokens = None
self.device = None
self.world_size = None
self.tensor_parallel_degree = None
self.model_config = None
self.low_cpu_mem_usage = False

def initialize(self, properties: dict):
self.parse_properties(properties)
self.validate_model_type_and_task()
self._parse_properties(properties)
self._validate_model_type_and_task()
self.create_model_pipeline()
self.logger.info(f"Initialized DeepSpeed model with the following configurations"
f"model: {self.model_id}"
f"task: {self.task}"
f"data_type: {self.data_type}"
f"data_type: {self.ds_config['dtype']}"
f"tensor_parallel_degree: {self.tensor_parallel_degree}")
self.initialized = True

def _parse_properties(self, properties):
self.model_dir = properties.get("model_dir")
self.model_id = properties.get("model_id")
self.task = properties.get("task")
self.data_type = get_torch_dtype_from_str(properties.get("data_type", "fp32"))
self.data_type = get_torch_dtype_from_str(properties.get("dtype"))
self.max_tokens = int(properties.get("max_tokens", 1024))
self.device = int(os.getenv("LOCAL_RANK", 0))
self.world_size = int(os.getenv("WORLD_SIZE", 1))
self.tensor_parallel_degree = int(properties.get("tensor_parallel_degree", self.world_size))
self.tensor_parallel_degree = int(properties.get("tensor_parallel_degree", 1))
self.low_cpu_mem_usage = properties.get("low_cpu_mem_usage", "true").lower() == "true"
self.ds_config = {
"replace_with_kernel_inject": True,
"dtype": self.data_type,
"mp_size": self.tensor_parallel_degree,
"mpu": None,
"enable_cuda_graph": properties.get("enable_cuda_graph", "false").lower() == "true",
"triangular_masking": properties.get("triangular_masking", "true").lower() == "true",
"checkpoint": properties.get("checkpoint"),
"base_dir": properties.get("base_dir"),
"return_tuple": properties.get("return_tuple", "true").lower() == "true",
"training_mp_size": int(properties.get("training_mp_size", 1)),
"replace_method": "auto",
"injection_policy": None,
"max_tokens": self.max_tokens,
}
if properties.get("checkpoint"):
self.ds_config["checkpoint"] = os.path.join(self.model_dir, properties.get("checkpoint"))
self.ds_config["base_dir"] = self.model_dir
if self.data_type is None:
raise ValueError("dtype should also be provided for checkpoint loading")

def _validate_model_type_and_task(self):
if not self.model_id:
Expand Down Expand Up @@ -194,14 +196,17 @@ def infer_task_from_model_architecture(self, config: PretrainedConfig):

def create_model_pipeline(self):
# If a ds checkpoint is provided, we instantiate model with meta tensors. weights loaded when DS engine invoked
# Workaround on int8. fp16 fp32 bf16 init supported
dtype = torch.float16 if self.data_type == torch.int8 else self.data_type
kwargs = {"torch_dtype" : dtype} if dtype else {}
if self.ds_config["checkpoint"]:
dtype = torch.float32 if self.data_type == torch.float32 else torch.float16
with deepspeed.OnDevice(dtype=dtype, device="meta"):
model = TASK_TO_MODEL[self.task].from_config(self.model_config)
model = TASK_TO_MODEL[self.task].from_config(self.model_config, **kwargs)
else:
model = TASK_TO_MODEL[self.task].from_pretrained(self.model_id, low_cpu_mem_usage=self.low_cpu_mem_usage)

model = TASK_TO_MODEL[self.task].from_pretrained(self.model_id, low_cpu_mem_usage=self.low_cpu_mem_usage,
**kwargs)
model.eval()
self.ds_config["dtype"] = torch.int8 if self.data_type == torch.int8 else model.dtype
tokenizer = AutoTokenizer.from_pretrained(self.model_id)
self.pipeline = pipeline(task=self.task, model=model, tokenizer=tokenizer, device=self.device)
if self.model_config.model_type in MODEL_TYPE_TO_INJECTION_POLICY:
Expand Down Expand Up @@ -244,7 +249,7 @@ def inference(self, inputs: Input):
json_input = inputs.get_as_json()
if isinstance(json_input, dict):
input_data = self.format_input_for_task(json_input.pop("inputs"))
model_kwargs = json_input
model_kwargs = json_input.pop("parameters", None)
else:
input_data = json_input
else:
Expand Down
26 changes: 17 additions & 9 deletions tests/integration/llm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"bloom-7b1": {"max_memory_per_gpu": 10.0, "batch_size": [1, 2, 4, 8], "seq_length": [64, 128]}
}

ds_model_spec = {
"gpt-j-6b": {"max_memory_per_gpu": 14.0, "batch_size": [1, 2, 4, 8], "seq_length": [64, 128, 256], "worker": 2},
"bloom-7b1-int8": {"max_memory_per_gpu": 10.0, "batch_size": [1, 2, 4, 8], "seq_length": [64, 128, 256]}
}


def check_worker_number(desired):
endpoint = "http://127.0.0.1:8080/models/test"
Expand Down Expand Up @@ -64,10 +69,10 @@ def batch_generation(batch_size):
return input_sentences[: batch_size]


def test_hf_model(model):
if model not in hf_model_spec:
raise ValueError(f"{args.model} is not one of the supporting models {list(hf_model_spec.keys())}")
spec = hf_model_spec[args.model]
def test_handler(model, model_spec):
if model not in model_spec:
raise ValueError(f"{args.model} is not one of the supporting models {list(model_spec.keys())}")
spec = model_spec[args.model]
if "worker" in spec:
check_worker_number(spec["worker"])
for batch_size in spec["batch_size"]:
Expand Down Expand Up @@ -103,10 +108,13 @@ def test_ds_raw_model(model):
assert float(memory) / 1024.0 < spec["max_memory_per_gpu"]


supported_handler = {'deepspeed': None, 'huggingface': test_hf_model, "deepspeed_raw": test_ds_raw_model}

if __name__ == '__main__':
args = parser.parse_args()
if args.handler not in supported_handler:
raise ValueError(f"{args.handler} is not one of the supporting handler {list(supported_handler.keys())}")
supported_handler[args.handler](args.model)
if args.handler == "deepspeed_raw":
test_ds_raw_model(args.model)
elif args.handler == "huggingface":
test_handler(args.model, hf_model_spec)
elif args.handler == "deepspeed":
test_handler(args.model, ds_model_spec)
else:
raise ValueError(f"{args.handler} is not one of the supporting handler")
19 changes: 18 additions & 1 deletion tests/integration/llm/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
"option.task": "text-generation", "option.load_in_8bit": "TRUE", "option.device_map": "auto"}
}

ds_handler_list = {
"gpt-j-6b": {"option.s3url": "s3://djl-llm/gpt-j-6b/", "option.task": "text-generation",
"option.tensor_parallel_degree": 2, "option.dtype": "bf16"},
"bloom-7b1-int8": {"option.s3url": "s3://djl-llm/bloom-7b1/", "option.tensor_parallel_degree": 4,
"option.task": "text-generation", "option.dtype": "int8"}
}


def write_prperties(properties):
model_path = "models/test"
Expand All @@ -45,14 +52,24 @@ def build_hf_handler_model(model):
write_prperties(options)


def build_ds_handler_model(model):
if model not in hf_handler_list:
raise ValueError(f"{model} is not one of the supporting handler {list(hf_handler_list.keys())}")
options = hf_handler_list[model]
options["engine"] = "DeepSpeed"
options["option.entryPoint"] = "djl_python.deepspeed"
write_prperties(options)


def build_ds_raw_model(model):
options = ds_model_list[model]
options["engine"] = "DeepSpeed"
write_prperties(options)
shutil.copyfile("llm/deepspeed-model.py", "models/test/model.py")


supported_handler = {'deepspeed': None, 'huggingface': build_hf_handler_model, "deepspeed_raw": build_ds_raw_model}
supported_handler = {'deepspeed': build_ds_handler_model, 'huggingface': build_hf_handler_model,
"deepspeed_raw": build_ds_raw_model}

if __name__ == '__main__':
args = parser.parse_args()
Expand Down

0 comments on commit 35589b4

Please sign in to comment.