Skip to content

Commit 1be7ecd

Browse files
authored
feat: trace refactor (#1058)
1 parent b02a182 commit 1be7ecd

File tree

14 files changed

+171
-108
lines changed

14 files changed

+171
-108
lines changed

packages/core/src/call-step-file.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export const callStepFile = <TData>(options: CallStepFileOptions, motia: Motia):
7070
processManager.kill()
7171
const errorMessage = `Step execution timed out after ${timeoutSeconds} seconds`
7272
logger.error(errorMessage, { step: step.config.name, timeout: timeoutSeconds })
73-
await tracer.end({ message: errorMessage })
73+
tracer.end({ message: errorMessage })
7474
trackEvent('step_execution_timeout', {
7575
stepName: step.config.name,
7676
traceId,
@@ -94,7 +94,7 @@ export const callStepFile = <TData>(options: CallStepFileOptions, motia: Motia):
9494
message: err.message,
9595
})
9696

97-
await tracer.end({
97+
tracer.end({
9898
message: err.message,
9999
code: err.code,
100100
stack: err.stack?.replace(new RegExp(`${motia.lockedData.baseDir}/`), ''),
@@ -104,35 +104,35 @@ export const callStepFile = <TData>(options: CallStepFileOptions, motia: Motia):
104104

105105
reject(error)
106106
} else {
107-
await tracer.end()
107+
tracer.end()
108108
}
109109

110110
processManager.kill()
111111
})
112112
processManager.handler<unknown>('log', async (input: unknown) => logger.log(input))
113113

114114
processManager.handler<StateGetInput, unknown>('state.get', async (input) => {
115-
await tracer.stateOperation('get', input)
115+
tracer.stateOperation('get', input)
116116
return motia.state.get(input.traceId, input.key)
117117
})
118118

119119
processManager.handler<StateSetInput, unknown>('state.set', async (input) => {
120-
await tracer.stateOperation('set', { traceId: input.traceId, key: input.key, value: input.value })
120+
tracer.stateOperation('set', { traceId: input.traceId, key: input.key, value: input.value })
121121
return motia.state.set(input.traceId, input.key, input.value)
122122
})
123123

124124
processManager.handler<StateDeleteInput, unknown>('state.delete', async (input) => {
125-
await tracer.stateOperation('delete', input)
125+
tracer.stateOperation('delete', input)
126126
return motia.state.delete(input.traceId, input.key)
127127
})
128128

129129
processManager.handler<StateClearInput, void>('state.clear', async (input) => {
130-
await tracer.stateOperation('clear', input)
130+
tracer.stateOperation('clear', input)
131131
return motia.state.clear(input.traceId)
132132
})
133133

134134
processManager.handler<StateStreamGetInput>(`state.getGroup`, async (input) => {
135-
await tracer.stateOperation('getGroup', input)
135+
tracer.stateOperation('getGroup', input)
136136
return motia.state.getGroup(input.groupId)
137137
})
138138

@@ -149,39 +149,39 @@ export const callStepFile = <TData>(options: CallStepFileOptions, motia: Motia):
149149
const flows = step.config.flows
150150

151151
if (!isAllowedToEmit(step, input.topic)) {
152-
await tracer.emitOperation(input.topic, input.data, false)
152+
tracer.emitOperation(input.topic, input.data, false)
153153
return motia.printer.printInvalidEmit(step, input.topic)
154154
}
155155

156-
await tracer.emitOperation(input.topic, input.data, true)
156+
tracer.emitOperation(input.topic, input.data, true)
157157
return motia.eventAdapter.emit({ ...input, traceId, flows, logger, tracer })
158158
})
159159

