In [None]:
import gc
from model_llama3_2s import *

In [2]:
import json
with open("Llama3.1-8B-Instruct/params.json", "r") as st_json:
    params = json.load(st_json)
params

args = ModelArgs(**params)
transformer = Transformer(args)

model_pth = torch.load("Llama3.1-8B-Instruct/consolidated.00.pth", map_location="cpu", weights_only=True)
transformer.load_state_dict(model_pth, strict=False)
transformer.eval()

from tokenizer import Tokenizer, ChatFormat
tok = Tokenizer("Llama3.1-8B-Instruct/tokenizer.model")
formatter = ChatFormat(tok)

In [5]:
dialogs = [
    [{"role": "user", "content": "hello!😆"}],
]

prompt_tokens = [
    formatter.encode_dialog_prompt(dialog) for dialog in dialogs
]
# prompt = torch.tensor(np.array(tok.encode("hello world!", bos= True, eos= False))[None, :])
prompt = torch.tensor(prompt_tokens)

In [None]:

pad_id = tok.pad_id
tokens = torch.full((1, 1000), pad_id, dtype=torch.long)

for k, t in enumerate(prompt):
    tokens[k, : len(t)] = torch.tensor(t, dtype=torch.long)
token_logprobs = torch.zeros_like(tokens, dtype=torch.float)

prev_pos = 0
eos_reached = torch.tensor([False] * 1)
input_text_mask = tokens != pad_id

temperature = 0
stop_tokens = torch.tensor(list(tok.stop_tokens))
prev_pos = 0

In [8]:
cache = KVCache(
    transformer.caches_shape, dtype=torch.float16
)

for cur_pos in range(len(prompt[0]), 800):
    seqlen = tokens[:, prev_pos:cur_pos].size(1)

    mask = torch.full((seqlen, seqlen), -1e9)
    mask = torch.triu(mask, diagonal=1)
    mask = torch.hstack(
        [torch.zeros((seqlen, prev_pos)), mask]
    )[None, None, :, :]

    logits = transformer.forward(tokens[:, prev_pos:cur_pos], mask, cache)
    next_token = torch.argmax(logits[..., -1, :], dim=-1)

    next_token = next_token.reshape(-1)
    next_token = torch.where(
        input_text_mask[:, cur_pos], tokens[:, cur_pos], next_token
    )
    tokens[:, cur_pos] = next_token
    
    eos_reached |= (~input_text_mask[:, cur_pos]) & (
        torch.isin(next_token, stop_tokens)
    )
    prev_pos = cur_pos
    if all(eos_reached):
        break
    # break

In [None]:
print(tok.decode(tokens[0, :prev_pos].tolist()))

In [None]:
coreML_transformer = Llama_coreML(transformer= transformer)
coreML_transformer.transformer.load_state_dict(transformer.state_dict())

In [None]:
del transformer
gc.collect()


In [8]:
input_ids: torch.Tensor = torch.zeros((1, 5), dtype=torch.int32)
causal_mask: torch.Tensor = torch.zeros((1, 1, 5, 5), dtype=torch.float32)

traced_transformer = torch.jit.trace(coreML_transformer.eval(),  [input_ids, causal_mask])

In [None]:
caches_shape = coreML_transformer.transformer.caches_shape
del coreML_transformer
gc.collect()

In [None]:
import coremltools as ct
import numpy as np
ct.__version__

In [12]:

query_length = ct.RangeDim(lower_bound=1, upper_bound=2000, default=1)
end_step_dim = ct.RangeDim(lower_bound=1, upper_bound=2000, default=1)
inputs = [
    ct.TensorType(shape=(1, query_length), dtype=np.int32, name="input_ids"),
    ct.TensorType(shape=(1, 1, query_length, end_step_dim), dtype=np.int32, name="causal_mask"),
]

states = [
    ct.StateType(
        wrapped_type=ct.TensorType(shape=caches_shape, 
                                   dtype=np.float16, 
                                   ),
        name="keyCache",
    ),
    ct.StateType(
        wrapped_type=ct.TensorType(shape=caches_shape, 
                                   dtype=np.float16, 
                                   ),
        name="valueCache",
    ),
]

outputs = [ct.TensorType(dtype=np.float32, name="logits")]


In [None]:

