Skip to content

Commit e6d4fb4

Browse files
feat(flowengine): gut inline orchestration, replace with FlowEngine Submit calls, add workflow visibility API
Session 1 (scaffolding): - flowengine/store.go: add ListWorkflowsFilter, ListWorkflows, GetWorkflowSteps - flowengine/progress.go: add ProgressAdapter, PollAndEmit, WaitForCompletion - activities/app_install.go: fallback "compose_yaml" alias in write_compose - activities/infra.go: forward_port → port fallback in create_proxy Session 2 (gut + wire): - managers/appstore_progress.go: replace 574-line inline install/remove with engine.Submit + PollAndEmit (appstore_install + appstore_remove workflows) - managers/orchestrator.go: gut InstallApp (async Submit, 202), gut InstallFromRegistryWithProgress (pre-alloc port + Submit + PollAndEmit), gut UninstallApp (Submit + WaitForCompletion); delete rollbackInstall and detectContainerPort dead code; add feStore field + update SetFlowEngine - managers/appstore.go: export RemapPorts wrapper for adapter use - activities/appstore.go: add port int to ProcessManifest interface + ProcessManifestInput so allocated port flows through fat envelope - cmd/cubeos-api/adapters.go: implement ReadManifest (disk read bypassing json:"-"), ProcessManifest (ProcessManifestYAML + RemapPorts), RemapVolumes - cmd/cubeos-api/main.go: fix SetFlowEngine(engine, store) call; add WorkflowsHandler + mount /workflows; add @tag.name Workflows - handlers/apps.go: InstallApp returns 202 Accepted (async workflow) - handlers/workflows.go: new GET /workflows + GET /workflows/{id} with full Swagger annotations Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent f40f95e commit e6d4fb4

12 files changed

Lines changed: 602 additions & 991 deletions

File tree

cmd/cubeos-api/adapters.go

Lines changed: 80 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"context"
1111
"encoding/json"
1212
"fmt"
13+
"os"
14+
"path/filepath"
1315

