From d7d8b6f817194a91808cb0f90edcdf14970c450c Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Wed, 1 Oct 2025 14:42:23 +0200 Subject: [PATCH 01/12] Fix error in conformance test script --- run_devrev_snapin_conformance_tests.sh | 29 ++++++++++++++++---------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/run_devrev_snapin_conformance_tests.sh b/run_devrev_snapin_conformance_tests.sh index bc66301..f04d506 100755 --- a/run_devrev_snapin_conformance_tests.sh +++ b/run_devrev_snapin_conformance_tests.sh @@ -164,23 +164,30 @@ trap cleanup EXIT SIGINT SIGTERM check_and_kill_node_server 8000 check_and_kill_node_server 8002 -# Start the mock DevRev server if it's not already running -if ! lsof -i :8003 -t >/dev/null 2>&1; then - start_mock_devrev_server -else - printf "Mock DevRev server is already running on port 8003\n" - MOCK_SERVER_PID=$(lsof -i :8003 -t) +# Ensure nothing is running on port 8003, then start the mock DevRev server +existing_pids=$(lsof -i :8003 -t 2>/dev/null) +if [ ! -z "$existing_pids" ]; then + printf "Killing existing process(es) on port 8003: %s\n" "$existing_pids" + for pid in $existing_pids; do + kill $pid 2>/dev/null + done + sleep 1 fi +start_mock_devrev_server # Set HTTPS_PROXY environment variable to point to proxy server export HTTPS_PROXY="http://localhost:8004" -if ! lsof -i :8004 -t >/dev/null 2>&1; then - start_proxy_server -else - printf "Proxy server is already running on port 8004\n" - PROXY_SERVER_PID=$(lsof -i :8004 -t) +# Ensure nothing is running on port 8004, then start the proxy server +existing_pids_8004=$(lsof -i :8004 -t 2>/dev/null) +if [ ! -z "$existing_pids_8004" ]; then + printf "Killing existing process(es) on port 8004: %s\n" "$existing_pids_8004" + for pid in $existing_pids_8004; do + kill $pid 2>/dev/null + done + sleep 1 fi +start_proxy_server # Check if chef-cli binary exists at CHEF_CLI_PATH if [ -z "$CHEF_CLI_PATH" ] || [ ! -f "$CHEF_CLI_PATH" ] || [ ! -x "$CHEF_CLI_PATH" ]; then From 1b7cb013b26b74613ae3ed12b01295dba132d644 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Tue, 7 Oct 2025 17:49:14 +0200 Subject: [PATCH 02/12] Streamline acceptance test templating --- devrev-snapin-template.plain | 1 + run_devrev_snapin_conformance_tests.sh | 6 +++--- ...ttachment_extraction_acceptance_test.plain | 20 +++++++++---------- .../chef_cli_normalization_validation.plain | 20 +++++++++---------- .../data_extraction_acceptance_test.plain | 12 +++++------ ...rate_limiting_during_data_extraction.plain | 12 +++++------ ...rate_limiting_during_emitting_events.plain | 10 +++++----- 7 files changed, 41 insertions(+), 40 deletions(-) diff --git a/devrev-snapin-template.plain b/devrev-snapin-template.plain index b862479..4f9d041 100644 --- a/devrev-snapin-template.plain +++ b/devrev-snapin-template.plain @@ -60,6 +60,7 @@ - The Conformance Tests should spin a server at http://localhost:8002 for testing callbacks (The Callback Server). - The Conformance Tests *MUST NOT* send the event directly to The Callback Server. + - The Implementation Code *MUST NOT* send the event directly to The Callback Server. - The DevRev Server runs at http://localhost:8003, so it must not be mocked. diff --git a/run_devrev_snapin_conformance_tests.sh b/run_devrev_snapin_conformance_tests.sh index f04d506..5491dfc 100755 --- a/run_devrev_snapin_conformance_tests.sh +++ b/run_devrev_snapin_conformance_tests.sh @@ -6,9 +6,9 @@ NPM_INSTALL_OUTPUT_FILTER="up to date in|added [0-9]* packages, removed [0-9]* p ANSI_ESCAPE_PATTERN="s/\x1b\[[0-9;]*[mK]//g" # Maximum number of characters to display from log files -SNAP_IN_LOG_MAX_CHARS=30000 -DEVREV_SERVER_LOG_MAX_CHARS=15000 -PROXY_SERVER_LOG_MAX_CHARS=10000 +SNAP_IN_LOG_MAX_CHARS=80000 +DEVREV_SERVER_LOG_MAX_CHARS=40000 +PROXY_SERVER_LOG_MAX_CHARS=20000 # Function to print a log file, truncating it if it's too large print_log_file() { diff --git a/templates/attachment_extraction_acceptance_test.plain b/templates/attachment_extraction_acceptance_test.plain index e6e91fe..156a185 100644 --- a/templates/attachment_extraction_acceptance_test.plain +++ b/templates/attachment_extraction_acceptance_test.plain @@ -1,10 +1,10 @@ - - Test The Extraction Function with the following flow (The Attachment Test Flow): - - Step 1: Invoke The Extraction Function using the resource [{{ data_extraction_resource_name }}](test_data/{{ data_extraction_resource_name }}). - - Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE". - - Step 2: Invoke The Extraction Function using the resource [{{ attachment_extraction_resource_name }}](test_data/{{ attachment_extraction_resource_name }}). - - Expect The Callback Server to receive from DevRev a **single** event (`:= event_from_callback_server`) with "event_type" that equals "EXTRACTION_ATTACHMENTS_DONE". - - Expect `event_from_callback_server["event_data"]["artifacts"]` to be an array. Expect this array to not be empty. Expect this array to have length 1. - - Let `artifact_object = event_from_callback_server["event_data"]["artifacts"][0]`. - - Expect `artifact_object["item_type"] = "ssor_attachment"` and `artifact_object["item_count"] = {{ expected_attachment_count }}`. - - Make a GET request to "http://localhost:8003/is_uploaded/{artifact_object['id']}". Expect response code to be equal to 200. - - Note: The Attachment Test Flow should be executed within a single test. \ No newline at end of file +Test The Extraction Function with the following flow (The Attachment Test Flow): + - Step 1: Invoke The Extraction Function using the resource [{{ data_extraction_resource_name }}](test_data/{{ data_extraction_resource_name }}). + - Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE". + - Step 2: Invoke The Extraction Function using the resource [{{ attachment_extraction_resource_name }}](test_data/{{ attachment_extraction_resource_name }}). + - Expect The Callback Server to receive from DevRev a **single** event (`:= event_from_callback_server`) with "event_type" that equals "EXTRACTION_ATTACHMENTS_DONE". + - Expect `event_from_callback_server["event_data"]["artifacts"]` to be an array. Expect this array to not be empty. Expect this array to have length 1. + - Let `artifact_object = event_from_callback_server["event_data"]["artifacts"][0]`. + - Expect `artifact_object["item_type"] = "ssor_attachment"` and `artifact_object["item_count"] = {{ expected_attachment_count }}`. + - Make a GET request to "http://localhost:8003/is_uploaded/{artifact_object['id']}". Expect response code to be equal to 200. + - Note: The Attachment Test Flow should be executed within a single test. \ No newline at end of file diff --git a/templates/chef_cli_normalization_validation.plain b/templates/chef_cli_normalization_validation.plain index 19a8cb8..101f428 100644 --- a/templates/chef_cli_normalization_validation.plain +++ b/templates/chef_cli_normalization_validation.plain @@ -1,10 +1,10 @@ - - Then, validate The Normalization Function with the chef-cli tool (The Chef CLI) using the command "chef-cli validate-data -m -r {{ record_type }}": - - To invoke the function that returns The External Domain Metadata JSON object, send the event [external_domain_metadata_event_payload.json](test_data/external_domain_metadata_event_payload.json) to The Snap-In server. - - The External Domain Metadata JSON object should then be stored in temporary file and provided as path to the validation of The Normalization Function. - - The Chef CLI reads the extracted file (The Extracted File) from stdin. - - The Extracted File is inside folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH". To retrieve the relevant file name, command `ls EXTRACTED_FILES_FOLDER_PATH | grep extractor_{{ record_type }} | sort -r | head -n 1` should be executed. The test should always fail if The Extracted File is not present. - - Please note that the expected behaviour is that only after the invocation of The Extraction Function, the folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH" should exist. The test should always fail if before the execution of The Chef CLI, the folder "EXTRACTED_FILES_FOLDER_PATH" does not exist or if The Extracted File is not found. - - The path to The Chef CLI executable is provided in the environment variable `CHEF_CLI_PATH`. - - For the validation of The Normalization Function to be successful The Chef CLI must return an empty output. The implementation of The Extraction Function directly infuences the output of The Chef CLI. - - The stdout and stderr of The Chef CLI *must* be printed to the console. - - The test should always fail if The Chef CLI is not available. \ No newline at end of file +validate The Normalization Function with the chef-cli tool (The Chef CLI) using the command "chef-cli validate-data -m -r {{ record_type }}": + - To invoke the function that returns The External Domain Metadata JSON object, send the event [external_domain_metadata_event_payload.json](test_data/external_domain_metadata_event_payload.json) to The Snap-In server. + - The External Domain Metadata JSON object should then be stored in temporary file and provided as path to the validation of The Normalization Function. + - The Chef CLI reads the extracted file (The Extracted File) from stdin. + - The Extracted File is inside folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH". To retrieve the relevant file name, command `ls EXTRACTED_FILES_FOLDER_PATH | grep extractor_{{ record_type }} | sort -r | head -n 1` should be executed. The test should always fail if The Extracted File is not present. + - Please note that the expected behaviour is that only after the invocation of The Extraction Function, the folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH" should exist. The test should always fail if before the execution of The Chef CLI, the folder "EXTRACTED_FILES_FOLDER_PATH" does not exist or if The Extracted File is not found. + - The path to The Chef CLI executable is provided in the environment variable `CHEF_CLI_PATH`. + - For the validation of The Normalization Function to be successful The Chef CLI must return an empty output. The implementation of The Extraction Function directly infuences the output of The Chef CLI. + - The stdout and stderr of The Chef CLI *must* be printed to the console. + - The test should always fail if The Chef CLI is not available. \ No newline at end of file diff --git a/templates/data_extraction_acceptance_test.plain b/templates/data_extraction_acceptance_test.plain index 012b89d..c517aca 100644 --- a/templates/data_extraction_acceptance_test.plain +++ b/templates/data_extraction_acceptance_test.plain @@ -1,6 +1,6 @@ - - Test The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}). - - Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE". - - Let `artifact_array = ["event_data"]["artifacts"]` (array of objects). - - Expect `len(artifact_array) > 0`. - - Out of `artifact_array`, expect one of the elements to have "item_type" equal to "{{ artifact_name }}" (`:= {{ artifact_name }}_artifact`) and `{{ artifact_name }}_artifact["item_count"]={{ expected_item_count }}`. - - Note: If `{{ artifact_name }}_artifact["item_count"] < {{ expected_item_count }}`, this indicates that not all the {{ artifact_name }} data was extracted. \ No newline at end of file +Test The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}). + - Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE". + - Let `artifact_array = ["event_data"]["artifacts"]` (array of objects). + - Expect `len(artifact_array) > 0`. + - Out of `artifact_array`, expect one of the elements to have "item_type" equal to "{{ artifact_name }}" (`:= {{ artifact_name }}_artifact`) and `{{ artifact_name }}_artifact["item_count"]={{ expected_item_count }}`. + - Note: If `{{ artifact_name }}_artifact["item_count"] < {{ expected_item_count }}`, this indicates that not all the {{ artifact_name }} data was extracted. \ No newline at end of file diff --git a/templates/test_rate_limiting_during_data_extraction.plain b/templates/test_rate_limiting_during_data_extraction.plain index ee183fd..64ed389 100644 --- a/templates/test_rate_limiting_during_data_extraction.plain +++ b/templates/test_rate_limiting_during_data_extraction.plain @@ -1,7 +1,7 @@ - - Test the rate limiting of The Function "{{ function_name }}" with the following flow: - - Make a POST request to "http://localhost:8004/start_rate_limiting" with body `{ "test_name": }`. - - Invoke The Function "{{ function_name }}" with valid credentials and all required parameters. - - Expect: `status_code = 429`. - - Expect: `api_delay > 0` and `api_delay <= 3`. If api_delay is bigger than 3, assume the problem is that we did not calculate the api_delay correctly in The Implementation Code. - - Make a POST request to "http://localhost:8004/end_rate_limiting". +Test the rate limiting of The Function "{{ function_name }}" with the following flow: + - Make a POST request to "http://localhost:8004/start_rate_limiting" with body `{ "test_name": }`. + - Invoke The Function "{{ function_name }}" with valid credentials and all required parameters. + - Expect: `status_code = 429`. + - Expect: `api_delay > 0` and `api_delay <= 3`. If api_delay is bigger than 3, assume the problem is that we did not calculate the api_delay correctly in The Implementation Code. + - Make a POST request to "http://localhost:8004/end_rate_limiting". \ No newline at end of file diff --git a/templates/test_rate_limiting_during_emitting_events.plain b/templates/test_rate_limiting_during_emitting_events.plain index c01ecb6..7744647 100644 --- a/templates/test_rate_limiting_during_emitting_events.plain +++ b/templates/test_rate_limiting_during_emitting_events.plain @@ -1,5 +1,5 @@ - - When the input event is "{{ input_event_name }}", the Extraction Function must handle rate limiting using the following test flow: - - Step 1: Make a POST request to "http://localhost:8004/start_rate_limiting" with body `{ "test_name": }`. - - Step 2: Invoke The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}). - - Expect The Callback Server to receive *a single* event with "event_type" "{{ expected_output_event_type }}". - - Step 3: Make a POST request to "http://localhost:8004/end_rate_limiting". \ No newline at end of file +When the input event is "{{ input_event_name }}", the Extraction Function must handle rate limiting using the following test flow: + - Step 1: Make a POST request to "http://localhost:8004/start_rate_limiting" with body `{ "test_name": }`. + - Step 2: Invoke The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}). + - Expect The Callback Server to receive *a single* event with "event_type" "{{ expected_output_event_type }}". + - Step 3: Make a POST request to "http://localhost:8004/end_rate_limiting". \ No newline at end of file From 928e31ddb163c81f762a7435f2a55ad517ae0a48 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Tue, 7 Oct 2025 21:01:12 +0200 Subject: [PATCH 03/12] Update base folder error handling --- base_folder/test/runner.ts | 60 +++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/base_folder/test/runner.ts b/base_folder/test/runner.ts index 6a392b9..5d78707 100644 --- a/base_folder/test/runner.ts +++ b/base_folder/test/runner.ts @@ -75,6 +75,18 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { resp.status(400).send(errMsg); return; } + // crash the process if an empty array is provided + if (Array.isArray(events) && events.length === 0) { + let errMsg = 'Invalid request format: body is an empty array'; + error = { + err_type: RuntimeErrorType.InvalidRequest, + err_msg: errMsg, + } as RuntimeError; + console.error(error.err_msg); + // Return validation error status for empty events input + resp.status(400).send(errMsg); + return; + } // if the request is synchronous, there should be a single event if (!isAsync) { if (events.length > 1) { @@ -88,6 +100,29 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { return; } } else { + // Preflight validation for async requests: ensure each event is minimally valid + for (let event of events) { + if (!event || !event.execution_metadata) { + let errMsg = 'Invalid request format: missing execution_metadata'; + console.error(errMsg); + resp.status(400).send(errMsg); + return; + } + const functionName: FunctionFactoryType = event.execution_metadata.function_name as FunctionFactoryType; + if (functionName === undefined) { + let errMsg = 'Function name not provided in event'; + console.error(errMsg); + resp.status(400).send(errMsg); + return; + } + const f = functionFactory[functionName]; + if (f == undefined) { + let errMsg = `Function ${event.execution_metadata.function_name} not found in factory`; + console.error(errMsg); + resp.status(400).send(errMsg); + return; + } + } // return a success response back to the server resp.status(200).send(); } @@ -101,8 +136,13 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { err_msg: errMsg, } as RuntimeError; console.error(error.err_msg); - resp.status(400).send(errMsg); - return; + if (!isAsync) { + resp.status(400).send(errMsg); + return; + } else { + // For async requests, response has already been sent; skip further processing for this event + continue; + } } const functionName: FunctionFactoryType = event.execution_metadata.function_name as FunctionFactoryType; if (functionName === undefined) { @@ -111,7 +151,13 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { err_msg: 'Function name not provided in event', } as RuntimeError; console.error(error.err_msg); - receivedError = true; + if (!isAsync) { + resp.status(400).send(error.err_msg); + return; + } else { + receivedError = true; + continue; + } } else { const f = functionFactory[functionName]; try { @@ -121,7 +167,13 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { err_msg: `Function ${event.execution_metadata.function_name} not found in factory`, } as RuntimeError; console.error(error.err_msg); - receivedError = true; + if (!isAsync) { + resp.status(400).send(error.err_msg); + return; + } else { + receivedError = true; + continue; + } } else { result = await run(f, [event]); } From b241a52b1d019d211ca6d8d39ff38f6dae03af33 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 11:16:48 +0200 Subject: [PATCH 04/12] Add extraction function scaffolding in base folder --- base_folder/src/function-factory.ts | 3 ++ base_folder/src/functions/extraction/index.ts | 37 +++++++++++++++++++ .../workers/attachments-extraction.ts | 6 +++ .../extraction/workers/data-extraction.ts | 6 +++ .../workers/external-sync-units-extraction.ts | 6 +++ .../extraction/workers/metadata-extraction.ts | 6 +++ 6 files changed, 64 insertions(+) create mode 100644 base_folder/src/functions/extraction/index.ts create mode 100644 base_folder/src/functions/extraction/workers/attachments-extraction.ts create mode 100644 base_folder/src/functions/extraction/workers/data-extraction.ts create mode 100644 base_folder/src/functions/extraction/workers/external-sync-units-extraction.ts create mode 100644 base_folder/src/functions/extraction/workers/metadata-extraction.ts diff --git a/base_folder/src/function-factory.ts b/base_folder/src/function-factory.ts index 0702e7d..cc27561 100644 --- a/base_folder/src/function-factory.ts +++ b/base_folder/src/function-factory.ts @@ -1,5 +1,8 @@ +import extraction from './functions/extraction'; + export const functionFactory = { // Add your functions here + extraction, } as const; export type FunctionFactoryType = keyof typeof functionFactory; diff --git a/base_folder/src/functions/extraction/index.ts b/base_folder/src/functions/extraction/index.ts new file mode 100644 index 0000000..bb5c0c0 --- /dev/null +++ b/base_folder/src/functions/extraction/index.ts @@ -0,0 +1,37 @@ +import { convertToAirdropEvent } from '../../core/utils'; +import { FunctionInput } from '../../core/types'; +import { spawn, EventType } from '@devrev/ts-adaas'; + +function getWorkerPerExtractionPhase(event: FunctionInput) { + let path; + switch (event.payload.event_type) { + case EventType.ExtractionExternalSyncUnitsStart: + path = __dirname + '/workers/external-sync-units-extraction'; + break; + case EventType.ExtractionMetadataStart: + path = __dirname + '/workers/metadata-extraction'; + break; + case EventType.ExtractionDataStart: + case EventType.ExtractionDataContinue: + path = __dirname + '/workers/data-extraction'; + break; + case EventType.ExtractionAttachmentsStart: + case EventType.ExtractionAttachmentsContinue: + path = __dirname + '/workers/attachments-extraction'; + break; + } + return path; +} + +const run = async (events: FunctionInput[]) => { + for (const event of events) { + const file = getWorkerPerExtractionPhase(event); + await spawn({ + event: convertToAirdropEvent(event), + workerPath: file, + initialState: {}, + }); + } +}; + +export default run; diff --git a/base_folder/src/functions/extraction/workers/attachments-extraction.ts b/base_folder/src/functions/extraction/workers/attachments-extraction.ts new file mode 100644 index 0000000..8198fbd --- /dev/null +++ b/base_folder/src/functions/extraction/workers/attachments-extraction.ts @@ -0,0 +1,6 @@ +import { processTask } from '@devrev/ts-adaas'; + +processTask({ + task: async ({ adapter }) => {}, + onTimeout: async ({ adapter }) => {}, +}); \ No newline at end of file diff --git a/base_folder/src/functions/extraction/workers/data-extraction.ts b/base_folder/src/functions/extraction/workers/data-extraction.ts new file mode 100644 index 0000000..87a4895 --- /dev/null +++ b/base_folder/src/functions/extraction/workers/data-extraction.ts @@ -0,0 +1,6 @@ +import { processTask } from "@devrev/ts-adaas"; + +processTask({ + task: async ({ adapter }) => {}, + onTimeout: async ({ adapter }) => {}, +}); diff --git a/base_folder/src/functions/extraction/workers/external-sync-units-extraction.ts b/base_folder/src/functions/extraction/workers/external-sync-units-extraction.ts new file mode 100644 index 0000000..87a4895 --- /dev/null +++ b/base_folder/src/functions/extraction/workers/external-sync-units-extraction.ts @@ -0,0 +1,6 @@ +import { processTask } from "@devrev/ts-adaas"; + +processTask({ + task: async ({ adapter }) => {}, + onTimeout: async ({ adapter }) => {}, +}); diff --git a/base_folder/src/functions/extraction/workers/metadata-extraction.ts b/base_folder/src/functions/extraction/workers/metadata-extraction.ts new file mode 100644 index 0000000..8198fbd --- /dev/null +++ b/base_folder/src/functions/extraction/workers/metadata-extraction.ts @@ -0,0 +1,6 @@ +import { processTask } from '@devrev/ts-adaas'; + +processTask({ + task: async ({ adapter }) => {}, + onTimeout: async ({ adapter }) => {}, +}); \ No newline at end of file From 35d2934a0b71dc514cec3713645ce767267de470 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 11:22:10 +0200 Subject: [PATCH 05/12] Should not modify extraction function specification --- devrev-snapin-template.plain | 2 ++ templates/external_domain_metadata_boilerplate.plain | 2 ++ templates/initial_domain_mapping_boilerplate.plain | 2 ++ templates/should_not_modify_extraction_function.plain | 1 + 4 files changed, 7 insertions(+) create mode 100644 templates/should_not_modify_extraction_function.plain diff --git a/devrev-snapin-template.plain b/devrev-snapin-template.plain index e2e5f08..6eb199b 100644 --- a/devrev-snapin-template.plain +++ b/devrev-snapin-template.plain @@ -94,6 +94,8 @@ - The Function being implemented *MUST not* be called "extraction" or "loading". +- {% include "templates/should_not_modify_extraction_function.plain" %} + ***Functional Requirements:*** - Implement The Function that only checks if The Function can be invoked. diff --git a/templates/external_domain_metadata_boilerplate.plain b/templates/external_domain_metadata_boilerplate.plain index 31da0e8..07b7cf7 100644 --- a/templates/external_domain_metadata_boilerplate.plain +++ b/templates/external_domain_metadata_boilerplate.plain @@ -11,6 +11,8 @@ - The External Domain Metadata JSON object *MUST* be imported without "as" statement in The Implementation Code. +- {% include "templates/should_not_modify_extraction_function.plain" %} + ***Test Requirements:*** - Validate generated The External Domain Metadata JSON object with the chef-cli tool (The Chef CLI) using the command "chef-cli validate-metadata": diff --git a/templates/initial_domain_mapping_boilerplate.plain b/templates/initial_domain_mapping_boilerplate.plain index 1b4c602..ce930d8 100644 --- a/templates/initial_domain_mapping_boilerplate.plain +++ b/templates/initial_domain_mapping_boilerplate.plain @@ -27,6 +27,8 @@ - The External Domain Metadata JSON object should not be modified. If there are discrepancies between The External Domain Metadata JSON object and The Initial Domain Mapping JSON object, assume The External Domain Metadata JSON object is correct and The Initial Domain Mapping JSON object needs to be adjusted. +- {% include "templates/should_not_modify_extraction_function.plain" %} + ***Test Requirements:*** - Validate generated The Initial Domain Mapping JSON object with the chef-cli tool (The Chef CLI) using the command "chef-cli initial-mapping check -m ": diff --git a/templates/should_not_modify_extraction_function.plain b/templates/should_not_modify_extraction_function.plain new file mode 100644 index 0000000..f296b2b --- /dev/null +++ b/templates/should_not_modify_extraction_function.plain @@ -0,0 +1 @@ +The Extraction Function *must not* be modified. \ No newline at end of file From 6134739e1c1d728511d83ec7ba39a6a2952a3cf5 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 11:22:23 +0200 Subject: [PATCH 06/12] Simplify specification for emitting events --- ...ta-extraction-rules-for-emitting-events.md | 1 + docs/data-extraction.md | 22 +------------------ 2 files changed, 2 insertions(+), 21 deletions(-) diff --git a/docs/data-extraction-rules-for-emitting-events.md b/docs/data-extraction-rules-for-emitting-events.md index 9f31dc8..6ac59aa 100644 --- a/docs/data-extraction-rules-for-emitting-events.md +++ b/docs/data-extraction-rules-for-emitting-events.md @@ -5,6 +5,7 @@ Here are the rules when `EXTRACTION_DATA_START` or `EXTRACTION_DATA_CONTINUE` event is received by the snap-in: - If there's a timeout ("onTimeout" method in "spawn" method), the snap-in should emit an event with type `EXTRACTION_DATA_PROGRESS` (no "data" payload). + - Note: Event `EXTRACTION_DATA_PROGRESS` **MUST NEVER** be emitted if the the "task" method in The Worker Thread. The Only place where `EXTRACTION_DATA_PROGRESS` can be emitted is in the "onTimeout" method in The Worker Thread. - If the extraction is successful and all the resources have been extracted, the snap-in should emit an event with type `EXTRACTION_DATA_DONE` (no "data" payload). - In case of unexpected error, the snap-in should emit an event with type `EXTRACTION_DATA_ERROR` ("data" payload should contain attribute "error" - error message as a string). - In case of rate limiting (status code 429) for "data extraction", the snap-in *must* emit an event with type `EXTRACTION_DATA_DELAY`. If emitting event of type `EXTRACTION_DATA_DELAY`, you *must* also provide a "delay" attribute, specifying the delay in seconds as an integer. diff --git a/docs/data-extraction.md b/docs/data-extraction.md index 6791e2e..397ea54 100644 --- a/docs/data-extraction.md +++ b/docs/data-extraction.md @@ -15,27 +15,7 @@ This state object is shared across all invocations and keeps track of where the ## Triggering event -Airdrop initiates data extraction by starting the snap-in with a message with event type -`EXTRACTION_DATA_START` when transitioning to the data extraction phase. - -During the data extraction phase, the snap-in extracts data from an external system, -prepares batches of data and uploads them in the form of artifacts (files) to DevRev. - -The snap-in must respond to Airdrop with a message with event type of `EXTRACTION_DATA_PROGRESS`, -together with an optional progress estimate when the maximum Airdrop snap-in runtime (13 minutes) has been reached. - -If the extraction has been rate-limited by the external system and back-off is required, the snap-in -must respond to Airdrop with a message with event type `EXTRACTION_DATA_DELAY` and specifying -back-off time with `delay` attribute (in seconds as an integer). - -In both cases, Airdrop starts the snap-in with a message with event type `EXTRACTION_DATA_CONTINUE`. -In case of `EXTRACTION_DATA_PROGRESS` the restarting is immediate, -meanwhile in case of `EXTRACTION_DATA_DELAY` the restarting is delayed for the given number of seconds. - -Once the data extraction is done, the snap-in must respond to Airdrop with a message with event type `EXTRACTION_DATA_DONE`. - -If data extraction failed in any moment of extraction, the snap-in must respond to Airdrop with a -message with event type `EXTRACTION_DATA_ERROR`. +Refer to the resource [data-extraction-rules-for-emitting-events.md](docs/data-extraction-rules-for-emitting-events.md) for the rules for emitting events for The Extraction Function. ## Implementation From 346f5e15c69b51624f7f5598ce18554d4ae2dafb Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 11:22:49 +0200 Subject: [PATCH 07/12] Abstract not stringying error messages --- templates/spawn_method_instructions.plain | 2 ++ 1 file changed, 2 insertions(+) diff --git a/templates/spawn_method_instructions.plain b/templates/spawn_method_instructions.plain index d4be4c7..e421ff5 100644 --- a/templates/spawn_method_instructions.plain +++ b/templates/spawn_method_instructions.plain @@ -7,3 +7,5 @@ - Use The Initial Domain Mapping JSON object for initialDomainMapping parameter when spawning a new worker. - Note: The Initial Domain Mapping JSON object should be read directly from the JSON file. + +- The Extraction Function should not stringify error messages. If an error is thrown, it should be logged before throwing it. \ No newline at end of file From 42353eb576a39790fcbaa0b67977e09005e158e2 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 11:23:00 +0200 Subject: [PATCH 08/12] Template for data fetching return expectation --- templates/data_fetching_return_expectation.plain | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 templates/data_fetching_return_expectation.plain diff --git a/templates/data_fetching_return_expectation.plain b/templates/data_fetching_return_expectation.plain new file mode 100644 index 0000000..1e92e62 --- /dev/null +++ b/templates/data_fetching_return_expectation.plain @@ -0,0 +1,5 @@ +The Functions implemented *should* also return: + - `status_code` - a status code from The API. + - `api_delay` - Delay in seconds to wait before retrying the request to The API (if no rate limit, set to 0) + - `message` - Information about the call the The Function. + - Note: Every The Function should handle rate limiting. \ No newline at end of file From e7840b42eb5b3a0a8888dc5b861e8ad67eb5c0f0 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 11:23:17 +0200 Subject: [PATCH 09/12] Template for validating external sync unit --- templates/external_sync_unit_acceptance_test.plain | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 templates/external_sync_unit_acceptance_test.plain diff --git a/templates/external_sync_unit_acceptance_test.plain b/templates/external_sync_unit_acceptance_test.plain new file mode 100644 index 0000000..0a24cda --- /dev/null +++ b/templates/external_sync_unit_acceptance_test.plain @@ -0,0 +1,6 @@ +Test The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}). + - Expect The Callback Server to receive *a single* event with "event_type" "EXTRACTION_EXTERNAL_SYNC_UNITS_DONE". + - Let `external_sync_units_array = ["event_data"]["external_sync_units"]` (array of objects). + - Expect `external_sync_units_array` to exist and be an array. + - Expect `len(external_sync_units_array) = {{ expected_external_sync_unit_count }}`. + - Out of `external_sync_units_array`, expect that there exists and element that has "name" equal to "{{ expected_external_sync_unit_name }}". \ No newline at end of file From 0fcdd64d9c313f95e54120941b4d61b92f0c1f0a Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 11:23:39 +0200 Subject: [PATCH 10/12] Template for "internal" client --- templates/internal_client.plain | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 templates/internal_client.plain diff --git a/templates/internal_client.plain b/templates/internal_client.plain new file mode 100644 index 0000000..a41807e --- /dev/null +++ b/templates/internal_client.plain @@ -0,0 +1,3 @@ +The {{ external_system_name }} Internal Client is a TypeScript service that communicates with The API. These are the rules for The {{ external_system_name }} Internal Client: + - If we need to create a new request to The API, we must create a new method in The {{ external_system_name }} Internal Client. + - Communication with The API must be completely abstracted away from The Function. The Function must be able to initialize The {{ external_system_name }} Internal Client, call the relevant method from The {{ external_system_name }} Internal Client and get the response from The API. \ No newline at end of file From 0cd65aae7fe26e55a36f4406ebf4c7794da24104 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Mon, 13 Oct 2025 14:43:44 +0200 Subject: [PATCH 11/12] Remove rate limiting proxy --- devrev-snapin-template.plain | 1 - rate_limiting_proxy.py | 324 ------------------------- run_devrev_snapin_conformance_tests.sh | 13 +- 3 files changed, 12 insertions(+), 326 deletions(-) delete mode 100644 rate_limiting_proxy.py diff --git a/devrev-snapin-template.plain b/devrev-snapin-template.plain index 6eb199b..6cc2bea 100644 --- a/devrev-snapin-template.plain +++ b/devrev-snapin-template.plain @@ -21,7 +21,6 @@ - The Normalization Function is a function that is responsible for normalizing the data from The API to the data expected by The External Domain Metadata JSON object's record type. - The Extraction Function is The Function named "extraction". It is responsible for pushing data to the DevRev servers (The DevRev Servers) based on various event types. - - "event_type" equals "EXTRACTION_DATA_START" or "event_type" equals "EXTRACTION_DATA_CONTINUE" and we get a timeout, The Worker Thread should emit event "EXTRACTION_DATA_PROGRESS". ***Non-Functional Requirements:*** diff --git a/rate_limiting_proxy.py b/rate_limiting_proxy.py deleted file mode 100644 index 51fdcb1..0000000 --- a/rate_limiting_proxy.py +++ /dev/null @@ -1,324 +0,0 @@ -import socket -import threading -import socketserver -import time -import sys -import ssl -import json -import datetime -import email.utils -from urllib.parse import urlparse - -# Rate limiting settings -TOKEN_BUCKET_CAPACITY = 100 # requests -REFILL_RATE = 10 # requests per second -RATE_LIMIT_DELAY = 3 # seconds - -class RateLimiterState: - """A thread-safe class to manage the global rate limiting state.""" - def __init__(self): - self.lock = threading.Lock() - self.rate_limiting_active = False - self.test_name = None - - def start_rate_limiting(self, test_name): - with self.lock: - self.rate_limiting_active = True - self.test_name = test_name - - def end_rate_limiting(self): - with self.lock: - self.rate_limiting_active = False - self.test_name = None - - def is_rate_limiting_active(self): - with self.lock: - return self.rate_limiting_active, self.test_name - -rate_limiter_state = RateLimiterState() - -class TokenBucket: - """A thread-safe token bucket for rate limiting.""" - def __init__(self, capacity, refill_rate): - self.capacity = float(capacity) - self.refill_rate = float(refill_rate) - self.tokens = float(capacity) - self.last_refill = time.time() - self.lock = threading.Lock() - - def consume(self, tokens): - """Consumes tokens from the bucket. Returns True if successful, False otherwise.""" - with self.lock: - now = time.time() - time_since_refill = now - self.last_refill - new_tokens = time_since_refill * self.refill_rate - self.tokens = min(self.capacity, self.tokens + new_tokens) - self.last_refill = now - - if self.tokens >= tokens: - self.tokens -= tokens - return True - return False - -rate_limiter = TokenBucket(TOKEN_BUCKET_CAPACITY, REFILL_RATE) - -class ProxyHandler(socketserver.BaseRequestHandler): - """Handles incoming proxy requests.""" - def handle(self): - if not rate_limiter.consume(1): - print("Rate limit exceeded. Dropping connection.") - try: - self.request.sendall(b'HTTP/1.1 429 Too Many Requests\r\n\r\n') - except OSError: - pass # Client might have already closed the connection. - finally: - self.request.close() - return - - try: - data = self.request.recv(4096) - except ConnectionResetError: - return # Client closed connection. - - if not data: - return - - first_line = data.split(b'\r\n')[0] - try: - method, target, _ = first_line.split() - except ValueError: - print(f"Could not parse request: {first_line}") - self.request.close() - return - - print(f"Received request: {method.decode('utf-8')} {target.decode('utf-8')}") - - path = target.decode('utf-8') - # Check for control plane endpoints on the proxy itself - if path.startswith(('/start_rate_limiting', '/end_rate_limiting')): - self.handle_control_request(method, path, data) - return - - # Check if global rate limiting is active - is_active, test_name = rate_limiter_state.is_rate_limiting_active() - if is_active: - print(f"Rate limiting is active for test: '{test_name}'. Blocking request.") - - retry_after_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=RATE_LIMIT_DELAY) - retry_after_str = email.utils.formatdate( - timeval=retry_after_time.timestamp(), - localtime=False, - usegmt=True - ) - - response_body = {"detail": "Rate limit exceeded"} - self.send_json_response(429, "Too Many Requests", response_body, headers={"Retry-After": retry_after_str}) - return - - if method == b'CONNECT': - self.handle_connect(target) - else: - self.handle_http_request(target, data) - - def get_request_body(self, data): - header_end = data.find(b'\r\n\r\n') - if header_end != -1: - return data[header_end + 4:].decode('utf-8') - return "" - - def send_json_response(self, status_code, status_message, body_json, headers=None): - body_bytes = json.dumps(body_json).encode('utf-8') - - response_headers = [ - f"HTTP/1.1 {status_code} {status_message}", - "Content-Type: application/json", - f"Content-Length: {len(body_bytes)}", - "Connection: close", - ] - - if headers: - for key, value in headers.items(): - response_headers.append(f"{key}: {value}") - - response_headers.append("") - response_headers.append("") - - response = '\r\n'.join(response_headers).encode('utf-8') + body_bytes - try: - self.request.sendall(response) - except OSError: - pass # Client might have closed the connection. - finally: - self.request.close() - - def handle_control_request(self, method, path, data): - if method != b'POST': - self.send_json_response(405, "Method Not Allowed", {"error": "Only POST method is allowed"}) - return - - if path == '/start_rate_limiting': - body_str = self.get_request_body(data) - if not body_str: - self.send_json_response(400, "Bad Request", {"error": "Request body is missing or empty"}) - return - try: - body_json = json.loads(body_str) - test_name = body_json.get('test_name') - if not test_name or not isinstance(test_name, str): - self.send_json_response(400, "Bad Request", {"error": "'test_name' is missing or not a string"}) - return - except json.JSONDecodeError: - self.send_json_response(400, "Bad Request", {"error": "Invalid JSON in request body"}) - return - - rate_limiter_state.start_rate_limiting(test_name) - response_body = {"status": f"rate limiting started for test: {test_name}"} - self.send_json_response(200, "OK", response_body) - - elif path == '/end_rate_limiting': - rate_limiter_state.end_rate_limiting() - response_body = {"status": "rate limiting ended"} - self.send_json_response(200, "OK", response_body) - else: - self.send_json_response(404, "Not Found", {"error": "Endpoint not found"}) - - def handle_http_request(self, target, data): - """Handles HTTP requests like GET, POST, etc.""" - try: - parsed_url = urlparse(target.decode('utf-8')) - host = parsed_url.hostname - port = parsed_url.port - if port is None: - port = 443 if parsed_url.scheme == 'https' else 80 - except Exception as e: - print(f"Could not parse URL for HTTP request: {target}. Error: {e}") - self.request.close() - return - - if not host: - print(f"Invalid host in URL: {target}") - self.request.close() - return - - try: - remote_socket = socket.create_connection((host, port), timeout=10) - if parsed_url.scheme == 'https': - context = ssl.create_default_context() - remote_socket = context.wrap_socket(remote_socket, server_hostname=host) - except (socket.error, ssl.SSLError) as e: - print(f"Failed to connect or SSL wrap to {host}:{port}: {e}") - self.request.close() - return - - # Modify the request to use a relative path and force connection closing - # This ensures each request gets its own connection and is logged. - header_end = data.find(b'\r\n\r\n') - if header_end == -1: - # If no header-body separator is found, assume it's a simple request with no body. - header_end = len(data) - - header_data = data[:header_end] - body = data[header_end:] - - lines = header_data.split(b'\r\n') - first_line = lines[0] - headers = lines[1:] - - method, _, http_version = first_line.split(b' ', 2) - - path = parsed_url.path or '/' - if parsed_url.query: - path += '?' + parsed_url.query - - new_first_line = b' '.join([method, path.encode('utf-8'), http_version]) - - new_headers = [] - for header in headers: - # Remove existing connection-related headers, as we're forcing it to close. - if not header.lower().startswith(b'connection:') and \ - not header.lower().startswith(b'proxy-connection:'): - new_headers.append(header) - new_headers.append(b'Connection: close') - - modified_header_part = new_first_line + b'\r\n' + b'\r\n'.join(new_headers) - modified_request = modified_header_part + body - - try: - remote_socket.sendall(modified_request) - except OSError: - remote_socket.close() - return - - self.tunnel(self.request, remote_socket) - - def handle_connect(self, target): - """Handles CONNECT requests for HTTPS traffic.""" - try: - host, port_str = target.split(b':') - port = int(port_str) - except ValueError: - print(f"Invalid target for CONNECT: {target}") - self.request.close() - return - - try: - remote_socket = socket.create_connection((host.decode('utf-8'), port), timeout=10) - except socket.error as e: - print(f"Failed to connect to {host.decode('utf-8')}:{port}: {e}") - self.request.close() - return - - try: - self.request.sendall(b'HTTP/1.1 200 Connection Established\r\n\r\n') - except OSError: - remote_socket.close() - return - - self.tunnel(self.request, remote_socket) - - def tunnel(self, client_socket, remote_socket): - """Tunnels data between the client and the remote server.""" - stop_event = threading.Event() - - def forward(src, dst): - try: - while not stop_event.is_set(): - data = src.recv(4096) - if not data: - break - dst.sendall(data) - except OSError: - pass - finally: - stop_event.set() - - client_thread = threading.Thread(target=forward, args=(client_socket, remote_socket)) - remote_thread = threading.Thread(target=forward, args=(remote_socket, client_socket)) - - client_thread.start() - remote_thread.start() - - client_thread.join() - remote_thread.join() - - client_socket.close() - remote_socket.close() - -class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): - daemon_threads = True - allow_reuse_address = True - -def main(): - HOST, PORT = "localhost", 8004 - - try: - server = ThreadingTCPServer((HOST, PORT), ProxyHandler) - print(f"Starting proxy server on {HOST}:{PORT}") - server.serve_forever() - except Exception as e: - print(f"Could not start proxy server: {e}", file=sys.stderr) - # The script `run_devrev_snapin_conformance_tests.sh` checks for exit code 69. - sys.exit(69) - -if __name__ == "__main__": - main() diff --git a/run_devrev_snapin_conformance_tests.sh b/run_devrev_snapin_conformance_tests.sh index 5491dfc..182b2e0 100755 --- a/run_devrev_snapin_conformance_tests.sh +++ b/run_devrev_snapin_conformance_tests.sh @@ -91,7 +91,12 @@ start_mock_devrev_server() { } start_proxy_server() { - python3 "$SCRIPT_DIR/rate_limiting_proxy.py" > "$PROXY_SERVER_LOG" 2>&1 & + # Check if the rate limiting proxy file exists + if [ ! -f "$EXEC_DIR/rate_limiting_proxy.py" ]; then + printf "Error: rate_limiting_proxy.py file not found in $EXEC_DIR/rate_limiting_proxy.py. This file should exist (and should be adopted for 3rd party service's rate limiting response format).\n" + exit 69 + fi + python3 "$EXEC_DIR/rate_limiting_proxy.py" > "$PROXY_SERVER_LOG" 2>&1 & PROXY_SERVER_PID=$! sleep 2 # Give the server time to start @@ -201,6 +206,12 @@ if [ -z "$EXTRACTED_FILES_FOLDER_PATH" ]; then exit 69 # EXIT_SERVICE_UNAVAILABLE fi +# Check if EXTRACTED_FILES_FOLDER_PATH does not end with "node_$1/build" +if [[ "$EXTRACTED_FILES_FOLDER_PATH" != *"node_$1/extracted_files" ]]; then + echo "Error: EXTRACTED_FILES_FOLDER_PATH should end with 'node_$1/extracted_files'." + echo "Note: The value of EXTRACTED_FILES_FOLDER_PATH should be /node_$1/extracted_files." + exit 69 # EXIT_SERVICE_UNAVAILABLE +fi # Check if build folder name is provided if [ -z "$1" ]; then From f7eb9e55901be2ffad7a79207c2597cb45f38493 Mon Sep 17 00:00:00 2001 From: Tjaz Erzen Date: Tue, 14 Oct 2025 12:07:24 +0200 Subject: [PATCH 12/12] Revert base folder runner --- base_folder/test/runner.ts | 60 +++----------------------------------- 1 file changed, 4 insertions(+), 56 deletions(-) diff --git a/base_folder/test/runner.ts b/base_folder/test/runner.ts index 5d78707..6a392b9 100644 --- a/base_folder/test/runner.ts +++ b/base_folder/test/runner.ts @@ -75,18 +75,6 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { resp.status(400).send(errMsg); return; } - // crash the process if an empty array is provided - if (Array.isArray(events) && events.length === 0) { - let errMsg = 'Invalid request format: body is an empty array'; - error = { - err_type: RuntimeErrorType.InvalidRequest, - err_msg: errMsg, - } as RuntimeError; - console.error(error.err_msg); - // Return validation error status for empty events input - resp.status(400).send(errMsg); - return; - } // if the request is synchronous, there should be a single event if (!isAsync) { if (events.length > 1) { @@ -100,29 +88,6 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { return; } } else { - // Preflight validation for async requests: ensure each event is minimally valid - for (let event of events) { - if (!event || !event.execution_metadata) { - let errMsg = 'Invalid request format: missing execution_metadata'; - console.error(errMsg); - resp.status(400).send(errMsg); - return; - } - const functionName: FunctionFactoryType = event.execution_metadata.function_name as FunctionFactoryType; - if (functionName === undefined) { - let errMsg = 'Function name not provided in event'; - console.error(errMsg); - resp.status(400).send(errMsg); - return; - } - const f = functionFactory[functionName]; - if (f == undefined) { - let errMsg = `Function ${event.execution_metadata.function_name} not found in factory`; - console.error(errMsg); - resp.status(400).send(errMsg); - return; - } - } // return a success response back to the server resp.status(200).send(); } @@ -136,13 +101,8 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { err_msg: errMsg, } as RuntimeError; console.error(error.err_msg); - if (!isAsync) { - resp.status(400).send(errMsg); - return; - } else { - // For async requests, response has already been sent; skip further processing for this event - continue; - } + resp.status(400).send(errMsg); + return; } const functionName: FunctionFactoryType = event.execution_metadata.function_name as FunctionFactoryType; if (functionName === undefined) { @@ -151,13 +111,7 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { err_msg: 'Function name not provided in event', } as RuntimeError; console.error(error.err_msg); - if (!isAsync) { - resp.status(400).send(error.err_msg); - return; - } else { - receivedError = true; - continue; - } + receivedError = true; } else { const f = functionFactory[functionName]; try { @@ -167,13 +121,7 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) { err_msg: `Function ${event.execution_metadata.function_name} not found in factory`, } as RuntimeError; console.error(error.err_msg); - if (!isAsync) { - resp.status(400).send(error.err_msg); - return; - } else { - receivedError = true; - continue; - } + receivedError = true; } else { result = await run(f, [event]); }