Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ SpaceCat Task Processor is a Node.js service that processes messages from the AW
## Features
- Receives and processes messages from SQS
- Supports multiple task types via modular handlers
- Handlers for audit status, demo URL preparation, and disabling imports/audits
- Built-in handlers for audit status, demo URL preparation, disabling imports/audits, generic agent execution, and Slack notifications
- Extensible and easy to add new handlers

## Handlers
- **opportunity-status-processor**: Checks and reports status audits for a site
- **disable-import-audit-processor**: Disables specified imports and audits for a site
- **demo-url-processor**: Prepares and shares a demo URL for a site
- **agent-executor**: Runs registered AI/LLM agents (e.g., the brand-profile agent) asynchronously after onboarding flows
- **slack-notify**: Sends Slack notifications (text or block messages) from workflows

## Setup
1. Clone the repository
Expand All @@ -25,13 +27,32 @@ SpaceCat Task Processor is a Node.js service that processes messages from the AW

## Usage
- The service is designed to run as a serverless function or background worker.
- It listens for messages on the SQS queue and processes them automatically.
- It can be invoked in two ways:
- **SQS mode:** listens to the `SPACECAT-TASK-PROCESSOR-JOBS` queue and processes messages automatically (default path for existing workflows).
- **Direct mode:** the Lambda entrypoint auto-detects single-message payloads (e.g., from AWS Step Functions) and executes the corresponding handler synchronously. This is used by the new agent workflows to obtain immediate results before triggering follow-up actions.

## Development
- To run tests:
```sh
npm test
```
- To run the optional brand-profile integration test (requires Azure OpenAI env variables):
```sh
npm run test:brand-profile-it
```

### Agent Executor Environment

The `agent-executor` (and the provided brand-profile agent) rely on the Azure OpenAI credentials consumed by `@adobe/spacecat-shared-gpt-client`. Ensure the following variables are configured in the Lambda/runner environment (and locally when running the IT test):

| Variable | Purpose |
| --- | --- |
| `AZURE_OPENAI_ENDPOINT` | Azure OpenAI endpoint URL |
| `AZURE_OPENAI_KEY` | API key for the Azure OpenAI resource |
| `AZURE_API_VERSION` | API version used for the chat completions |
| `AZURE_COMPLETION_DEPLOYMENT` | Deployment/model name (e.g., `gpt-4o`) |

