In [1]:
from utils import Policy, logging
# from forward import flexgen
from test import test_hf_gen

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

checkpoint = "facebook/opt-125m" # 125m 6.7b 13b 30b
# checkpoint = "Salesforce/codegen-350M-mono"
# checkpoint = 'bigscience/bloom-560m'

policy = Policy(
    gpu_batch_size=2, 
    num_gpu_batches=4, 
    weights_gpu_percent=0.0, 
    weights_cpu_percent=0.3, 
    cache_gpu_percent=0.0, 
    cache_cpu_percent=0.2, 
    act_gpu_percent=0.0, 
    act_cpu_percent=0.5, 
    overlap=True, 
    pin_weight=True,
)


In [2]:
# forward.py: rewrite layer forward function

import torch
import functools 
import contextlib

# from minibatch import get_size_info, load_kth_batch_inputs, concat_outputs
from utils import get_module_from_name


def reset_forward(model, layer_name):        
    layer = get_module_from_name(model, layer_name) 

    if hasattr(layer, "_flexgen_old_forward"):
        layer.forward = layer._flexgen_old_forward
        delattr(layer, "_flexgen_old_forward")
        logger.debug(f'{layer_name} from flexgen to old.')

    if hasattr(layer, "_test_old_forward"):
        layer.forward = layer._test_old_forward
        delattr(layer, "_test_old_forward")
        logger.debug(f'{layer_name} from test to old.')

def to_test_forward(mpl, layer_name, call_layer_log):
    layer = get_module_from_name(mpl.model, layer_name) 
    compute_device = 'cpu' 
    layer._test_old_forward = old_forward = layer.forward 

    @functools.wraps(old_forward)
    def new_forward(*args, **kwargs):
        mpl.load_layer_weights(layer_name, compute_device) 

        call_layer_log.append(layer_name)  # 

        with torch.no_grad():
            output = old_forward(*args, **kwargs)

        mpl.offload_layer_weights(layer_name)
        return output

    layer.forward = new_forward
    logger.debug(f'{layer_name} to test forward') 

@contextlib.contextmanager
def test(mpl, call_layer_log):
    model = mpl.model
    layer_names = mpl.layer_names

    # test run to get layer calling order
    for layer_name in layer_names:
        to_test_forward(mpl, layer_name, call_layer_log)
    yield 
    for layer_name in layer_names:
        reset_forward(model, layer_name)



In [3]:
from typing import Mapping, Tuple
import numpy as np 
import os 
import torch
from math import floor