mlmodel_fp16 = ct.convert(
    traced_transformer,
    inputs=inputs,
    states=states,
    outputs=outputs,
    minimum_deployment_target=ct.target.iOS18,
    # skip_model_load=True
)

In [None]:
del traced_transformer
gc.collect()

In [None]:
# Block-wise quantize model weights to int4
MODEL_ID: str = "meta-llama/Llama-3.2-1B-Instruct"
METADATA_TOKENIZER: str = "tokenizer.model"

op_config = ct.optimize.coreml.OpLinearQuantizerConfig(
    mode="linear_symmetric",
    dtype="int4",
    granularity="per_block",
    block_size=[1, 32],
)

config = ct.optimize.coreml.OptimizationConfig(global_config=op_config)
mlmodel_int4 = ct.optimize.coreml.linear_quantize_weights(mlmodel_fp16, config=config)
# mlmodel_int4._spec.description.metadata.userDefined.update({METADATA_TOKENIZER: MODEL_ID})


In [20]:
mlmodel_int4.save("coreml_1B_INT4.mlpackage")

In [20]:
state = mlmodel_fp16.make_state()  # 루프 내에서 상태 초기화
index = 0
c = []

In [None]:
# prompt_tokens = torch.arange(1, 10)[None, :]
c = seq_len = len(prompt_tokens[0])


mask = torch.full((seq_len, seq_len), -1e9)
mask = torch.triu(mask, diagonal=1)
mask = torch.hstack(
    [torch.zeros((seq_len, 0)), mask]
)[None, None, :, :]

print(mask.shape)

logits = coreML_transformer.forward(torch.tensor(prompt_tokens), mask)
pre = torch.argmax(logits, -1)
print(torch.argmax(logits, -1))

a = []
for i in range(len(prompt_tokens[0])+1, 200):
    seq_len = len(pre)
    c += 1

    mask = torch.full((seq_len, seq_len), -1e9)
    mask = torch.triu(mask, diagonal=1)
    mask = torch.hstack(
        [torch.zeros((seq_len, c)), mask]
    )[None, None, :, :]
    
    # print(mask.shape)
    # print(pre.shape)
    logits = coreML_transformer.forward(pre[None, :], mask)
    pre = torch.argmax(logits, -1)

    a.append(int(pre[0]))
print(a)
print(tok.decode(a))


In [None]:
# prompt_tokens = torch.arange(1, 10)[None, :]
seq_len = len(prompt_tokens[0])

mask = torch.full((seq_len, seq_len), -1e9)
mask = torch.triu(mask, diagonal=1)
mask = torch.hstack(
    [torch.zeros((seq_len, seq_len)), mask]
)[None, None, :, :]
logits = traced_transformer.forward(torch.tensor(prompt_tokens), mask)
print(torch.argmax(logits, -1))

In [None]:
a = []

c = seq_len = len(prompt_tokens[0])
mask = np.full((seq_len, seq_len), -1e9)
mask = np.triu(mask, k=1)
mask = np.hstack(
    [np.zeros((seq_len, 0)), mask]
)[None, None, :, :]
print(mask.shape)


state = mlmodel_fp16.make_state()  # 루프 내에서 상태 초기화

inputs = {
    "input_ids": np.array(prompt_tokens, dtype=np.int32),
    "causal_mask": mask,
}
logits = mlmodel_fp16.predict(inputs, state = state)['logits']
pre = np.argmax(logits, axis=-1)
a.append(int(pre[0]))
print(np.argmax(logits, axis=-1))

In [None]:
for i in range(len(prompt_tokens[0])+1, 200):
    c += 1
    seq_len = len(pre)
    mask = np.full((seq_len, seq_len), -1e9)
    mask = np.triu(mask, k=1)
    mask = np.hstack(
        [np.zeros((seq_len, c)), mask]
    )[None, None, :, :]
    # print(pre.mask)

    inputs = {
        "input_ids": np.array([pre], dtype=np.int32),
        "causal_mask": mask,
    }
    preds = mlmodel_fp16.predict(inputs, state = state)
    logits = preds['logits']
    pre = np.argmax(logits, axis=-1)

    a.append(int(pre[0]))
    if int(pre[0]) in stop_tokens.tolist():
        break
print(a)
print(tok.decode(prompt_tokens[0] + a))


In [59]:
mlmodel_int4 = ct.models.MLModel("coreml_1B_INT4.mlpackage")

