diff --git a/devrev-snapin-template.plain b/devrev-snapin-template.plain index b5daacf..b862479 100644 --- a/devrev-snapin-template.plain +++ b/devrev-snapin-template.plain @@ -11,10 +11,10 @@ - Please note that EventContextIn and EventContextOut interfaces are deprecated. - The External Domain Metadata is a JSON object that describes the logical domain model of an external system. It defines the types of external record types, the relationships between those record types, and the metadata associated with each field, including the field’s data type, human-readable name, and other descriptive attributes. - - Note: Resource "attachments" should not be included in The External Domain Metadata JSON object even if the external system supports attachments. This is handled automatically by The Airdrop SDK Library. + - Note: Resource "attachments" *MUST NOT* be included in The External Domain Metadata JSON object even if the external system supports attachments. This is handled automatically by The Airdrop SDK Library. - The Initial Domain Mapping is a JSON object that defines how the logical domain model described by The External Domain Metadata maps to the target domain model in DevRev. It specifies the correspondence between each external record type and DevRev leaf type, as well as the mapping of individual fields, including transformation logic, value conversions, and any fixed or default values. - - Note: Resource "attachments" should not be included in The Initial Domain Mapping JSON object even if the external system supports attachments. This is handled automatically by The Airdrop SDK Library. + - Note: Resource "attachments" *MUST NOT* be included in The Initial Domain Mapping JSON object even if the external system supports attachments. This is handled automatically by The Airdrop SDK Library. - The External Sync Unit Item Count is a numeric field representing the total number of items (such as cards) contained within a given external sync unit. It is accessible as "item_count" field of the external sync unit. diff --git a/docs/attachments-extraction.md b/docs/attachments-extraction.md index af14c05..7f60c36 100644 --- a/docs/attachments-extraction.md +++ b/docs/attachments-extraction.md @@ -33,6 +33,7 @@ import { const getAttachmentStream = async ({ item, + event, }: ExternalSystemAttachmentStreamingParams): Promise => { // IMPORTANT: "url" is not necessarily deployed on the base URL of The API. It could also be an external URL (e.g. https://example.com/attachment.pdf, https://devrev.ai, ...) const { id, url } = item; @@ -44,7 +45,7 @@ const getAttachmentStream = async ({ responseType: 'stream', headers: { 'Accept-Encoding': 'identity', - 'Authorization': ... // TODO: Authorization if needed + 'Authorization': ... // TODO: Authorization if needed. Credentials should be read from event["payload"]["connection_data"]["key"] }, }); diff --git a/rate_limiting_proxy.py b/rate_limiting_proxy.py index 3801be0..51fdcb1 100644 --- a/rate_limiting_proxy.py +++ b/rate_limiting_proxy.py @@ -1,170 +1,324 @@ -# This proxy server requires 'fastapi', 'uvicorn', and 'httpx'. -# You can install them with: pip install fastapi uvicorn httpx - -from fastapi import FastAPI, Request, HTTPException -from fastapi.responses import JSONResponse, Response, StreamingResponse -import httpx +import socket +import threading +import socketserver +import time +import sys +import ssl +import json import datetime import email.utils -import os -from pydantic import BaseModel -import logging - -app = FastAPI() - -# Create a single, long-lived client instance for connection pooling -client = httpx.AsyncClient(timeout=30.0) - - -from contextlib import asynccontextmanager - -@asynccontextmanager -async def lifespan(app: FastAPI): - yield - await client.aclose() - -app = FastAPI(lifespan=lifespan) - -# In-memory state for rate limiting -app.state.rate_limiting_active = False -app.state.test_name = "" - -# The API URL to which requests will be proxied. -# Configurable via the PROXY_API_URL environment variable. -API_URL = os.getenv("PROXY_API_URL") -if not API_URL: - print("Error: PROXY_API_URL environment variable not set") - exit(69) # EXIT_SERVICE_UNAVAILABLE - -if not API_URL.endswith("/"): - print("Error: PROXY_API_URL environment variable must end with a slash. Current API URL: ", API_URL) - exit(69) # EXIT_SERVICE_UNAVAILABLE - -RATE_LIMIT_DELAY = 3 - - -def is_streaming_response(response: httpx.Response) -> bool: - """Check if the response should be streamed.""" - # 1. The server is explicitly streaming the response. - if response.headers.get('transfer-encoding', '').lower() == 'chunked': - return True - - # 2. The response content suggests it should be streamed. - content_type = response.headers.get('content-type', '').lower() - content_disposition = response.headers.get('content-disposition', '').lower() - - # Stream if it's a file attachment - if 'attachment' in content_disposition: - return True - - # Stream if it's a common large file type - if content_type.startswith(( - 'application/octet-stream', 'application/pdf', 'application/zip', - 'image/', 'video/', 'audio/' - )): - return True - - # Stream if the content length is over a certain threshold (e.g., 1MB) - content_length = int(response.headers.get('content-length', 0)) - if content_length > 1024 * 1024: - return True - - return False - - -class RateLimitStartRequest(BaseModel): - test_name: str - - -@app.post("/start_rate_limiting") -async def start_rate_limiting(request_body: RateLimitStartRequest): - """Starts rate limiting all proxied requests.""" - app.state.rate_limiting_active = True - app.state.test_name = request_body.test_name - print(f"Rate limiting started for test: {app.state.test_name}") - return JSONResponse(content={"status": f"rate limiting started for test: {app.state.test_name}"}) - - -@app.post("/end_rate_limiting") -async def end_rate_limiting(): - """Ends rate limiting all proxied requests.""" - app.state.rate_limiting_active = False - app.state.test_name = "" - print(f"Rate limiting state ended") - return JSONResponse(content={"status": "rate limiting ended"}) - - -# Catch-all for proxying -@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) -async def proxy(request: Request, path: str): - """ - Proxies all incoming requests to the API_URL. - If rate limiting is active, it returns a 429 status code. - """ - if app.state.rate_limiting_active: - retry_after_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=RATE_LIMIT_DELAY) - retry_after_str = email.utils.formatdate( - timeval=retry_after_time.timestamp(), - localtime=False, - usegmt=True - ) - print(f"Rate limit exceeded for test {app.state.test_name}. Returning 429.") - return JSONResponse( - status_code=429, - content={"detail": "Rate limit exceeded"}, - headers={"Retry-After": retry_after_str} - ) +from urllib.parse import urlparse + +# Rate limiting settings +TOKEN_BUCKET_CAPACITY = 100 # requests +REFILL_RATE = 10 # requests per second +RATE_LIMIT_DELAY = 3 # seconds + +class RateLimiterState: + """A thread-safe class to manage the global rate limiting state.""" + def __init__(self): + self.lock = threading.Lock() + self.rate_limiting_active = False + self.test_name = None + + def start_rate_limiting(self, test_name): + with self.lock: + self.rate_limiting_active = True + self.test_name = test_name + + def end_rate_limiting(self): + with self.lock: + self.rate_limiting_active = False + self.test_name = None + + def is_rate_limiting_active(self): + with self.lock: + return self.rate_limiting_active, self.test_name + +rate_limiter_state = RateLimiterState() + +class TokenBucket: + """A thread-safe token bucket for rate limiting.""" + def __init__(self, capacity, refill_rate): + self.capacity = float(capacity) + self.refill_rate = float(refill_rate) + self.tokens = float(capacity) + self.last_refill = time.time() + self.lock = threading.Lock() + + def consume(self, tokens): + """Consumes tokens from the bucket. Returns True if successful, False otherwise.""" + with self.lock: + now = time.time() + time_since_refill = now - self.last_refill + new_tokens = time_since_refill * self.refill_rate + self.tokens = min(self.capacity, self.tokens + new_tokens) + self.last_refill = now + + if self.tokens >= tokens: + self.tokens -= tokens + return True + return False + +rate_limiter = TokenBucket(TOKEN_BUCKET_CAPACITY, REFILL_RATE) + +class ProxyHandler(socketserver.BaseRequestHandler): + """Handles incoming proxy requests.""" + def handle(self): + if not rate_limiter.consume(1): + print("Rate limit exceeded. Dropping connection.") + try: + self.request.sendall(b'HTTP/1.1 429 Too Many Requests\r\n\r\n') + except OSError: + pass # Client might have already closed the connection. + finally: + self.request.close() + return + + try: + data = self.request.recv(4096) + except ConnectionResetError: + return # Client closed connection. + + if not data: + return + + first_line = data.split(b'\r\n')[0] + try: + method, target, _ = first_line.split() + except ValueError: + print(f"Could not parse request: {first_line}") + self.request.close() + return + + print(f"Received request: {method.decode('utf-8')} {target.decode('utf-8')}") + + path = target.decode('utf-8') + # Check for control plane endpoints on the proxy itself + if path.startswith(('/start_rate_limiting', '/end_rate_limiting')): + self.handle_control_request(method, path, data) + return + + # Check if global rate limiting is active + is_active, test_name = rate_limiter_state.is_rate_limiting_active() + if is_active: + print(f"Rate limiting is active for test: '{test_name}'. Blocking request.") + + retry_after_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=RATE_LIMIT_DELAY) + retry_after_str = email.utils.formatdate( + timeval=retry_after_time.timestamp(), + localtime=False, + usegmt=True + ) - url = httpx.URL(API_URL).join(path) - - # Pass through headers from the original request, excluding the host header. - headers = {key: value for key, value in request.headers.items() if key.lower() != 'host'} + response_body = {"detail": "Rate limit exceeded"} + self.send_json_response(429, "Too Many Requests", response_body, headers={"Retry-After": retry_after_str}) + return - try: - print(f"Received request on The API: {request.method} {url}") - - # Stream the request body to the upstream server - req = client.build_request( - method=request.method, - url=url, - headers=headers, - params=request.query_params, - content=request.stream() - ) - - # Stream the response from the upstream server - resp = await client.send(req, stream=True) - - if is_streaming_response(resp): - print("Decision: streaming response") - async def safe_iterator(response): - try: - async for chunk in response.aiter_raw(): - yield chunk - except httpx.ReadError as e: - print(f"Upstream read error while streaming response: {e}") - finally: - await response.aclose() - - return StreamingResponse( - safe_iterator(resp), - status_code=resp.status_code, - headers=resp.headers, - ) + if method == b'CONNECT': + self.handle_connect(target) else: - print("Decision: buffering response") - await resp.aread() - return Response( - content=resp.content, - status_code=resp.status_code, - headers=resp.headers, - ) - except httpx.RequestError as exc: - print(f"Error connecting to upstream server: {exc}\n{repr(exc)}") - raise HTTPException(status_code=502, detail=f"Error connecting to upstream server: {exc}") - + self.handle_http_request(target, data) + + def get_request_body(self, data): + header_end = data.find(b'\r\n\r\n') + if header_end != -1: + return data[header_end + 4:].decode('utf-8') + return "" + + def send_json_response(self, status_code, status_message, body_json, headers=None): + body_bytes = json.dumps(body_json).encode('utf-8') + + response_headers = [ + f"HTTP/1.1 {status_code} {status_message}", + "Content-Type: application/json", + f"Content-Length: {len(body_bytes)}", + "Connection: close", + ] + + if headers: + for key, value in headers.items(): + response_headers.append(f"{key}: {value}") + + response_headers.append("") + response_headers.append("") + + response = '\r\n'.join(response_headers).encode('utf-8') + body_bytes + try: + self.request.sendall(response) + except OSError: + pass # Client might have closed the connection. + finally: + self.request.close() + + def handle_control_request(self, method, path, data): + if method != b'POST': + self.send_json_response(405, "Method Not Allowed", {"error": "Only POST method is allowed"}) + return + + if path == '/start_rate_limiting': + body_str = self.get_request_body(data) + if not body_str: + self.send_json_response(400, "Bad Request", {"error": "Request body is missing or empty"}) + return + try: + body_json = json.loads(body_str) + test_name = body_json.get('test_name') + if not test_name or not isinstance(test_name, str): + self.send_json_response(400, "Bad Request", {"error": "'test_name' is missing or not a string"}) + return + except json.JSONDecodeError: + self.send_json_response(400, "Bad Request", {"error": "Invalid JSON in request body"}) + return + + rate_limiter_state.start_rate_limiting(test_name) + response_body = {"status": f"rate limiting started for test: {test_name}"} + self.send_json_response(200, "OK", response_body) + + elif path == '/end_rate_limiting': + rate_limiter_state.end_rate_limiting() + response_body = {"status": "rate limiting ended"} + self.send_json_response(200, "OK", response_body) + else: + self.send_json_response(404, "Not Found", {"error": "Endpoint not found"}) + + def handle_http_request(self, target, data): + """Handles HTTP requests like GET, POST, etc.""" + try: + parsed_url = urlparse(target.decode('utf-8')) + host = parsed_url.hostname + port = parsed_url.port + if port is None: + port = 443 if parsed_url.scheme == 'https' else 80 + except Exception as e: + print(f"Could not parse URL for HTTP request: {target}. Error: {e}") + self.request.close() + return + + if not host: + print(f"Invalid host in URL: {target}") + self.request.close() + return + + try: + remote_socket = socket.create_connection((host, port), timeout=10) + if parsed_url.scheme == 'https': + context = ssl.create_default_context() + remote_socket = context.wrap_socket(remote_socket, server_hostname=host) + except (socket.error, ssl.SSLError) as e: + print(f"Failed to connect or SSL wrap to {host}:{port}: {e}") + self.request.close() + return + + # Modify the request to use a relative path and force connection closing + # This ensures each request gets its own connection and is logged. + header_end = data.find(b'\r\n\r\n') + if header_end == -1: + # If no header-body separator is found, assume it's a simple request with no body. + header_end = len(data) + + header_data = data[:header_end] + body = data[header_end:] + + lines = header_data.split(b'\r\n') + first_line = lines[0] + headers = lines[1:] + + method, _, http_version = first_line.split(b' ', 2) + + path = parsed_url.path or '/' + if parsed_url.query: + path += '?' + parsed_url.query + + new_first_line = b' '.join([method, path.encode('utf-8'), http_version]) + + new_headers = [] + for header in headers: + # Remove existing connection-related headers, as we're forcing it to close. + if not header.lower().startswith(b'connection:') and \ + not header.lower().startswith(b'proxy-connection:'): + new_headers.append(header) + new_headers.append(b'Connection: close') + + modified_header_part = new_first_line + b'\r\n' + b'\r\n'.join(new_headers) + modified_request = modified_header_part + body + + try: + remote_socket.sendall(modified_request) + except OSError: + remote_socket.close() + return + + self.tunnel(self.request, remote_socket) + + def handle_connect(self, target): + """Handles CONNECT requests for HTTPS traffic.""" + try: + host, port_str = target.split(b':') + port = int(port_str) + except ValueError: + print(f"Invalid target for CONNECT: {target}") + self.request.close() + return + + try: + remote_socket = socket.create_connection((host.decode('utf-8'), port), timeout=10) + except socket.error as e: + print(f"Failed to connect to {host.decode('utf-8')}:{port}: {e}") + self.request.close() + return + + try: + self.request.sendall(b'HTTP/1.1 200 Connection Established\r\n\r\n') + except OSError: + remote_socket.close() + return + + self.tunnel(self.request, remote_socket) + + def tunnel(self, client_socket, remote_socket): + """Tunnels data between the client and the remote server.""" + stop_event = threading.Event() + + def forward(src, dst): + try: + while not stop_event.is_set(): + data = src.recv(4096) + if not data: + break + dst.sendall(data) + except OSError: + pass + finally: + stop_event.set() + + client_thread = threading.Thread(target=forward, args=(client_socket, remote_socket)) + remote_thread = threading.Thread(target=forward, args=(remote_socket, client_socket)) + + client_thread.start() + remote_thread.start() + + client_thread.join() + remote_thread.join() + + client_socket.close() + remote_socket.close() + +class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + daemon_threads = True + allow_reuse_address = True + +def main(): + HOST, PORT = "localhost", 8004 + + try: + server = ThreadingTCPServer((HOST, PORT), ProxyHandler) + print(f"Starting proxy server on {HOST}:{PORT}") + server.serve_forever() + except Exception as e: + print(f"Could not start proxy server: {e}", file=sys.stderr) + # The script `run_devrev_snapin_conformance_tests.sh` checks for exit code 69. + sys.exit(69) if __name__ == "__main__": - import uvicorn - print("Starting The API Server") - uvicorn.run(app, host="localhost", port=8004) + main() diff --git a/run_devrev_snapin_conformance_tests.sh b/run_devrev_snapin_conformance_tests.sh index 7dccbae..bc66301 100755 --- a/run_devrev_snapin_conformance_tests.sh +++ b/run_devrev_snapin_conformance_tests.sh @@ -172,6 +172,9 @@ else MOCK_SERVER_PID=$(lsof -i :8003 -t) fi +# Set HTTPS_PROXY environment variable to point to proxy server +export HTTPS_PROXY="http://localhost:8004" + if ! lsof -i :8004 -t >/dev/null 2>&1; then start_proxy_server else @@ -351,8 +354,6 @@ conformance_tests_result=$? printf "\n#### Output of the DevRev server log file:\n\n" print_log_file "$MOCK_DEVREV_SERVER_LOG" "$DEVREV_SERVER_LOG_MAX_CHARS" -printf "\n#### Output of the The API Server log file:\n\n" -print_log_file "$PROXY_SERVER_LOG" "$PROXY_SERVER_LOG_MAX_CHARS" printf "\n#### Output of The Snap-In log file:\n" print_log_file "$NODE_SUBFOLDER/app.log" "$SNAP_IN_LOG_MAX_CHARS" printf "\n" diff --git a/templates/data_extraction_acceptance_test.plain b/templates/data_extraction_acceptance_test.plain new file mode 100644 index 0000000..012b89d --- /dev/null +++ b/templates/data_extraction_acceptance_test.plain @@ -0,0 +1,6 @@ + - Test The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}). + - Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE". + - Let `artifact_array = ["event_data"]["artifacts"]` (array of objects). + - Expect `len(artifact_array) > 0`. + - Out of `artifact_array`, expect one of the elements to have "item_type" equal to "{{ artifact_name }}" (`:= {{ artifact_name }}_artifact`) and `{{ artifact_name }}_artifact["item_count"]={{ expected_item_count }}`. + - Note: If `{{ artifact_name }}_artifact["item_count"] < {{ expected_item_count }}`, this indicates that not all the {{ artifact_name }} data was extracted. \ No newline at end of file