From c5d6349f031c84fbd0b52a711474567201706502 Mon Sep 17 00:00:00 2001 From: Homero Roman <17214754+HomeroRR@users.noreply.github.com> Date: Mon, 11 May 2026 13:01:43 +0000 Subject: [PATCH] fix(ds4): Implement MoE low-memory streaming to work around macOS's kernel bug. --- LICENSE | 1 + ds4.c | 1034 +++++++++++++++++++++++++++++++++++++++++++++----- ds4.h | 1 + ds4_cli.c | 5 + ds4_metal.h | 9 + ds4_metal.m | 106 ++++++ ds4_server.c | 5 + 7 files changed, 1072 insertions(+), 89 deletions(-) diff --git a/LICENSE b/LICENSE index 5973a4c9..1e267f70 100644 --- a/LICENSE +++ b/LICENSE @@ -1,5 +1,6 @@ MIT License +Copyright (c) 2026-2036 Homero Roman Copyright (c) 2026 The ds4.c authors Copyright (c) 2023-2026 The ggml authors diff --git a/ds4.c b/ds4.c index 3142bf89..9099927e 100644 --- a/ds4.c +++ b/ds4.c @@ -14,6 +14,7 @@ * no-copy MTLBuffers. */ +#define _XOPEN_SOURCE 600 #include #include #include @@ -52,6 +53,9 @@ #define DS4_RMS_EPS ( 1.0e-6f) #define DS4_HC_EPS ( 1.0e-6f) #define DS4_EXPERT_WEIGHT_SCALE (1.5f) + +/* Engine flags for low-memory streaming mode */ +#define DS4_FLAG_LOW_MEM (1 << 0) #define DS4_SWIGLU_CLAMP_EXP (10.0f) #define DS4_ROPE_FREQ_BASE (10000.0f) #define DS4_ROPE_SCALE_FACTOR (16.0f) @@ -212,8 +216,67 @@ typedef struct { float *output_weights; float *output_embd; float *output_norm; + + int32_t stream_selected[DS4_N_EXPERT_USED]; + float stream_router_weights[DS4_N_EXPERT_USED]; + uint8_t *layer_buf; + uint8_t *expert_buf; } ds4_cpu_decode_scratch; +typedef struct { + int fd; /* GGUF file descriptor (kept open for pread) */ + uint64_t attn_offsets [DS4_N_LAYER][8]; /* up to 8 attention tensors per layer */ + uint64_t attn_bytes [DS4_N_LAYER][8]; + int n_attn_tensors; + uint64_t gate_base [DS4_N_LAYER]; + uint64_t gate_stride; + uint64_t up_base [DS4_N_LAYER]; + uint64_t up_stride; + uint64_t down_base [DS4_N_LAYER]; + uint64_t down_stride; + uint64_t embd_base; + uint64_t embd_row_bytes; + uint64_t lmhead_offset; + uint64_t lmhead_bytes; +} ds4_stream_ctx; + +/* Offsets of each tensor within a packed streaming buffer for a single layer */ +typedef struct { + size_t attn_norm; + size_t attn_q_a; + size_t attn_q_a_norm; + size_t attn_q_b; + size_t attn_kv; + size_t attn_kv_a_norm; + size_t attn_sinks; + size_t attn_output_a; + size_t attn_output_b; + size_t hc_attn_fn; + size_t hc_attn_scale; + size_t hc_attn_base; + size_t hc_ffn_fn; + size_t hc_ffn_scale; + size_t hc_ffn_base; + size_t ffn_norm; + size_t ffn_gate_inp; + size_t ffn_exp_probs_b; + size_t ffn_gate_tid2eid; + size_t ffn_gate_shexp; + size_t ffn_up_shexp; + size_t ffn_down_shexp; + size_t attn_compressor_kv; + size_t attn_compressor_gate; + size_t attn_compressor_ape; + size_t attn_compressor_norm; + size_t indexer_compressor_kv; + size_t indexer_compressor_gate; + size_t indexer_compressor_ape; + size_t indexer_compressor_norm; + size_t indexer_attn_q_b; + size_t indexer_proj; + size_t total_bytes; +} ds4_stream_layer_offsets; + static const uint8_t kmask_iq2xs[8] = { 1, 2, 4, 8, 16, 32, 64, 128 }; @@ -890,6 +953,8 @@ typedef struct { uint64_t abs_offset; uint64_t elements; uint64_t bytes; + uint64_t file_offset; // byte offset in GGUF file (= abs_offset, for low-mem streaming) + uint64_t file_bytes; // total byte size (= bytes, for clarity in low-mem code) } ds4_tensor; typedef struct { @@ -905,6 +970,9 @@ typedef struct { ds4_kv *kv; ds4_tensor *tensors; + + uint8_t *header_buf; /* malloc'd GGUF header for skip_mmap mode; NULL in normal mode */ + uint64_t header_buf_size; /* bytes valid in header_buf */ } ds4_model; static uint64_t scalar_value_size(uint32_t type) { @@ -990,9 +1058,11 @@ static bool tensor_nbytes(uint32_t type, uint64_t elements, uint64_t *bytes) { } static ds4_cursor cursor_at(const ds4_model *m, uint64_t pos) { + const uint8_t *base = (m->map != NULL) ? m->map : m->header_buf; + uint64_t size = (m->map != NULL) ? m->size : m->header_buf_size; ds4_cursor c = { - .base = m->map, - .size = m->size, + .base = base, + .size = size, .pos = pos, .error = {0}, }; @@ -1058,6 +1128,7 @@ static void model_close(ds4_model *m) { if (!m) return; free(m->kv); free(m->tensors); + free(m->header_buf); if (m->map) munmap((void *)m->map, (size_t)m->size); if (m->fd >= 0) close(m->fd); memset(m, 0, sizeof(*m)); @@ -1161,6 +1232,8 @@ static void parse_tensors(ds4_model *m, ds4_cursor *c) { ds4_die("tensor offset overflow"); } t->abs_offset = m->tensor_data_pos + t->rel_offset; + t->file_offset = t->abs_offset; // store for low-mem streaming + t->file_bytes = t->bytes; // store for low-mem streaming if (t->bytes != 0 && (t->abs_offset > m->size || t->bytes > m->size - t->abs_offset)) { @@ -1174,7 +1247,7 @@ static void parse_tensors(ds4_model *m, ds4_cursor *c) { * Tokenizer-only callers pass prefetch_cpu=false so inspecting tokens never * walks the huge tensor payload. */ static void model_open(ds4_model *m, const char *path, bool metal_mapping, - bool prefetch_cpu) { + bool prefetch_cpu, bool skip_mmap) { memset(m, 0, sizeof(*m)); m->fd = -1; @@ -1185,40 +1258,67 @@ static void model_open(ds4_model *m, const char *path, bool metal_mapping, if (fstat(fd, &st) == -1) ds4_die_errno("cannot stat model", path); if (st.st_size < 32) ds4_die("model file is too small to be GGUF"); - /* - * Metal wraps slices of this mapping as no-copy MTLBuffers, so the Metal - * path keeps the file-backed shared mapping. The CPU path only reads the - * weights through normal pointers and should not inherit Metal's VM policy: - * use a private read-only mapping there. - * - * This is deliberately defensive against an OS-level Darwin VM bug observed - * while the CPU backend streams the very large GGUF through a shared mmap: - * the kernel can panic in VM map-count accounting instead of returning a - * normal user-space failure. Keeping CPU inference off the shared mapping - * avoids that VM accounting path while preserving normal file-backed reads. - */ - const int mmap_flags = metal_mapping ? MAP_SHARED : MAP_PRIVATE; - void *map = mmap(NULL, (size_t)st.st_size, PROT_READ, mmap_flags, fd, 0); - if (map == MAP_FAILED) ds4_die_errno("cannot mmap model", path); - m->fd = fd; - m->map = map; m->size = (uint64_t)st.st_size; - ds4_cursor c = cursor_at(m, 0); - uint32_t magic; - if (!cursor_u32(&c, &magic)) ds4_die(c.error); - if (magic != DS4_GGUF_MAGIC) ds4_die("model is not a GGUF file"); - if (!cursor_u32(&c, &m->version)) ds4_die(c.error); - if (!cursor_u64(&c, &m->n_tensors)) ds4_die(c.error); - if (!cursor_u64(&c, &m->n_kv)) ds4_die(c.error); + if (skip_mmap) { + /* Low-memory streaming mode: parse header without persistent mmap. + * Read GGUF header into a buffer, parse metadata/tensors, keep buffer alive for + * string pointers (kv[i].key, tensors[i].name). This avoids creating large VM + * entries on macOS while keeping fd open for pread tensor access. */ + const size_t header_buf_size = (size_t)((uint64_t)(64*1024*1024) < (uint64_t)st.st_size ? + (uint64_t)(64*1024*1024) : (uint64_t)st.st_size); + uint8_t *header_buf = xmalloc(header_buf_size); + + ssize_t n = read(fd, header_buf, header_buf_size); + if (n < 32) ds4_die("cannot read GGUF header"); + + m->map = header_buf; /* Temporarily point to buffer for parsing */ + ds4_cursor c = cursor_at(m, 0); + uint32_t magic; + if (!cursor_u32(&c, &magic)) ds4_die(c.error); + if (magic != DS4_GGUF_MAGIC) ds4_die("model is not a GGUF file"); + if (!cursor_u32(&c, &m->version)) ds4_die(c.error); + if (!cursor_u64(&c, &m->n_tensors)) ds4_die(c.error); + if (!cursor_u64(&c, &m->n_kv)) ds4_die(c.error); + + if (m->version != 3) ds4_die("only GGUF v3 is supported"); + + parse_metadata(m, &c); + parse_tensors(m, &c); + + /* Verify header fits in our read buffer (tensor_data_pos is the end of header) */ + if (m->tensor_data_pos > (uint64_t)header_buf_size) + ds4_die("GGUF header exceeds 64 MB read buffer (likely corrupted or unsupported model)"); + + /* Keep buffer alive for string pointers; m->map will be set to NULL for Metal logic */ + m->header_buf = header_buf; + m->header_buf_size = (uint64_t)n; + m->map = NULL; /* No persistent mmap; all tensor data via pread(m->fd, ...) */ + } else { + /* Standard mode: mmap entire model file. + * Metal: MAP_SHARED for zero-copy MTLBuffer wrapping. + * CPU: MAP_PRIVATE to avoid Darwin VM accounting panic with shared mappings. */ + const int mmap_flags = metal_mapping ? MAP_SHARED : MAP_PRIVATE; + void *map = mmap(NULL, (size_t)st.st_size, PROT_READ, mmap_flags, fd, 0); + if (map == MAP_FAILED) ds4_die_errno("cannot mmap model", path); + m->map = map; + + ds4_cursor c = cursor_at(m, 0); + uint32_t magic; + if (!cursor_u32(&c, &magic)) ds4_die(c.error); + if (magic != DS4_GGUF_MAGIC) ds4_die("model is not a GGUF file"); + if (!cursor_u32(&c, &m->version)) ds4_die(c.error); + if (!cursor_u64(&c, &m->n_tensors)) ds4_die(c.error); + if (!cursor_u64(&c, &m->n_kv)) ds4_die(c.error); - if (m->version != 3) ds4_die("only GGUF v3 is supported"); + if (m->version != 3) ds4_die("only GGUF v3 is supported"); - parse_metadata(m, &c); - parse_tensors(m, &c); + parse_metadata(m, &c); + parse_tensors(m, &c); - if (!metal_mapping && prefetch_cpu) model_prefetch_cpu_mapping(m); + if (!metal_mapping && prefetch_cpu) model_prefetch_cpu_mapping(m); + } } static void print_size(uint64_t bytes) { @@ -2429,7 +2529,19 @@ static void config_validate_model(const ds4_model *m) { /* Bind tensor names once into the fixed DS4 layer layout. This is the point * where stringly GGUF metadata becomes direct model-specific pointers. */ -static void weights_bind(ds4_weights *w, const ds4_model *m) { +/* Compute the byte size of one expert in a 3D expert weight tensor */ +static uint64_t compute_expert_bytes(const ds4_tensor *t) { + if (!t || t->ndim != 3) return 0; + uint64_t in_dim = t->dim[0]; + uint64_t out_dim = t->dim[1]; + const gguf_type_info *info = tensor_type(t->type); + if (!info || info->block_elems == 0) return 0; + uint64_t blocks = (in_dim + info->block_elems - 1) / info->block_elems; + uint64_t row_bytes = blocks * info->block_bytes; + return out_dim * row_bytes; +} + +static void weights_bind(ds4_weights *w, const ds4_model *m, ds4_stream_ctx *stream) { memset(w, 0, sizeof(*w)); w->token_embd = required_tensor(m, "token_embd.weight"); w->output_hc_base = required_tensor(m, "output_hc_base.weight"); @@ -2487,6 +2599,31 @@ static void weights_bind(ds4_weights *w, const ds4_model *m) { } weights_validate_layout(w); + + /* Populate low-memory streaming context if provided */ + if (stream) { + memset(stream, 0, sizeof(*stream)); + stream->fd = m->fd; /* store GGUF fd for pread */ + /* Store token embedding and output weight offsets */ + stream->embd_base = w->token_embd->file_offset; + stream->embd_row_bytes = (4096 * 2); /* 4096 dims, F16 = 2 bytes each */ + stream->lmhead_offset = w->output->file_offset; + stream->lmhead_bytes = w->output->file_bytes; + + /* Per-layer expert weight offsets and strides */ + for (uint32_t il = 0; il < DS4_N_LAYER; il++) { + ds4_layer_weights *l = &w->layer[il]; + stream->gate_base[il] = l->ffn_gate_exps->file_offset; + stream->up_base[il] = l->ffn_up_exps->file_offset; + stream->down_base[il] = l->ffn_down_exps->file_offset; + } + /* Expert stride is per-expert bytes (shared across all layers) */ + if (stream->gate_base[0] != 0) { + stream->gate_stride = compute_expert_bytes(w->layer[0].ffn_gate_exps); + stream->up_stride = compute_expert_bytes(w->layer[0].ffn_up_exps); + stream->down_stride = compute_expert_bytes(w->layer[0].ffn_down_exps); + } + } } static void mtp_weights_bind(ds4_mtp_weights *w, const ds4_model *m) { @@ -5237,11 +5374,17 @@ static void layer_routed_moe_one_prealloc( float clamp, float * mid_all, block_q8_K * xq, - block_q8_K * midq) { - int selected[DS4_N_EXPERT_USED]; - float expert_weight[DS4_N_EXPERT_USED]; + block_q8_K * midq, + int streaming_phase, + const ds4_model * expert_model, + ds4_cpu_decode_scratch * scratch) { + int selected_stack[DS4_N_EXPERT_USED]; + float expert_weight_stack[DS4_N_EXPERT_USED]; + int *selected = (scratch != NULL) ? scratch->stream_selected : selected_stack; + float *expert_weight = (scratch != NULL) ? scratch->stream_router_weights : expert_weight_stack; const uint64_t expert_in_dim = layer->ffn_gate_exps->dim[0]; const uint64_t down_in_dim = layer->ffn_down_exps->dim[0]; + const ds4_model *moe_model = (streaming_phase == 2 && expert_model) ? expert_model : model; if (expert_in_dim % QK_K != 0) ds4_die("IQ2_XXS expert input is not QK_K aligned"); if (down_in_dim != DS4_N_FF_EXP || down_in_dim % QK_K != 0) ds4_die("Q2_K expert input has an unexpected layout"); @@ -5249,14 +5392,24 @@ static void layer_routed_moe_one_prealloc( memset(out, 0, (size_t)DS4_N_EMBD * sizeof(out[0])); ds4_quantize_row_q8_K(x, xq, (int64_t)expert_in_dim); - if (layer->ffn_gate_tid2eid) { - layer_hash_selected_experts(selected, model, layer, token); - layer_hash_router_weights_one(expert_weight, model, layer, x, selected); - } else { - layer_topk_selected_experts(selected, expert_weight, model, layer, x); + /* Phase 1: Compute routing (expert selection and softmax weights) */ + if (streaming_phase != 2) { + if (layer->ffn_gate_tid2eid) { + layer_hash_selected_experts(selected, model, layer, token); + layer_hash_router_weights_one(expert_weight, model, layer, x, selected); + } else { + layer_topk_selected_experts(selected, expert_weight, model, layer, x); + } + + /* Phase 1 only: return after routing, skip MoE compute */ + if (streaming_phase == 1) { + (void)il; + return; + } } + /* Phase 2: Skip routing, use pre-computed selected[] and expert_weight[] from phase 1 */ - matvec_iq2_xxs_experts_mid_prequant(mid_all, model, + matvec_iq2_xxs_experts_mid_prequant(mid_all, moe_model, layer->ffn_gate_exps, layer->ffn_up_exps, xq, @@ -5270,7 +5423,7 @@ static void layer_routed_moe_one_prealloc( midq + (uint64_t)i * (down_in_dim / QK_K), (int64_t)down_in_dim); } - matvec_q2_k_experts_accum_prequant(out, model, layer->ffn_down_exps, midq, selected, DS4_N_EXPERT_USED); + matvec_q2_k_experts_accum_prequant(out, moe_model, layer->ffn_down_exps, midq, selected, DS4_N_EXPERT_USED); (void)il; } @@ -5536,7 +5689,9 @@ static void layer_ffn_one_decode_scratch( const float * inp_hc, uint32_t il, int token, - ds4_cpu_decode_scratch * scratch) { + ds4_cpu_decode_scratch * scratch, + int streaming_phase, + const ds4_model * expert_model) { const uint32_t n_hc = DS4_N_HC; const bool profile = getenv("DS4_DECODE_PROFILE_DETAIL") != NULL; const double t_start = profile ? now_sec() : 0.0; @@ -5573,7 +5728,10 @@ static void layer_ffn_one_decode_scratch( DS4_SWIGLU_CLAMP_EXP, scratch->routed_mid_all, scratch->routed_xq, - scratch->routed_midq); + scratch->routed_midq, + streaming_phase, + expert_model, + scratch); if (profile) t_routed = now_sec() - t0; t0 = profile ? now_sec() : 0.0; @@ -5682,7 +5840,10 @@ static void routed_moe_tokens_worker(void *vctx, uint64_t t0, uint64_t t1) { DS4_SWIGLU_CLAMP_EXP, routed_mid, routed_xq, - routed_midq); + routed_midq, + 0, + NULL, + NULL); } free(routed_midq); @@ -5974,6 +6135,7 @@ static void cpu_decode_scratch_init(ds4_cpu_decode_scratch *scratch, uint32_t ct scratch->output_weights = xmalloc((size_t)DS4_N_HC * sizeof(float)); scratch->output_embd = xmalloc((size_t)DS4_N_EMBD * sizeof(float)); scratch->output_norm = xmalloc((size_t)DS4_N_EMBD * sizeof(float)); + } static void cpu_decode_scratch_free(ds4_cpu_decode_scratch *scratch) { @@ -7218,7 +7380,9 @@ static void layer_forward_raw_swa_one( uint32_t il, uint32_t pos, int token, - ds4_cpu_decode_scratch * scratch) { + ds4_cpu_decode_scratch * scratch, + int streaming_phase, + const ds4_model * expert_model) { const uint32_t n_hc = DS4_N_HC; const bool profile = getenv("DS4_DECODE_PROFILE_DETAIL") != NULL; const double t_start = profile ? now_sec() : 0.0; @@ -7348,7 +7512,7 @@ static void layer_forward_raw_swa_one( if (profile) t_post = now_sec() - t0; t0 = profile ? now_sec() : 0.0; - layer_ffn_one_decode_scratch(out_hc, model, layer, scratch->after_attn_hc, il, token, scratch); + layer_ffn_one_decode_scratch(out_hc, model, layer, scratch->after_attn_hc, il, token, scratch, streaming_phase, expert_model); if (profile) t_ffn = now_sec() - t0; if (profile) { @@ -7396,7 +7560,77 @@ static void forward_token_raw_swa_cpu_decode_scratch( for (uint32_t il = 0; il < DS4_N_LAYER; il++) { layer_forward_raw_swa_one(next, model, &weights->layer[il], &cache->layer[il], - cur, il, pos, token, scratch); + cur, il, pos, token, scratch, 0, NULL); + float *tmp = cur; + cur = next; + next = tmp; + } + + if (logits) { + output_logits_one_decode_scratch(logits, model, weights, cur, scratch); + } +} + +#define LAYER_BUF_BYTES (256 * 1024 * 1024) +#define EXPERT_BUF_BYTES (64 * 1024 * 1024) + +/* CPU streaming decode: load non-expert weights per-layer, stream experts per-token. */ +static void forward_token_raw_swa_cpu_streaming( + float * logits, + const ds4_model * model, + const ds4_weights * weights, + ds4_layer_cache * cache, + ds4_cpu_decode_scratch * scratch, + const float * inp_hc, + int token, + uint32_t pos, + ds4_engine * engine) { + const uint32_t n_hc = DS4_N_HC; + uint8_t *layer_buf = scratch->layer_buf; + uint8_t *expert_buf = scratch->expert_buf; + + float *cur = scratch->cur; + float *next = scratch->next; + + embed_token_f16(model, weights, token, scratch->plain); + hc_from_plain_embedding(cur, scratch->plain, DS4_N_EMBD, DS4_N_HC); + + for (uint32_t il = 0; il < DS4_N_LAYER; il++) { + /* Phase 1: Load non-expert layer weights and run attention + routing */ + ds4_stream_layer_offsets layer_off; + ssize_t lbytes = stream_load_layer(engine, il, layer_buf, LAYER_BUF_BYTES, &layer_off); + if (lbytes < 0) return; + + ds4_model layer_model; + ds4_layer_weights layer_weights; + ds4_tensor tensors_storage[32]; + stream_layer_build_temp(engine, il, layer_buf, (uint64_t)lbytes, + &layer_off, &layer_model, &layer_weights, tensors_storage); + + /* Phase 1 (streaming_phase=1): attention + routing only */ + layer_forward_raw_swa_one(next, &layer_model, &layer_weights, &cache->layer[il], + cur, il, pos, token, scratch, 1, NULL); + + /* Phase 2: Stream selected experts and run MoE + output */ + int32_t *sel = scratch->stream_selected; + + ssize_t ebytes = stream_load_experts(engine, il, sel, DS4_N_EXPERT_USED, + expert_buf, EXPERT_BUF_BYTES); + if (ebytes < 0) return; + + /* Remap expert IDs to 0..5 (indices in the streamed buffer) */ + for (int i = 0; i < DS4_N_EXPERT_USED; i++) sel[i] = i; + + /* Build fake expert model pointing to streamed buffer */ + ds4_model expert_model = {0}; + expert_model.map = expert_buf; + expert_model.size = (uint64_t)ebytes; + expert_model.fd = -1; + + /* Phase 2 (streaming_phase=2): MoE + output only, use pre-computed routing */ + layer_forward_raw_swa_one(next, &layer_model, &layer_weights, &cache->layer[il], + cur, il, pos, token, scratch, 2, &expert_model); + float *tmp = cur; cur = next; next = tmp; @@ -7551,7 +7785,9 @@ static void prefill_layer_major_cpu( il, (uint32_t)t, prompt->v[t], - &decode_scratch); + &decode_scratch, + 0, + NULL); } } @@ -8694,7 +8930,10 @@ static bool metal_graph_encode_decode_layer( uint32_t raw_cap, uint32_t raw_row, uint32_t n_raw, - int token) { + int token, + const void *expert_map, /* NULL = use model->map */ + uint64_t expert_size, /* DS4_STREAM_BUF_BYTES or 0 */ + int streaming_phase) /* 0=full, 1=attn+routing only, 2=moe+output only */ { const uint64_t hc_dim = (uint64_t)DS4_N_HC * DS4_N_EMBD; const uint64_t mix_hc = 2ull * DS4_N_HC + (uint64_t)DS4_N_HC * DS4_N_HC; const uint64_t q_rank = layer->attn_q_a->dim[1]; @@ -8726,6 +8965,16 @@ static bool metal_graph_encode_decode_layer( ok = metal_graph_layer_stage_profile_boundary("decode", (name), il, pos, 1, &decode_stage_t0); \ } \ } while (0) + + /* Expert stride constants (moved before attention section for Phase 2 scope) */ + const uint64_t gate_row_bytes = routed_expert_row_bytes(layer->ffn_gate_exps); + const uint64_t gate_expert_bytes = expert_mid_dim * gate_row_bytes; + const uint64_t up_row_bytes = routed_expert_row_bytes(layer->ffn_up_exps); + const uint64_t up_expert_bytes = layer->ffn_up_exps->dim[1] * up_row_bytes; + const uint64_t down_row_bytes = routed_expert_row_bytes(layer->ffn_down_exps); + const uint64_t down_expert_bytes = routed_out_dim * down_row_bytes; + + if (streaming_phase != 2) { if (ok) ok = ds4_metal_rms_norm_plain_tensor(g->flat_hc, g->cur_hc, (uint32_t)hc_dim, DS4_RMS_EPS) != 0; if (ok) ok = metal_graph_matmul_plain_tensor(g->hc_mix, model, layer->hc_attn_fn, hc_dim, mix_hc, g->flat_hc, 1); @@ -9279,10 +9528,6 @@ static bool metal_graph_encode_decode_layer( if (ok) { metal_graph_debug_dump_tensor("ffn_norm", g->ffn_norm, DS4_N_EMBD, il, pos); } - const uint64_t gate_row_bytes = routed_expert_row_bytes(layer->ffn_gate_exps); - const uint64_t gate_expert_bytes = expert_mid_dim * gate_row_bytes; - const uint64_t down_row_bytes = routed_expert_row_bytes(layer->ffn_down_exps); - const uint64_t down_expert_bytes = routed_out_dim * down_row_bytes; if (ok) ok = metal_graph_matmul_plain_tensor(g->router_logits, model, layer->ffn_gate_inp, DS4_N_EMBD, DS4_N_EXPERT, g->ffn_norm, 1); if (ok) ok = ds4_metal_router_select_tensor(g->router_selected, g->router_weights, g->router_probs, @@ -9303,15 +9548,30 @@ static bool metal_graph_encode_decode_layer( metal_graph_debug_dump_i32_tensor("ffn_moe_topk", g->router_selected, DS4_N_EXPERT_USED, il, pos); metal_graph_debug_dump_tensor("ffn_moe_weights_scaled", g->router_weights, DS4_N_EXPERT_USED, il, pos); } + if (streaming_phase == 1) return ok; + } + /* Expert weight override: if expert_map is provided (streaming mode), use it with + * adjusted offsets for the contiguous-per-type layout. Otherwise use main model. */ + const void *emap = expert_map ? expert_map : model->map; + uint64_t esize = expert_map ? expert_size : model->size; + uint64_t gate_off = expert_map ? 0 + : layer->ffn_gate_exps->abs_offset; + uint64_t up_off = expert_map + ? (uint64_t)DS4_N_EXPERT_USED * gate_expert_bytes + : layer->ffn_up_exps->abs_offset; + uint64_t down_off = expert_map + ? (uint64_t)DS4_N_EXPERT_USED * (gate_expert_bytes + up_expert_bytes) + : layer->ffn_down_exps->abs_offset; + if (ok) ok = ds4_metal_routed_moe_one_tensor(g->routed_out, g->routed_gate, g->routed_up, g->routed_mid, g->routed_down, - model->map, model->size, - layer->ffn_gate_exps->abs_offset, - layer->ffn_up_exps->abs_offset, - layer->ffn_down_exps->abs_offset, + emap, esize, + gate_off, + up_off, + down_off, layer->ffn_gate_exps->type, layer->ffn_down_exps->type, gate_expert_bytes, gate_row_bytes, @@ -9989,7 +10249,7 @@ static int metal_graph_decode_test( g.raw_cap, 0, 1, - token); + token, NULL, 0, 0); if (ok) { ds4_metal_tensor *embedded_hc = g.cur_hc; g.cur_hc = g.after_ffn_hc; @@ -10147,7 +10407,7 @@ static int metal_graph_first_token_full_test( } ok = ds4_metal_begin_commands() != 0; if (ok) ok = metal_graph_encode_decode_layer(&g, model, &weights->layer[il], - il, 0, g.layer_raw_cache[il], g.raw_cap, 0, 1, token); + il, 0, g.layer_raw_cache[il], g.raw_cap, 0, 1, token, NULL, 0, 0); ds4_metal_tensor *tmp = g.cur_hc; g.cur_hc = g.after_ffn_hc; g.after_ffn_hc = tmp; @@ -10192,7 +10452,7 @@ static int metal_graph_first_token_full_test( for (uint32_t il = 0; ok && il < DS4_N_LAYER; il++) { ok = metal_graph_encode_decode_layer(&g, model, &weights->layer[il], il, 0, g.layer_raw_cache[il], - g.raw_cap, 0, 1, token); + g.raw_cap, 0, 1, token, NULL, 0, 0); ds4_metal_tensor *tmp = g.cur_hc; g.cur_hc = g.after_ffn_hc; g.after_ffn_hc = tmp; @@ -10295,7 +10555,7 @@ static bool metal_graph_encode_token_raw_swa( g->raw_cap, raw_row, n_raw, - token); + token, NULL, 0, 0); ds4_metal_tensor *tmp = g->cur_hc; g->cur_hc = g->after_ffn_hc; g->after_ffn_hc = tmp; @@ -12187,6 +12447,202 @@ static bool metal_graph_eval_token_raw_swa( return ok; } +/* Forward declaration for streaming helper functions */ +static ssize_t stream_load_experts(ds4_engine *e, int layer_idx, + const int32_t *expert_ids, int n_experts, + uint8_t *buf, size_t buf_size); + +/* Low-memory streaming decode: alternative to metal_graph_eval_token_raw_swa + * that loads weights layer-by-layer from disk. Requires DS4_FLAG_LOW_MEM set + * and Metal streaming buffers initialized. Uses ping-pong buffers (two 210 MB + * regions) to allow prefetching next layer while GPU processes current layer. */ +static bool metal_graph_eval_token_streaming( + ds4_engine *e, + ds4_metal_graph *g, + int token, + uint32_t pos, + float *logits) { + if (!e || !g || e->stream.fd < 0) return false; + + bool ok = true; + int32_t expert_ids[DS4_N_EXPERT_USED]; /* 6 selected expert IDs per layer */ + + /* Step 1: Load and set token embedding (shared across all layers) */ + uint64_t embd_offset = e->stream.embd_base + (uint64_t)token * e->stream.embd_row_bytes; + ssize_t embd_bytes = pread(e->stream.fd, ds4_metal_stream_mem(0), e->stream.embd_row_bytes, (off_t)embd_offset); + if (embd_bytes < 0 || (uint64_t)embd_bytes < e->stream.embd_row_bytes) return false; + + /* Decode embedding and embed token as float into g->cur_hc */ + const uint16_t *embd_f16 = (const uint16_t *)ds4_metal_stream_mem(0); + for (uint32_t i = 0; i < DS4_N_EMBD; i++) { + ((float *)ds4_metal_stream_mem(1))[i] = f16_to_f32(embd_f16[i]); + } + hc_from_plain_embedding((float *)ds4_metal_stream_mem(1), (float *)ds4_metal_stream_mem(1), + DS4_N_EMBD, DS4_N_HC); + if (!ds4_metal_tensor_write(g->cur_hc, 0, ds4_metal_stream_mem(1), DS4_N_HC * DS4_N_EMBD * sizeof(float))) { + return false; + } + + /* Step 1.5: Stream output head weights (5 tensors) into dedicated buffer for later use */ + ds4_model lmhead_model = {0}; + ds4_weights lmhead_weights = {0}; + ds4_tensor lmhead_tensors[5] = {0}; + if (logits) { + uint8_t *buf = (uint8_t *)ds4_metal_stream_mem(DS4_LMHEAD_BUF_IDX); + size_t lmhead_pos = 0; + + /* Pack all 5 output tensors into the lm_head buffer */ + const ds4_tensor *tensors[5] = { + e->weights.output_norm, + e->weights.output_hc_scale, + e->weights.output_hc_base, + e->weights.output_hc_fn, + e->weights.output + }; + int tensor_count = 0; + for (int i = 0; i < 5; i++) { + if (tensors[i] && tensors[i]->file_bytes > 0) { + if (lmhead_pos + tensors[i]->file_bytes > DS4_LMHEAD_BUF_BYTES) { + ok = false; + break; + } + ssize_t n = pread(e->stream.fd, buf + lmhead_pos, (size_t)tensors[i]->file_bytes, + (off_t)tensors[i]->file_offset); + if (n != (ssize_t)tensors[i]->file_bytes) { + ok = false; + break; + } + lmhead_tensors[tensor_count] = *tensors[i]; + lmhead_tensors[tensor_count].abs_offset = lmhead_pos; + lmhead_pos += tensors[i]->file_bytes; + tensor_count++; + } + } + + if (ok && tensor_count > 0) { + lmhead_model.map = buf; + lmhead_model.size = DS4_LMHEAD_BUF_BYTES; + lmhead_weights = e->weights; /* Shallow copy */ + + /* Update weights to point to temp tensor storage */ + int idx = 0; + if (e->weights.output_norm) lmhead_weights.output_norm = &lmhead_tensors[idx++]; + if (e->weights.output_hc_scale) lmhead_weights.output_hc_scale = &lmhead_tensors[idx++]; + if (e->weights.output_hc_base) lmhead_weights.output_hc_base = &lmhead_tensors[idx++]; + if (e->weights.output_hc_fn) lmhead_weights.output_hc_fn = &lmhead_tensors[idx++]; + if (e->weights.output) lmhead_weights.output = &lmhead_tensors[idx++]; + } + } + if (!ok) return false; + + /* Step 2: Layer-by-layer decode using ping-pong buffering */ + const uint32_t raw_row = pos % g->raw_cap; + const uint32_t n_raw = metal_graph_raw_span_for_batch(g, pos, 1); + for (uint32_t il = 0; il < DS4_N_LAYER && ok; il++) { + int lb = il & 1; /* Ping-pong buffer index: 0 or 1 */ + + /* Phase 0: Load all non-expert layer weights into attention stream buffer (buf 2) */ + ds4_stream_layer_offsets layer_off; + ds4_model layer_model; + ds4_layer_weights layer_weights; + /* IMPORTANT: tensors_storage[32] must match the number of UPDATE_TENSOR calls + * in stream_layer_build_temp(). Currently 32 offset fields in ds4_stream_layer_offsets + * require exactly 32 storage slots. Adding any new tensor without growing this array + * will cause silent overflow. */ + ds4_tensor tensors_storage[32]; + + ssize_t layer_bytes = stream_load_layer(e, il, + (uint8_t *)ds4_metal_stream_mem(DS4_ATTN_BUF_IDX), + DS4_STREAM_BUF_BYTES, + &layer_off); + if (layer_bytes < 0) return false; + + stream_layer_build_temp(e, il, + ds4_metal_stream_mem(DS4_ATTN_BUF_IDX), + DS4_STREAM_BUF_BYTES, + &layer_off, + &layer_model, + &layer_weights, + tensors_storage); + + /* Phase A: Full attention + routing with streamed non-expert weights (streaming_phase=1) */ + if (!ds4_metal_begin_commands()) return false; + ok = metal_graph_encode_decode_layer(g, + &layer_model, + &layer_weights, + il, + pos, + g->layer_raw_cache[il], + g->raw_cap, + raw_row, + n_raw, + token, + NULL, 0, + 1); /* streaming_phase=1: attn+routing only */ + if (ok) ok = ds4_metal_end_commands() != 0; + if (!ok) return false; + + /* Phase B: Read expert IDs from GPU */ + if (!ds4_metal_tensor_read(g->router_selected, 0, expert_ids, DS4_N_EXPERT_USED * sizeof(int32_t))) { + return false; + } + + /* Phase C: Stream 6 selected experts into buffer lb */ + ssize_t expert_bytes = stream_load_experts(e, il, expert_ids, DS4_N_EXPERT_USED, + (uint8_t *)ds4_metal_stream_mem(lb), + DS4_STREAM_BUF_BYTES); + if (expert_bytes < 0) return false; + + /* Phase D: Remap router_selected to indices 0..5 for the stream buffer layout */ + int32_t remapped[DS4_N_EXPERT_USED]; + for (int i = 0; i < DS4_N_EXPERT_USED; i++) remapped[i] = i; + if (!ds4_metal_tensor_write(g->router_selected, 0, remapped, DS4_N_EXPERT_USED * sizeof(int32_t))) { + return false; + } + + /* Phase E: Full layer decode with streamed attention weights and expert_map override */ + if (!ds4_metal_begin_commands()) return false; + ok = metal_graph_encode_decode_layer(g, + &layer_model, + &layer_weights, + il, + pos, + g->layer_raw_cache[il], + g->raw_cap, + raw_row, + n_raw, + token, + ds4_metal_stream_mem(lb), DS4_STREAM_BUF_BYTES, + 2); /* streaming_phase=2: moe+output only */ + if (ok) ok = ds4_metal_end_commands() != 0; + if (!ok) return false; + + /* Phase F: Swap cur_hc <-> after_ffn_hc for next iteration */ + ds4_metal_tensor *tmp = g->cur_hc; + g->cur_hc = g->after_ffn_hc; + g->after_ffn_hc = tmp; + } + + /* Step 3: Encode output head with streamed lm_head weights if available */ + if (logits && ok) { + if (!ds4_metal_begin_commands()) return false; + + const ds4_model *output_model = (lmhead_model.map) ? &lmhead_model : &e->model; + const ds4_weights *output_weights = (lmhead_model.map) ? &lmhead_weights : &e->weights; + + ok = metal_graph_encode_output_head(g, output_model, output_weights, DS4_N_VOCAB); + if (ok) ok = ds4_metal_end_commands() != 0; + if (!ok) return false; + + /* Step 4: Read logits from GPU */ + if (!ds4_metal_tensor_read(g->logits, 0, logits, (uint64_t)DS4_N_VOCAB * sizeof(float))) { + return false; + } + } + + return ok; +} + /* Greedy verifier helper. Speculative decoding only needs the target model's * top token after most accepted draft rows; the full vocabulary row is needed * once, for the final committed state that normal sampling will continue from. @@ -12307,7 +12763,7 @@ static bool metal_graph_eval_mtp_draft_from_hc( g->raw_cap, raw_row, n_raw, - token); + token, NULL, 0, 0); } if (ok) g->cur_hc = out_hc; if (ok) ok = metal_graph_encode_output_head_mtp(g, @@ -13006,7 +13462,7 @@ static bool metal_graph_verify_decode2_exact( g->raw_cap, pos0 % g->raw_cap, metal_graph_raw_span_for_batch(g, pos0, 1), - token0); + token0, NULL, 0, 0); if (!ok) break; ok = metal_graph_capture_prefix1_attn_state(g, il) && metal_graph_capture_prefix1_index_state(g, il); @@ -13023,7 +13479,7 @@ static bool metal_graph_verify_decode2_exact( g->raw_cap, pos1 % g->raw_cap, metal_graph_raw_span_for_batch(g, pos1, 1), - token1); + token1, NULL, 0, 0); if (!ok) break; ds4_metal_tensor *tmp = cur0; cur0 = next0; next0 = tmp; @@ -13532,6 +13988,8 @@ struct ds4_vocab { str_i32_table merge_rank; }; +/* Low-memory streaming context for loading weights on demand from GGUF file. + * Used when DS4_FLAG_LOW_MEM is set. */ struct ds4_engine { ds4_model model; ds4_model mtp_model; @@ -13544,8 +14002,304 @@ struct ds4_engine { bool quality; bool metal_ready; bool mtp_ready; + ds4_stream_ctx stream; // low-memory streaming context + int flags; // DS4_FLAG_LOW_MEM, etc }; +/* Stream-mode I/O: load all layer weights from GGUF file via pread + * Packs tensors contiguously and returns offsets via out_offsets. + * Returns total bytes loaded or -1 on error. + * Used by Metal low-memory streaming path (--low-mem). */ +static ssize_t stream_load_layer(ds4_engine *e, int layer_idx, + uint8_t *buf, size_t buf_size, + ds4_stream_layer_offsets *out_offsets) { + if (!buf || !out_offsets || e->stream.fd < 0) return -1; + + size_t pos = 0; + ds4_layer_weights *lw = &e->weights.layer[layer_idx]; + memset(out_offsets, 0, sizeof(*out_offsets)); + + /* Helper macro to load a tensor and record its offset. + * Absent tensors are marked with (size_t)-1 sentinel. */ +#define LOAD_TENSOR(field, offset_field) do { \ + if (lw->field && lw->field->file_bytes > 0) { \ + if (pos + lw->field->file_bytes > buf_size) { \ + ds4_log(stderr, DS4_LOG_WARNING, \ + "ds4: stream buffer overflow loading " #field ": %zu + %zu > %zu\n", \ + pos, lw->field->file_bytes, buf_size); \ + return -1; \ + } \ + ssize_t n = pread(e->stream.fd, buf + pos, (size_t)lw->field->file_bytes, \ + (off_t)lw->field->file_offset); \ + if (n != (ssize_t)lw->field->file_bytes) { \ + ds4_log(stderr, DS4_LOG_WARNING, \ + "ds4: pread failed for " #field ": got %zd, expected %zu\n", \ + n, (size_t)lw->field->file_bytes); \ + return -1; \ + } \ + out_offsets->offset_field = pos; \ + pos += n; \ + } else { \ + out_offsets->offset_field = (size_t)-1; /* Sentinel: tensor not loaded */ \ + } \ +} while(0) + + /* Load all tensors in order */ + LOAD_TENSOR(attn_norm, attn_norm); + LOAD_TENSOR(attn_q_a, attn_q_a); + LOAD_TENSOR(attn_q_a_norm, attn_q_a_norm); + LOAD_TENSOR(attn_q_b, attn_q_b); + LOAD_TENSOR(attn_kv, attn_kv); + LOAD_TENSOR(attn_kv_a_norm, attn_kv_a_norm); + LOAD_TENSOR(attn_sinks, attn_sinks); + LOAD_TENSOR(attn_output_a, attn_output_a); + LOAD_TENSOR(attn_output_b, attn_output_b); + LOAD_TENSOR(hc_attn_fn, hc_attn_fn); + LOAD_TENSOR(hc_attn_scale, hc_attn_scale); + LOAD_TENSOR(hc_attn_base, hc_attn_base); + LOAD_TENSOR(hc_ffn_fn, hc_ffn_fn); + LOAD_TENSOR(hc_ffn_scale, hc_ffn_scale); + LOAD_TENSOR(hc_ffn_base, hc_ffn_base); + LOAD_TENSOR(ffn_norm, ffn_norm); + LOAD_TENSOR(ffn_gate_inp, ffn_gate_inp); + LOAD_TENSOR(ffn_exp_probs_b, ffn_exp_probs_b); + LOAD_TENSOR(ffn_gate_tid2eid, ffn_gate_tid2eid); + LOAD_TENSOR(ffn_gate_shexp, ffn_gate_shexp); + LOAD_TENSOR(ffn_up_shexp, ffn_up_shexp); + LOAD_TENSOR(ffn_down_shexp, ffn_down_shexp); + LOAD_TENSOR(attn_compressor_kv, attn_compressor_kv); + LOAD_TENSOR(attn_compressor_gate, attn_compressor_gate); + LOAD_TENSOR(attn_compressor_ape, attn_compressor_ape); + LOAD_TENSOR(attn_compressor_norm, attn_compressor_norm); + LOAD_TENSOR(indexer_compressor_kv, indexer_compressor_kv); + LOAD_TENSOR(indexer_compressor_gate, indexer_compressor_gate); + LOAD_TENSOR(indexer_compressor_ape, indexer_compressor_ape); + LOAD_TENSOR(indexer_compressor_norm, indexer_compressor_norm); + LOAD_TENSOR(indexer_attn_q_b, indexer_attn_q_b); + LOAD_TENSOR(indexer_proj, indexer_proj); + +#undef LOAD_TENSOR + + out_offsets->total_bytes = pos; + return (ssize_t)pos; +} + +/* Build temporary model and layer weights pointing into a stream buffer + * with adjusted abs_offset values. No allocation needed; uses caller-provided storage. */ +static void stream_layer_build_temp( + const ds4_engine *e, + int il, + void *stream_buf, + uint64_t stream_size, + const ds4_stream_layer_offsets *off, + ds4_model *out_model, + ds4_layer_weights *out_layer, + ds4_tensor *tensors_storage) { + out_model->map = stream_buf; + out_model->size = stream_size; + + *out_layer = e->weights.layer[il]; /* Shallow copy */ + + /* Helper macro to update a tensor pointer to point to adjusted offset in stream buffer. + * Only processes tensors that were actually loaded (offset != sentinel). */ +#define UPDATE_TENSOR(field, offset_field) do { \ + if (e->weights.layer[il].field && off->offset_field != (size_t)-1) { \ + tensors_storage[0] = *e->weights.layer[il].field; \ + tensors_storage[0].abs_offset = off->offset_field; \ + out_layer->field = &tensors_storage[0]; \ + tensors_storage++; \ + } \ +} while(0) + + UPDATE_TENSOR(attn_norm, attn_norm); + UPDATE_TENSOR(attn_q_a, attn_q_a); + UPDATE_TENSOR(attn_q_a_norm, attn_q_a_norm); + UPDATE_TENSOR(attn_q_b, attn_q_b); + UPDATE_TENSOR(attn_kv, attn_kv); + UPDATE_TENSOR(attn_kv_a_norm, attn_kv_a_norm); + UPDATE_TENSOR(attn_sinks, attn_sinks); + UPDATE_TENSOR(attn_output_a, attn_output_a); + UPDATE_TENSOR(attn_output_b, attn_output_b); + UPDATE_TENSOR(hc_attn_fn, hc_attn_fn); + UPDATE_TENSOR(hc_attn_scale, hc_attn_scale); + UPDATE_TENSOR(hc_attn_base, hc_attn_base); + UPDATE_TENSOR(hc_ffn_fn, hc_ffn_fn); + UPDATE_TENSOR(hc_ffn_scale, hc_ffn_scale); + UPDATE_TENSOR(hc_ffn_base, hc_ffn_base); + UPDATE_TENSOR(ffn_norm, ffn_norm); + UPDATE_TENSOR(ffn_gate_inp, ffn_gate_inp); + UPDATE_TENSOR(ffn_exp_probs_b, ffn_exp_probs_b); + UPDATE_TENSOR(ffn_gate_tid2eid, ffn_gate_tid2eid); + UPDATE_TENSOR(ffn_gate_shexp, ffn_gate_shexp); + UPDATE_TENSOR(ffn_up_shexp, ffn_up_shexp); + UPDATE_TENSOR(ffn_down_shexp, ffn_down_shexp); + UPDATE_TENSOR(attn_compressor_kv, attn_compressor_kv); + UPDATE_TENSOR(attn_compressor_gate, attn_compressor_gate); + UPDATE_TENSOR(attn_compressor_ape, attn_compressor_ape); + UPDATE_TENSOR(attn_compressor_norm, attn_compressor_norm); + UPDATE_TENSOR(indexer_compressor_kv, indexer_compressor_kv); + UPDATE_TENSOR(indexer_compressor_gate, indexer_compressor_gate); + UPDATE_TENSOR(indexer_compressor_ape, indexer_compressor_ape); + UPDATE_TENSOR(indexer_compressor_norm, indexer_compressor_norm); + UPDATE_TENSOR(indexer_attn_q_b, indexer_attn_q_b); + UPDATE_TENSOR(indexer_proj, indexer_proj); + +#undef UPDATE_TENSOR +} + +/* Load selected expert weights (6 experts per layer) into streaming buffer in contiguous-per-type layout. + * Packing: [gate0 gate1 gate2 ... | up0 up1 up2 ... | down0 down1 down2 ...] + * Metal routed_moe kernel indexes as gate_offset + expert_id * gate_stride, so offsets are: + * gate_offset = 0 + * up_offset = n_experts * gate_stride + * down_offset = n_experts * (gate_stride + up_stride) + * Used by Metal low-memory streaming path (--low-mem). */ +static ssize_t stream_load_experts(ds4_engine *e, int layer_idx, + const int32_t *expert_ids, int n_experts, + uint8_t *buf, size_t buf_size) { + if (e->stream.fd < 0) return -1; + if (n_experts < 1 || n_experts > 6) return -1; + + /* Validate expert IDs */ + for (int k = 0; k < n_experts; k++) { + if (expert_ids[k] < 0 || expert_ids[k] >= 256) { + ds4_log(stderr, DS4_LOG_WARNING, "ds4: invalid expert id: %d\n", expert_ids[k]); + return -1; + } + } + + size_t total = 0; + const uint64_t gate_stride = e->stream.gate_stride; + const uint64_t up_stride = e->stream.up_stride; + const uint64_t down_stride = e->stream.down_stride; + + /* Phase 1: Load all gate weights contiguously */ + size_t gate_section_size = (size_t)n_experts * gate_stride; + if (gate_section_size > buf_size) { + ds4_log(stderr, DS4_LOG_WARNING, + "ds4: stream buffer too small for gate section: need %zu, have %zu\n", + gate_section_size, buf_size); + return -1; + } + for (int k = 0; k < n_experts; k++) { + int32_t expert_id = expert_ids[k]; + uint64_t offset = e->stream.gate_base[layer_idx] + expert_id * gate_stride; + ssize_t n = pread(e->stream.fd, buf + total, (size_t)gate_stride, (off_t)offset); + if (n != (ssize_t)gate_stride) { + ds4_log(stderr, DS4_LOG_WARNING, + "ds4: pread failed for gate expert %d: got %zd, expected %zu\n", + expert_id, n, (size_t)gate_stride); + return -1; + } + total += n; + } + + /* Phase 2: Load all up weights contiguously */ + size_t up_section_size = (size_t)n_experts * up_stride; + if (total + up_section_size > buf_size) { + ds4_log(stderr, DS4_LOG_WARNING, + "ds4: stream buffer too small for up section: need %zu, have %zu\n", + total + up_section_size, buf_size); + return -1; + } + for (int k = 0; k < n_experts; k++) { + int32_t expert_id = expert_ids[k]; + uint64_t offset = e->stream.up_base[layer_idx] + expert_id * up_stride; + ssize_t n = pread(e->stream.fd, buf + total, (size_t)up_stride, (off_t)offset); + if (n != (ssize_t)up_stride) { + ds4_log(stderr, DS4_LOG_WARNING, + "ds4: pread failed for up expert %d: got %zd, expected %zu\n", + expert_id, n, (size_t)up_stride); + return -1; + } + total += n; + } + + /* Phase 3: Load all down weights contiguously */ + size_t down_section_size = (size_t)n_experts * down_stride; + if (total + down_section_size > buf_size) { + ds4_log(stderr, DS4_LOG_WARNING, + "ds4: stream buffer too small for down section: need %zu, have %zu\n", + total + down_section_size, buf_size); + return -1; + } + for (int k = 0; k < n_experts; k++) { + int32_t expert_id = expert_ids[k]; + uint64_t offset = e->stream.down_base[layer_idx] + expert_id * down_stride; + ssize_t n = pread(e->stream.fd, buf + total, (size_t)down_stride, (off_t)offset); + if (n != (ssize_t)down_stride) { + ds4_log(stderr, DS4_LOG_WARNING, + "ds4: pread failed for down expert %d: got %zd, expected %zu\n", + expert_id, n, (size_t)down_stride); + return -1; + } + total += n; + } + + return (ssize_t)total; +} + +/* CPU decode using streaming pread I/O (for CPU-only inference without mmap). + * PLACEHOLDER: This function is non-functional and kept for reference only. + * CPU streaming has a fundamental limitation: all CPU compute kernels use + * tensor_data(model, tensor) = model->map + tensor->abs_offset internally, + * where abs_offset is relative to the full GGUF file. When weights are pread'd + * into a stream buffer, these abs_offset values are wildly wrong (they point far + * beyond the buffer). Fixing this would require modifying ~60 inner functions. + * CPU inference uses MAP_PRIVATE + MADV_WILLNEED to avoid the Darwin VM panic. + * For 8GB RAM systems, use Metal backend with --low-mem instead. */ +static void __attribute__((unused)) forward_token_stream_cpu_decode_scratch( + float * logits, + ds4_engine * engine, + const ds4_weights * weights, + ds4_kv_cache * cache, + int token, + uint32_t pos, + ds4_cpu_decode_scratch * scratch) { + /* NONFUNCTIONAL PLACEHOLDER: This function body has been removed. + * CPU streaming is not feasible because all CPU compute kernels internally + * use tensor_data(model, tensor) = model->map + tensor->abs_offset, where + * abs_offset is relative to the full GGUF file. Fixing this would require + * modifying ~60 inner compute functions. Instead, CPU inference uses + * MAP_PRIVATE mmap + MADV_WILLNEED prefetch to avoid Darwin VM panic. + * Metal backend --low-mem implements true weight streaming. */ + (void)logits; (void)engine; (void)weights; (void)cache; + (void)token; (void)pos; (void)scratch; + return; +} + +/* ======================================================================== + * STREAMING DECODE LOOP FOR 8GB RAM OPERATION + * ======================================================================== + * + * The following functions implement layer-sequential inference for low-memory + * (8GB RAM) operation. Instead of loading all weights into GPU memory at once, + * weights are streamed from the GGUF file on disk using pread(). + * + * Overview: + * 1. For each of 43 layers: + * a. Load non-expert weights (attention + shared expert) via pread into stream buffer + * b. Set Metal model_map to point to stream buffer for this layer's kernels + * c. Run routing-only phase (gate projection + top-k) to determine 6 active experts + * d. Synchronize with GPU and read expert IDs from g->router_selected + * e. Load the 6 selected expert weights via pread into the same stream buffer + * f. Run full layer compute (attention + MoE FFN) + * g. Proceed to next layer + * + * Key design decisions: + * - Two ping-pong buffers (210 MB each) allow CPU to load layer i+1 while GPU + * processes layer i, reducing I/O overhead. + * - Routing is separated from MoE dispatch to allow expert selection before load. + * - Expert weights in GGUF file are stored contiguously per-expert, enabling + * direct pread() of specific expert slices. + * - Token embeddings (1 GB) are streamed per-token (8 KB per row). + * - Output weights (563 MB) are pinned in memory since needed every token. + * + * Integration point: This path is used when DS4_FLAG_LOW_MEM is set on engine. + * The streaming decode loop should replace metal_graph_eval_token_raw_swa() in + * the session decode path (ds4_session_decode_step). + * ======================================================================== */ + static void utf8_put(char **p, uint32_t cp) { if (cp <= 0x7f) { *(*p)++ = (char)cp; @@ -14426,7 +15180,7 @@ static void print_top_logits( /* CPU generation entry point. It runs layer-major prefill once, then decodes * one token at a time using the persistent KV cache and scratch arena. */ static int generate_raw_swa_cpu( - const ds4_model * model, + ds4_engine * engine, const ds4_vocab * vocab, const ds4_weights * weights, const token_vec * prompt, @@ -14437,15 +15191,22 @@ static int generate_raw_swa_cpu( void * emit_ud, ds4_session_progress_fn progress, void * progress_ud) { - (void)progress; - (void)progress_ud; - fprintf(stderr, "ds4: using CPU generation with layer-major prefill\n"); + const ds4_model *model = &engine->model; + const bool low_mem = (engine->flags & DS4_FLAG_LOW_MEM) != 0; + fprintf(stderr, "ds4: using CPU generation with %s prefill\n", + low_mem ? "streaming" : "layer-major"); ds4_kv_cache cache; kv_cache_init(&cache, (uint32_t)ctx_size, 0); ds4_cpu_decode_scratch decode_scratch; cpu_decode_scratch_init(&decode_scratch, (uint32_t)ctx_size); + /* Allocate streaming buffers if needed */ + if (low_mem) { + decode_scratch.layer_buf = xmalloc(LAYER_BUF_BYTES); + decode_scratch.expert_buf = xmalloc(EXPERT_BUF_BYTES); + } + float *logits = xmalloc((size_t)DS4_N_VOCAB * sizeof(logits[0])); int pos = prompt->len; const bool trace_top = getenv("DS4_TRACE_TOP") != NULL; @@ -14454,12 +15215,33 @@ static int generate_raw_swa_cpu( if (prompt->len <= 0 || prompt->len > ctx_size) { fprintf(stderr, "ds4: prompt is empty or exceeds context size\n"); free(logits); + if (low_mem) { + free(decode_scratch.layer_buf); + free(decode_scratch.expert_buf); + } cpu_decode_scratch_free(&decode_scratch); kv_cache_free(&cache); return 1; } - prefill_layer_major_cpu(logits, model, weights, &cache, prompt); + /* Streaming prefill: token-by-token through all layers */ + if (low_mem) { + for (int i = 0; i < (int)prompt->len - 1; i++) { + forward_token_raw_swa_cpu_streaming(NULL, model, weights, &cache, &decode_scratch, + NULL, prompt->v[i], (uint32_t)i, engine); + } + /* Last prefill token: compute logits */ + if (prompt->len > 0) { + forward_token_raw_swa_cpu_streaming(logits, model, weights, &cache, &decode_scratch, + NULL, prompt->v[prompt->len - 1], + (uint32_t)(prompt->len - 1), engine); + } + } else { + prefill_layer_major_cpu(logits, model, weights, &cache, prompt); + } + + /* Fire progress completion event — matches cli_prefill_progress_cb expectation */ + if (progress) progress(progress_ud, "prefill_chunk", prompt->len, prompt->len); const double t_prefill1 = now_sec(); fprintf(stderr, "ds4: prefill %d/%d done\n", prompt->len, prompt->len); @@ -14498,8 +15280,17 @@ static int generate_raw_swa_cpu( } const double t_eval0 = token_timing ? now_sec() : 0.0; - forward_token_raw_swa_cpu_decode_scratch(logits, model, weights, &cache, token, (uint32_t)pos, - &decode_scratch); + if (low_mem) { + forward_token_raw_swa_cpu_streaming(logits, model, weights, &cache, &decode_scratch, + NULL, token, (uint32_t)pos, engine); + } else if (model->map) { + forward_token_raw_swa_cpu_decode_scratch(logits, model, weights, &cache, token, (uint32_t)pos, + &decode_scratch); + } else { + /* No mmap and no streaming - cannot proceed */ + fprintf(stderr, "ds4: error: CPU inference requires either model mmap or --low-mem streaming\n"); + break; + } if (token_timing) { const double t_eval1 = now_sec(); fprintf(stderr, "ds4: decode eval %d took %.3f ms\n", n_decode_eval + 1, (t_eval1 - t_eval0) * 1000.0); @@ -14520,6 +15311,10 @@ static int generate_raw_swa_cpu( decode_s > 0.0 ? (double)n_generated / decode_s : 0.0); free(logits); + if (low_mem) { + free(decode_scratch.layer_buf); + free(decode_scratch.expert_buf); + } cpu_decode_scratch_free(&decode_scratch); kv_cache_free(&cache); return 0; @@ -14529,7 +15324,7 @@ static int generate_raw_swa_cpu( /* Metal generation entry point. The model runs as one local whole-graph * pipeline: chunked/layer-major prefill followed by graph decode steps. */ static int generate_metal_graph_raw_swa( - const ds4_model * model, + ds4_engine * engine, const ds4_vocab * vocab, const ds4_weights * weights, const token_vec * prompt, @@ -14541,6 +15336,7 @@ static int generate_metal_graph_raw_swa( void * emit_ud, ds4_session_progress_fn progress, void * progress_ud) { + const ds4_model *model = &engine->model; fprintf(stderr, "ds4: using Metal graph generation with layer-major graph prefill\n"); if (prompt->len <= 0 || prompt->len > ctx_size) { @@ -14572,7 +15368,23 @@ static int generate_metal_graph_raw_swa( const bool token_timing = getenv("DS4_TOKEN_TIMING") != NULL; const double t_prefill0 = now_sec(); - if (prefill_cap < (uint32_t)prompt->len) { + if ((engine->flags & DS4_FLAG_LOW_MEM) && engine->stream.fd >= 0) { + /* Streaming prefill: process one token at a time through all layers */ + for (int i = 0; i < (int)prompt->len - 1 && ok; i++) { + ok = metal_graph_eval_token_streaming(engine, &g, + prompt->v[i], + (uint32_t)i, + NULL); /* No logits except last token */ + if (progress) progress(progress_ud, "prefill", i + 1, prompt->len); + } + /* Last prefill token: get logits to seed decode */ + if (ok && prompt->len > 0) { + ok = metal_graph_eval_token_streaming(engine, &g, + prompt->v[prompt->len - 1], + (uint32_t)(prompt->len - 1), + logits); + } + } else if (prefill_cap < (uint32_t)prompt->len) { ok = metal_graph_prefill_chunked(&g, model, weights, prompt, prompt->len, logits, false, progress, progress_ud); } else { ok = metal_graph_prefill_raw_swa(&g, model, weights, prompt, prompt->len, logits, true); @@ -14618,12 +15430,17 @@ static int generate_metal_graph_raw_swa( } const double t_eval0 = token_timing ? now_sec() : 0.0; - ok = metal_graph_eval_token_raw_swa(&g, - model, - weights, - (uint32_t)token, - (uint32_t)pos, - logits); + /* Dispatch to streaming or non-streaming decode based on --low-mem flag */ + if ((engine->flags & DS4_FLAG_LOW_MEM) && engine->stream.fd >= 0) { + ok = metal_graph_eval_token_streaming(engine, &g, token, (uint32_t)pos, logits); + } else { + ok = metal_graph_eval_token_raw_swa(&g, + model, + weights, + (uint32_t)token, + (uint32_t)pos, + logits); + } if (!ok) break; if (token_timing) { const double t_eval1 = now_sec(); @@ -15475,7 +16292,7 @@ int ds4_dump_text_tokenization(const char *model_path, const char *text, FILE *f token_vec tokens = {0}; if (!fp) fp = stdout; - model_open(&model, model_path, false, false); + model_open(&model, model_path, false, false, false); vocab_load(&vocab, &model); tokenize_rendered_chat_vocab(&vocab, text ? text : "", &tokens); @@ -15496,7 +16313,6 @@ int ds4_engine_generate_argmax( void *emit_ud, ds4_session_progress_fn progress, void *progress_ud) { - const ds4_model *model = &e->model; const ds4_vocab *vocab = &e->vocab; const ds4_weights *weights = &e->weights; @@ -15506,7 +16322,7 @@ int ds4_engine_generate_argmax( fprintf(stderr, "ds4: Metal generation requested but Metal is unavailable\n"); return 1; } - return generate_metal_graph_raw_swa(model, vocab, weights, prompt, + return generate_metal_graph_raw_swa(e, vocab, weights, prompt, n_predict, ctx_size, e->quality, emit, done, emit_ud, progress, progress_ud); #else @@ -15515,12 +16331,16 @@ int ds4_engine_generate_argmax( #endif } - return generate_raw_swa_cpu(model, vocab, weights, prompt, n_predict, + return generate_raw_swa_cpu(e, vocab, weights, prompt, n_predict, ctx_size, emit, done, emit_ud, progress, progress_ud); } int ds4_engine_metal_graph_test(ds4_engine *e, const ds4_tokens *prompt) { #ifndef DS4_NO_METAL + if (e->flags & DS4_FLAG_LOW_MEM) { + fprintf(stderr, "ds4: Metal graph test not available in --low-mem mode\n"); + return 1; + } if (!e->metal_ready) { fprintf(stderr, "ds4: Metal graph test requested but Metal is unavailable\n"); return 1; @@ -15536,6 +16356,10 @@ int ds4_engine_metal_graph_test(ds4_engine *e, const ds4_tokens *prompt) { int ds4_engine_metal_graph_full_test(ds4_engine *e, const ds4_tokens *prompt) { #ifndef DS4_NO_METAL + if (e->flags & DS4_FLAG_LOW_MEM) { + fprintf(stderr, "ds4: Metal full graph test not available in --low-mem mode\n"); + return 1; + } if (!e->metal_ready) { fprintf(stderr, "ds4: Metal full graph test requested but Metal is unavailable\n"); return 1; @@ -15551,6 +16375,10 @@ int ds4_engine_metal_graph_full_test(ds4_engine *e, const ds4_tokens *prompt) { int ds4_engine_metal_graph_prompt_test(ds4_engine *e, const ds4_tokens *prompt, int ctx_size) { #ifndef DS4_NO_METAL + if (e->flags & DS4_FLAG_LOW_MEM) { + fprintf(stderr, "ds4: Metal prompt graph test not available in --low-mem mode\n"); + return 1; + } if (!e->metal_ready) { fprintf(stderr, "ds4: Metal prompt graph test requested but Metal is unavailable\n"); return 1; @@ -15566,6 +16394,10 @@ int ds4_engine_metal_graph_prompt_test(ds4_engine *e, const ds4_tokens *prompt, } int ds4_engine_head_test(ds4_engine *e, const ds4_tokens *prompt) { + if (e->flags & DS4_FLAG_LOW_MEM) { + fprintf(stderr, "ds4: head test not available in --low-mem mode\n"); + return 1; + } if (!prompt || prompt->len <= 0) { fprintf(stderr, "ds4: head test requires a non-empty prompt\n"); return 1; @@ -15714,17 +16546,25 @@ int ds4_engine_open(ds4_engine **out, const ds4_engine_options *opt) { if (e->mtp_draft_tokens > 16) e->mtp_draft_tokens = 16; e->mtp_margin = opt->mtp_margin >= 0.0f ? opt->mtp_margin : 3.0f; if (opt->n_threads > 0) g_requested_threads = (uint32_t)opt->n_threads; + if (opt->low_mem) e->flags |= DS4_FLAG_LOW_MEM; + e->stream.fd = -1; /* initialize stream fd */ ds4_acquire_instance_lock(); model_open(&e->model, opt->model_path, - opt->backend == DS4_BACKEND_METAL, true); - if (opt->warm_weights) model_warm_weights(&e->model); + opt->backend == DS4_BACKEND_METAL, true, opt->low_mem); + /* Warming only applies to mmap'd models; skip in low-mem mode (model.map is NULL) */ + if (opt->warm_weights && !(opt->low_mem)) model_warm_weights(&e->model); vocab_load(&e->vocab, &e->model); config_validate_model(&e->model); - weights_bind(&e->weights, &e->model); + /* Initialize stream context only for low-mem Metal. + * Note: CPU streaming requires tensor offset adjustment (not yet implemented). + * With skip_mmap=true, model->map is NULL, so pread is required for any weight access. + * CPU currently uses MAP_PRIVATE mmap (when not skip_mmap) to avoid Darwin VM panic. */ + bool use_streaming = (e->flags & DS4_FLAG_LOW_MEM); + weights_bind(&e->weights, &e->model, use_streaming ? &e->stream : NULL); if (opt->mtp_path && opt->mtp_path[0]) { model_open(&e->mtp_model, opt->mtp_path, - opt->backend == DS4_BACKEND_METAL, true); + opt->backend == DS4_BACKEND_METAL, true, false); mtp_weights_bind(&e->mtp_weights, &e->mtp_model); e->mtp_ready = true; fprintf(stderr, "ds4: MTP support model loaded: %s (draft=%d)\n", @@ -15742,7 +16582,10 @@ int ds4_engine_open(ds4_engine **out, const ds4_engine_options *opt) { return 1; } ds4_metal_set_quality(e->quality); - if (!ds4_metal_set_model_map_range(e->model.map, + /* In low-mem mode, model.map is NULL and we use pread for all weight access. + * Skip Metal model view registration to avoid spurious "failed to map" error. */ + if (!(e->flags & DS4_FLAG_LOW_MEM) && + !ds4_metal_set_model_map_range(e->model.map, e->model.size, e->model.tensor_data_pos, e->model.size - e->model.tensor_data_pos)) @@ -15768,6 +16611,17 @@ int ds4_engine_open(ds4_engine **out, const ds4_engine_options *opt) { return 1; } fprintf(stderr, "ds4: Metal backend initialized for graph diagnostics\n"); + + /* Low-memory streaming mode: allocate streaming buffer pool and populate stream context */ + if (e->flags & DS4_FLAG_LOW_MEM) { + if (ds4_metal_stream_buf_alloc() != 0) { + fprintf(stderr, "ds4: failed to allocate Metal streaming buffers; aborting startup\n"); + ds4_engine_close(e); + *out = NULL; + return 1; + } + fprintf(stderr, "[low-mem mode: streaming weights from disk, expect ~2-3 s/token]\n"); + } } #else if (e->backend == DS4_BACKEND_METAL) { @@ -15855,6 +16709,7 @@ typedef struct { void *user_ud; } ds4_sync_progress; +#ifndef DS4_NO_METAL static void ds4_session_note_prefill_progress(void *ud, const char *event, int current, int total) { ds4_sync_progress *p = ud; if (!p || !p->session || !p->prompt) return; @@ -15866,6 +16721,7 @@ static void ds4_session_note_prefill_progress(void *ud, const char *event, int c } if (p->user) p->user(p->user_ud, event, current, total); } +#endif /* Bring the Metal graph to exactly the supplied token prefix. * diff --git a/ds4.h b/ds4.h index 1e5ad66b..912acbd1 100644 --- a/ds4.h +++ b/ds4.h @@ -63,6 +63,7 @@ typedef struct { float mtp_margin; bool warm_weights; bool quality; + bool low_mem; /* enable low-memory streaming mode for 8GB RAM operation */ } ds4_engine_options; typedef void (*ds4_token_emit_fn)(void *ud, int token); diff --git a/ds4_cli.c b/ds4_cli.c index 9851346d..80c2932e 100644 --- a/ds4_cli.c +++ b/ds4_cli.c @@ -1,3 +1,4 @@ +#define _XOPEN_SOURCE 600 #include "ds4.h" #include "linenoise.h" @@ -99,6 +100,8 @@ static void usage(FILE *fp) { " Prefer exact kernels where faster approximate paths exist; MTP uses strict verification.\n" " --warm-weights\n" " Touch mapped tensor pages before generation. Slower startup, fewer first-use stalls.\n" + " --low-mem\n" + " Stream weights from disk for 8GB RAM operation. Expect ~2-3 s/token. Requires NVMe.\n" "\n" "Prompt and generation:\n" " -p, --prompt TEXT\n" @@ -1251,6 +1254,8 @@ static cli_config parse_options(int argc, char **argv) { c.inspect = true; } else if (!strcmp(arg, "--warm-weights")) { c.engine.warm_weights = true; + } else if (!strcmp(arg, "--low-mem")) { + c.engine.low_mem = true; } else if (!strcmp(arg, "--server")) { fprintf(stderr, "ds4: use ds4-server for the HTTP server\n"); exit(2); diff --git a/ds4_metal.h b/ds4_metal.h index f84f78b8..d59a29e8 100644 --- a/ds4_metal.h +++ b/ds4_metal.h @@ -39,6 +39,15 @@ int ds4_metal_set_model_map_range(const void *model_map, uint64_t model_size, ui void ds4_metal_set_quality(bool quality); void ds4_metal_print_memory_report(const char *label); +/* Low-memory streaming buffer pool for weight loading from disk */ +#define DS4_STREAM_BUF_BYTES (210 * 1024 * 1024) +#define DS4_ATTN_BUF_IDX 2 +#define DS4_LMHEAD_BUF_IDX 3 +#define DS4_LMHEAD_BUF_BYTES (600 * 1024 * 1024) +int ds4_metal_stream_buf_alloc(void); +void *ds4_metal_stream_mem(int buf_idx); +void ds4_metal_stream_fill(int buf_idx, const void *src, size_t offset, size_t n); + /* ========================================================================= * Embeddings and Indexer Helpers. * ========================================================================= diff --git a/ds4_metal.m b/ds4_metal.m index f6d3a7d9..5f34658b 100644 --- a/ds4_metal.m +++ b/ds4_metal.m @@ -132,6 +132,10 @@ static id g_attn_out_group_ids_buffer; static const void *g_model_map_ptr; static uint64_t g_model_map_size; + +/* Low-memory streaming buffers for layer-sequential weight loading */ +static id g_stream_buf[4]; +static void *g_stream_mem[4]; static uint64_t g_model_mapped_offset; static uint64_t g_model_mapped_size; static uint64_t g_tensor_alloc_live_bytes; @@ -3940,6 +3944,101 @@ int ds4_metal_synchronize(void) { return ds4_metal_finish_command_buffer(cb, 1, "synchronize"); } +/* Low-memory streaming buffer allocation for layer-sequential weight loading */ +int ds4_metal_stream_buf_alloc(void) { + if (!g_device) return 1; /* Metal not initialized */ + + @autoreleasepool { + /* Allocate buffers 0, 1 (expert streaming) with DS4_STREAM_BUF_BYTES size */ + for (int i = 0; i < 2; i++) { + int ret = posix_memalign(&g_stream_mem[i], 16384, DS4_STREAM_BUF_BYTES); + if (ret != 0) { + fprintf(stderr, "ds4: posix_memalign failed for stream buffer %d: %s\n", + i, strerror(ret)); + return 1; + } + + g_stream_buf[i] = [g_device newBufferWithBytesNoCopy:g_stream_mem[i] + length:DS4_STREAM_BUF_BYTES + options:MTLResourceStorageModeShared + deallocator:nil]; + if (!g_stream_buf[i]) { + fprintf(stderr, "ds4: failed to create MTLBuffer for stream buffer %d\n", i); + return 1; + } + + /* Register stream buffer as a Metal model view so Metal kernels can look it up */ + ds4_metal_set_model_map_range(g_stream_mem[i], DS4_STREAM_BUF_BYTES, 0, DS4_STREAM_BUF_BYTES); + } + + /* Allocate buffer 2 (attention weights) with DS4_STREAM_BUF_BYTES size */ + int ret = posix_memalign(&g_stream_mem[2], 16384, DS4_STREAM_BUF_BYTES); + if (ret != 0) { + fprintf(stderr, "ds4: posix_memalign failed for stream buffer 2: %s\n", strerror(ret)); + return 1; + } + g_stream_buf[2] = [g_device newBufferWithBytesNoCopy:g_stream_mem[2] + length:DS4_STREAM_BUF_BYTES + options:MTLResourceStorageModeShared + deallocator:nil]; + if (!g_stream_buf[2]) { + fprintf(stderr, "ds4: failed to create MTLBuffer for stream buffer 2\n"); + return 1; + } + ds4_metal_set_model_map_range(g_stream_mem[2], DS4_STREAM_BUF_BYTES, 0, DS4_STREAM_BUF_BYTES); + + /* Allocate buffer 3 (lm_head) with DS4_LMHEAD_BUF_BYTES size */ + ret = posix_memalign(&g_stream_mem[3], 16384, DS4_LMHEAD_BUF_BYTES); + if (ret != 0) { + fprintf(stderr, "ds4: posix_memalign failed for stream buffer 3: %s\n", strerror(ret)); + return 1; + } + g_stream_buf[3] = [g_device newBufferWithBytesNoCopy:g_stream_mem[3] + length:DS4_LMHEAD_BUF_BYTES + options:MTLResourceStorageModeShared + deallocator:nil]; + if (!g_stream_buf[3]) { + fprintf(stderr, "ds4: failed to create MTLBuffer for stream buffer 3\n"); + return 1; + } + ds4_metal_set_model_map_range(g_stream_mem[3], DS4_LMHEAD_BUF_BYTES, 0, DS4_LMHEAD_BUF_BYTES); + } + return 0; +} + +void *ds4_metal_stream_mem(int buf_idx) { + if (buf_idx < 0 || buf_idx >= 4) return NULL; + return g_stream_mem[buf_idx]; +} + +void ds4_metal_stream_fill(int buf_idx, const void *src, size_t offset, size_t n) { + if (buf_idx < 0 || buf_idx >= 4 || !g_stream_mem[buf_idx] || !src) return; + size_t capacity = (buf_idx == DS4_LMHEAD_BUF_IDX) ? DS4_LMHEAD_BUF_BYTES : DS4_STREAM_BUF_BYTES; + if (offset + n > capacity) { + fprintf(stderr, "ds4: stream buffer overflow: buf_idx=%d, offset=%zu, n=%zu, capacity=%zu\n", + buf_idx, offset, n, capacity); + return; + } + memcpy((char *)g_stream_mem[buf_idx] + offset, src, n); +} + +/* Get MTLBuffer from stream buffer, setting inner_offset for indexing */ +static id ds4_metal_wrap_stream_range( + int buf_idx, + uint64_t offset, + uint64_t len, + uint64_t *inner_offset) { + if (buf_idx < 0 || buf_idx >= 4 || !g_stream_buf[buf_idx]) return nil; + uint64_t capacity = (buf_idx == DS4_LMHEAD_BUF_IDX) ? DS4_LMHEAD_BUF_BYTES : DS4_STREAM_BUF_BYTES; + if (len > capacity || offset > capacity - len) { + fprintf(stderr, "ds4: stream buffer range out of bounds: buf_idx=%d, offset=%llu, len=%llu, capacity=%llu\n", + buf_idx, offset, len, capacity); + return nil; + } + *inner_offset = offset; + return g_stream_buf[buf_idx]; +} + void ds4_metal_cleanup(void) { if (!g_initialized) return; @@ -4042,6 +4141,13 @@ void ds4_metal_cleanup(void) { g_moe_id_map_buffer = nil; g_attn_out_group_ids_buffer = nil; g_model_map_ptr = NULL; + for (int i = 0; i < 4; i++) { + g_stream_buf[i] = nil; + if (g_stream_mem[i]) { + free(g_stream_mem[i]); + g_stream_mem[i] = NULL; + } + } g_model_map_size = 0; g_model_mapped_offset = 0; g_model_mapped_size = 0; diff --git a/ds4_server.c b/ds4_server.c index 1ba23293..e5f45194 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -1,3 +1,4 @@ +#define _XOPEN_SOURCE 600 #include "ds4.h" #include "rax.h" @@ -7417,6 +7418,8 @@ static void usage(FILE *fp) { " Prefer exact kernels where faster approximate paths exist; MTP uses strict verification.\n" " --warm-weights\n" " Touch mapped tensor pages before serving. Slower startup, fewer first-use stalls.\n" + " --low-mem\n" + " Stream weights from disk for 8GB RAM operation. Expect ~2-3 s/token. Requires NVMe.\n" "\n" "HTTP API:\n" " --host HOST\n" @@ -7539,6 +7542,8 @@ static server_config parse_options(int argc, char **argv) { c.engine.quality = true; } else if (!strcmp(arg, "--warm-weights")) { c.engine.warm_weights = true; + } else if (!strcmp(arg, "--low-mem")) { + c.engine.low_mem = true; } else if (!strcmp(arg, "--cpu") || !strcmp(arg, "--backend")) { server_log(DS4_LOG_DEFAULT, "ds4-server: server mode is Metal-only"); exit(2);