160160
Object.entries(streamConfig).forEach(([name, streamFactory]) => {
161161
const stateStream = streamFactory()
162162

163163
processManager.handler<StateStreamGetInput>(`streams.${name}.get`, async (input) => {
164-
await tracer.streamOperation(name, 'get', input)
164+
tracer.streamOperation(name, 'get', input)
165165
return stateStream.get(input.groupId, input.id)
166166
})
167167

168168
processManager.handler<StateStreamMutateInput>(`streams.${name}.set`, async (input) => {
169-
await tracer.streamOperation(name, 'set', { groupId: input.groupId, id: input.id, data: input.data })
169+
tracer.streamOperation(name, 'set', { groupId: input.groupId, id: input.id, data: input.data })
170170
return stateStream.set(input.groupId, input.id, input.data)
171171
})
172172

173173
processManager.handler<StateStreamGetInput>(`streams.${name}.delete`, async (input) => {
174-
await tracer.streamOperation(name, 'delete', input)
174+
tracer.streamOperation(name, 'delete', input)
175175
return stateStream.delete(input.groupId, input.id)
176176
})
177177

178178
processManager.handler<StateStreamGetInput>(`streams.${name}.getGroup`, async (input) => {
179-
await tracer.streamOperation(name, 'getGroup', input)
179+
tracer.streamOperation(name, 'getGroup', input)
180180
return stateStream.getGroup(input.groupId)
181181
})
182182

183183
processManager.handler<StateStreamSendInput>(`streams.${name}.send`, async (input) => {
184-
await tracer.streamOperation(name, 'send', input)
184+
tracer.streamOperation(name, 'send', input)
185185
return stateStream.send(input.channel, input.event)
186186
})
187187
})
@@ -203,19 +203,19 @@ export const callStepFile = <TData>(options: CallStepFileOptions, motia: Motia):
203203

204204
if (code !== 0 && code !== null) {
205205
const error = { message: `Process exited with code ${code}`, code }
206-
await tracer.end(error)
206+
tracer.end(error)
207207
trackEvent('step_execution_error', { stepName: step.config.name, traceId, code })
208208
reject(`Process exited with code ${code}`)
209209
} else {
210-
await tracer.end()
210+
tracer.end()
211211
resolve(result)
212212
}
213213
})
214214

