Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove split batch inside pipline inference function #1507

Merged
merged 6 commits into from
May 2, 2024

Conversation

AllentDan
Copy link
Collaborator

@zhulinJulia24 May add a pressure test on pipeline.batch_infer and pipeline.stream_infer

Tested OK with llama70b tp 4 and 10000 prompts.

@AllentDan AllentDan requested a review from lvhan028 April 26, 2024 12:19
@lvhan028
Copy link
Collaborator

we need to verify it on desktop GPU too, like 3090

@lvhan028
Copy link
Collaborator

or Internstudio

@lvhan028
Copy link
Collaborator

lvhan028 commented Apr 27, 2024

long context should be tested too, with as many prompts as possible @AllentDan

@AllentDan
Copy link
Collaborator Author

long context should be tested too, with as many prompts as possible @AllentDan

Tested OK with internlm2-chat-7b, tp 4, session_len 16w, 256 prompts.

@lvhan028
Copy link
Collaborator

May merge the latest main

@lvhan028 lvhan028 requested a review from irexyc April 28, 2024 05:53
@lvhan028
Copy link
Collaborator

@irexyc will it affect vl pipeline?

@irexyc
Copy link
Collaborator

irexyc commented Apr 28, 2024

好像就是之前人为控制并发,每次最多处理 instance_num 个,现在用safe_run来控制,好处就是不用等当前batch全部处理完。没有什么影响。

速度有提升么?

@lvhan028
Copy link
Collaborator

对于这个接口来讲,性能会提升,因为不用分批次了。但是,对于含图像的请求,如果列表太长,内存估计会成为问题

pass

proc.join()
for i, prompt in enumerate(prompts):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On what occasion batch streaming infer is needed?

Copy link
Collaborator Author

@AllentDan AllentDan Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was just a demand from community and XTuner.

@lvhan028
Copy link
Collaborator

lvhan028 commented Apr 29, 2024

I found the stream_infer API unsuitable for benchmark throughput because benchmark throughput requires each prompt to be attached with an identical generation config, not sharing the same one by a batch of prompts.
Can we support a list of generation configs the same size as the list of prompts in batch_infer and stream_infer?

@lvhan028
Copy link
Collaborator

We can discuss the reasonability if there is any doubt

@zhulinJulia24
Copy link
Collaborator

from lmdeploy import TurbomindEngineConfig, pipeline
model_path = '/nvme/qa_test_models/internlm/internlm2-chat-7b'
backend_config = TurbomindEngineConfig(cache_max_entry_count=-1)
pipe = pipeline(model_path, backend_config=backend_config)
# batch infer
response = pipe(['Hi, pls intro yourself', 'Shanghai is'])
print(response)

the script stucked in this code

@AllentDan
Copy link
Collaborator Author

I found the stream_infer API unsuitable for benchmark throughput because benchmark throughput requires each prompt to be attached with an identical generation config, not sharing the same one by a batch of prompts. Can we support a list of generation configs the same size as the list of prompts in batch_infer and stream_infer?

Yes, we can make it an option to pass in a list of generation configs.

@lvhan028
Copy link
Collaborator

Exception ignored in: <function BaseEventLoop.__del__ at 0x7f61bfd059d0>
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/base_events.py", line 656, in __del__
    self.close()
  File "/usr/lib/python3.8/asyncio/unix_events.py", line 58, in close
    super().close()
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 92, in close
    self._close_self_pipe()
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 99, in _close_self_pipe
    self._remove_reader(self._ssock.fileno())
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 276, in _remove_reader
    key = self._selector.get_key(fd)
  File "/usr/lib/python3.8/selectors.py", line 190, in get_key
    return mapping[fileobj]
  File "/usr/lib/python3.8/selectors.py", line 71, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/usr/lib/python3.8/selectors.py", line 225, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/usr/lib/python3.8/selectors.py", line 42, in _fileobj_to_fd
    raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1

@lvhan028
Copy link
Collaborator


from lmdeploy import pipeline, GenerationConfig, PytorchEngineConfig, TurbomindEngineConfig
pipe = pipeline('/workspace/models-140/InternLM/internlm2-chat-7b', backend_config=TurbomindEngineConfig())
for outputs in pipe.stream_infer(['hi', 'how are you?', 'who are you']):
    print(outputs)