1416
"cubeos-api/internal/flowengine/activities"
1517
"cubeos-api/internal/managers"
@@ -82,35 +84,101 @@ func (a *appConflictAdapter) AppExists(ctx context.Context, name string) (bool,
8284

8385
// --- appStoreManifestAdapter: activities.AppStoreManifestReader via *managers.AppStoreManager ---
8486
//
85-
// Session 1: ReadManifest is functional. ProcessManifest, RemapVolumes, and DetectWebUIType
86-
// are stubs that return errors — the active install path still uses InstallAppWithProgress.
87-
// Full implementations land in Session 3 when AppStoreManager is gutted.
87+
// ReadManifest reads the manifest YAML from disk and returns a JSON blob containing
88+
// the manifest content (manifest_yaml), app metadata, and store-specific hints.
89+
// This bypasses the json:"-" tag on StoreApp.ManifestPath.
90+
//
91+
// ProcessManifest applies CasaOS variable substitution, Swarm sanitization, and
92+
// port remapping using the allocated port from the fat envelope.
93+
//
94+
// RemapVolumes remaps external bind-mount paths to safe defaults under /cubeos/apps/.
95+
//
96+
// DetectWebUIType is a stub — the actual detection is performed by the app.detect_webui
97+
// activity (database.go) via HTTP probe. This method is never called.
8898

8999
type appStoreManifestAdapter struct{ mgr *managers.AppStoreManager }
90100

101+
// manifestPayload is the JSON structure embedded in ReadManifestOutput.Manifest.
102+
// All fields are consumed by ProcessManifest and RemapVolumes via the fat envelope.
103+
type manifestPayload struct {
104+
AppName string `json:"app_name"`
105+
DataPath string `json:"data_path"`
106+
ManifestYAML string `json:"manifest_yaml"`
107+
PortMap string `json:"port_map,omitempty"` // CasaOS x-casaos port hint
108+
}
109+
91110
func (a *appStoreManifestAdapter) ReadManifest(ctx context.Context, storeID, appName string) (json.RawMessage, error) {
92111
app := a.mgr.GetApp(storeID, appName)
93112
if app == nil {
94113
return nil, fmt.Errorf("app %s/%s not found in catalog", storeID, appName)
95114
}
96-
data, err := json.Marshal(app)
115+
if app.ManifestPath == "" {
116+
return nil, fmt.Errorf("manifest path not set for %s/%s", storeID, appName)
117+
}
118+
119+
// Read the raw YAML from disk (ManifestPath has json:"-" so it's not in catalog JSON).
120+
raw, err := os.ReadFile(app.ManifestPath)
121+
if err != nil {
122+
return nil, fmt.Errorf("failed to read manifest for %s/%s: %w", storeID, appName, err)
123+
}
124+
125+
payload := manifestPayload{
126+
AppName: appName,
127+
DataPath: filepath.Join("/cubeos/apps", appName, "appdata"),
128+
ManifestYAML: string(raw),
129+
PortMap: app.PortMap,
130+
}
131+
data, err := json.Marshal(payload)
97132
if err != nil {
98-
return nil, fmt.Errorf("marshal manifest for %s/%s: %w", storeID, appName, err)
133+
return nil, fmt.Errorf("failed to marshal manifest payload for %s/%s: %w", storeID, appName, err)
99134
}
100135
return json.RawMessage(data), nil
101136
}
102137

103-
func (a *appStoreManifestAdapter) ProcessManifest(ctx context.Context, manifest json.RawMessage) (*activities.ProcessedManifest, error) {
104-
// Session 1 stub: full implementation in Session 3.
105-
return nil, fmt.Errorf("ProcessManifest: not yet implemented (Session 3)")
138+
func (a *appStoreManifestAdapter) ProcessManifest(ctx context.Context, manifest json.RawMessage, port int) (*activities.ProcessedManifest, error) {
139+
var payload manifestPayload
140+
if err := json.Unmarshal(manifest, &payload); err != nil {
141+
return nil, fmt.Errorf("failed to unmarshal manifest payload: %w", err)
142+
}
143+
if payload.ManifestYAML == "" {
144+
return nil, fmt.Errorf("manifest_yaml is empty")
145+
}
146+
if payload.AppName == "" {
147+
return nil, fmt.Errorf("app_name is missing from manifest payload")
148+
}
149+
if payload.DataPath == "" {
150+
payload.DataPath = filepath.Join("/cubeos/apps", payload.AppName, "appdata")
151+
}
152+
153+
// Apply CasaOS variable substitution + Swarm sanitization.
154+
processed := a.mgr.ProcessManifestYAML(payload.ManifestYAML, payload.AppName, payload.DataPath)
155+
156+
// Remap the published host port to the CubeOS-allocated port.
157+
if port > 0 {
158+
remapped, err := managers.RemapPorts(processed, port, payload.PortMap)
159+
if err == nil {
160+
processed = remapped
161+
}
162+
// non-fatal: if remapping fails the original is used
163+
}
164+
165+
return &activities.ProcessedManifest{
166+
ComposeYAML: processed,
167+
}, nil
106168
}
107169

108170
func (a *appStoreManifestAdapter) RemapVolumes(ctx context.Context, compose string, appName string) (string, error) {
109-
// Session 1 stub: return compose unchanged; full implementation in Session 3.
110-
return compose, nil
171+
dataPath := filepath.Join("/cubeos/apps", appName, "appdata")
172+
remapped, _, err := managers.RemapExternalVolumes(compose, appName, dataPath, map[string]string{})
173+
if err != nil {
174+
// non-fatal: return original compose if remapping fails
175+
return compose, nil
176+
}
177+
return remapped, nil
111178
}
112179

113180
func (a *appStoreManifestAdapter) DetectWebUIType(ctx context.Context, manifest json.RawMessage) (string, error) {
114-
// Session 1 stub: default to "http".
115-
return "http", nil
181+
// Detection is performed by the app.detect_webui activity (database.go) via HTTP probe.
182+
// This method satisfies the interface but is never called by any registered activity.
183+
return "browser", nil
116184
}

cmd/cubeos-api/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@
8787
// @tag.name Registry
8888
// @tag.description Local Docker registry management
8989
//
90+
// @tag.name Workflows
91+
// @tag.description FlowEngine workflow run visibility (install, remove, progress)
92+
//
9093
// @tag.name Setup
9194
// @tag.description First boot setup wizard
9295
//
@@ -404,7 +407,7 @@ func main() {
404407
log.Info().Msg("FlowEngine started")
405408

406409
// Wire engine into managers
407-
orchestrator.SetFlowEngine(flowEngine)
410+
orchestrator.SetFlowEngine(flowEngine, feStore)
408411
appStoreMgr.SetFlowEngine(flowEngine, feStore)
409412

410413
// Create VPN manager (Sprint 3 - with HAL client)
@@ -453,6 +456,10 @@ func main() {
453456
log.Info().Msg("AppsHandler and ProfilesHandler initialized")
454457
}
455458

459+
// Create Workflows handler (FlowEngine visibility API)
460+
workflowsHandler := handlers.NewWorkflowsHandler(feStore)
461+
log.Info().Msg("WorkflowsHandler initialized")
462+
456463
// Create NetworkHandler for network mode management (Sprint 3)
457464
networkHandler := handlers.NewNetworkHandler(networkMgr, halClient)
458465

@@ -763,6 +770,9 @@ func main() {
763770
// Documentation (offline docs viewer)
764771
r.Mount("/documentation", docsHandler.Routes())
765772

773+
// Workflows API (FlowEngine visibility)
774+
r.Mount("/workflows", workflowsHandler.Routes())
775+
766776
// Unified Apps API (Sprint 3)
767777
if appsHandler != nil {
768778
r.Mount("/apps", appsHandler.Routes())

internal/flowengine/activities/app_install.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ func makeWriteCompose() flowengine.ActivityFunc {
213213
if err := json.Unmarshal(input, &in); err != nil {
214214
return nil, flowengine.NewPermanentError(fmt.Errorf("invalid write_compose input: %w", err))
215215
}
216+
// Fallback: accept "compose_yaml" key as alias for "content" (used by appstore pipeline).
217+
if in.Content == "" {
218+
var composeFallback struct {
219+
ComposeYAML string `json:"compose_yaml"`
220+
}
221+
_ = json.Unmarshal(input, &composeFallback)
222+
in.Content = composeFallback.ComposeYAML
223+
}
216224
if in.ComposePath == "" || in.Content == "" {
217225
return nil, flowengine.NewPermanentError(fmt.Errorf("compose_path and content are required"))
218226
}

internal/flowengine/activities/appstore.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ type AppStoreManifestReader interface {
1616
// ReadManifest fetches the raw manifest for a store app.
1717
ReadManifest(ctx context.Context, storeID, appName string) (json.RawMessage, error)
1818
// ProcessManifest transforms a raw manifest into a compose-ready config.
19-
ProcessManifest(ctx context.Context, manifest json.RawMessage) (*ProcessedManifest, error)
19+
// port is the allocated host port from the fat envelope (used to remap published ports).
20+
ProcessManifest(ctx context.Context, manifest json.RawMessage, port int) (*ProcessedManifest, error)
2021
// RemapVolumes adjusts volume paths for the CubeOS directory structure.
2122
RemapVolumes(ctx context.Context, compose string, appName string) (string, error)
2223
// DetectWebUIType determines the app's web UI access type.
@@ -62,9 +63,11 @@ type ReadManifestOutput struct {
6263
}
6364

6465
// ProcessManifestInput is the input for the appstore.process_manifest activity.
66+
// Port is auto-populated from the fat envelope (set by the allocate_port step).
6567
type ProcessManifestInput struct {
6668
AppName string `json:"app_name"`
6769
Manifest json.RawMessage `json:"manifest"`
70+
Port int `json:"port,omitempty"` // allocated host port from fat envelope
6871
}
6972

7073
// ProcessManifestOutput is the output of the appstore.process_manifest activity.
@@ -162,8 +165,8 @@ func makeProcessManifest(storeMgr AppStoreManifestReader) flowengine.ActivityFun
162165
return nil, flowengine.NewPermanentError(fmt.Errorf("manifest is required"))
163166
}
164167

165-
log.Info().Str("app", in.AppName).Msg("process_manifest: processing manifest")
166-
processed, err := storeMgr.ProcessManifest(ctx, in.Manifest)
168+
log.Info().Str("app", in.AppName).Int("port", in.Port).Msg("process_manifest: processing manifest")
169+
processed, err := storeMgr.ProcessManifest(ctx, in.Manifest, in.Port)
167170
if err != nil {
168171
return nil, flowengine.ClassifyError(err)
169172
}

internal/flowengine/activities/infra.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,19 @@ func makeCreateProxy(proxyMgr ProxyManager) flowengine.ActivityFunc {
208208
})
209209
}
210210

211+
// Fallback: fat-envelope passes allocated port as "port", but this struct reads "forward_port".
212+
// Accept either key so callers don't need to rename the field in workflow input.
213+
if in.ForwardPort == 0 {
214+
var portFallback struct {
215+
Port int `json:"port"`
216+
}
217+
_ = json.Unmarshal(input, &portFallback)
218+
in.ForwardPort = portFallback.Port
219+
}
220+
if in.ForwardPort == 0 {
221+
return nil, flowengine.NewPermanentError(fmt.Errorf("forward_port is required"))
222+
}
223+
211224
log.Info().Str("domain", in.Domain).Int("port", in.ForwardPort).Msg("create_proxy: creating proxy host")
212225
hostID, err := proxyMgr.CreateProxyHost(ctx, in.Domain, in.ForwardHost, in.ForwardPort, in.ForwardScheme)
213226
if err != nil {

internal/flowengine/progress.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package flowengine
22

3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
)
8+
39
// ProgressEmitter is the interface that a Job must satisfy for the ProgressAdapter.
410
// Matched by *managers.Job so the adapter bridges FlowEngine → SSE without
511
// importing the managers package from here.
@@ -76,6 +82,97 @@ func (p *ProgressAdapter) OnWorkflowComplete(workflowType, externalID string, st
7682
}
7783
}
7884

85+
// PollAndEmit polls the WorkflowStore for step status changes and emits SSE progress events
86+
// to the wrapped ProgressEmitter. Blocks until the workflow reaches a terminal state.
87+
//
88+
// Returns nil if the workflow completed successfully, or an error if it failed/compensated.
89+
// The caller is responsible for emitting EmitDone/EmitError after this returns.
90+
func (p *ProgressAdapter) PollAndEmit(ctx context.Context, store *WorkflowStore, workflowID string) error {
91+
ticker := time.NewTicker(300 * time.Millisecond)
92+
defer ticker.Stop()
93+
94+
emitted := make(map[string]StepStatus) // track last-known status per step
95+
96+
for {
97+
select {
98+
case <-ctx.Done():
99+
return ctx.Err()
100+
case <-ticker.C:
101+
}
102+
103+
wf, err := store.GetWorkflow(workflowID)
104+
if err != nil {
105+
continue
106+
}
107+
108+
steps, err := store.GetWorkflowSteps(workflowID)
109+
if err != nil {
110+
continue
111+
}
112+
113+
for _, step := range steps {
114+
prev, seen := emitted[step.StepName]
115+
if seen && prev == step.Status {
116+
continue
117+
}
118+
emitted[step.StepName] = step.Status
119+
120+
switch step.Status {
121+
case StepRunning:
122+
p.OnStepStart(step.StepName)
123+
case StepCompleted:
124+
p.OnStepComplete(step.StepName)
125+
case StepFailed:
126+
p.OnStepFail(step.StepName, step.Error)
127+
}
128+
}
129+
130+
switch wf.CurrentState {
131+
case StateCompleted:
132+
return nil
133+
case StateFailed, StateCompensated:
134+
msg := wf.Error
135+
if msg == "" {
136+
msg = string(wf.CurrentState)
137+
}
138+
return fmt.Errorf("workflow %s", msg)
139+
}
140+
}
141+
}
142+
143+
// WaitForCompletion polls the WorkflowStore until the workflow reaches a terminal state.
144+
// Unlike PollAndEmit, it emits no SSE events. Useful for synchronous callers.
145+
//
146+
// Returns nil on success, error on failure/compensation.
147+
func WaitForCompletion(ctx context.Context, store *WorkflowStore, workflowID string) error {
148+
ticker := time.NewTicker(500 * time.Millisecond)
149+
defer ticker.Stop()
150+
151+
for {
152+
select {
153+
case <-ctx.Done():
154+
return ctx.Err()
155+
case <-ticker.C:
156+
}
157+
158+
wf, err := store.GetWorkflow(workflowID)
159+
if err != nil {
160+
continue
161+
}
162+
163+
switch wf.CurrentState {
164+
case StateCompleted:
165+
return nil
166+
case StateFailed, StateCompensated:
167+
msg := wf.Error
168+
if msg == "" {
169+
msg = string(wf.CurrentState)
170+
}
171+
return fmt.Errorf("workflow %s", msg)
172+
}
173+
}
174+
}
175+
79176
// stepProgressPct returns the SSE progress percentage for a step name.
80177
// Unknown step names default to 50.
81178
func stepProgressPct(stepName string) int {

internal/flowengine/store.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,57 @@ func (s *WorkflowStore) GetWorkflowSteps(workflowID string) ([]WorkflowStep, err
225225
return steps, rows.Err()
226226
}
227227

228+
// ListWorkflowsFilter holds query parameters for listing workflows.
229+
type ListWorkflowsFilter struct {
230+
WorkflowType string // optional: filter by workflow type
231+
State string // optional: filter by state (e.g. "running", "completed")
232+
Limit int // 0 = default 50
233+
Offset int
234+
}
235+
236+
// ListWorkflows returns workflows matching the filter, ordered by created_at DESC.
237+
func (s *WorkflowStore) ListWorkflows(filter ListWorkflowsFilter) ([]WorkflowRun, error) {
238+
if filter.Limit <= 0 {
239+
filter.Limit = 50
240+
}
241+
242+
query := `
243+
SELECT id, workflow_type, version, external_id, current_state, current_step,
244+
input, output, error, metadata, locked_by, locked_until,
245+
max_retries, retry_count, created_at, updated_at
246+
FROM workflow_runs
247+
WHERE 1=1`
248+
args := []interface{}{}
249+
250+
if filter.WorkflowType != "" {
251+
query += " AND workflow_type = ?"
252+
args = append(args, filter.WorkflowType)
253+
}
254+
if filter.State != "" {
255+
query += " AND current_state = ?"
256+
args = append(args, filter.State)
257+
}
258+
259+
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
260+
args = append(args, filter.Limit, filter.Offset)
261+
262+
rows, err := s.db.Query(query, args...)
263+
if err != nil {
264+
return nil, fmt.Errorf("list workflows: %w", err)
265+
}
266+
defer rows.Close()
267+
268+
var workflows []WorkflowRun
269+
for rows.Next() {
270+
wf, err := scanWorkflowRunFromRows(rows)
271+
if err != nil {
272+
return nil, err
273+
}
274+
workflows = append(workflows, *wf)
275+
}
276+
return workflows, rows.Err()
277+
}
278+
228279
// GetIncompleteWorkflows retrieves all workflows that are not in a terminal state.
229280
// Used by the engine on startup to recover in-flight workflows.
230281
func (s *WorkflowStore) GetIncompleteWorkflows() ([]WorkflowRun, error) {

0 commit comments

Comments
 (0)