class MixTensor:
    def __init__(
        self, 
        mix_data: Tuple, 
        split_dim: int, 
        device: torch.device, 
        shape: torch.Size,
        percents: Mapping[str, float],
        file_path: str,
        dtype
    ):
        self.mix_data = mix_data
        self.split_dim = split_dim 
        self.device = device 
        self.shape = shape 
        self.percents = percents
        self.file_path = file_path
        self.dtype = dtype
    
    def size(self):
        return self.shape 
    
    @staticmethod
    def get_split_dim(tensor):
        dim_sizes = tensor.size()
        max_dim, max_size = -1, -1
        for dim, size in enumerate(dim_sizes):
            if size > max_size:
                max_size = size
                max_dim = dim 
        return max_dim 
    
    @staticmethod
    def tensor_dim_slice(tensor, dim, dim_slice):
        return tensor[(dim if dim >= 0 else dim + tensor.dim()) * (slice(None), ) + (dim_slice, )]
    
    @staticmethod
    def split_tensor(tensor, dim, percents):
        dim_size = tensor.size(dim)
        g_per, c_per, _ = [percents[dev] for dev in ['cuda', 'cpu', 'disk']]
        
        g_cut = floor(dim_size * g_per)
        c_cut = floor(dim_size * (g_per + c_per))

        g_data = MixTensor.tensor_dim_slice(tensor, dim, slice(0, g_cut))
        c_data = MixTensor.tensor_dim_slice(tensor, dim, slice(g_cut, c_cut))
        d_data = MixTensor.tensor_dim_slice(tensor, dim, slice(c_cut, dim_size))
        return g_data, c_data, d_data 

    @classmethod
    def from_tensor(
        cls, 
        tensor: torch.Tensor, 
        percents: Mapping[str, float],
        file_path: str 
    ):
        split_dim = cls.get_split_dim(tensor) 
        device = tensor.device 
        shape = tensor.shape
        dtype = tensor.dtype
        
        g_data, c_data, d_data = cls.split_tensor(tensor, split_dim, percents) 
        
        g_data = g_data.to('cuda' if torch.cuda.is_available() else 'cpu') if g_data.numel() else None
        c_data = c_data.to('cpu') if c_data.numel() else None
        if d_data.numel():
            d_data = d_data.cpu().numpy()
            shape = d_data.shape
            np_dtype = d_data.dtype 

            fp = np.memmap(file_path, mode="w+", shape=shape, dtype=np_dtype)
            fp[:] = d_data[:]
            d_data = (shape, np_dtype)
        else:
            d_data = None 
        mix_data = (g_data, c_data, d_data)

        return cls(
            mix_data=mix_data,
            split_dim=split_dim,
            device=device,
            shape=shape,
            percents=percents,
            file_path=file_path,
            dtype=dtype
        )

    @classmethod 
    def from_mixtensor(cls, mix_tensor):
        self = mix_tensor 
        return self 

    def to_tensor(self):
        g_data, c_data, d_data = self.mix_data 
        compute_device = self.device 

        tensor = []
        if g_data is not None:
            if g_data.device != torch.device(compute_device):
                g_data = g_data.to(compute_device) 
            tensor.append(g_data)
        if c_data is not None:
            if c_data.device != torch.device(compute_device):
                c_data = c_data.to(compute_device) 
            tensor.append(c_data)
        if d_data is not None:
            (shape, np_dtype) = d_data 
            d_data = np.memmap(self.file_path, shape=shape, dtype=np_dtype, mode='r')
            d_data = torch.from_numpy(d_data).to(compute_device)
            tensor.append(d_data)
            
        tensor = torch.cat(tensor, dim=self.split_dim) 

        return tensor        

    def __add__(self, mix_tensor):
        assert self.shape == mix_tensor.shape and type(self) == type(mix_tensor) # is same shape mix tensor
        res = self.to_tensor() + mix_tensor.to_tensor() 
        return self.from_tensor(res, self.percents, self.file_path)

if __name__ == '__main__':
    
    x = torch.tensor([1,2,3])
    m = MixTensor.from_tensor(x, percents={'cuda':0, 'cpu':0.5, 'disk':0.5}, file_path='test/m.dat')
    m2 = MixTensor.from_tensor(x, percents={'cuda':0, 'cpu':0.5, 'disk':0.5}, file_path='test/m2.dat')
    m = m + m2
    print(m.to_tensor())



tensor([2, 4, 6])


  d_data = torch.from_numpy(d_data).to(compute_device)


In [10]:

import torch 
from accelerate.utils import honor_type
from typing import Mapping
from utils import logging 


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

def get_type_size_info(obj): # recursive
    if isinstance(obj, (tuple, list)):
        return honor_type(obj, (get_type_size_info(o) for o in obj))
    elif isinstance(obj, Mapping):
        return type(obj)({k:get_type_size_info(v) for k, v in obj.items()})
    
    elif isinstance(obj, (torch.Tensor, MixTensor, BatchMixTensor)):
        return f'{type(obj)}: {obj.size()}'

    elif isinstance(obj, (int, bool, type(None))): 
        return f'{type(obj)}: {obj}'
    else:
        logger.warning(f'inputs: {obj} of type \'{type(obj)}\' is not implemented.')
        return f'{type(obj)}: {obj}'

def to_mixed_device(obj, policy, prefix): 
    if isinstance(obj, tuple) and len(obj) == 2 and isinstance(obj[0], torch.Tensor): # KV cache
        m0 = MixTensor.from_tensor(
            obj[0], 
            percents={
                'cuda':policy.cache_gpu_percent, 
                'cpu':policy.cache_cpu_percent, 
                'disk':policy.cache_disk_percent, 
            }, 
            file_path=f'{prefix}_key'
        )
        m1 = MixTensor.from_tensor(
            obj[1], 
            percents={
                'cuda':policy.cache_gpu_percent, 
                'cpu':policy.cache_cpu_percent, 
                'disk':policy.cache_disk_percent, 
            }, 
            file_path=f'{prefix}_value'
        )
        return (m0, m1)
    elif isinstance(obj, torch.Tensor):
        return MixTensor.from_tensor(
            obj, percents={
                'cuda':policy.act_gpu_percent, 
                'cpu':policy.act_cpu_percent, 
                'disk':policy.act_disk_percent, 
            }, 
            file_path=f'{prefix}'
        )
    elif isinstance(obj, tuple):
        return honor_type(obj, (to_mixed_device(o, policy, f'{prefix}[{i}]') for i, o in enumerate(obj)))
    else:
        logger.warning(f'inputs: {obj} of type \'{type(obj)}\' is not implemented.')
        return obj

