Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apps/hash-ai-worker-ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@
"@local/hash-graph-sdk": "workspace:*",
"@local/hash-isomorphic-utils": "workspace:*",
"@local/status": "workspace:*",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/api-logs": "0.207.0",
"@opentelemetry/instrumentation": "0.207.0",
"@opentelemetry/instrumentation-grpc": "0.207.0",
"@sentry/node": "10.42.0",
"@temporalio/activity": "1.12.1",
"@temporalio/common": "1.12.1",
"@temporalio/interceptors-opentelemetry": "1.12.1",
"@temporalio/proto": "1.12.1",
"@temporalio/worker": "1.12.1",
"@temporalio/workflow": "1.12.1",
Expand Down
8 changes: 8 additions & 0 deletions apps/hash-ai-worker-ts/scripts/bundle-workflow-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ async function bundle() {
require.resolve(
"@local/hash-backend-utils/temporal/interceptors/workflows/sentry",
),
// OTEL workflow interceptor must be in the bundle: when the
// worker boots with `workflowBundle`, the `interceptors.workflowModules`
// option on `Worker.create` is ignored. The interceptor is a no-op
// when no global TracerProvider is registered, so it's safe to
// include unconditionally.
require.resolve(
"@local/hash-backend-utils/temporal/interceptors/workflows/opentelemetry",
),
],
});
const codePath = path.join(__dirname, "../dist/workflow-bundle.js");
Expand Down
51 changes: 51 additions & 0 deletions apps/hash-ai-worker-ts/src/instrument.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* OpenTelemetry bootstrap for the AI worker. Imported as the very first
* statement of `main.ts` so the auto-instrumentations can patch http
* and gRPC modules before any other code requires them.
*/
import {
createHttpInstrumentation,
createUndiciInstrumentation,
registerOpenTelemetry,
} from "@local/hash-backend-utils/opentelemetry";
import { GrpcInstrumentation } from "@opentelemetry/instrumentation-grpc";

/**
* Setup handles. `undefined` when no `HASH_OTLP_ENDPOINT` is configured
* (no collector) or when bootstrap throws.
*/
export const otelSetup: ReturnType<typeof registerOpenTelemetry> = (() => {
const otlpEndpoint = process.env.HASH_OTLP_ENDPOINT;
if (!otlpEndpoint) {
return undefined;
}
try {
return registerOpenTelemetry({
endpoint: otlpEndpoint,
serviceName: process.env.OTEL_SERVICE_NAME ?? "AI Worker",
instrumentations: [
createHttpInstrumentation(otlpEndpoint),
new GrpcInstrumentation(),
// Native `fetch` (used by openai / @anthropic-ai/sdk / Vertex AI
// SDKs) goes through undici, which the http instrumentation does
// not patch. The shared helper sets `peer.service` so Tempo's
// service_graphs processor renders external dependencies as
// edges in the service map.
createUndiciInstrumentation(),
],
});
} catch (error) {
// Outside production, fail loud: realistic causes here are coding
// errors (bad URL, malformed instrumentation config) and hiding
// them in dev/CI loses regressions.
if (process.env.NODE_ENV !== "production") {
throw error;
}
// eslint-disable-next-line no-console
console.error(
"OpenTelemetry bootstrap failed; AI worker will start without telemetry.",
error,
);
return undefined;
}
})();
165 changes: 56 additions & 109 deletions apps/hash-ai-worker-ts/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
/* eslint-disable import/first */
/* eslint-disable import/first, import/order, simple-import-sort/imports */

// Must be the first import so OTEL auto-instrumentations can patch
// http / grpc / Sentry's own monkey-patches before they apply.
import { otelSetup } from "./instrument.js";

import * as Sentry from "@sentry/node";