When invoking the integration test, you can also set `BRAND_PROFILE_TEST_BASE_URL` to control which site is analyzed and `BRAND_PROFILE_IT_FULL=1` to print the complete agent response (otherwise the preview is truncated for readability).
- To lint code:
```sh
npm run lint
Expand Down
23,879 changes: 17,667 additions & 6,212 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"prepare": "husky",
"local-build": "sam build",
"local-run": "sam local invoke",
"local-watch": "node scripts/watch-and-copy.js"
"local-watch": "node scripts/watch-and-copy.js",
"test:brand-profile-it": "BRAND_PROFILE_IT=1 mocha test/it/agent-executor/brand-profile.test.js"
},
"wsk": {
"target": "aws",
Expand Down Expand Up @@ -78,6 +79,7 @@
"@aws-sdk/client-cloudwatch-logs": "3.932.0",
"@adobe/spacecat-shared-ims-client": "1.11.1",
"@adobe/spacecat-shared-data-access": "2.83.2",
"@adobe/spacecat-shared-gpt-client": "1.6.9",
"@adobe/spacecat-shared-http-utils": "1.18.2",
"@adobe/spacecat-shared-slack-client": "1.5.30",
"@adobe/spacecat-shared-utils": "1.73.1",
Expand Down
29 changes: 27 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import { runOpportunityStatusProcessor as opportunityStatusProcessor } from './t
import { runDisableImportAuditProcessor as disableImportAuditProcessor } from './tasks/disable-import-audit-processor/handler.js';
import { runDemoUrlProcessor as demoUrlProcessor } from './tasks/demo-url-processor/handler.js';
import { runCwvDemoSuggestionsProcessor as cwvDemoSuggestionsProcessor } from './tasks/cwv-demo-suggestions-processor/handler.js';
import { runAgentExecutor as agentExecutor } from './tasks/agent-executor/handler.js';
import { runSlackNotify as slackNotify } from './tasks/slack-notify/handler.js';

const HANDLERS = {
'opportunity-status-processor': opportunityStatusProcessor,
'disable-import-audit-processor': disableImportAuditProcessor,
'demo-url-processor': demoUrlProcessor,
'agent-executor': agentExecutor,
'slack-notify': slackNotify,
'cwv-demo-suggestions-processor': cwvDemoSuggestionsProcessor,
dummy: (message) => ok(message),
};
Expand All @@ -50,7 +54,7 @@ function getElapsedSeconds(startTime) {
* @param {UniversalContext} context the context of the universal serverless function
* @returns {Response} a response
*/
async function run(message, context) {
async function processTask(message, context) {
const { log } = context;
const { type, siteId } = message;

Expand All @@ -76,9 +80,30 @@ async function run(message, context) {
}
}

export const main = wrap(run)
const runSQS = wrap(processTask)
.with(dataAccess)
.with(sqsEventAdapter)
.with(imsClientWrapper)
.with(secrets, { name: getSecretName })
.with(helixStatus);

const runDirect = wrap(processTask)
.with(dataAccess)
.with(imsClientWrapper)
.with(secrets, { name: getSecretName })
.with(helixStatus);

function isSqsEvent(event, context) {
if (Array.isArray(event?.Records)) {
return true;
}
if (Array.isArray(context?.invocation?.event?.Records)) {
return true;
}
return typeof event?.type !== 'string';
}

export const main = async (event, context) => {
const handler = isSqsEvent(event, context) ? runSQS : runDirect;
return handler(event, context);
};
61 changes: 61 additions & 0 deletions src/tasks/agent-executor/handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2025 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import { ok, badRequest } from '@adobe/spacecat-shared-http-utils';
import { hasText, isNonEmptyObject } from '@adobe/spacecat-shared-utils';

import { getAgent } from '../agents/registry.js';

/**
* Message shape:
* {
* type: 'agent-executor',
* agentId: 'brand-profile',
* context: { siteId?, baseURL, provider?, model?, params? },
* slackContext?: { channelId, threadTs },
* idempotencyKey?: string
* }
*/
export async function runAgentExecutor(message, context) {
const { log } = context;
const { agentId, context: agentContext } = message || {};

if (!hasText(agentId)) {
return badRequest('agentId is required');
}

const agent = getAgent(agentId);
if (!agent) {
return badRequest(`Unknown agentId: ${agentId}`);
}

// Run the agent (returns plain result object)
const result = await agent.run(agentContext, context.env, log);
// Optionally persist if the agent supports persistence
let persistMeta;
if (typeof agent.persist === 'function') {
try {
persistMeta = await agent.persist(message, context, result);
} catch (e) {
log.error(`agent-executor: persist failed for agent ${agentId}`, { error: e.message });
throw e;
}
}
const payload = {
agentId,
context: agentContext,
result,
};
if (isNonEmptyObject(persistMeta)) {
payload.persistMeta = persistMeta;
}
return ok(payload);
}
28 changes: 28 additions & 0 deletions src/tasks/agents/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2025 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import { readFileSync } from 'fs';
import { fileURLToPath } from 'url';
import path from 'path';

export function readPromptFile(importMetaUrl, relPath) {
const filename = fileURLToPath(importMetaUrl);
const dirname = path.dirname(filename);
const fullPath = path.resolve(dirname, relPath);
return readFileSync(fullPath, 'utf-8');
}

export function renderTemplate(template, vars = {}) {
return template.replace(/{{\s*([\w.]+)\s*}}/g, (_, key) => {
const v = vars[key];
return v == null ? '' : String(v);
});
}
104 changes: 104 additions & 0 deletions src/tasks/agents/brand-profile/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2025 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import { Config } from '@adobe/spacecat-shared-data-access/src/models/site/config.js';
import { AzureOpenAIClient } from '@adobe/spacecat-shared-gpt-client';
import { isNonEmptyObject, isValidUrl, isValidUUID } from '@adobe/spacecat-shared-utils';

import { readPromptFile, renderTemplate } from '../base.js';

async function callModel({
env, log, systemPrompt, userPrompt,
}) {
const gpt = AzureOpenAIClient.createFrom({ env, log });
const resp = await gpt.fetchChatCompletion(userPrompt, {
systemPrompt,
responseFormat: 'json_object',
});
const content = resp?.choices?.[0]?.message?.content || '{}';
try {
return JSON.parse(content);
} catch (e) {
log.error('brand-profile: failed to parse model JSON response', { error: e.message, contentPreview: String(content).slice(0, 500) });
throw new Error('brand-profile: invalid JSON returned by model');
}
}

async function run(context, env, log) {
const {
baseURL,
params = {},
} = context;

if (!isValidUrl(baseURL)) {
throw new Error('brand-profile: context.baseURL is required');
}

const systemPrompt = readPromptFile(import.meta.url, './prompts/system.prompt');
const userTemplate = readPromptFile(import.meta.url, './prompts/user.prompt');
const userPrompt = renderTemplate(userTemplate, { baseURL, params: JSON.stringify(params) });

return callModel({
env, log, systemPrompt, userPrompt,
});
}

async function persist(message, context, result) {
const { log, dataAccess } = context;
const siteId = message?.context?.siteId;

if (!isValidUUID(siteId)) {
log.warn(`brand-profile persist: invalid siteId ${siteId}`);
return;
}

if (!isNonEmptyObject(result)) {
log.warn(`brand-profile persist: empty result for site ${siteId}`);
return;
}

const { Site } = dataAccess;
const site = await Site.findById(siteId);
if (!site) {
log.warn(`brand-profile persist: site not found ${siteId}`);
return;
}
const cfg = site.getConfig();
const before = cfg.getBrandProfile?.() || {};
const beforeHash = before?.contentHash || null;
cfg.updateBrandProfile(result);
const after = cfg.getBrandProfile?.() || {};
const afterHash = after?.contentHash || null;
const changed = beforeHash !== afterHash;
site.setConfig(Config.toDynamoItem(cfg));
await site.save();

// Emit concise summary for observability/Slack step consumers via logs
const baseURL = message?.context?.baseURL;
const version = after?.version;
const summary = changed
? `Brand profile updated to v${version} for site ${siteId}${baseURL}.`
: `Brand profile unchanged (v${version}) for site ${siteId}${baseURL}.`;
log.info('brand-profile persist:', {
siteId,
version,
changed,
contentHash: afterHash,
baseURL,
summary,
});
}

export default {
id: 'brand-profile',
run,
persist,
};
Loading