Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/api/src/routes/query-engine.http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
ServiceReleasesResponse,
ServiceDependenciesResponse,
ServiceDbEdgesResponse,
ServiceExternalEdgesResponse,
ServicePlatformsResponse,
ServiceWorkloadsResponse,
ServiceUsageResponse,
Expand Down Expand Up @@ -436,6 +437,26 @@ export const HttpQueryEngineLive = HttpApiBuilder.group(MapleApi, "queryEngine",
return new ServiceDbEdgesResponse({ data: compiled.castRows(rows) as any[] })
}),
)
.handle("serviceExternalEdges", ({ payload }) =>
Effect.gen(function* () {
const tenant = yield* CurrentTenant.Context
const compiled = CH.serviceExternalEdgesSQL(
{
deploymentEnv: payload.deploymentEnv,
serviceName: payload.serviceName,
},
{ orgId: tenant.orgId, startTime: payload.startTime, endTime: payload.endTime },
)
const rows = yield* mapExecError(
warehouse.sqlQuery(tenant, compiled.sql, {
profile: "aggregation",
context: "serviceExternalEdges",
}),
"serviceExternalEdges query failed",
)
return new ServiceExternalEdgesResponse({ data: compiled.castRows(rows) as any[] })
}),
)
.handle("servicePlatforms", ({ payload }) =>
Effect.gen(function* () {
const tenant = yield* CurrentTenant.Context
Expand Down
52 changes: 45 additions & 7 deletions apps/api/src/services/ServiceMapRollupService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface ServiceMapRollupResult {
readonly orgsProcessed: number
readonly hoursRolledUp: number
readonly edgesWritten: number
readonly resolutionsWritten: number
readonly orgFailures: number
}

Expand Down Expand Up @@ -92,12 +93,12 @@ export class ServiceMapRollupService extends Context.Service<

let hoursRolledUp = 0
let edgesWritten = 0
let resolutionsWritten = 0
for (const hourMs of missing) {
const rollup = CH.serviceMapEdgesRollupSQL({
orgId,
hourStart: toTinybirdDateTime(hourMs),
hourEnd: toTinybirdDateTime(hourMs + HOUR_MS),
})
const hourStart = toTinybirdDateTime(hourMs)
const hourEnd = toTinybirdDateTime(hourMs + HOUR_MS)

const rollup = CH.serviceMapEdgesRollupSQL({ orgId, hourStart, hourEnd })
const raw = yield* warehouse.sqlQuery(tenant, rollup.sql, {
context: "serviceMapRollup",
})
Expand All @@ -106,9 +107,40 @@ export class ServiceMapRollupService extends Context.Service<
yield* warehouse.ingest(tenant, "service_map_edges_hourly", rows)
edgesWritten += rows.length
}

// Companion write: address-resolutions, used by the external-edges
// query's anti-join to suppress internal-service overlap. Separate
// SQL pass (~same cost as the edges JOIN) so the existing rollup
// query keeps its tight shape; failure of one ingest doesn't
// invalidate the other (per-org Effect failure already isolated).
//
// NOTE: the hour is marked "done" purely by the presence of an edges
// row (see `serviceMapEdgesExistingHoursSQL`). If the edges write
// succeeds but the resolutions write fails (warehouse error, ingest
// throttling), the next tick will skip the hour and the resolutions
// gap is permanent — manifesting as internal-service HTTP calls
// leaking into the Dependencies "External" tab for that window.
// Backfill = re-running this rollup with the edges row deleted.
const resolutionsRollup = CH.serviceMapResolutionsRollupSQL({
orgId,
hourStart,
hourEnd,
})
const resolutionsRaw = yield* warehouse.sqlQuery(tenant, resolutionsRollup.sql, {
context: "serviceMapResolutionsRollup",
})
const resolutionsRows = resolutionsRollup.castRows(resolutionsRaw)
if (resolutionsRows.length > 0) {
yield* warehouse.ingest(
tenant,
"service_address_resolutions_hourly",
resolutionsRows,
)
resolutionsWritten += resolutionsRows.length
}
hoursRolledUp += 1
}
return { hoursRolledUp, edgesWritten, failed: false }
return { hoursRolledUp, edgesWritten, resolutionsWritten, failed: false }
})