215215
processManager.onProcessError(async (error) => {
216216
if (timeoutId) clearTimeout(timeoutId)
217217
processManager.close()
218-
await tracer.end({
218+
tracer.end({
219219
message: error.message,
220220
code: error.code,
221221
stack: error.stack,
@@ -236,7 +236,7 @@ export const callStepFile = <TData>(options: CallStepFileOptions, motia: Motia):
236236
})
237237
.catch(async (error) => {
238238
if (timeoutId) clearTimeout(timeoutId)
239-
await tracer.end({
239+
tracer.end({
240240
message: error.message,
241241
code: error.code,
242242
stack: error.stack,
@@ -253,7 +253,7 @@ export const callStepFile = <TData>(options: CallStepFileOptions, motia: Motia):
253253
})
254254
} catch (error: unknown) {
255255
const err = error as Error & { code?: string }
256-
await tracer.end({
256+
tracer.end({
257257
message: err.message,
258258
code: err.code,
259259
stack: err.stack,

packages/core/src/observability/create-trace.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ export const createTrace = (traceGroup: TraceGroup, step: Step) => {
77
const trace: Trace = {
88
id,
99
name: step.config.name,
10-
correlationId: traceGroup.correlationId,
1110
parentTraceId: traceGroup.id,
1211
status: 'running',
1312
startTime: Date.now(),
@@ -16,8 +15,5 @@ export const createTrace = (traceGroup: TraceGroup, step: Step) => {
1615
events: [],
1716
}
1817

19-
traceGroup.metadata.totalSteps++
20-
traceGroup.metadata.activeSteps++
21-
2218
return trace
2319
}

packages/core/src/observability/stream-tracer.ts

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -33,39 +33,6 @@ export class StreamTracer implements Tracer {
3333
this.trace.error = err
3434

3535
await this.manager.updateTrace()
36-
await this.recomputeTraceGroupStatus()
37-
await this.manager.updateTraceGroup()
38-
}
39-
40-
private async recomputeTraceGroupStatus() {
41-
const allTracesFromStorage = await this.manager.getAllTracesForGroup()
42-
43-
const currentTraceIndex = allTracesFromStorage.findIndex((t) => t.id === this.trace.id)
44-
const allTraces = [...allTracesFromStorage]
45-
46-
if (currentTraceIndex >= 0) {
47-
allTraces[currentTraceIndex] = this.trace
48-
} else {
49-
allTraces.push(this.trace)
50-
}
51-
52-
const completedCount = allTraces.filter((t) => t.status === 'completed').length
53-
const failedCount = allTraces.filter((t) => t.status === 'failed').length
54-
const runningCount = allTraces.filter((t) => t.status === 'running').length
55-
56-
this.traceGroup.metadata.completedSteps = completedCount
57-
this.traceGroup.metadata.activeSteps = runningCount
58-
59-
if (failedCount > 0) {
60-
this.traceGroup.status = 'failed'
61-
} else if (runningCount === 0 && completedCount > 0) {
62-
this.traceGroup.status = 'completed'
63-
if (!this.traceGroup.endTime) {
64-
this.traceGroup.endTime = Date.now()
65-
}
66-
} else {
67-
this.traceGroup.status = 'running'
68-
}
6936
}
7037

7138
async stateOperation(operation: StateOperation, input: unknown) {
@@ -106,9 +73,7 @@ export class StreamTracer implements Tracer {
10673
lastEvent.data.data = input.data
10774
lastEvent.maxTimestamp = Date.now()
10875

109-
this.traceGroup.lastActivity = lastEvent.maxTimestamp
11076
await this.manager.updateTrace()
111-
await this.manager.updateTraceGroup()
11277

11378
return
11479
}
@@ -133,9 +98,7 @@ export class StreamTracer implements Tracer {
13398

13499
private async addEvent(event: TraceEvent) {
135100
this.trace.events.push(event)
136-
this.traceGroup.lastActivity = event.timestamp
137101

138102
await this.manager.updateTrace()
139-
await this.manager.updateTraceGroup()
140103
}
141104
}

packages/core/src/observability/trace-manager.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ export class TraceManager {
99
private readonly trace: Trace,
1010
) {
1111
this.updateTrace().catch(() => {})
12-
this.updateTraceGroup().catch(() => {})
1312
}
1413

1514
async updateTrace() {

packages/core/src/observability/tracer.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,14 @@ export class BaseTracerFactory implements TracerFactory {
4040
const traceGroup: TraceGroup = {
4141
id: traceId,
4242
name: step.config.name,
43-
lastActivity: Date.now(),
44-
metadata: {
45-
completedSteps: 0,
46-
activeSteps: 0,
47-
totalSteps: 0,
48-
},
49-
correlationId: undefined,
50-
status: 'running',
5143
startTime: Date.now(),
5244
}
5345

5446
const trace = createTrace(traceGroup, step)
5547
const manager = new TraceManager(this.traceStream, this.traceGroupStream, traceGroup, trace)
5648

49+
await manager.updateTraceGroup()
50+
5751
return new StreamTracer(manager, traceGroup, trace, logger)
5852
}
5953

packages/core/src/observability/types.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,8 @@ import type { StepConfig } from '../types'
22

33
export interface TraceGroup {
44
id: string
5-
correlationId: string | undefined
65
name: string
7-
status: 'running' | 'completed' | 'failed'
86
startTime: number
9-
endTime?: number
10-
lastActivity: number
11-
metadata: {
12-
completedSteps: number
13-
activeSteps: number
14-
totalSteps: number
15-
}
167
}
178

189
export type TraceError = {

packages/e2e/tests/traces/traces.spec.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ test.describe('Traces tests', () => {
1010
await workbench.open()
1111
await workbench.navigateToTraces()
1212
await tracesPage.verifyTracesInterface()
13-
await expect(tracesPage.traceDetailsContainer).toHaveText('Select a trace or trace group to view the timeline')
13+
await expect(tracesPage.traceDetailsContainer).toBeVisible()
14+
await expect(tracesPage.traceDetailsContainer).toContainText(
15+
/Select a trace or trace group to view the timeline|0ms/,
16+
)
1417
})
1518

1619
await test.step('Execute basic tutorial flow', async () => {

plugins/plugin-observability/src/components/trace-timeline.tsx

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Button } from '@motiadev/ui'
22
import { Minus, Plus } from 'lucide-react'
33
import type React from 'react'
44
import { memo, useMemo, useState } from 'react'
5+
import { deriveTraceGroup } from '../hooks/use-derive-trace-group'
56
import { useGetEndTime } from '../hooks/use-get-endtime'
67
import { useTracesStream } from '../hooks/use-traces-stream'
78
import { formatDuration } from '../lib/utils'
@@ -22,9 +23,25 @@ interface TraceTimelineComponentProps {
2223
const TraceTimelineComponent: React.FC<TraceTimelineComponentProps> = memo(({ groupId }) => {
2324
useTracesStream()
2425

25-
const traceGroups = useObservabilityStore((state) => state.traceGroups)
26+
const traceGroupMetas = useObservabilityStore((state) => state.traceGroupMetas)
2627
const traces = useObservabilityStore((state) => state.traces)
27-
const group = useMemo(() => traceGroups.find((traceGroup) => traceGroup.id === groupId), [traceGroups, groupId])
28+
const group = useMemo(() => {
29+
const meta = traceGroupMetas.find((m) => m.id === groupId)
30+
if (!meta) return null
31+
if (traces.length === 0) {
32+
return {
33+
...meta,
34+
status: 'running' as const,
35+
lastActivity: meta.startTime,
36+
metadata: {
37+
completedSteps: 0,
38+
activeSteps: 0,
39+
totalSteps: 0,
40+
},
41+
}
42+
}
43+
return deriveTraceGroup(meta, traces)
44+
}, [traceGroupMetas, traces, groupId])
2845

2946
const endTime = useGetEndTime(group)
3047
const [zoom, setZoom] = useState(100)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { Trace, TraceGroup, TraceGroupMeta } from '../types/observability'
2+
3+
export const deriveTraceGroup = (meta: TraceGroupMeta, traces: Trace[]): TraceGroup => {
4+
const completedSteps = traces.filter((t) => t.status === 'completed').length
5+
const failedCount = traces.filter((t) => t.status === 'failed').length
6+
const runningCount = traces.filter((t) => t.status === 'running').length
7+
8+
const status = failedCount > 0 ? 'failed' : runningCount > 0 ? 'running' : 'completed'
9+
10+
const endTimes = traces.filter((t) => t.endTime).map((t) => t.endTime!)
11+
const endTime = status === 'completed' && endTimes.length > 0 ? Math.max(...endTimes) : undefined
12+
13+
const lastActivity = Math.max(
14+
meta.startTime,
15+
...traces.map((t) => t.endTime || t.startTime),
16+
...traces.flatMap((t) => t.events.map((e) => e.timestamp)),
17+
)
18+
19+
return {
20+
...meta,
21+
status,
22+
endTime,
23+
lastActivity,
24+
metadata: {
25+
completedSteps,
26+
activeSteps: runningCount,
27+
totalSteps: traces.length,
28+
},
29+
}
30+
}
Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,34 @@
11
import { useMemo } from 'react'
22
import { useObservabilityStore } from '../stores/use-observability-store'
3+
import { deriveTraceGroup } from './use-derive-trace-group'
34

45
export const useFilteredTraceGroups = () => {
5-
const traceGroups = useObservabilityStore((state) => state.traceGroups)
6+
const traceGroupMetas = useObservabilityStore((state) => state.traceGroupMetas)
7+
const tracesByGroupId = useObservabilityStore((state) => state.tracesByGroupId)
68
const search = useObservabilityStore((state) => state.search)
79

8-
return useMemo(
9-
() =>
10-
traceGroups.filter(
11-
(group) =>
12-
group.name.toLowerCase().includes(search.toLowerCase()) ||
13-
group.id.toLowerCase().includes(search.toLowerCase()),
14-
),
15-
[traceGroups, search],
16-
)
10+
return useMemo(() => {
11+
const traceGroups = traceGroupMetas.map((meta) => {
12+
const traces = tracesByGroupId[meta.id] || []
13+
if (traces.length === 0) {
14+
return {
15+
...meta,
16+
status: 'running' as const,
17+
lastActivity: meta.startTime,
18+
metadata: {
19+
completedSteps: 0,
20+
activeSteps: 0,
21+
totalSteps: 0,
22+
},
23+
}
24+
}
25+
return deriveTraceGroup(meta, traces)
26+
})
27+
28+
return traceGroups.filter(
29+
(group) =>
30+
group.name.toLowerCase().includes(search.toLowerCase()) ||
31+
group.id.toLowerCase().includes(search.toLowerCase()),
32+
)
33+
}, [traceGroupMetas, tracesByGroupId, search])
1734
}

0 commit comments

Comments
 (0)