@lvhan028
Copy link
Collaborator

I found the stream_infer API unsuitable for benchmark throughput because benchmark throughput requires each prompt to be attached with an identical generation config, not sharing the same one by a batch of prompts. Can we support a list of generation configs the same size as the list of prompts in batch_infer and stream_infer?

Yes, we can make it an option to pass in a list of generation configs.

Let's make batch_infer accept a list of GenerationConfig, too.
I am working on profile_pipeline_api.py. Both stream_infer and batch_infer are profiled.

@zhulinJulia24
Copy link
Collaborator

from lmdeploy import TurbomindEngineConfig, pipeline
model_path = '/nvme/qa_test_models/internlm/internlm2-chat-7b'
backend_config = TurbomindEngineConfig(cache_max_entry_count=-1)
pipe = pipeline(model_path, backend_config=backend_config)
# batch infer
response = pipe(['Hi, pls intro yourself', 'Shanghai is'])
print(response)

the script stucked in this code

Need other pr to follow invalid config input.

@AllentDan
Copy link
Collaborator Author

AllentDan commented Apr 30, 2024

Exception ignored in: <function BaseEventLoop.__del__ at 0x7f61bfd059d0>
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/base_events.py", line 656, in __del__
    self.close()
  File "/usr/lib/python3.8/asyncio/unix_events.py", line 58, in close
    super().close()
  File "/usr/lib/python3.8/selectors.py", line 42, in _fileobj_to_fd
    raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1

Seemed a bug in the python3.8 compiler. It happens occasionally when the resource-releasing order is wrong. Cannot reproduce in python3.10. KimiNewt/pyshark#347 (comment)

@lvhan028
Copy link
Collaborator

GPU utility is unstable when profiling pipeline APIs

# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import csv
import json
import os
import random
import time
from typing import List, Tuple
from collections import OrderedDict

from tqdm import tqdm

from lmdeploy.cli.utils import ArgumentHelper, DefaultsAndTypesHelpFormatter
from lmdeploy import pipeline, GenerationConfig, TurbomindEngineConfig, PytorchEngineConfig
from transformers import AutoTokenizer

def sample_requests(
    dataset_path: str,
    num_requests: int,
    tokenizer
) -> List[Tuple[str, int, int]]:
    # Load the dataset.
    with open(dataset_path) as f:
        dataset = json.load(f)
    # Filter out the conversations with less than 2 turns.
    dataset = [data for data in dataset if len(data['conversations']) >= 2]
    # Only keep the first two turns of each conversation.
    dataset = [(data['conversations'][0]['value'],
                data['conversations'][1]['value']) for data in dataset]

    # pre-sample to avoid go through all the dataset
    dataset = random.sample(dataset, max(int(num_requests * 1.2), 1000))

    # Tokenize the prompts and completions.
    prompts = [prompt for prompt, _ in dataset]
    prompt_token_ids = tokenizer(prompts).input_ids
    completions = [completion for _, completion in dataset]
    completion_token_ids = tokenizer(completions).input_ids
    tokenized_dataset = []
    for i in range(len(dataset)):
        output_len = len(completion_token_ids[i])
        tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len))

    # Filter out too long sequences.
    filtered_dataset: List[Tuple[str, int, int]] = []
    for prompt, prompt_token_ids, output_len in tokenized_dataset:
        prompt_len = len(prompt_token_ids)
        if prompt_len < 4 or output_len < 4:
            # Prune too short sequences.
            continue
        if prompt_len > 1024 or prompt_len + output_len > 2048:
            # Prune too long sequences.
            continue
        filtered_dataset.append((prompt, prompt_len, output_len))

    # Sample the requests.
    sampled_requests = random.sample(filtered_dataset, num_requests)
    return sampled_requests


