Skip to content

Commit

Permalink
allow for multiple otel span processors & metric readers (#2623)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Apr 26, 2024
1 parent e7d1607 commit 8492f5b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .changeset/metal-fireants-remember.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/opentelemetry": patch
---

allow for multiple otel span processors & metric readers
9 changes: 6 additions & 3 deletions packages/opentelemetry/src/Metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* @since 1.0.0
*/
import type { MetricProducer, MetricReader } from "@opentelemetry/sdk-metrics"
import type { NonEmptyReadonlyArray } from "effect/Array"
import type * as Effect from "effect/Effect"
import type { LazyArg } from "effect/Function"
import type { Layer } from "effect/Layer"
Expand All @@ -21,11 +22,13 @@ export const makeProducer: Effect.Effect<MetricProducer, never, Resource> = inte
*/
export const registerProducer: (
self: MetricProducer,
metricReader: LazyArg<MetricReader>
) => Effect.Effect<MetricReader, never, Scope.Scope> = internal.registerProducer
metricReader: LazyArg<MetricReader | NonEmptyReadonlyArray<MetricReader>>
) => Effect.Effect<Array<any>, never, Scope.Scope> = internal.registerProducer

/**
* @since 1.0.0
* @category layers
*/
export const layer: (evaluate: LazyArg<MetricReader>) => Layer<never, never, Resource> = internal.layer
export const layer: (
evaluate: LazyArg<MetricReader | NonEmptyReadonlyArray<MetricReader>>
) => Layer<never, never, Resource> = internal.layer
31 changes: 19 additions & 12 deletions packages/opentelemetry/src/NodeSdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type * as Resources from "@opentelemetry/resources"
import type { MetricReader } from "@opentelemetry/sdk-metrics"
import type { SpanProcessor, TracerConfig } from "@opentelemetry/sdk-trace-base"
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"
import type { NonEmptyReadonlyArray } from "effect/Array"
import * as Effect from "effect/Effect"
import type { LazyArg } from "effect/Function"
import * as Layer from "effect/Layer"
Expand All @@ -18,9 +19,9 @@ import * as Tracer from "./Tracer.js"
* @category model
*/
export interface Configuration {
readonly spanProcessor?: SpanProcessor | undefined
readonly spanProcessor?: SpanProcessor | ReadonlyArray<SpanProcessor> | undefined
readonly tracerConfig?: Omit<TracerConfig, "resource"> | undefined
readonly metricReader?: MetricReader | undefined
readonly metricReader?: MetricReader | ReadonlyArray<MetricReader> | undefined
readonly resource?: {
readonly serviceName: string
readonly serviceVersion?: string
Expand All @@ -33,7 +34,7 @@ export interface Configuration {
* @category layers
*/
export const layerTracerProvider = (
processor: SpanProcessor,
processor: SpanProcessor | NonEmptyReadonlyArray<SpanProcessor>,
config?: Omit<TracerConfig, "resource">
): Layer.Layer<TracerProvider, never, Resource.Resource> =>
Layer.scoped(
Expand All @@ -47,7 +48,11 @@ export const layerTracerProvider = (
...(config ?? undefined),
resource
})
provider.addSpanProcessor(processor)
if (Array.isArray(processor)) {
processor.forEach((p) => provider.addSpanProcessor(p))
} else {
provider.addSpanProcessor(processor as any)
}
return provider
}),
(provider) => Effect.ignoreLogged(Effect.promise(() => provider.shutdown()))
Expand All @@ -74,14 +79,16 @@ export const layer: {
const ResourceLive = config.resource === undefined
? Resource.layerFromEnv()
: Resource.layer(config.resource)
const TracerLive = config.spanProcessor ?
Tracer.layer.pipe(
Layer.provide(layerTracerProvider(config.spanProcessor, config.tracerConfig))
)
: Layer.empty
const MetricsLive = config.metricReader
? Metrics.layer(() => config.metricReader!)
: Layer.empty
const TracerLive =
config.spanProcessor && !(Array.isArray(config.spanProcessor) && config.spanProcessor.length === 0) ?
Tracer.layer.pipe(
Layer.provide(layerTracerProvider(config.spanProcessor as any, config.tracerConfig))
)
: Layer.empty
const MetricsLive =
config.metricReader && !(Array.isArray(config.metricReader) && config.metricReader.length === 0)
? Metrics.layer(() => config.metricReader as any)
: Layer.empty
return Layer.merge(TracerLive, MetricsLive).pipe(
Layer.provideMerge(ResourceLive)
)
Expand Down
27 changes: 17 additions & 10 deletions packages/opentelemetry/src/WebSdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type * as Resources from "@opentelemetry/resources"
import type { MetricReader } from "@opentelemetry/sdk-metrics"
import type { SpanProcessor, TracerConfig } from "@opentelemetry/sdk-trace-base"
import { WebTracerProvider } from "@opentelemetry/sdk-trace-web"
import type { NonEmptyReadonlyArray } from "effect/Array"
import * as Effect from "effect/Effect"
import type { LazyArg } from "effect/Function"
import * as Layer from "effect/Layer"
Expand All @@ -18,9 +19,9 @@ import * as Tracer from "./Tracer.js"
* @category model
*/
export interface Configuration {
readonly spanProcessor?: SpanProcessor
readonly spanProcessor?: SpanProcessor | ReadonlyArray<SpanProcessor> | undefined
readonly tracerConfig?: Omit<TracerConfig, "resource">
readonly metricReader?: MetricReader
readonly metricReader?: MetricReader | ReadonlyArray<MetricReader> | undefined
readonly resource: {
readonly serviceName: string
readonly serviceVersion?: string
Expand All @@ -33,7 +34,7 @@ export interface Configuration {
* @category layers
*/
export const layerTracerProvider = (
processor: SpanProcessor,
processor: SpanProcessor | NonEmptyReadonlyArray<SpanProcessor>,
config?: Omit<TracerConfig, "resource">
): Layer.Layer<TracerProvider, never, Resource.Resource> =>
Layer.scoped(
Expand All @@ -47,7 +48,11 @@ export const layerTracerProvider = (
...(config ?? undefined),
resource
})
provider.addSpanProcessor(processor)
if (Array.isArray(processor)) {
processor.forEach((p) => provider.addSpanProcessor(p))
} else {
provider.addSpanProcessor(processor as any)
}
return provider
}),
(provider) => Effect.ignoreLogged(Effect.promise(() => provider.shutdown()))
Expand All @@ -72,12 +77,14 @@ export const layer: {
: Effect.sync(evaluate),
(config) => {
const ResourceLive = Resource.layer(config.resource)
const TracerLive = config.spanProcessor ?
Tracer.layer.pipe(Layer.provide(layerTracerProvider(config.spanProcessor, config.tracerConfig)))
: Layer.effectDiscard(Effect.void)
const MetricsLive = config.metricReader
? Metrics.layer(() => config.metricReader!)
: Layer.effectDiscard(Effect.void)
const TracerLive =
config.spanProcessor && !(Array.isArray(config.spanProcessor) && config.spanProcessor.length === 0) ?
Tracer.layer.pipe(Layer.provide(layerTracerProvider(config.spanProcessor as any, config.tracerConfig)))
: Layer.effectDiscard(Effect.void)
const MetricsLive =
config.metricReader && !(Array.isArray(config.metricReader) && config.metricReader.length === 0)
? Metrics.layer(() => config.metricReader as any)
: Layer.effectDiscard(Effect.void)
return Layer.merge(TracerLive, MetricsLive).pipe(
Layer.provideMerge(ResourceLive)
)
Expand Down
19 changes: 14 additions & 5 deletions packages/opentelemetry/src/internal/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,18 +290,27 @@ export const makeProducer = Effect.map(
)

/** @internal */
export const registerProducer = (self: MetricProducer, metricReader: LazyArg<MetricReader>) =>
export const registerProducer = (
self: MetricProducer,
metricReader: LazyArg<MetricReader | Arr.NonEmptyReadonlyArray<MetricReader>>
) =>
Effect.acquireRelease(
Effect.sync(() => {
const reader = metricReader()
reader.setMetricProducer(self)
return reader
const readers: Array<MetricReader> = Array.isArray(reader) ? reader : [reader] as any
readers.forEach((reader) => reader.setMetricProducer(self))
return readers
}),
(reader) => Effect.ignoreLogged(Effect.promise(() => reader.shutdown()))
(readers) =>
Effect.ignoreLogged(Effect.promise(() =>
Promise.all(
readers.map((reader) => reader.shutdown())
)
))
)

/** @internal */
export const layer = (evaluate: LazyArg<MetricReader>) =>
export const layer = (evaluate: LazyArg<MetricReader | Arr.NonEmptyReadonlyArray<MetricReader>>) =>
Layer.scopedDiscard(Effect.flatMap(
makeProducer,
(producer) => registerProducer(producer, evaluate)
Expand Down
2 changes: 1 addition & 1 deletion packages/opentelemetry/test/Tracer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const TracingLive = NodeSdk.layer(Effect.sync(() => ({
resource: {
serviceName: "test"
},
spanProcessor: new SimpleSpanProcessor(new InMemorySpanExporter())
spanProcessor: [new SimpleSpanProcessor(new InMemorySpanExporter())]
})))

// needed to test context propagation
Expand Down

0 comments on commit 8492f5b

Please sign in to comment.