In [37]:
def apply_repetition_penalty(logits, generated_tokens, penalty):
    """
    Repetition Penalty를 적용한 logits 값을 반환하는 함수.
    
    Parameters:
    - logits: (numpy array) 현재 스텝에서의 logits 값
    - generated_tokens: (list) 이전에 생성된 토큰들의 리스트
    - penalty: (float) 패널티 값 (보통 1.0보다 큰 값을 설정)
    
    Returns:
    - 수정된 logits 값
    """
    for token in set(generated_tokens):  # 중복을 방지하기 위해 set 사용
        if logits[..., token] > 0:
            logits[..., token] /= penalty  # 패널티 적용
        else:
            logits[..., token] *= penalty  # 음수일 경우 반대로 곱함

    return logits

In [None]:
import numpy as np

def top_p_sampling(logits, p=0.9):
    """
    Top-p 샘플링을 적용하여 다음 토큰을 선택하는 함수.

    Parameters:
    - logits: (numpy array) shape=(1, seq_len, vocab)인 logits 값
    - p: (float) 누적 확률의 기준 (0 < p <= 1)

    Returns:
    - selected_token: (int) 선택된 토큰의 인덱스
    """
    # logits를 softmax를 사용하여 확률로 변환
    probabilities = np.exp(logits) / np.sum(np.exp(logits), axis=-1, keepdims=True)
    
    # 가장 높은 확률을 가진 단어들의 인덱스를 정렬
    sorted_indices = np.argsort(probabilities[0])[::-1]
    sorted_probabilities = probabilities[0][sorted_indices]
    
    # 누적 확률 계산
    cumulative_probs = np.cumsum(sorted_probabilities)
    
    # p보다 작은 단어들의 인덱스 찾기
    cutoff_index = np.searchsorted(cumulative_probs, p)
    
    # top-p 후보 단어와 그 확률
    top_p_indices = sorted_indices[:cutoff_index + 1]
    top_p_probs = sorted_probabilities[:cutoff_index + 1]

    # top-p 후보들 중에서 무작위로 선택
    selected_token = np.random.choice(top_p_indices, p=top_p_probs/top_p_probs.sum())
    
    return np.array([selected_token])

# 예시
np.random.seed(0)  # 재현성을 위한 시드 설정
logits = np.random.rand(1, 10)  # (1, seq_len, vocab) shape의 임의 logits
p = 0.9  # 누적 확률 기준

# Top-p 샘플링 적용
selected_token = top_p_sampling(logits, p)
print("Selected token index:", selected_token)

In [None]:
a = []

c = seq_len = len(prompt_tokens[0])
mask = np.full((seq_len, seq_len), -1e9)
mask = np.triu(mask, k=1)
mask = np.hstack(
    [np.zeros((seq_len, 0)), mask]
)[None, None, :, :]
print(mask.shape)


state = mlmodel_int4.make_state()  # 루프 내에서 상태 초기화

inputs = {
    "input_ids": np.array(prompt_tokens, dtype=np.int32),
    "causal_mask": mask,
}

logits = mlmodel_int4.predict(inputs, state = state)['logits']

pre = np.argmax(logits, axis=-1)
a.append(int(pre[0]))
print(np.argmax(logits, axis=-1))

In [None]:
# Your main loop
a = []
for i in range(len(prompt_tokens[0])+1, len(prompt_tokens[0])+100):
    seq_len = len(pre)
    c = i
    # print(seq_len)
    mask = np.full((seq_len, seq_len), -1e9)
    mask = np.triu(mask, k=1)
    mask = np.hstack(
        [np.zeros((seq_len, c)), mask]
    )[None, None, :, :]
    
    inputs = {
        "input_ids": np.array([pre], dtype=np.int32),
        "causal_mask": mask,
    }
    preds = mlmodel_int4.predict(inputs, state=state)
    logits = preds['logits']
    logits = apply_repetition_penalty(logits, prompt_tokens[0] + a, 4.20)
    # logits = top_p_sampling(logits, p= .9)
    pre = np.argmax(logits, axis=-1)
    
    a.append(int(pre[0]))
    if int(pre[0]) in stop_tokens.tolist():
        break

# print(len(a))
# print(tok.decode(prompt_tokens[0] + a))
print(tok.decode(a))

In [None]:
len(prompt_tokens[0] + a)