Expand All @@ -12,21 +16,26 @@ Sentry.init({
process.env.ENVIRONMENT ||
(process.env.NODE_ENV === "production" ? "production" : "development"),
tracesSampleRate: process.env.NODE_ENV === "production" ? 1.0 : 0,
// Sentry registers its own global `NodeTracerProvider` by default.
// Letting it run after `registerOpenTelemetry` would replace the
// provider that the OTEL workflow client interceptor (set up in
// `createTemporalClient`) holds via `trace.getTracer(...)`, breaking
// caller → workflow → activity context propagation. With this flag
// Sentry shares our provider so its spans flow through the same
// OTLP pipeline.
skipOpenTelemetrySetup: !!otelSetup,
});

import * as http from "node:http";
import { createRequire } from "node:module";
import path from "node:path";
import { fileURLToPath } from "node:url";

import { createGraphClient } from "@local/hash-backend-utils/create-graph-client";
import { getRequiredEnv } from "@local/hash-backend-utils/environment";
import { createCommonFlowActivities } from "@local/hash-backend-utils/flows";
import { SentryActivityInboundInterceptor } from "@local/hash-backend-utils/temporal/interceptors/activities/sentry";
import { sentrySinks } from "@local/hash-backend-utils/temporal/sinks/sentry";
import type { WorkflowSource } from "@local/hash-backend-utils/temporal/worker-bootstrap";
import { runWorker } from "@local/hash-backend-utils/temporal/worker-bootstrap";
import { createVaultClient } from "@local/hash-backend-utils/vault";
import type { WorkerOptions } from "@temporalio/worker";
import { defaultSinks, NativeConnection, Worker } from "@temporalio/worker";
import { config } from "dotenv-flow";
import { TsconfigPathsPlugin } from "tsconfig-paths-webpack-plugin";

Expand All @@ -43,138 +52,76 @@ export const monorepoRootDir = path.resolve(__dirname, "../../..");

config({ silent: true, path: monorepoRootDir });

const TEMPORAL_HOST = new URL(
process.env.HASH_TEMPORAL_SERVER_HOST ?? "http://localhost",
).hostname;
const TEMPORAL_PORT = process.env.HASH_TEMPORAL_SERVER_PORT
? parseInt(process.env.HASH_TEMPORAL_SERVER_PORT, 10)
: 7233;

const createHealthCheckServer = () => {
const server = http.createServer((req, res) => {
if (req.method === "GET" && req.url === "/health") {
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
res.end(
JSON.stringify({
msg: "worker healthy",
}),
);
return;
}
res.writeHead(404);
res.end("");
});

return server;
};

const workflowOptions: Partial<WorkerOptions> =
const workflowSource: WorkflowSource =
process.env.NODE_ENV === "production"
? {
workflowBundle: {
codePath: require.resolve("../dist/workflow-bundle.js"),
},
kind: "bundle",
bundle: { codePath: require.resolve("../dist/workflow-bundle.js") },
}
: {
kind: "path",
workflowsPath: require.resolve("./workflows"),
bundlerOptions: {
webpackConfigHook: (webpackConfig) => {
return {
...webpackConfig,
resolve: {
...webpackConfig.resolve,
plugins: [
...((webpackConfig.plugins as [] | undefined) ?? []),
/**
* Because we run TypeScript directly in development, we need to use the 'paths' in the base tsconfig.json
* This tells TypeScript where to resolve the imports from, overwriting the 'exports' in local dependencies' package.jsons,
* which refer to the transpiled JavaScript code. This plugin converts the 'paths' to webpack 'alias'.
*/
new TsconfigPathsPlugin({
configFile:
"../../libs/@local/tsconfig/legacy-base-tsconfig-to-refactor.json",
}),
],
},
};
},
webpackConfigHook: (webpackConfig) => ({
...webpackConfig,
resolve: {
...webpackConfig.resolve,
plugins: [
...((webpackConfig.plugins as [] | undefined) ?? []),
/**
* We run TypeScript directly in development, so the 'paths' in
* the base tsconfig.json need to be honoured to override the
* 'exports' in local dependencies' package.jsons (which point
* at transpiled JavaScript). This plugin converts the 'paths'
* to webpack 'alias'.
*/
new TsconfigPathsPlugin({
configFile:
"../../libs/@local/tsconfig/legacy-base-tsconfig-to-refactor.json",
}),
],
},
}),
},
workflowsPath: require.resolve("./workflows"),
};