from typing import Iterable
class BatchMixTensor:
    def __init__(self, batches: Iterable[MixTensor]):
        self.dtype = batches[0].dtype
        self.device = batches[0].device
        self.batches = batches 

    def __getitem__(self, i):
        return self.batches[i]
    
    def __setitem__(self, i, mt: MixTensor):
        self.batches[i] = mt

    def __len__(self):
        return len(self.batches)
    
    def size(self):
        shape = list(self[0].size()) 
        shape[0] *= len(self)
        return torch.Size(shape)

    def __add__(self, bmt):
        for k in range(len(self)): # K batches 
            # TODO flexgen: parallelly load k+1
            self_k = self[k].to_tensor()
            bmt_k = bmt[k].to_tensor()
            res = self_k + bmt_k 
            self[k] = MixTensor.from_tensor(res, self[k].percents, self[k].file_path)
        return self 

def concat_outputs(outputs): # concatenate K outputs to one output
    assert len(outputs), 'empty outputs.'
    assert isinstance(outputs[0], (MixTensor, torch.Tensor, tuple)), f'not supported type: {type(outputs[0])}.'
    
    if isinstance(outputs[0], torch.Tensor):
        return torch.cat(outputs, dim=0)
    elif isinstance(outputs[0], MixTensor):
        return BatchMixTensor(outputs)
    elif isinstance(outputs[0], tuple):
        def f(outputs):
            ans = []
            for elem in zip(*outputs):
                if isinstance(elem[0], torch.Tensor):
                    ans.append(torch.cat(elem, dim=0))
                elif isinstance(elem[0], MixTensor):
                    ans.append(BatchMixTensor(elem))
                elif isinstance(elem[0], tuple):
                    ans.append(f(elem))
                else:
                    logger.warning(f'outputs: {elem[0]} of type \'{type(elem[0])}\' is not implemented.')
                    ans.append(elem[0])
            return tuple(ans)

        return f(outputs)


def load_kth_batch_inputs(inputs, k, ngb): # for both args, kwargs, with a nested structure of tuple/list/dict/Tensor
    if isinstance(inputs, (tuple, list)): # e.g. args
        return honor_type(inputs, (load_kth_batch_inputs(inp, k, ngb) for inp in inputs))
    elif isinstance(inputs, Mapping): # e.g. kwargs
        return type(inputs)({key:load_kth_batch_inputs(value, k, ngb) for key, value in inputs.items()})
    elif isinstance(inputs, torch.Tensor):
        mini_size = inputs.size(0) // ngb
        return inputs[k * mini_size:(k + 1) * mini_size]
    elif isinstance(inputs, BatchMixTensor):
        mini_batch = inputs.batches[k]
        return mini_batch.to_tensor()
    elif isinstance(inputs, (int, bool, type(None))): 
        return inputs
    else:
        logger.warning(f'inputs: {inputs} of type \'{type(inputs)}\' is not implemented.')
        return inputs



In [11]:



def to_flexgen_forward(mpl, j, compute_device):
    # rewrite the j-th layer's forward
    layer_name = mpl.layer_names[j]
    next_layer_name = mpl.layer_names[(j + 1) % len(mpl.layer_names)]

    policy = mpl.policy
    ngb = policy.num_gpu_batches

    layer = get_module_from_name(mpl.model, layer_name)  
    if hasattr(layer, "_flexgen_old_forward"): return  
    
    layer._flexgen_old_forward = old_forward = layer.forward 

    @functools.wraps(old_forward)
    def new_forward(*args, **kwargs):
        # pre fwd: load curr & next weights, TODO: cuda stream
        mpl.load_layer_weights(layer_name, compute_device) 
        mpl.load_layer_weights(next_layer_name, compute_device) 
        
        # loop forward pass of K minibatches, TODO: cuda stream
        with torch.no_grad():
            logger.debug(f'args: {get_type_size_info(args)}')
            logger.debug(f'kwargs: {get_type_size_info(kwargs)}')
            
            outputs = []
            for k in range(ngb):
                logger.debug(f'layer: {layer_name}, batch: {k}')

                # 'pre' fwd: load curr & next inputs (activations, KV cache)
                args_k = load_kth_batch_inputs(args, k, ngb)
                kwargs_k = load_kth_batch_inputs(kwargs, k, ngb)

                # TODO: load args, kwargs to compute device

                # the k-th fwd pass
                output = old_forward(*args_k, **kwargs_k)

                # TODO: 1) output: to mix, 2) args_k, kwargs_k: free
                output = to_mixed_device(output, policy, prefix=f'tmp/{layer_name}_output')
                outputs.append(output) 

            output = concat_outputs(outputs)
            logger.debug(f'outputs after concat: {get_type_size_info(output)}')                

        # post fwd: free curr weights
        mpl.offload_layer_weights(layer_name)
        return output

    layer.forward = new_forward
    logger.debug(f'{layer_name} to flexgen forward')

@contextlib.contextmanager 
def flexgen(checkpoint, policy):
    # init model 
    from model import ModelPolicyLoader
    mpl = ModelPolicyLoader(checkpoint, policy)
    mpl.init_all_weights() # init 

    # test run, get layer order
    call_layer_log = []
    with test(mpl, call_layer_log):
        from test import test_hf_gen
        test_hf_gen(mpl.checkpoint, mpl.model, 1,1, prompts=['0'])

    assert len(call_layer_log) == len(mpl.layer_names) and set(call_layer_log) == set(mpl.layer_names)
    mpl.layer_names = call_layer_log

    # rewrite layer forward
    for j, _ in enumerate(mpl.layer_names):
        compute_device = 'cpu'
        to_flexgen_forward(mpl, j, compute_device)
    yield mpl.model 
    for layer_name in mpl.layer_names:
        reset_forward(mpl.model, layer_name)
        

In [12]:
with flexgen(checkpoint, policy) as model:
    num_prompts = policy.gpu_batch_size * policy.num_gpu_batches
    test_hf_gen(checkpoint, model, num_prompts)


2023-10-11 12:27:00,277 [connectionpool.py:456 in _make_request] DEBUG - https://huggingface.co:443 "HEAD /facebook/opt-125m/resolve/main/config.json HTTP/1.1" 200 0
2023-10-11 12:27:00,408 [connectionpool.py:456 in _make_request] DEBUG - https://huggingface.co:443 "HEAD /facebook/opt-125m/resolve/main/config.json HTTP/1.1" 200 0
2023-10-11 12:27:00,492 [model.py:159 in is_on_disk] INFO - [], ['lm_head.weight']


2023-10-11 12:27:00,532 [connectionpool.py:456 in _make_request] DEBUG - https://huggingface.co:443 "HEAD /facebook/opt-125m/resolve/main/config.json HTTP/1.1" 200 0
2023-10-11 12:27:00,627 [model.py:159 in is_on_disk] INFO - [], ['lm_head.weight']
2023-10-11 12:27:00,629 [model.py:182 in download] INFO - The whole model has been downloaded an processed to offload_folder: 'offload_dir/facebook.opt-125m'
2023-10-11 12:27:00,639 [model.py:138 in get_policy_weight_map] DEBUG - model.decoder.embed_tokens, [0. 0. 1.], size_todo: 86630400
2023-10-11 12:27:00,640 [model.py:138 in get_policy_weight_map] DEBUG - model.decoder.embed_positions, [0. 0. 1.], size_todo: 85056000
2023-10-11 12:27:00,642 [model.py:138 in get_policy_weight_map] DEBUG - model.decoder.final_layer_norm, [0.00000000e+00 1.91116887e-05 9.99980888e-01], size_todo: 85054464
2023-10-11 12:27:00,645 [model.py:138 in get_policy_weight_map] DEBUG - model.decoder.layers.0, [0.         0.05002193 0.94997807], size_todo: 77966592
20

AttributeError: 'tuple' object has no attribute 'size'