refactor(data-import): refactor source credential handlers#30260
refactor(data-import): refactor source credential handlers#30260
Conversation
There was a problem hiding this comment.
PR Summary
This PR introduces a significant refactoring of data source handlers in PostHog, implementing a standardized pattern for managing external data source integrations.
Key changes:
- Introduces base
SourceHandlerclass in/pipelines/source/handlers.pyto standardize credential validation and schema retrieval - Adds specialized handlers for multiple data sources (Stripe, Hubspot, BigQuery, etc.) with consistent validation patterns
- Implements SSH tunnel support and improved error handling in SQL database handlers
- Adds schema diff checking capabilities to detect changes between source and database
- Consolidates common validation patterns and removes redundant code across handlers
The changes appear well-structured and improve maintainability, though there are some potential concerns around error handling consistency and edge cases in schema validation.
12 file(s) reviewed, 18 comment(s)
Edit PR Review Bot Settings | Greptile
| api_key = self.request_data.get("api_key", "") | ||
| site_name = self.request_data.get("site_name", "") |
There was a problem hiding this comment.
style: consider trimming whitespace from api_key and site_name inputs, as seen in test_trimming_payload
| api_key = self.request_data.get("api_key", "") | |
| site_name = self.request_data.get("site_name", "") | |
| api_key = self.request_data.get("api_key", "").strip() | |
| site_name = self.request_data.get("site_name", "").strip() |
| if not salesforce_integration_id: | ||
| return False, "Missing required parameters: Salesforce integration ID" |
There was a problem hiding this comment.
style: Error message inconsistent with other handlers - should be 'Invalid credentials: Salesforce integration ID is missing' to match pattern
| for column_name, column_type in columns | ||
| ], | ||
| "incremental_available": True, | ||
| "incremental_field": columns[0][0] if len(columns) > 0 and len(columns[0]) > 0 else None, |
There was a problem hiding this comment.
style: Assumes first column is always suitable for incremental field. Should validate column type is appropriate for incremental updates.
| auth_type_obj = self.request_data.get("auth_type", {}) | ||
| auth_type = auth_type_obj.get("selection", None) | ||
| auth_type_username = auth_type_obj.get("username", None) |
There was a problem hiding this comment.
logic: auth_type is allowed to be None here, but there's no validation to ensure a valid auth_type is provided. This could lead to authentication failures.
| auth_type_obj = self.request_data.get("auth_type", {}) | |
| auth_type = auth_type_obj.get("selection", None) | |
| auth_type_username = auth_type_obj.get("username", None) | |
| auth_type_obj = self.request_data.get("auth_type", {}) | |
| auth_type = auth_type_obj.get("selection", None) | |
| if auth_type not in ["password", "keypair"]: | |
| return False, "Invalid auth_type: must be 'password' or 'keypair'" | |
| auth_type_username = auth_type_obj.get("username", None) |
|
|
||
| class StripeSourceHandler(SourceHandler): | ||
| def validate_credentials(self) -> tuple[bool, str | None]: | ||
| key = self.request_data.get("stripe_secret_key", "") |
There was a problem hiding this comment.
logic: Missing key existence check before validation. Empty string will be used if key doesn't exist.
| subdomain_regex = re.compile("^[a-zA-Z-]+$") | ||
| if region == "US" and not subdomain_regex.match(subdomain): |
There was a problem hiding this comment.
logic: Subdomain regex will reject valid subdomains containing numbers. Consider using ^[a-zA-Z0-9-]+$ instead
| subdomain_regex = re.compile("^[a-zA-Z-]+$") | |
| if region == "US" and not subdomain_regex.match(subdomain): | |
| subdomain_regex = re.compile("^[a-zA-Z0-9-]+$") | |
| if region == "US" and not subdomain_regex.match(subdomain): |
| if region == "US" and not subdomain_regex.match(subdomain): | ||
| return False, "Invalid credentials: Vitally subdomain is incorrect" |
There was a problem hiding this comment.
logic: Subdomain validation only happens for US region. Should validate for all regions or document why US-only
| api_key = self.request_data.get("api_key", "") | ||
| email_address = self.request_data.get("email_address", "") | ||
|
|
||
| subdomain_regex = re.compile("^[a-zA-Z-]+$") |
There was a problem hiding this comment.
logic: regex pattern could be too restrictive - some Zendesk subdomains may contain numbers
| subdomain_regex = re.compile("^[a-zA-Z-]+$") | |
| subdomain_regex = re.compile("^[a-zA-Z0-9-]+$") |
| subdomain = self.request_data.get("subdomain", "") | ||
| api_key = self.request_data.get("api_key", "") | ||
| email_address = self.request_data.get("email_address", "") |
There was a problem hiding this comment.
style: consider validating that none of these required fields are empty before proceeding with validation
| current_schemas = handler.get_schema_options() | ||
| current_schemas = [schema["table"] for schema in current_schemas] |
There was a problem hiding this comment.
logic: schema["table"] access could raise KeyError if schema options format changes
| current_schemas = handler.get_schema_options() | |
| current_schemas = [schema["table"] for schema in current_schemas] | |
| current_schemas = handler.get_schema_options() | |
| current_schemas = [schema.get("table") for schema in current_schemas if isinstance(schema, dict) and schema.get("table")] |
| return Response( | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| data={"message": "Invalid parameter: source_type"}, | ||
| data={"message": str(e)}, |
Check warning
Code scanning / CodeQL
Information exposure through an exception Medium
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 11 months ago
To fix the problem, we need to ensure that detailed exception messages are not exposed to the end user. Instead, we should log the detailed error message on the server and return a generic error message to the user. This can be achieved by modifying the exception handling block to log the exception and return a generic error message.
Specifically, we will:
- Log the detailed exception message using
logger.exception. - Return a generic error message in the HTTP response.
| @@ -978,5 +978,6 @@ | ||
| except ValidationError as e: | ||
| logger.exception(f"Validation error for source type {source_type}", exc_info=e) | ||
| return Response( | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| data={"message": str(e)}, | ||
| data={"message": "Validation error occurred"}, | ||
| ) | ||
| @@ -987,3 +988,3 @@ | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| data={"message": f"Error handling source type {source_type}"}, | ||
| data={"message": "An internal error has occurred"}, | ||
| ) |
| logger.exception(f"Error checking schema changes for source type {source_type}", exc_info=e) | ||
| return Response( | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| data={"message": f"Error checking schema changes: {str(e)}"}, |
Check warning
Code scanning / CodeQL
Information exposure through an exception Medium
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 11 months ago
To fix the problem, we need to replace the detailed error message returned to the user with a more generic message. This can be done by modifying the response data in the exception handling block. The detailed error message should be logged using logger.exception, which already captures the stack trace.
Steps to fix:
- Modify the response data in the exception handling block to return a generic error message.
- Ensure that the detailed error message is logged for debugging purposes.
| @@ -1124,3 +1124,3 @@ | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| data={"message": f"Error checking schema changes: {str(e)}"}, | ||
| data={"message": "An internal error has occurred while checking schema changes."}, | ||
| ) |
|
This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the |
|
This PR was closed due to lack of activity. Feel free to reopen if it's still relevant. |
Problem
Changes
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
Does this work well for both Cloud and self-hosted?
How did you test this code?