From 7aef0cd65af225fae6d0fdbd309ad55fe6840f58 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 22 Sep 2025 13:11:16 +0200 Subject: [PATCH 1/4] Rate limit and attachment support --- base_folder/package.json | 2 +- base_folder/src/core/types.ts | 52 +++ base_folder/src/core/utils.ts | 30 ++ devrev-snapin-template.plain | 30 +- docs/attachment_normalization.md | 52 +++ docs/attachments-extraction.md | 29 +- ...ta-extraction-rules-for-emitting-events.md | 39 +++ docs/data-extraction.md | 2 + docs/function_invocation.mdx | 2 +- docs/incremental_mode.md | 87 ++++- docs/metadata-extraction.mdx | 43 +++ mock_callback_server.py | 2 +- mock_devrev_server.py | 331 ++++++++++++++---- run_devrev_snapin_conformance_tests.sh | 42 ++- ...ttachment_extraction_acceptance_test.plain | 10 + ...external_domain_metadata_boilerplate.plain | 2 + .../initial_domain_mapping_boilerplate.plain | 1 + templates/spawn_method_instructions.plain | 5 +- ...rate_limiting_during_data_extraction.plain | 7 + ...rate_limiting_during_emitting_events.plain | 5 + test_data/data_extraction_check.json | 46 +-- test_data/external_sync_unit_check.json | 46 +-- 22 files changed, 723 insertions(+), 142 deletions(-) create mode 100644 base_folder/src/core/types.ts create mode 100644 base_folder/src/core/utils.ts create mode 100644 docs/attachment_normalization.md create mode 100644 docs/data-extraction-rules-for-emitting-events.md create mode 100644 docs/metadata-extraction.mdx create mode 100644 templates/attachment_extraction_acceptance_test.plain create mode 100644 templates/test_rate_limiting_during_data_extraction.plain create mode 100644 templates/test_rate_limiting_during_emitting_events.plain diff --git a/base_folder/package.json b/base_folder/package.json index 99260d5..a957628 100644 --- a/base_folder/package.json +++ b/base_folder/package.json @@ -52,7 +52,7 @@ "yargs": "^17.6.2" }, "dependencies": { - "@devrev/ts-adaas": "1.5.1", + "@devrev/ts-adaas": "1.8.0", "@devrev/typescript-sdk": "1.1.63", "axios": "^1.9.0", "dotenv": "^16.0.3", diff --git a/base_folder/src/core/types.ts b/base_folder/src/core/types.ts new file mode 100644 index 0000000..228c5d0 --- /dev/null +++ b/base_folder/src/core/types.ts @@ -0,0 +1,52 @@ +/** + * Type definitions for DevRev function inputs and related types + */ + +export type Context = { + // ID of the dev org for which the function is being invoked. + dev_oid: string; + // ID of the automation/command/snap-kit Action/Event Source for which the function is being invoked. + source_id: string; + // ID of the snap-in as part of which the function is being invoked. + snap_in_id: string; + // ID of the snap-in Version as part of which the function is being invoked. + snap_in_version_id: string; + // ID of the service account. + service_account_id: string; + // This secrets map would contain some secrets which platform would provide to the snap-in. + // `service_account_token`: This is the token of the service account which belongs to this snap-in. This can be used to make API calls to DevRev. + // `actor_session_token`: For commands, and snap-kits, where the user is performing some action, this is the token of the user who is performing the action. + secrets: Record; + }; + + export type ExecutionMetadata = { + // A unique id for the function invocation. Can be used to filter logs for a particular invocation. + request_id: string; + // Function name as defined in the manifest being invoked. + function_name: string; + // Type of event that triggered the function invocation as defined in manifest. + event_type: string; + // DevRev endpoint to which the function can make API calls. + // Example : "https://api.devrev.ai/" + devrev_endpoint: string; + }; + + export type InputData = { + // Map of organization inputs and their corresponding values stored in snap-in. + // The values are passed as string and typing need to be handled by the function + global_values: Record; + // Map of event sources and their corresponding ids stored in snap-in. + // These could be used to schedule events on a schedule based event source. + event_sources: Record; + }; + + // Event sent to our app. + export type FunctionInput = { + // Actual payload of the event. + payload: Record; + // Context of the function invocation. + context: Context; + // Metadata of the function invocation. + execution_metadata: ExecutionMetadata; + input_data: InputData; + }; \ No newline at end of file diff --git a/base_folder/src/core/utils.ts b/base_folder/src/core/utils.ts new file mode 100644 index 0000000..b9a376d --- /dev/null +++ b/base_folder/src/core/utils.ts @@ -0,0 +1,30 @@ +import { AirdropEvent, AirdropMessage } from '@devrev/ts-adaas'; +import { FunctionInput } from './types'; + +export function convertToAirdropEvent(fi: FunctionInput): AirdropEvent { + // Create a properly structured AirdropMessage + const airdropMessage: AirdropMessage = { + connection_data: fi.payload.connection_data, + event_context: { + ...fi.payload.event_context, + ...fi.context, + request_id: fi.execution_metadata?.request_id, + }, + event_type: fi.payload.event_type, + event_data: fi.payload.event_data || {}, + }; + + return { + context: { + ...fi.context, + secrets: { + service_account_token: fi.context.secrets?.service_account_token || '', + ...fi.context.secrets, + }, + }, + payload: airdropMessage, + execution_metadata: fi.execution_metadata, + input_data: fi.input_data, + }; + } + \ No newline at end of file diff --git a/devrev-snapin-template.plain b/devrev-snapin-template.plain index 3868342..42808fc 100644 --- a/devrev-snapin-template.plain +++ b/devrev-snapin-template.plain @@ -11,8 +11,10 @@ - Please note that EventContextIn and EventContextOut interfaces are deprecated. - The External Domain Metadata is a JSON object that describes the logical domain model of an external system. It defines the types of external record types, the relationships between those record types, and the metadata associated with each field, including the field’s data type, human-readable name, and other descriptive attributes. + - Note: Resource "attachments" should not be included in The External Domain Metadata JSON object even if the external system supports attachments. This is handled automatically by The Airdrop SDK Library. - The Initial Domain Mapping is a JSON object that defines how the logical domain model described by The External Domain Metadata maps to the target domain model in DevRev. It specifies the correspondence between each external record type and DevRev leaf type, as well as the mapping of individual fields, including transformation logic, value conversions, and any fixed or default values. + - Note: Resource "attachments" should not be included in The Initial Domain Mapping JSON object even if the external system supports attachments. This is handled automatically by The Airdrop SDK Library. - The External Sync Unit Item Count is a numeric field representing the total number of items (such as cards) contained within a given external sync unit. It is accessible as "item_count" field of the external sync unit. @@ -57,17 +59,29 @@ - The Conformance Tests should use the The Test Snap-In Server - do not use mock server. - The Conformance Tests should spin a server at http://localhost:8002 for testing callbacks (The Callback Server). + - The Conformance Tests *MUST NOT* send the event directly to The Callback Server. - The DevRev Server runs at http://localhost:8003, so it must not be mocked. - The Worker Data Server is part of The DevRev Server and is accessible at http://localhost:8003/external-worker, so it must not be mocked. +- The API server is running at http://localhost:8004, so it must not be mocked. + - The `jest.setup.js` file is managed externally and cannot be modified within The Conformance Tests. - All test files from The Conformance Tests must use the `.test.ts` suffix to be discoverable by Jest's default test pattern matching. - The Conformance Tests should send a single event to the Snap-In Server in the request body without wrapping it in an array. + - It is safe to assume that event payload will never be undefined, null, or empty. + +- If generating The Acceptance Test that references a JSON file (The Attached JSON Of The Acceptance Test) in the resource, you should: + - Store The Attached JSON Of The Acceptance Test in a separate JSON file. + - Make sure to replace placeholders of the credentials (in `event["payload"]["connection_data"]["key"]` and `event["payload"]["connection_data"]["org_id"]`) in The Attached JSON Of The Acceptance Test with the actual values (credentials from The API). +- The Conformance Tests should encourage code reuse. Most notably, you should avoid duplication of the following: + - Setting up The Callback Server, + - Reading environment variables, + - Defining the test event payload sent to The Snap-In Server. # The TypeScript Node.js application boilerplate @@ -75,23 +89,29 @@ - Logic of pushing data to The DevRev Servers must be implemented in the worker thread and invoked with the "spawn" method from The Airdrop SDK Library. +- The Worker Thread must be implemented in TypeScript and *The Worker Thread MUST BE* referenced with a .ts (TypeScript) extension, not .js (JavaScript). + - The Function being implemented *MUST not* be called "extraction" or "loading". ***Functional Requirements:*** - Implement The Function that only checks if The Function can be invoked. -- Implement The Function called "test_external_sync_units" that provides a test of the 'external sync units' part of the extraction workflow as described in the resource [external-sync-units-extraction.mdx](docs/external-sync-units-extraction.mdx). Only check if the extraction can be triggered and that it successfully completes. Disregard all other details. +- Implement The Function called "test_external_sync_units" (The External Sync Units Test Function) that provides a test of the 'external sync units' part of the extraction workflow as described in the resource [external-sync-units-extraction.mdx](docs/external-sync-units-extraction.mdx). + - If "event_type" equals "EXTRACTION_EXTERNAL_SYNC_UNITS_START", The External Sync Units Test Function should: + - Only emit event "EXTRACTION_EXTERNAL_SYNC_UNITS_DONE". ***Acceptance Tests:*** - Test The Function using the resource [external_sync_unit_check.json](test_data/external_sync_unit_check.json). Test is successful if The Callback Server receives from DevRev an event with "event_type" that equals "EXTRACTION_EXTERNAL_SYNC_UNITS_DONE". The test must not send the event directly to The Callback Server. - Implement The Function called "data_extraction_check" (The Data Extraction Check Function) that provides a test of the 'data extraction' part of the extraction workflow as described in the resource [data-extraction.md](docs/data-extraction.md). The Data Extraction Check Function should: - - Initialize a single repo "users" - - normalize the users data using The Normalization Function - - push the normalized users data to The DevRev Servers - - Note: The Data Extraction Check Function should be simplified and should disregard all other details. + - if "event_type" equals "EXTRACTION_DATA_START" or "EXTRACTION_ATTACHMENTS_CONTINUE", The Data Extraction Check Function should: + - Initialize a single repo "users" + - normalize the users data using The Normalization Function + - push the normalized users data to The DevRev Servers + - Emit event "EXTRACTION_DATA_DONE". + - Note: The Data Extraction Check Function should be simplified and should disregard all other details. ***Acceptance Tests:*** diff --git a/docs/attachment_normalization.md b/docs/attachment_normalization.md new file mode 100644 index 0000000..0cc8759 --- /dev/null +++ b/docs/attachment_normalization.md @@ -0,0 +1,52 @@ +# Attachment normalization + +Attachments have a special way of normalization compared to other resources. Function for normalizing "attachments" should return an object "NormalizedAttachment" instead of "NormalizedItem". + +## NormalizedAttachment + +NormalizedAttachment represents the standardized structure of an attachment after normalization in the Airdrop platform. This interface defines the essential properties needed to identify and link attachments to their parent items. + +### Properties + +- _url_ + + Required. A **string** representing the URL where the attachment can be accessed. + +- _id_ + + Required. A **string** that uniquely identifies the normalized attachment. + +- _file_name_ + + Required. A **string** representing the name of the attachment file. + +- _parent_id_ + + Required. A **string** identifying the parent item this attachment belongs to. + +- _author_id_ + + Optional. A **string** identifying the author or creator of the attachment. + +- _grand_parent_id_ + + Optional. A **number** identifying a higher-level parent entity, if applicable. + +### Example + +```typescript +export function normalizeAttachment(item: any): NormalizedAttachment { + return { + id: item.gid, + url: item.download_url, + file_name: item.name, + parent_id: item.parent_id, + }; +} +``` + +### Further remarks + +Note: + +- In the example above, parent_id should be the ID of the resource that the attachment belongs to. For example, if we're normalizing an attachment for a task, parent_id should be the ID of the task. \ No newline at end of file diff --git a/docs/attachments-extraction.md b/docs/attachments-extraction.md index 0210bfc..af14c05 100644 --- a/docs/attachments-extraction.md +++ b/docs/attachments-extraction.md @@ -22,19 +22,41 @@ import { The `getAttachmentStream` function is responsible for fetching and streaming attachments from their source URLs. ```typescript +import { + axiosClient, + ExternalSystemAttachmentStreamingParams, + ExternalSystemAttachmentStreamingResponse, + axios, + serializeAxiosError, + ... // Other imports from @devrev/ts-adaas + } from '@devrev/ts-adaas'; + const getAttachmentStream = async ({ item, }: ExternalSystemAttachmentStreamingParams): Promise => { + // IMPORTANT: "url" is not necessarily deployed on the base URL of The API. It could also be an external URL (e.g. https://example.com/attachment.pdf, https://devrev.ai, ...) const { id, url } = item; + // NOTE: Import axiosClient directly from @devrev/ts-adaas try { + // IMPORTANT: If the URL is protected by authentication from The API, you should also use the appropriate credentials. const fileStreamResponse = await axiosClient.get(url, { responseType: 'stream', headers: { 'Accept-Encoding': 'identity', + 'Authorization': ... // TODO: Authorization if needed }, }); + // Check if we were rate limited + if (fileStreamResponse.status === 429) { + const delay = ... // TODO: Calculate the delay from The API + return { + delay: delay + }; + } + + // IMPORTANT: "httpStream" should be directly Axios response stream (including headers, data, and everything else) return { httpStream: fileStreamResponse }; } catch (error) { // Error handling logic @@ -102,13 +124,6 @@ processTask({ - Failed attachment metadata is logged for debugging - Error responses include the attachment ID for traceability -### Event Emission -The worker emits different events based on the processing outcome: -- `ExtractionAttachmentsDelay`: When processing needs to be delayed -- `ExtractionAttachmentsError`: When an error occurs -- `ExtractionAttachmentsDone`: When processing completes successfully -- `ExtractionAttachmentsProgress`: During timeout handling - ### Timeout Handling - On timeout, the current state is posted to the platform - A progress event is emitted with 50% completion diff --git a/docs/data-extraction-rules-for-emitting-events.md b/docs/data-extraction-rules-for-emitting-events.md new file mode 100644 index 0000000..9f31dc8 --- /dev/null +++ b/docs/data-extraction-rules-for-emitting-events.md @@ -0,0 +1,39 @@ +# Data Extraction Rules for Emitting Events + +## Input event: `EXTRACTION_DATA_START` or `EXTRACTION_DATA_CONTINUE` + +Here are the rules when `EXTRACTION_DATA_START` or `EXTRACTION_DATA_CONTINUE` event is received by the snap-in: + +- If there's a timeout ("onTimeout" method in "spawn" method), the snap-in should emit an event with type `EXTRACTION_DATA_PROGRESS` (no "data" payload). +- If the extraction is successful and all the resources have been extracted, the snap-in should emit an event with type `EXTRACTION_DATA_DONE` (no "data" payload). +- In case of unexpected error, the snap-in should emit an event with type `EXTRACTION_DATA_ERROR` ("data" payload should contain attribute "error" - error message as a string). +- In case of rate limiting (status code 429) for "data extraction", the snap-in *must* emit an event with type `EXTRACTION_DATA_DELAY`. If emitting event of type `EXTRACTION_DATA_DELAY`, you *must* also provide a "delay" attribute, specifying the delay in seconds as an integer. + - Example: `await adapter.emit(ExtractorEventType.ExtractionDataDelay, { delay: "30" });` + +## Input event: `EXTRACTION_EXTERNAL_SYNC_UNITS_START` + +Here are the rules when `EXTRACTION_EXTERNAL_SYNC_UNITS_START` event is received by the snap-in: + +- If "external sync unit extraction" is successful and the snap-in has extracted all the external sync units, the snap-in should emit an event with type `EXTRACTION_EXTERNAL_SYNC_UNITS_DONE` (no "data" payload). +- In case of unexpected error, the snap-in should emit an event with type `EXTRACTION_EXTERNAL_SYNC_UNITS_ERROR` ("data" payload should contain attribute "error" - error message as a string). +- In case of rate limiting (status code 429) for "external sync unit extraction", the snap-in should also emit an event with type `EXTRACTION_EXTERNAL_SYNC_UNITS_ERROR`. + +## Input event: `EXTRACTION_METADATA_START` + +Here are the rules when `EXTRACTION_METADATA_START` event is received by the snap-in: +- If "metadata extraction" is successful and the snap-in has extracted all the metadata, the snap-in should emit an event with type `EXTRACTION_METADATA_DONE` (no "data" payload). +- In case of unexpected error, the snap-in should emit an event with type `EXTRACTION_METADATA_ERROR` ("data" payload should contain attribute "error" - error message as a string). + +## Input event: `EXTRACTION_ATTACHMENTS_START` or `EXTRACTION_ATTACHMENTS_CONTINUE` + +Here are the rules when `EXTRACTION_ATTACHMENTS_START` or `EXTRACTION_ATTACHMENTS_CONTINUE` event is received by the snap-in: +- If "attachments extraction" is successful and the snap-in has extracted all the attachments, the snap-in should emit an event with type "EXTRACTION_ATTACHMENTS_DONE" +- If case of unexpected error, the snap-in should emit an event with type "EXTRACTION_ATTACHMENTS_ERROR" ("data" payload should contain attribute "error" - error message as a string). +- In case of rate limiting (status code 429) for "attachments extraction", the snap-in should also emit an event with type "EXTRACTION_ATTACHMENTS_DELAY". If emitting event of type "EXTRACTION_ATTACHMENTS_DELAY", you *must* also provide a "delay" attribute, specifying the delay in seconds as an integer. + - Example: `await adapter.emit(ExtractorEventType.ExtractionAttachmentsDelay, { delay: "30" });` +- If there's a timeout ("onTimeout" method in "spawn" method), the snap-in should emit an event with type "EXTRACTION_ATTACHMENTS_PROGRESS". + + +## IMPORTANT FOR ALL INPUT EVENTS + +- In all cases, only a single event should be emitted. \ No newline at end of file diff --git a/docs/data-extraction.md b/docs/data-extraction.md index 10de0e9..6791e2e 100644 --- a/docs/data-extraction.md +++ b/docs/data-extraction.md @@ -100,6 +100,8 @@ processTask({ }); ``` +NOTE: Do not call `initializeRepos` multiple times. You should call it only once with all the repositories. + After initialization of repositories using `initializeRepos`, items should be then retrieved from the external system and stored in the correct repository by calling the `push` function. diff --git a/docs/function_invocation.mdx b/docs/function_invocation.mdx index ada4c2b..991c18e 100644 --- a/docs/function_invocation.mdx +++ b/docs/function_invocation.mdx @@ -3,7 +3,7 @@ A function can be invoked synchronously or asynchronously. You need to implement the run method in your function. The run method is called when the function is invoked. The run method signature is defined below: ```typescript -async function run(events: any[]): any; +async function run(events: []FunctionInput): any; ``` The value returned from the `run` method is passed back in synchronous execution modes, such as commands, snap kit actions, and event source synchronous execution. In asynchronous execution modes, such as automation execution, the return value is ignored. diff --git a/docs/incremental_mode.md b/docs/incremental_mode.md index cfa0294..e6ab2c6 100644 --- a/docs/incremental_mode.md +++ b/docs/incremental_mode.md @@ -1,24 +1,91 @@ ### Implementing Incremental Data Sync -Incremental data synchronization retrieves only records that have been created or updated since the last successful sync. This process requires persistently tracking the timestamp of the last successful data extraction. On subsequent runs, this timestamp is used to query the source system for changes. +Incremental data synchronization retrieves only records that have been created or updated since the time of the last successful sync. -Incremental mode should only be handled if the "event_type" is `EXTRACTION_DATA_START`. +This process requires reading the timestamp of the last successful data extraction from the state object. After every successful sync, this timestamp gets updated automatically. -To check if we're in incremental mode, you should check if the value of `adapter.event.payload.event_context.mode` is `SyncMode.INCREMENTAL`. +### When to handle incremental mode and how to check if we're in incremental mode -#### How implement incremental mode +Incremental mode should only be handled if the "event_type" is `EXTRACTION_DATA_START`. To check if we're in incremental mode, you should check if the value of `adapter.event.payload.event_context.mode` is `SyncMode.INCREMENTAL`. -If we're in incremental mode, you should reset The Extraction State, indicating the sync hasn't been completed yet for all data types that we support incremental mode. +### ts-adaas Library Details -Value of `adapter.state.lastSuccessfulSyncStarted` (of format ISO 8601 Extended Format with timezone) represents you the information since when you should query resources from the 3rd party API. +The information about the last successful sync is stored in the `adapter.state.lastSuccessfulSyncStarted` property (of format ISO 8601 Extended Format with timezone). -To retrieve only the resources from the API that have to be updated, filtering on The API should be implemented. +Here are some further remarks: +- Field `adapter.state.lastSuccessfulSyncStarted` should be the source of truth for the last successful sync time. `adapter.state.lastSuccessfulSyncStarted` should be read *only* from `adapter.state` (e.g. not from the `adapter.event.payload`). No need to define the `adapter.state.lastSuccessfulSyncStarted` in The State Object explicitly. +- Field `adapter.state.LastSyncStarted` *should not* play a role in the incremental mode (`adapter.state.lastSuccessfulSyncStarted` contains all the information needed). + +### High level: How to implement incremental mode + +Here's how to handle incremental mode: + +- Reset The Extraction State, indicating the sync hasn't been completed yet for all data types that we support incremental mode. +- Set field "modifiedSince" (The Modified Since Field) to the value of `adapter.state.lastSuccessfulSyncStarted` +- Then, you should retrieve and push only the resources from the API that have been modified since The Modified Since Field (either server side, if the API supports it, or client-side, if the API doesn't support it). -Note: -- `adapter.state.lastSuccessfulSyncStarted` and `adapter.state.lastSyncStarted` are internal properties of the ts-adaas library, so no need to define it. This should be a read-only property. #### Remember the last successful sync time If the sync is successful, update the state object with the current time. -Note: No need to modify any time-related properties in adapter.state object. \ No newline at end of file +Note: No need to modify any time-related properties in adapter.state object. + +#### Example + +Let's imagine the following scenario: + +We're extracting the following resources from the 3rd party system: + +- Users +- Tasks +- Attachments (Attachments are tied to Tasks) + +To persist information over multiple syncs, we're using the following The State Object. + +```typescript +export interface ExtractorState { + users: { + completed: boolean; + }; + tasks: { + completed: boolean; + modifiedSince?: string; + }; + attachments: { + completed: boolean; + }; +} + +export const initialState: ExtractorState = { + users: { completed: false }, + tasks: { completed: false }, + attachments: { completed: false }, +}; +``` + +Our goal is to support incremental mode for the Tasks resource. To do that, we need to reset The State Object for the Tasks resource and set the modifiedSince field to the value of the last successful sync time. Since Attachments are tied to Tasks, we need to reset the Attachments resource as well. + +This is how we are going to handle the incremental mode in The Worker Thread: + +```typescript +processTask({ + task: async ({ adapter }) => { + // ... ... + if (adapter.event.payload.event_type === EventType.ExtractionDataStart) { + + // If this is an incremental sync, we need to reset the state for the item types. + if (adapter.event.payload.event_context.mode === SyncMode.INCREMENTAL) { + adapter.state.tasks = initialState.tasks; + adapter.state.attachments = initialState.attachments; + adapter.state.tasks.modifiedSince = adapter.state.lastSuccessfulSyncStarted; + } + } + + // TODO: Here, fetch only the tasks that have been modified since the last successful sync time and push only those to the designated repository. + } + onTimeout: async ({ adapter }) => { + // ... ... + } +}) +``` \ No newline at end of file diff --git a/docs/metadata-extraction.mdx b/docs/metadata-extraction.mdx new file mode 100644 index 0000000..5ff9824 --- /dev/null +++ b/docs/metadata-extraction.mdx @@ -0,0 +1,43 @@ +# Metadata Extraction + +During the metadata extraction phase, the snap-in must provide an +`external_domain_metadata.json` file to Airdrop. +This file provides a structured way of describing the external system's domain model, +including its domain entities, types, relationships, and other metadata. + +## Triggering event + +AirSync initiates the metadata extraction by starting the snap-in with a message with an event +of type `EXTRACTION_METADATA_START`. + +The snap-in must respond to AirSync with a message with an event of type `EXTRACTION_METADATA_DONE` +when done, or `EXTRACTION_METADATA_ERROR` in case of an error. + +## Implementation + +```typescript +import { ExtractorEventType, processTask } from "@devrev/ts-adaas"; +// TODO: Replace "path/to/external_domain_metadata.json" with the actual path to the external domain metadata file. +import externalDomainMetadata from "path/to/external_domain_metadata.json"; + +const repos = [{ itemType: "external_domain_metadata" }]; + +processTask({ + task: async ({ adapter }) => { + adapter.initializeRepos(repos); + await adapter + .getRepo("external_domain_metadata") + ?.push([externalDomainMetadata]); + await adapter.emit(ExtractorEventType.ExtractionMetadataDone); + }, + onTimeout: async ({ adapter }) => { + await adapter.emit(ExtractorEventType.ExtractionMetadataError, { + error: { message: "Failed to extract metadata. Lambda timeout." }, + }); + }, +}); +``` + +The snap-in must always emit a single message. + +IMPORTANT: The External Domain Metadata JSON *MUST NOT* be normalized when pushed to the repository. \ No newline at end of file diff --git a/mock_callback_server.py b/mock_callback_server.py index 072becb..5fb62c3 100644 --- a/mock_callback_server.py +++ b/mock_callback_server.py @@ -55,7 +55,7 @@ async def handle_request(request: Request): if __name__ == "__main__": # Set up argument parser parser = argparse.ArgumentParser(description='Run FastAPI server with optional port specification') - parser.add_argument('--port', type=int, default=8001, help='Port number to run the server on (default: 8001)') + parser.add_argument('--port', type=int, default=8002, help='Port number to run the server on (default: 8002)') args = parser.parse_args() print(f"Starting server on http://localhost:{args.port}") diff --git a/mock_devrev_server.py b/mock_devrev_server.py index 2f757a5..155ef82 100644 --- a/mock_devrev_server.py +++ b/mock_devrev_server.py @@ -1,16 +1,27 @@ -from fastapi import FastAPI, HTTPException, Header, Request +import time +from fastapi import FastAPI, HTTPException, Header, Request, File, BackgroundTasks +from fastapi.responses import Response from pydantic import BaseModel from typing import Optional, List, Dict import uuid import json import random import copy +import gzip +import os +import asyncio +from datetime import datetime, timezone, timedelta + app = FastAPI() +# Avoid automatic trailing-slash redirects which can interact badly with streaming uploads +app.router.redirect_slashes = False # Initialize application state containers -app.state.uploaded_states = {} -app.state.uploaded_artifacts = set() +app.state.uploaded_states = {} # mapping sync_unit -> state object +app.state.uploaded_artifacts_length = {} # mapping artifact_id -> content length +app.state.artifact_id_to_name = {} # mapping artifact_id -> artifact name (e.g. "cards", "attachments", ...) +app.state.artifact_contents = {} # mapping artifact_id -> decompressed artifact content) class ArtifactPrepareRequest(BaseModel): file_name: str @@ -36,28 +47,39 @@ class AirdropArtifactResponse(BaseModel): @app.get("/is_uploaded/{artifact_id:path}") async def was_artifact_uploaded(artifact_id: str): """Check if an artifact with the given artifact_id was uploaded""" - if artifact_id in app.state.uploaded_artifacts: - return {"artifact_id": artifact_id, "uploaded": True} + # This endpoint is used for testing the attachments extraction worker. + print(f"Received /is_uploaded/{artifact_id} GET request") + if artifact_id in app.state.uploaded_artifacts_length: + return {"artifact_id": artifact_id, "uploaded": True, "content_length": app.state.uploaded_artifacts_length[artifact_id]} raise HTTPException(status_code=404, detail="Artifact not found") @app.post("/upload/{artifact_id:path}") async def upload_artifact( artifact_id: str, - request: Request, + file: bytes = File(...), ): + # Here, we upload artifact metadata. The data is in "gibberish" form - encoded with gzip. Here, on the server, we decode this and store in the artifact contents dictionary. + print(f"Received /upload/{artifact_id} POST request") try: - # Read the raw body content - content = await request.body() - - print(f"Received file upload with ID: {artifact_id}") - print(f"Content length: {len(content)}") - print(f"Content type: {request.headers.get('content-type', 'unknown')}") - - # Remember that this artifact_id was uploaded - app.state.uploaded_artifacts.add(artifact_id) + content = file + # decode the data on the mock server + # example content variable: + # b'{"id":"68c04a0fc549efffaccb0300","url":"https://devrev.ai/","file_name":"devrev cover","parent_id":"688725db990240b77167efef","author_id":"6752eb529b14a3446b75e69c"}\n{"id":"68c2be83c413a1889bde83df","url":"https://trello.com/1/cards/688725db990240b77167efef/attachments/68c2be83c413a1889bde83df/download/2d7a71f4ebe27d165f5ea1974ca2bfbb529ad90d-1200x627.png","file_name":"2d7a71f4ebe27d165f5ea1974ca2bfbb529ad90d-1200x627.png","parent_id":"688725db990240b77167efef","author_id":"6752eb529b14a3446b75e69c"}' + try: + content = file.decode('utf-8').encode('latin-1') + except (UnicodeDecodeError, UnicodeEncodeError): + # If it fails, proceed with the original content, maybe it's not corrupted. + pass + # Decompress the content before storing + decompressed_content = gzip.decompress(content) + # Store the content length of the original uploaded file + app.state.uploaded_artifacts_length[artifact_id] = len(file) + # Store the decompressed content + app.state.artifact_contents[artifact_id] = decompressed_content return {"status": "success", "message": "File uploaded successfully"} except Exception as e: + print(f"Error uploading artifact: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.post("/artifacts.prepare", response_model=ArtifactPrepareResponse) @@ -65,6 +87,7 @@ async def prepare_artifact( request: ArtifactPrepareRequest, authorization: Optional[str] = Header(None) ): + print("Received /artifacts.prepare POST request") # Generate a unique artifact ID artifact_id = str(uuid.uuid4()) @@ -84,56 +107,105 @@ async def prepare_artifact( form_data=form_data ) +@app.get("/external-worker.get-all-states") +async def get_all_external_worker_states(): + """Not called anywhere, for debugging/internal purposes""" + print("Received /external-worker.get-all-states GET request") + return app.state.uploaded_states + @app.get("/external-worker.get", response_model=ExternalWorkerResponse) async def get_external_worker(sync_unit: str): print(f"Received /external-worker.get GET request for sync_unit: {sync_unit}") - - # Default state - default_state = { - "lastSyncStarted": "2024-06-01T12:00:00Z", - "toDevRev": { - "attachmentsMetadata": { - "artifactIds": [], - "lastProcessed": 0 - } - }, - "test-artifact-1": { - "completed": True, - "offset": 0 - }, - "test-artifact-2": { - "completed": True, - "offset": 0 - } - } - # Check if uploaded_states contains the specific sync_unit if sync_unit in app.state.uploaded_states: - print(f"Found uploaded state for sync_unit: {sync_unit}") - return ExternalWorkerResponse(state=json.dumps(app.state.uploaded_states[sync_unit])) + stored_state = app.state.uploaded_states[sync_unit] + # If the stored state is already a string, return it directly + # Otherwise, serialize it to JSON + if isinstance(stored_state, str): + return ExternalWorkerResponse(state=stored_state) + else: + return ExternalWorkerResponse(state=json.dumps(stored_state)) else: - print(f"No uploaded state found for sync_unit: {sync_unit}, returning default state") - return ExternalWorkerResponse(state=json.dumps(default_state)) + print(f"No uploaded state found for sync_unit: {sync_unit}, returning 404") + raise HTTPException(status_code=404, detail="State not found for sync_unit") @app.post("/external-worker.update") async def update_external_worker(sync_unit: str, request: Request): - body = await request.body() print(f"Received /external-worker.update POST request for sync_unit: {sync_unit}") + body = await request.body() try: parsed = json.loads(body.decode("utf-8")) - # Store the uploaded state under the specific sync_unit key - app.state.uploaded_states[sync_unit] = copy.deepcopy(parsed) - print(f"Stored state for sync_unit: {sync_unit}") - print(json.dumps(parsed, indent=2)) + # If the parsed JSON has a 'state' field, use that as the state + # Otherwise, use the entire parsed object as the state + if 'state' in parsed: + app.state.uploaded_states[sync_unit] = json.loads(parsed['state']) + else: + app.state.uploaded_states[sync_unit] = copy.deepcopy(json.loads(parsed)) + + print(f"Updated state for sync_unit: {sync_unit}\n", json.dumps(app.state.uploaded_states[sync_unit], indent=4)) except Exception as e: print("Failed to pretty print JSON:", e) print(body.decode("utf-8")) return {"status": "received"} +@app.post("/external-worker.update-last-successful-sync/{sync_unit}") +async def update_last_successful_sync(sync_unit: str, request: Request): + """Update lastSuccessfulSyncStarted for a sync unit""" + print(f"Received /external-worker.update-last-successful-sync POST request for sync_unit: {sync_unit}") + # Set current time iso to a bit less than the current time (30 seconds), so we can test incremental mode with it. + # current_time_iso = (datetime.now(timezone.utc) - timedelta(seconds=30)).isoformat() + current_time_iso = datetime.now(timezone.utc).isoformat() + print(f"Setting lastSuccessfulSyncStarted to: {current_time_iso}") + + body = await request.json() + if "snap_in_version_id" not in body: + raise HTTPException(status_code=400, detail="snap_in_version_id is required") + + if sync_unit not in app.state.uploaded_states: + base_state = { + "lastSyncStarted": "", + "lastSuccessfulSyncStarted": current_time_iso, + "snapInVersionId": body["snap_in_version_id"], + "toDevRev": { + "attachmentsMetadata": { + "artifactIds": [], + "lastProcessed": 0, + "lastProcessedAttachmentsIdsList": [] + } + } + } + + if "extend_state" in body: + base_state.update(body["extend_state"]) + + app.state.uploaded_states[sync_unit] = base_state + + app.state.uploaded_states[sync_unit]["lastSuccessfulSyncStarted"] = current_time_iso + + print(f"Updated state for sync_unit: {sync_unit}\n", json.dumps(app.state.uploaded_states[sync_unit], indent=4)) + return {"status": "success"} + + @app.get("/internal/snap-ins.get") async def get_snap_ins(request: Request): + # auxiliary endpoint, intended just for testing to run smoothly. print("Received /internal/snap-ins.get GET request") - return {"status": "success"} + return { + "snap_in": { + "imports": [{"name": "test_import_slug"}], + "snap_in_version": {"slug": "test_snap_in_slug"} + } + } + +@app.post("/internal/airdrop.recipe.initial-domain-mappings.install") +async def install_initial_domain_mappings(): + # auxiliary endpoint, intended just for testing to run smoothly. + print("Received /internal/airdrop.recipe.initial-domain-mappings.install POST request") + return { + "success": True, + "message": "Initial domain mappings installed successfully" + } + @app.get("/internal/airdrop.artifacts.upload-url", response_model=AirdropArtifactResponse) async def airdrop_artifacts_upload_url( @@ -142,30 +214,104 @@ async def airdrop_artifacts_upload_url( request_id: Optional[str] = None, authorization: Optional[str] = Header(None) ): - # Generate a unique artifact ID in the required format - partition = "dvrv-us-1" - devOrgID = "1" - random_int = random.randint(1, 1000) - artifact_id = f"don:core:{partition}:devo/{devOrgID}:artifact/{random_int}" - - # Create a mock S3-like URL for the upload - upload_url = f"http://localhost:8003/upload/{artifact_id}" - - # Create form data fields that would typically be required for S3 upload - form_data = { - "key": f"airdrop-artifacts/{artifact_id}/{file_name}", - "Content-Type": file_type, - "x-amz-meta-artifact-id": artifact_id, - } - - return AirdropArtifactResponse( - artifact_id=artifact_id, - upload_url=upload_url, - form_data=form_data - ) + print("Received /internal/airdrop.artifacts.upload-url GET request") + + if file_type == "application/x-gzip": + # In this case, we're generating upload URL for any artifacts (e.g. users, cards, attachments, etc.) + # file_name = "cards.jsonl.gz" -> strip json.gz away + artifact_name = file_name.replace(".jsonl.gz", "") + # Generate a unique artifact ID in the required format + partition = "dvrv-us-1" + devOrgID = "1" + random_int = random.randint(1, 1000) + artifact_id = f"don:core:{partition}:devo/{devOrgID}:artifact/{random_int}" + # artifact_id = f"don:core:dvrv-us-1:devo/1:artifact/211" + + app.state.artifact_id_to_name[artifact_id] = artifact_name + + # Create a mock S3-like URL for the upload + upload_url = f"http://localhost:8003/upload/{artifact_id}" + + # Create form data fields that would typically be required for S3 upload + form_data = { + "key": f"airdrop-artifacts/{artifact_id}/{file_name}", + "Content-Type": file_type, + "x-amz-meta-artifact-id": artifact_id, + } + + return AirdropArtifactResponse( + artifact_id=artifact_id, + upload_url=upload_url, + form_data=form_data + ) + else: + # from app.state.artifact_id_to_name, find the artifact_id where the key in this dictionary is "attachments" + artifact_id = next((k for k, v in app.state.artifact_id_to_name.items() if v == "attachments"), None) + if not artifact_id: + # should not happen - at this point we already uploaded the attachment metadata artifact ID, and how we're streaming it. + print("Attachments artifact not found. Current artifact_id_to_name:", app.state.artifact_id_to_name) + raise HTTPException(status_code=400, detail="Attachments artifact not found") + + # The upload_url is different here, because it's not gzipped. + # This will be handled by the stream_artifact endpoint + upload_url = f"http://localhost:8003/stream_artifact/{artifact_id}" + + form_data = { + "key": f"streaming-attachments/{artifact_id}/{file_name}", + "Content-Type": file_type, + "x-amz-meta-artifact-id": artifact_id, + } + + return AirdropArtifactResponse( + artifact_id=artifact_id, + upload_url=upload_url, + form_data=form_data + ) + +async def process_stream_in_background(request: Request, artifact_id: str): + """ + This function runs in the background. It attempts to read the stream, + but even if it hangs, the client has already received its 200 OK. + """ + print(f"BG Task: Starting to process stream for artifact {artifact_id}.") + actual_length = 0 + try: + # We still use a timeout here as a defensive measure for the background task itself. + stream_iterator = request.stream().__aiter__() + while True: + chunk = await asyncio.wait_for(stream_iterator.__anext__(), timeout=5.0) + actual_length += len(chunk) + except StopAsyncIteration: + print(f"BG Task: Stream for {artifact_id} finished normally.") + except asyncio.TimeoutError: + print(f"BG Task: Stream for {artifact_id} timed out; assuming complete.") + except Exception as e: + print(f"BG Task: An error occurred processing stream for {artifact_id}: {e}") + + if actual_length > 0: + app.state.uploaded_artifacts_length[artifact_id] = actual_length + print(f"BG Task: Successfully streamed artifact {artifact_id} with size {actual_length} bytes.") + else: + print(f"BG Task: No data was read for artifact {artifact_id}.") + + +@app.post("/stream_artifact/{artifact_id:path}") +async def stream_artifact( + artifact_id: str, + request: Request, + background_tasks: BackgroundTasks, +): + """ + This endpoint immediately returns a 200 OK and schedules the actual + stream processing to happen in the background. This breaks the deadlock. + """ + print(f"Received /stream_artifact/{artifact_id} POST request. Scheduling background processing and returning 200 OK immediately.") + background_tasks.add_task(process_stream_in_background, request, artifact_id) + return {"status": "success", "message": "File streaming acknowledged."} @app.post("/internal/airdrop.artifacts.confirm-upload") async def confirm_upload(request: Request): + print("Received /internal/airdrop.artifacts.confirm-upload POST request") try: body = await request.json() print("Received /internal/airdrop.artifacts.confirm-upload POST body:") @@ -177,12 +323,59 @@ async def confirm_upload(request: Request): @app.post("/reset-mock-server") async def reset_mock_server(): - """Reset the mock server state by clearing uploaded_states and uploaded_artifacts""" + """Reset the mock server state by clearing uploaded_states, uploaded_artifacts and artifact_id_to_name""" app.state.uploaded_states = {} - app.state.uploaded_artifacts = set() - print("Mock server state reset - uploaded_states and uploaded_artifacts cleared") + app.state.uploaded_artifacts_length = {} + app.state.artifact_id_to_name = {} + app.state.artifact_contents = {} + print("Server state reset - uploaded_states, uploaded_artifacts, artifact_id_to_name and artifact_contents cleared") return {"status": "success", "message": "Mock server state reset successfully"} +@app.get("/internal/airdrop.artifacts.download-url") +async def airdrop_artifacts_download_url( + artifact_id: str, + request_id: Optional[str] = None, + authorization: Optional[str] = Header(None) +): + """Mock endpoint to get artifact download URL""" + print(f"Received /internal/airdrop.artifacts.download-url GET request for artifact_id: {artifact_id}, request_id: {request_id}") + return { + "download_url": f"http://localhost:8003/download/{artifact_id}.jsonl.gz" + } + +@app.get("/download/{file_name:path}") +async def download_jsonl_gz_file(file_name: str): + """Endpoint to serve .jsonl.gz files for Node.js download""" + print(f"Received download request for file: {file_name}") + + if not file_name.endswith('.jsonl.gz'): + raise HTTPException(status_code=400, detail="Only .jsonl.gz files are supported") + + artifact_id = file_name.replace(".jsonl.gz", "") + + # this content is decompressed, we need to compress it back + decompressed_content = app.state.artifact_contents[artifact_id] + + # Compress the content using gzip + artifact_content = gzip.compress(decompressed_content) + + # get artifact name from the state + artifact_name = app.state.artifact_id_to_name[artifact_id] + # file_name should be artifact_name-unix-timestamp.jsonl.gz + output_file_name = f"{artifact_name}-{int(time.time())}.jsonl.gz" + + # Debug: Print compression info + print(f"Decompressed content length: {len(decompressed_content)}") + print(f"Compressed content length: {len(artifact_content)}") + + return Response( + content=artifact_content, + media_type="application/gzip", + headers={ + "Content-Disposition": f"attachment; filename={output_file_name}", + } + ) + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="localhost", port=8003) diff --git a/run_devrev_snapin_conformance_tests.sh b/run_devrev_snapin_conformance_tests.sh index 30f8032..7dccbae 100755 --- a/run_devrev_snapin_conformance_tests.sh +++ b/run_devrev_snapin_conformance_tests.sh @@ -6,8 +6,9 @@ NPM_INSTALL_OUTPUT_FILTER="up to date in|added [0-9]* packages, removed [0-9]* p ANSI_ESCAPE_PATTERN="s/\x1b\[[0-9;]*[mK]//g" # Maximum number of characters to display from log files -SNAP_IN_LOG_MAX_CHARS=8000 -DEVREV_SERVER_LOG_MAX_CHARS=4000 +SNAP_IN_LOG_MAX_CHARS=30000 +DEVREV_SERVER_LOG_MAX_CHARS=15000 +PROXY_SERVER_LOG_MAX_CHARS=10000 # Function to print a log file, truncating it if it's too large print_log_file() { @@ -38,6 +39,7 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" # Get the directory from where the script is being executed EXEC_DIR="$(pwd)" MOCK_DEVREV_SERVER_LOG="$EXEC_DIR/devrev_server.log" +PROXY_SERVER_LOG="$EXEC_DIR/proxy_server.log" # Source environment variables from .env file - look in execution directory if [ ! -f "$EXEC_DIR/.env" ]; then @@ -88,6 +90,24 @@ start_mock_devrev_server() { printf "\n" } +start_proxy_server() { + python3 "$SCRIPT_DIR/rate_limiting_proxy.py" > "$PROXY_SERVER_LOG" 2>&1 & + PROXY_SERVER_PID=$! + sleep 2 # Give the server time to start + + # Check if the proxy server started successfully. + if ! kill -0 "$PROXY_SERVER_PID" > /dev/null 2>&1; then + wait "$PROXY_SERVER_PID" + EXIT_CODE=$? + if [ "$EXIT_CODE" -eq 69 ]; then + echo "Proxy server failed to start. Error details:" + cat "$PROXY_SERVER_LOG" + exit 69 + fi + fi + printf "\n" +} + # Cleanup function to ensure all processes are terminated cleanup() { # Kill any running npm processes started by this script @@ -123,9 +143,18 @@ cleanup() { fi fi + # Kill proxy server if it exists + if [ ! -z "${PROXY_SERVER_PID+x}" ]; then + kill $PROXY_SERVER_PID 2>/dev/null + if [ "${VERBOSE:-}" -eq 1 ] 2>/dev/null; then + printf "Proxy server terminated!\n" + fi + fi + # Remove temporary files if they exist [ -f "$build_output" ] && rm "$build_output" 2>/dev/null [ -f "$MOCK_DEVREV_SERVER_LOG" ] && rm "$MOCK_DEVREV_SERVER_LOG" 2>/dev/null + [ -f "$PROXY_SERVER_LOG" ] && rm "$PROXY_SERVER_LOG" 2>/dev/null } # Set up trap to call cleanup function on script exit, interrupt, or termination @@ -143,6 +172,13 @@ else MOCK_SERVER_PID=$(lsof -i :8003 -t) fi +if ! lsof -i :8004 -t >/dev/null 2>&1; then + start_proxy_server +else + printf "Proxy server is already running on port 8004\n" + PROXY_SERVER_PID=$(lsof -i :8004 -t) +fi + # Check if chef-cli binary exists at CHEF_CLI_PATH if [ -z "$CHEF_CLI_PATH" ] || [ ! -f "$CHEF_CLI_PATH" ] || [ ! -x "$CHEF_CLI_PATH" ]; then echo "Error: chef-cli not found or not executable at CHEF_CLI_PATH. Please ensure CHEF_CLI_PATH is set and points to an executable chef-cli." @@ -315,6 +351,8 @@ conformance_tests_result=$? printf "\n#### Output of the DevRev server log file:\n\n" print_log_file "$MOCK_DEVREV_SERVER_LOG" "$DEVREV_SERVER_LOG_MAX_CHARS" +printf "\n#### Output of the The API Server log file:\n\n" +print_log_file "$PROXY_SERVER_LOG" "$PROXY_SERVER_LOG_MAX_CHARS" printf "\n#### Output of The Snap-In log file:\n" print_log_file "$NODE_SUBFOLDER/app.log" "$SNAP_IN_LOG_MAX_CHARS" printf "\n" diff --git a/templates/attachment_extraction_acceptance_test.plain b/templates/attachment_extraction_acceptance_test.plain new file mode 100644 index 0000000..e6e91fe --- /dev/null +++ b/templates/attachment_extraction_acceptance_test.plain @@ -0,0 +1,10 @@ + - Test The Extraction Function with the following flow (The Attachment Test Flow): + - Step 1: Invoke The Extraction Function using the resource [{{ data_extraction_resource_name }}](test_data/{{ data_extraction_resource_name }}). + - Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE". + - Step 2: Invoke The Extraction Function using the resource [{{ attachment_extraction_resource_name }}](test_data/{{ attachment_extraction_resource_name }}). + - Expect The Callback Server to receive from DevRev a **single** event (`:= event_from_callback_server`) with "event_type" that equals "EXTRACTION_ATTACHMENTS_DONE". + - Expect `event_from_callback_server["event_data"]["artifacts"]` to be an array. Expect this array to not be empty. Expect this array to have length 1. + - Let `artifact_object = event_from_callback_server["event_data"]["artifacts"][0]`. + - Expect `artifact_object["item_type"] = "ssor_attachment"` and `artifact_object["item_count"] = {{ expected_attachment_count }}`. + - Make a GET request to "http://localhost:8003/is_uploaded/{artifact_object['id']}". Expect response code to be equal to 200. + - Note: The Attachment Test Flow should be executed within a single test. \ No newline at end of file diff --git a/templates/external_domain_metadata_boilerplate.plain b/templates/external_domain_metadata_boilerplate.plain index d80e336..31da0e8 100644 --- a/templates/external_domain_metadata_boilerplate.plain +++ b/templates/external_domain_metadata_boilerplate.plain @@ -9,6 +9,8 @@ - Store The External Domain Metadata JSON object as a separate JSON file. +- The External Domain Metadata JSON object *MUST* be imported without "as" statement in The Implementation Code. + ***Test Requirements:*** - Validate generated The External Domain Metadata JSON object with the chef-cli tool (The Chef CLI) using the command "chef-cli validate-metadata": diff --git a/templates/initial_domain_mapping_boilerplate.plain b/templates/initial_domain_mapping_boilerplate.plain index d1e4043..1b4c602 100644 --- a/templates/initial_domain_mapping_boilerplate.plain +++ b/templates/initial_domain_mapping_boilerplate.plain @@ -6,6 +6,7 @@ - When working with devrev_leaf_type, be aware that the schema expects different formats depending on context. In most places, it should be passed as an object with object_category and object_type. However, in the airdrop-recipe-create-possible-record-type-mapping context, it must be a string representing only the object_type. - Please note that mappings are split into separate ‘shards’ - one for each record type - for easier manipulation and storage. - Please note that a leaf type or a concrete id has to be selected for use_devrev_record, but not both. + - Please note that field "record_type_mappings" has to be placed inside the "additional_mappings" object. - The Stock Field Mapping Field is a configuration object within the Initial Domain Mapping JSON structure that defines how external system data fields are mapped to DevRev's built-in (stock) fields. It is located within the "stock_field_mappings" object inside a blueprint shard configuration. diff --git a/templates/spawn_method_instructions.plain b/templates/spawn_method_instructions.plain index 3f2f21b..d4be4c7 100644 --- a/templates/spawn_method_instructions.plain +++ b/templates/spawn_method_instructions.plain @@ -3,6 +3,7 @@ - "isLocalDevelopment" *must not* be set. - "initialState" should be The Extraction State Object with all "completed" fields set to false, and all the rest of the fields in The Extraction State Object set to undefined, false, 0, or empty string, depending on the type of the field. - Note: No additional fields should be added to The Extraction State Object. - - Note: The Worker Thread must be implemented in TypeScript and it should be referenced with a .ts extension, not .js. + - Note: The Worker Thread must be implemented in TypeScript and *The Worker Thread MUST BE* referenced with a .ts (TypeScript) extension, not .js (JavaScript). + - Use The Initial Domain Mapping JSON object for initialDomainMapping parameter when spawning a new worker. - - Note: The Initial Domain Mapping JSON object should be read directly from the JSON file. \ No newline at end of file + - Note: The Initial Domain Mapping JSON object should be read directly from the JSON file. diff --git a/templates/test_rate_limiting_during_data_extraction.plain b/templates/test_rate_limiting_during_data_extraction.plain new file mode 100644 index 0000000..ee183fd --- /dev/null +++ b/templates/test_rate_limiting_during_data_extraction.plain @@ -0,0 +1,7 @@ + - Test the rate limiting of The Function "{{ function_name }}" with the following flow: + - Make a POST request to "http://localhost:8004/start_rate_limiting" with body `{ "test_name": }`. + - Invoke The Function "{{ function_name }}" with valid credentials and all required parameters. + - Expect: `status_code = 429`. + - Expect: `api_delay > 0` and `api_delay <= 3`. If api_delay is bigger than 3, assume the problem is that we did not calculate the api_delay correctly in The Implementation Code. + - Make a POST request to "http://localhost:8004/end_rate_limiting". + \ No newline at end of file diff --git a/templates/test_rate_limiting_during_emitting_events.plain b/templates/test_rate_limiting_during_emitting_events.plain new file mode 100644 index 0000000..c01ecb6 --- /dev/null +++ b/templates/test_rate_limiting_during_emitting_events.plain @@ -0,0 +1,5 @@ + - When the input event is "{{ input_event_name }}", the Extraction Function must handle rate limiting using the following test flow: + - Step 1: Make a POST request to "http://localhost:8004/start_rate_limiting" with body `{ "test_name": }`. + - Step 2: Invoke The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}). + - Expect The Callback Server to receive *a single* event with "event_type" "{{ expected_output_event_type }}". + - Step 3: Make a POST request to "http://localhost:8004/end_rate_limiting". \ No newline at end of file diff --git a/test_data/data_extraction_check.json b/test_data/data_extraction_check.json index 3ba6919..c018da3 100644 --- a/test_data/data_extraction_check.json +++ b/test_data/data_extraction_check.json @@ -1,25 +1,27 @@ -{ - "execution_metadata": { - "function_name": "data_extraction_check", - "devrev_endpoint": "http://localhost:8003" - }, - "payload" : { - "event_type": "EXTRACTION_DATA_START", - "event_context": { - "callback_url": "http://localhost:8002/callback", - "dev_org": "test-dev-org", - "external_sync_unit_id": "test-external-sync-unit", - "sync_unit_id": "test-sync-unit", - "worker_data_url": "http://localhost:8003/external-worker" +[ + { + "execution_metadata": { + "function_name": "data_extraction_check", + "devrev_endpoint": "http://localhost:8003" }, - "connection_data": { - "org_id": "test-org-id", - "key": "key=test-key&token=test-token" - } - }, - "context": { - "secrets": { - "service_account_token": "test-token" + "payload" : { + "event_type": "EXTRACTION_DATA_START", + "event_context": { + "callback_url": "http://localhost:8002/callback", + "dev_org": "test-dev-org", + "external_sync_unit_id": "test-external-sync-unit", + "sync_unit_id": "test-sync-unit", + "worker_data_url": "http://localhost:8003/external-worker" + }, + "connection_data": { + "org_id": "test-org-id", + "key": "key=test-key&token=test-token" + } + }, + "context": { + "secrets": { + "service_account_token": "test-token" + } } } -} \ No newline at end of file +] \ No newline at end of file diff --git a/test_data/external_sync_unit_check.json b/test_data/external_sync_unit_check.json index 860c29c..8720c67 100644 --- a/test_data/external_sync_unit_check.json +++ b/test_data/external_sync_unit_check.json @@ -1,25 +1,27 @@ -{ - "execution_metadata": { - "function_name": "extraction_external_sync_unit_check", - "devrev_endpoint": "http://localhost:8003" - }, - "payload" : { - "event_type": "EXTRACTION_EXTERNAL_SYNC_UNITS_START", - "event_context": { - "callback_url": "http://localhost:8002/callback", - "dev_org": "test-dev-org", - "external_sync_unit_id": "test-external-sync-unit", - "sync_unit_id": "test-sync-unit", - "worker_data_url": "http://localhost:8003/external-worker" +[ + { + "execution_metadata": { + "function_name": "test_external_sync_units", + "devrev_endpoint": "http://localhost:8003" }, - "connection_data": { - "org_id": "test-org-id", - "key": "key=test-key&token=test-token" - } - }, - "context": { - "secrets": { - "service_account_token": "test-token" + "payload" : { + "event_type": "EXTRACTION_EXTERNAL_SYNC_UNITS_START", + "event_context": { + "callback_url": "http://localhost:8002/callback", + "dev_org": "test-dev-org", + "external_sync_unit_id": "test-external-sync-unit", + "sync_unit_id": "test-sync-unit", + "worker_data_url": "http://localhost:8003/external-worker" + }, + "connection_data": { + "org_id": "test-org-id", + "key": "key=test-key&token=test-token" + } + }, + "context": { + "secrets": { + "service_account_token": "test-token" + } } } -} \ No newline at end of file +] \ No newline at end of file From 3f71a24d80aa4f9ef5852b4c3f50ef84f1fa3d1c Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Tue, 23 Sep 2025 10:45:40 +0200 Subject: [PATCH 2/4] Add rate limting proxy --- rate_limiting_proxy.py | 170 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 rate_limiting_proxy.py diff --git a/rate_limiting_proxy.py b/rate_limiting_proxy.py new file mode 100644 index 0000000..3801be0 --- /dev/null +++ b/rate_limiting_proxy.py @@ -0,0 +1,170 @@ +# This proxy server requires 'fastapi', 'uvicorn', and 'httpx'. +# You can install them with: pip install fastapi uvicorn httpx + +from fastapi import FastAPI, Request, HTTPException +from fastapi.responses import JSONResponse, Response, StreamingResponse +import httpx +import datetime +import email.utils +import os +from pydantic import BaseModel +import logging + +app = FastAPI() + +# Create a single, long-lived client instance for connection pooling +client = httpx.AsyncClient(timeout=30.0) + + +from contextlib import asynccontextmanager + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + await client.aclose() + +app = FastAPI(lifespan=lifespan) + +# In-memory state for rate limiting +app.state.rate_limiting_active = False +app.state.test_name = "" + +# The API URL to which requests will be proxied. +# Configurable via the PROXY_API_URL environment variable. +API_URL = os.getenv("PROXY_API_URL") +if not API_URL: + print("Error: PROXY_API_URL environment variable not set") + exit(69) # EXIT_SERVICE_UNAVAILABLE + +if not API_URL.endswith("/"): + print("Error: PROXY_API_URL environment variable must end with a slash. Current API URL: ", API_URL) + exit(69) # EXIT_SERVICE_UNAVAILABLE + +RATE_LIMIT_DELAY = 3 + + +def is_streaming_response(response: httpx.Response) -> bool: + """Check if the response should be streamed.""" + # 1. The server is explicitly streaming the response. + if response.headers.get('transfer-encoding', '').lower() == 'chunked': + return True + + # 2. The response content suggests it should be streamed. + content_type = response.headers.get('content-type', '').lower() + content_disposition = response.headers.get('content-disposition', '').lower() + + # Stream if it's a file attachment + if 'attachment' in content_disposition: + return True + + # Stream if it's a common large file type + if content_type.startswith(( + 'application/octet-stream', 'application/pdf', 'application/zip', + 'image/', 'video/', 'audio/' + )): + return True + + # Stream if the content length is over a certain threshold (e.g., 1MB) + content_length = int(response.headers.get('content-length', 0)) + if content_length > 1024 * 1024: + return True + + return False + + +class RateLimitStartRequest(BaseModel): + test_name: str + + +@app.post("/start_rate_limiting") +async def start_rate_limiting(request_body: RateLimitStartRequest): + """Starts rate limiting all proxied requests.""" + app.state.rate_limiting_active = True + app.state.test_name = request_body.test_name + print(f"Rate limiting started for test: {app.state.test_name}") + return JSONResponse(content={"status": f"rate limiting started for test: {app.state.test_name}"}) + + +@app.post("/end_rate_limiting") +async def end_rate_limiting(): + """Ends rate limiting all proxied requests.""" + app.state.rate_limiting_active = False + app.state.test_name = "" + print(f"Rate limiting state ended") + return JSONResponse(content={"status": "rate limiting ended"}) + + +# Catch-all for proxying +@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) +async def proxy(request: Request, path: str): + """ + Proxies all incoming requests to the API_URL. + If rate limiting is active, it returns a 429 status code. + """ + if app.state.rate_limiting_active: + retry_after_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=RATE_LIMIT_DELAY) + retry_after_str = email.utils.formatdate( + timeval=retry_after_time.timestamp(), + localtime=False, + usegmt=True + ) + print(f"Rate limit exceeded for test {app.state.test_name}. Returning 429.") + return JSONResponse( + status_code=429, + content={"detail": "Rate limit exceeded"}, + headers={"Retry-After": retry_after_str} + ) + + url = httpx.URL(API_URL).join(path) + + # Pass through headers from the original request, excluding the host header. + headers = {key: value for key, value in request.headers.items() if key.lower() != 'host'} + + try: + print(f"Received request on The API: {request.method} {url}") + + # Stream the request body to the upstream server + req = client.build_request( + method=request.method, + url=url, + headers=headers, + params=request.query_params, + content=request.stream() + ) + + # Stream the response from the upstream server + resp = await client.send(req, stream=True) + + if is_streaming_response(resp): + print("Decision: streaming response") + async def safe_iterator(response): + try: + async for chunk in response.aiter_raw(): + yield chunk + except httpx.ReadError as e: + print(f"Upstream read error while streaming response: {e}") + finally: + await response.aclose() + + return StreamingResponse( + safe_iterator(resp), + status_code=resp.status_code, + headers=resp.headers, + ) + else: + print("Decision: buffering response") + await resp.aread() + return Response( + content=resp.content, + status_code=resp.status_code, + headers=resp.headers, + ) + except httpx.RequestError as exc: + print(f"Error connecting to upstream server: {exc}\n{repr(exc)}") + raise HTTPException(status_code=502, detail=f"Error connecting to upstream server: {exc}") + + +if __name__ == "__main__": + import uvicorn + print("Starting The API Server") + uvicorn.run(app, host="localhost", port=8004) From bec13db37d1a40bbfb59c755f3e190bd5c084ffc Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Tue, 23 Sep 2025 14:45:21 +0200 Subject: [PATCH 3/4] Fix function invocation docs --- docs/function_invocation.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/function_invocation.mdx b/docs/function_invocation.mdx index 991c18e..21f7b3f 100644 --- a/docs/function_invocation.mdx +++ b/docs/function_invocation.mdx @@ -3,7 +3,7 @@ A function can be invoked synchronously or asynchronously. You need to implement the run method in your function. The run method is called when the function is invoked. The run method signature is defined below: ```typescript -async function run(events: []FunctionInput): any; +async function run(events: FunctionInput[]): any; ``` The value returned from the `run` method is passed back in synchronous execution modes, such as commands, snap kit actions, and event source synchronous execution. In asynchronous execution modes, such as automation execution, the return value is ignored. From a2ab792b08e938bba24767964ef16bde7678d1b7 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Tue, 23 Sep 2025 14:59:02 +0200 Subject: [PATCH 4/4] Update expected adaas version --- base_folder/package.json | 2 +- devrev-snapin-template.plain | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/base_folder/package.json b/base_folder/package.json index a957628..77e3df4 100644 --- a/base_folder/package.json +++ b/base_folder/package.json @@ -52,7 +52,7 @@ "yargs": "^17.6.2" }, "dependencies": { - "@devrev/ts-adaas": "1.8.0", + "@devrev/ts-adaas": "1.9.0", "@devrev/typescript-sdk": "1.1.63", "axios": "^1.9.0", "dotenv": "^16.0.3", diff --git a/devrev-snapin-template.plain b/devrev-snapin-template.plain index 42808fc..b5daacf 100644 --- a/devrev-snapin-template.plain +++ b/devrev-snapin-template.plain @@ -29,7 +29,7 @@ - The Snap-In will run on Node.js as the runtime environment. -- The Snap-In should use The Airdrop SDK Library version "1.8.0" for handling data extraction and loading, pushing data, event-driven actions, state management, and artifact handling. +- The Snap-In should use The Airdrop SDK Library version "1.9.0" for handling data extraction and loading, pushing data, event-driven actions, state management, and artifact handling. - Use snake_case for JSON keys.