const runRollupTick: ServiceMapRollupServiceShape["runRollupTick"] = Effect.fn(
Expand All @@ -130,7 +162,12 @@ export class ServiceMapRollupService extends Context.Service<
error: Cause.pretty(cause),
}),
),
{ hoursRolledUp: 0, edgesWritten: 0, failed: true },
{
hoursRolledUp: 0,
edgesWritten: 0,
resolutionsWritten: 0,
failed: true,
},
),
),
),
Expand All @@ -141,6 +178,7 @@ export class ServiceMapRollupService extends Context.Service<
orgsProcessed: orgRows.length,
hoursRolledUp: results.reduce((sum, r) => sum + r.hoursRolledUp, 0),
edgesWritten: results.reduce((sum, r) => sum + r.edgesWritten, 0),
resolutionsWritten: results.reduce((sum, r) => sum + r.resolutionsWritten, 0),
orgFailures: results.filter((r) => r.failed).length,
}
})
Expand Down
111 changes: 111 additions & 0 deletions apps/web/src/api/tinybird/service-external-edges.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { Effect, Schema } from "effect"
import { ServiceExternalEdgesRequest } from "@maple/domain/http"
import { MapleApiAtomClient } from "@/lib/services/common/atom-client"
import { summarizeSampling } from "@/lib/sampling"
import { TinybirdDateTimeString, decodeInput, runTinybirdQuery } from "@/api/tinybird/effect-utils"

export type ServiceExternalTargetType = "http" | "messaging" | "rpc"

export interface ServiceExternalEdge {
sourceService: string
targetType: ServiceExternalTargetType
targetSystem: string
targetName: string
callCount: number
estimatedCallCount: number
errorCount: number
errorRate: number
avgDurationMs: number
p95DurationMs: number
hasSampling: boolean
samplingWeight: number
}

export interface ServiceExternalEdgesResponse {
edges: ServiceExternalEdge[]
}

const GetServiceExternalEdgesInputSchema = Schema.Struct({
serviceName: Schema.String,
startTime: Schema.optional(TinybirdDateTimeString),
endTime: Schema.optional(TinybirdDateTimeString),
deploymentEnv: Schema.optional(Schema.String),
})

export type GetServiceExternalEdgesInput = Schema.Schema.Type<typeof GetServiceExternalEdgesInputSchema>

const defaultTimeRange = () => {
const now = new Date()
const dayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000)
const fmt = (d: Date) => d.toISOString().replace("T", " ").slice(0, 19)
return { startTime: fmt(dayAgo), endTime: fmt(now) }
}

const knownTargetTypes: ReadonlySet<ServiceExternalTargetType> = new Set([
"http",
"messaging",
"rpc",
])

function coerceTargetType(value: unknown): ServiceExternalTargetType {
return knownTargetTypes.has(value as ServiceExternalTargetType)
? (value as ServiceExternalTargetType)
: "http"
}

function transformEdge(row: Record<string, unknown>, durationSeconds: number): ServiceExternalEdge {
const callCount = Number(row.callCount ?? 0)
const errorCount = Number(row.errorCount ?? 0)
const estimatedSpanCount = Number(row.estimatedSpanCount ?? 0)
const sampling = summarizeSampling(estimatedSpanCount, callCount, durationSeconds)
const estimatedCallCount = sampling.hasSampling ? Math.round(estimatedSpanCount) : callCount
return {
sourceService: String(row.sourceService ?? ""),
targetType: coerceTargetType(row.targetType),
targetSystem: String(row.targetSystem ?? ""),
targetName: String(row.targetName ?? ""),
callCount,
estimatedCallCount,
errorCount,
errorRate: callCount > 0 ? errorCount / callCount : 0,
avgDurationMs: Number(row.avgDurationMs ?? 0),
p95DurationMs: Number(row.p95DurationMs ?? 0),
hasSampling: sampling.hasSampling,
samplingWeight: sampling.weight,
}
}

export const getServiceExternalEdges = Effect.fn("QueryEngine.getServiceExternalEdges")(function* ({
data,
}: {
data: GetServiceExternalEdgesInput
}) {
const input = yield* decodeInput(
GetServiceExternalEdgesInputSchema,
data ?? {},
"getServiceExternalEdges",
)
const fallback = defaultTimeRange()

const result = yield* runTinybirdQuery("serviceExternalEdges", () =>
Effect.gen(function* () {
const client = yield* MapleApiAtomClient
return yield* client.queryEngine.serviceExternalEdges({
payload: new ServiceExternalEdgesRequest({
serviceName: input.serviceName,
startTime: input.startTime ?? fallback.startTime,
endTime: input.endTime ?? fallback.endTime,
deploymentEnv: input.deploymentEnv,
}),
})
}),
)

const startMs = input.startTime ? new Date(input.startTime.replace(" ", "T") + "Z").getTime() : 0
const endMs = input.endTime ? new Date(input.endTime.replace(" ", "T") + "Z").getTime() : 0
const durationSeconds = startMs > 0 && endMs > 0 ? Math.max((endMs - startMs) / 1000, 1) : 3600

return {
edges: result.data.map((row) => transformEdge(row, durationSeconds)),
}
})
Loading
Loading