async function run() {
logger.info("Starting AI worker...");

const graphApiClient = createGraphClient(logger, {
host: getRequiredEnv("HASH_GRAPH_HTTP_HOST"),
port: parseInt(getRequiredEnv("HASH_GRAPH_HTTP_PORT"), 10),
});

logger.info("Created Graph client");

const vaultClient = await createVaultClient({ logger });

if (!vaultClient) {
throw new Error("Failed to create Vault client, check preceding logs.");
}

logger.info("Created Vault client");

const connection = await NativeConnection.connect({
address: `${TEMPORAL_HOST}:${TEMPORAL_PORT}`,
});
logger.info("Created Temporal connection");

const worker = await Worker.create({
...workflowOptions,
await runWorker({
serviceName: "AI worker",
taskQueue: "ai",
healthCheckPort: 4100,
activities: {
...createAiActivities({
graphApiClient,
}),
...createGraphActivities({
graphApiClient,
}),
...createAiActivities({ graphApiClient }),
...createGraphActivities({ graphApiClient }),
...createFlowActivities({ vaultClient }),
...createCommonFlowActivities({ graphApiClient }),
},
connection,
/**
* The maximum time that may elapse between heartbeats being processed by the server.
* The default maxHeartbeatThrottleInterval is 60s.
* Throttling is also capped at 80% of the heartbeatTimeout set when proxying an activity.
*/
maxHeartbeatThrottleInterval: "10 seconds",
namespace: "HASH",
taskQueue: "ai",
sinks: { ...defaultSinks(), ...sentrySinks() },
interceptors: {
workflowModules: [
require.resolve(
"@local/hash-backend-utils/temporal/interceptors/workflows/sentry",
),
],
activityInbound: [(ctx) => new SentryActivityInboundInterceptor(ctx)],
workflowSource,
workerOptions: {
/**
* Maximum interval between heartbeats being processed by the server.
* Default `maxHeartbeatThrottleInterval` is 60s; throttling is also
* capped at 80% of `heartbeatTimeout` set when proxying an activity.
*/
maxHeartbeatThrottleInterval: "10 seconds",
},
otelSetup,
logger,
});

const httpServer = createHealthCheckServer();
const port = 4100;
httpServer.listen({ host: "0.0.0.0", port });

logger.info(`HTTP server listening on port ${port}`);

await worker.run();
}

process.on("SIGINT", () => {
logger.info("Received SIGINT, exiting...");
process.exit(1);
});
process.on("SIGTERM", () => {
logger.info("Received SIGTERM, exiting...");
process.exit(1);
});

run().catch((error: unknown) => {
logger.error("Error running worker", { error });
process.exit(1);
Expand Down
1 change: 0 additions & 1 deletion apps/hash-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"@opentelemetry/instrumentation": "0.207.0",
"@opentelemetry/instrumentation-express": "0.56.0",
"@opentelemetry/instrumentation-graphql": "0.55.0",
"@opentelemetry/instrumentation-http": "0.207.0",
"@opentelemetry/resources": "2.2.0",
"@opentelemetry/sdk-logs": "0.207.0",
"@opentelemetry/sdk-trace-base": "2.2.0",
Expand Down
2 changes: 1 addition & 1 deletion apps/hash-api/src/ensure-system-graph-is-initialized.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const context: ImpureGraphContext<false, true> = {
host: getRequiredEnv("HASH_GRAPH_HTTP_HOST"),
port: Number.parseInt(getRequiredEnv("HASH_GRAPH_HTTP_PORT"), 10),
}),
temporalClient: await createTemporalClient(logger),
temporalClient: await createTemporalClient(),
};

await ensureSystemGraphIsInitialized({
Expand Down
Loading
Loading