Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opentelemetry fixes #4966

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions distributor-node/src/services/parsers/ConfigParserService.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ValidationError, ValidationService } from '../validation/ValidationService'
import { Config } from '../../types'
import fs from 'fs'
import { JSONSchema4, JSONSchema4TypeName } from 'json-schema'
import _ from 'lodash'
import path from 'path'
import YAML from 'yaml'
import _ from 'lodash'
import configSchema, { bytesizeUnits } from '../../schemas/configSchema'
import { JSONSchema4, JSONSchema4TypeName } from 'json-schema'
import { Config } from '../../types'
import { ValidationError, ValidationService } from '../validation/ValidationService'

const MIN_CACHE_SIZE = '20G'
const MIN_MAX_CACHED_ITEM_SIZE = '1M'
Expand Down Expand Up @@ -131,6 +131,26 @@ export class ConfigParserService {
return String(packageJSON.version)
}

private setEnvVarsFromInputConfig(inputConfig: Record<string, unknown>) {
this.populateEnvVars(inputConfig)
}

private populateEnvVars(config: Record<string, unknown>, prefix = 'JOYSTREAM_DISTRIBUTOR__', path = '') {
Object.entries(config).forEach(([key, value]) => {
// Transform camelCase to snake_case and then to uppercase
const envKey = `${prefix}${path}${this.toSnakeCase(key)}`.toUpperCase()
if (typeof value === 'object' && value !== null && !Array.isArray(value)) {
this.populateEnvVars(value as Record<string, unknown>, prefix, `${path}${this.toSnakeCase(key)}__`)
} else {
process.env[envKey] = String(value)
}
})
}

private toSnakeCase(str: string): string {
return str.replace(/[\w]([A-Z])/g, (m) => `${m[0]}_${m[1]}`).toLowerCase()
}

public parse(): Config {
const { configPath } = this
let inputConfig: Record<string, unknown> = {}
Expand All @@ -149,6 +169,10 @@ export class ConfigParserService {
// Override config with env variables
this.mergeEnvConfigWith(inputConfig)

// Export the JSON/YML config as env vars too so that the pieces of code that does not have
// access to the config object (e.g. opentelemetry module) can read the config values.
this.setEnvVarsFromInputConfig(inputConfig)

// Validate the config
const configJson = this.validator.validate('Config', inputConfig)

Expand Down
1 change: 1 addition & 0 deletions entrypoints/distributor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# docker entrypoint fot distributor node, to allow running with telemetry
if [[ "$TELEMETRY_ENABLED" = "yes" ]] && [[ $1 = "start" ]]; then
export OTEL_APPLICATION=distributor-node
node --require @joystream/opentelemetry /joystream/distributor-node/bin/run $*
else
/joystream/distributor-node/bin/run $*
Expand Down
1 change: 1 addition & 0 deletions entrypoints/graphql-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# docker entrypoint fot graphql-server, to allow running with telemetry
if [[ "$TELEMETRY_ENABLED" = "yes" ]]; then
export OTEL_APPLICATION=query-node
yarn workspace query-node-root query-node:start:prod:with-instrumentation $*
else
yarn workspace query-node-root query-node:start:prod:pm2 $*
Expand Down
1 change: 1 addition & 0 deletions entrypoints/storage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# docker entrypoint fot storage node, to allow running with telemetry
if [[ "$TELEMETRY_ENABLED" = "yes" ]] && [[ $1 = "server" ]]; then
export OTEL_APPLICATION=storage-node
node --require @joystream/opentelemetry /joystream/storage-node/bin/run $*
else
/joystream/storage-node/bin/run $*
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry/.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ OTEL_APPLICATION=
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:8200
OTEL_RESOURCE_ATTRIBUTES="service.name=test-service,deployment.environment=development"
OTEL_METRICS_EXPORTER="otlp"

# Optional env vars to configure the opentelemetry exporters
OTEL_MAX_QUEUE_SIZE=8192 # 4 times of default queue size
OTEL_MAX_EXPORT_BATCH_SIZE=1024 # 2 times of default batch size
6 changes: 1 addition & 5 deletions opentelemetry/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import { DiagConsoleLogger, DiagLogLevel, diag } from '@opentelemetry/api'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { config } from 'dotenv'
import path from 'path'
import 'dotenv/config'
import { DefaultInstrumentation, DistributorNodeInstrumentation, StorageNodeInstrumentation } from './instrumentations'

// For troubleshooting, set the log level to DiagLogLevel.DEBUG
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO)

async function addInstrumentation() {
// Load env variables
config({ path: path.join(__dirname, '../.env') })

const applicationName = process.env.OTEL_APPLICATION

let instrumentation: NodeSDK
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/instrumentations/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-node'

export const DefaultInstrumentation = new NodeSDK({
spanProcessor: new BatchSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: 8192 /* 4 times of default queue size */,
maxExportBatchSize: 1024 /* 2 times of default batch size */,
maxQueueSize: parseInt(process.env.OTEL_MAX_QUEUE_SIZE || '8192'),
maxExportBatchSize: parseInt(process.env.OTEL_MAX_EXPORT_BATCH_SIZE || '1024'),
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter(),
Expand Down
14 changes: 10 additions & 4 deletions opentelemetry/instrumentations/distributor-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ import { FsInstrumentation } from '@opentelemetry/instrumentation-fs'
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-node'
import { BatchSpanProcessor, Span } from '@opentelemetry/sdk-trace-node'
import { ClientRequest, ServerResponse } from 'http'

/** Opentelemetry Instrumentation for Joystream Distributor Node */

class CustomSpanProcessor extends BatchSpanProcessor {
onStart(span: Span) {
span.setAttribute('nodeId', process.env.JOYSTREAM_DISTRIBUTOR__ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I see the code in the config parser setting this env variable, but it happens in the distributor code. Do we know that onStart() is called after the config parser has done it's job?

Copy link
Contributor Author

@zeeshanakram3 zeeshanakram3 Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, the instrument code is loaded & initialized first and then the config parser will do its job, i.e. exporting the env vars. Because of this reason the first few spans when the distributor node starts will have nodeId as undefined (if the config was set through config.yml file and not as env vars).

}
}

export const DistributorNodeInstrumentation = new NodeSDK({
spanProcessor: new BatchSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: 8192 /* 4 times of default queue size */,
maxExportBatchSize: 1024 /* 2 times of default batch size */,
spanProcessor: new CustomSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: parseInt(process.env.OTEL_MAX_QUEUE_SIZE || '8192'),
maxExportBatchSize: parseInt(process.env.OTEL_MAX_EXPORT_BATCH_SIZE || '1024'),
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter(),
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/instrumentations/storage-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { ClientRequest, ServerResponse } from 'http'

export const StorageNodeInstrumentation = new NodeSDK({
spanProcessor: new BatchSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: 8192 /* 4 times of default queue size */,
maxExportBatchSize: 1024 /* 2 times of default batch size */,
maxQueueSize: parseInt(process.env.OTEL_MAX_QUEUE_SIZE || '8192'),
maxExportBatchSize: parseInt(process.env.OTEL_MAX_EXPORT_BATCH_SIZE || '1024'),
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter(),
Expand Down
Loading