class Engine:

    def __init__(self, model_path: str,
                 engine_config, csv: str):
        self.pipe = pipeline(model_path, backend_config=engine_config)
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_path,
            trust_remote_code=True)

        self.csv = csv
        self.pbar = None

    def process_request(self, requests, concurrency, temperature, top_p, top_k,
                        stream_output):
  
        stats = OrderedDict((session_id, None) for session_id in range(len(requests)))
        prompts = [prompt for prompt, _, _ in requests]
        gen_configs = [GenerationConfig(temperature=temperature,
                                        top_p=top_p,
                                        top_k=top_k,
                                        ignore_eos=True,
                                        max_new_tokens=output_len) for _, _, output_len in requests]

        start = time.perf_counter()
        if stream_output:
            self.pbar = tqdm(total=len(requests))
            for output in self.pipe.stream_infer(prompts, gen_configs, do_preprocess=False):
                session_id = output.session_id
                n_token = output.generate_token_len
                finish_reason = output.finish_reason
                stats[session_id] = (n_token, finish_reason)
                if finish_reason is not None:
                    self.pbar.update(1)
        else:
            for output in self.pipe(prompts, gen_configs, do_preprocess=False):
                session_id = output.session_id
                n_token = output.generate_token_len
                finish_reason = output.finish_reason
                stats[session_id] = (n_token, finish_reason)

        elapsed_time = time.perf_counter() - start

        completion_tokens = 0
        for session_id, (n_token, finish_reason) in stats.items():
            assert finish_reason == 'length', \
                f'unexpected finish_reason of session_id={session_id}, ' \
                f'prompt={requests[session_id][0]}'
            assert n_token -1 <= requests[session_id][-1] <= n_token, \
                f'request to generate {requests[session_id][-1]} tokens, ' \
                f'but got {n_token} tokens'
            completion_tokens += n_token
            
        prompt_tokens = 0
        for _, input_len, _ in requests:
            prompt_tokens += input_len

        completion_token_throughput = completion_tokens / elapsed_time
        total_token_throughput = (prompt_tokens + completion_tokens) / elapsed_time
        rps = len(requests) / elapsed_time
        rpm = rps * 60

        print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
              f'elapsed_time: {elapsed_time:.3f}s\n')
        
        print(
            f'number of prompt tokens: {prompt_tokens:.0f}\n'
            f'number of completion tokens: {completion_tokens:.0f}\n'
            f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n'  # noqa
            f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n'  # noqa
            f'RPS (request per second): {rps:.3f} req/s\n'
            f'RPM (request per minute): {rpm:.3f} req/min\n'
            f'{"-" * 50}\n')

        if self.csv:
            with open(self.csv, 'w') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow([
                    'batch', 'num_promts', 'RPS', 'RPM', 
                    'throughput(out tok/s)',
                    'throughput(total tok/s)'
                ])
                writer.writerow([
                    concurrency,
                    len(requests), f'{rps:.3f}', f'{rpm:.3f}',
                    f'{completion_token_throughput:.3f}',
                    f'{total_token_throughput:.3f}'
                ])


def parse_args():
    parser = argparse.ArgumentParser(
        description='Benchmark the request throughput of lmdeploy '
        'in localhost',
        formatter_class=DefaultsAndTypesHelpFormatter)
    parser.add_argument('dataset', type=str, help='the path dataset')
    parser.add_argument('model_path',
                        type=str,
                        help='the path of the model in localhost or '
                        'the repo_id of the model in huggingface.co')
    parser.add_argument(
        '-c',
        '--concurrency',
        type=int,
        help='Number of working threads to process the sampled prompts',
        default=256)
    parser.add_argument('-n',
                        '--num-prompts',
                        type=int,
                        help='Number of prompts to process',
                        default=5000)
    parser.add_argument('--csv',
                        type=str,
                        help='Where to save the result.',
                        default='./profile_throughput.csv')
    parser.add_argument('--seed',
                        type=int,
                        default=0,
                        help='Seed used in sampling prompts from dataset')
    parser.add_argument('--stream-output',
                        action='store_true',
                        help='Trust remote code for loading hf models')
    # other args
    ArgumentHelper.top_p(parser)
    ArgumentHelper.temperature(parser)
    ArgumentHelper.top_k(parser)
    ArgumentHelper.log_level(parser)
    ArgumentHelper.backend(parser)

    # pytorch engine args
    pt_group = parser.add_argument_group('PyTorch engine arguments')
    tp_act = ArgumentHelper.tp(pt_group)
    session_len_act = ArgumentHelper.session_len(pt_group, default=4096)
    cache_count_act = ArgumentHelper.cache_max_entry_count(pt_group)
    cache_block_seq_len_act = ArgumentHelper.cache_block_seq_len(pt_group)

    # turbomind engine args
    tb_group = parser.add_argument_group('TurboMind engine argument')
    tb_group._group_actions.append(tp_act)
    tb_group._group_actions.append(session_len_act)
    tb_group._group_actions.append(cache_count_act)
    tb_group._group_actions.append(cache_block_seq_len_act)
    ArgumentHelper.model_format(tb_group, default='hf')
    ArgumentHelper.quant_policy(tb_group, default=0)
    ArgumentHelper.num_tokens_per_iter(tb_group)
    ArgumentHelper.max_prefill_iters(tb_group)

    args = parser.parse_args()
    return args


def main():
    args = parse_args()
    random.seed(args.seed)
    os.environ['TM_LOG_LEVEL'] = args.log_level
    if args.backend == 'turbomind':
        engine_config = TurbomindEngineConfig(
            session_len=args.session_len,
            max_batch_size=args.concurrency,
            tp=args.tp,
            cache_max_entry_count=args.cache_max_entry_count,
            cache_block_seq_len=args.cache_block_seq_len,
            model_format=args.model_format,
            quant_policy=args.quant_policy,
            num_tokens_per_iter=args.num_tokens_per_iter,
            max_prefill_iters=args.max_prefill_iters)
    elif args.backend == 'pytorch':
        engine_config = PytorchEngineConfig(
            session_len=args.session_len,
            cache_max_entry_count=args.cache_max_entry_count,
            block_size=args.cache_block_seq_len,
            max_batch_size=args.concurrency,
            tp=args.tp,
            thread_safe=True)

    engine = Engine(args.model_path, engine_config, csv=args.csv)

    requests = sample_requests(args.dataset, args.num_prompts,
                               engine.tokenizer)

    engine.process_request(requests,
                           temperature=args.temperature,
                           top_p=args.top_p,
                           top_k=args.top_k,
                           concurrency=args.concurrency,
                           stream_output=args.stream_output)


if __name__ == '__main__':
    main()

@AllentDan
Copy link
Collaborator Author

In my testing, GPU utility was stable at 97%+. Here are my commands:

python benchmark/benchmark_pipeline.py ~/data/ShareGPT_V3_unfiltered_cleaned_split.json /nvme/shared_data/llama2/huggingface/llama-2-7b-chat --stream-output --concurrency 128

python benchmark/benchmark_pipeline.py ~/data/ShareGPT_V3_unfiltered_cleaned_split.json /nvme/shared_data/llama2/huggingface/llama-2-7b-chat
``

@lvhan028
Copy link
Collaborator

@irexyc @zhulinJulia24 could you help double check the performance?

@lvhan028
Copy link
Collaborator

It worked well after I changed python 3.10 env

@lvhan028
Copy link
Collaborator

@AllentDan I tested pytorch backend. The performance is much lower than profile_throughput.py.
@grimoire may pay attention to it

@lvhan028 lvhan028 requested a review from grimoire April 30, 2024 05:48
@grimoire
Copy link
Collaborator

@AllentDan I tested pytorch backend. The performance is much lower than profile_throughput.py. @grimoire may pay attention to it

https://github.com/InternLM/lmdeploy/pull/1528/files#r1584267353

@lvhan028
Copy link
Collaborator

@AllentDan I tested pytorch backend. The performance is much lower than profile_throughput.py. @grimoire may pay attention to it

https://github.com/InternLM/lmdeploy/pull/1528/files#r1584267353

I set thread_safe=False, but the performance of pipeline API is still much lower than profile_throughput.py for pytorch engine
GPU utility is low.

CUDA_VISIBLE_DEVICES=1 python benchmark_api.py ShareGPT_V3_un
filtered_cleaned_split.json /mnt/140/InternLM/internlm2-chat-7b/ --concurrency 256 --num-prompts 10000 --stream-output -
-backend pytorch

@lvhan028 lvhan028 merged commit 40f340e into InternLM:main May 2, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants