diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/config_migrations.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/config_migrations.py index 7af98ed13c4f0..c7560659d8ad9 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/config_migrations.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/config_migrations.py @@ -3,6 +3,7 @@ # import logging from typing import Any, List, Mapping + from airbyte_cdk.config_observation import create_connector_config_control_message from airbyte_cdk.entrypoint import AirbyteEntrypoint from airbyte_cdk.sources import Source @@ -30,10 +31,10 @@ def should_migrate(cls, config: Mapping[str, Any]) -> bool: """ return "project_id" in config - @classmethod - def move_project_id(cls, config: Mapping[str, Any], source: Source = None) -> Mapping[str, Any]: + @staticmethod + def move_project_id(config: Mapping[str, Any]) -> Mapping[str, Any]: # assign old values to new property that will be used within the new version - if isinstance(config.get("credentials")): + if isinstance(config.get("credentials", 0), dict): config["credentials"]["project_id"] = config["project_id"] else: config["credentials"] = {"project_id": config["project_id"]} @@ -43,7 +44,7 @@ def move_project_id(cls, config: Mapping[str, Any], source: Source = None) -> Ma @classmethod def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]: # modify the config - migrated_config = cls.move_project_id(config, source) + migrated_config = cls.move_project_id(config) # save the config source.write_config(migrated_config, config_path) # return modified config diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json index ca6546f1d94ff..39bd5024268e5 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json @@ -36,7 +36,7 @@ "airbyte_secret": true }, "project_id": { - "order": 1, + "order": 3, "title": "Project ID", "description": "Your project ID number. See the docs for more information on how to obtain this. Required if you are using a service account to authenticate.", "type": "integer" diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 49a19b987e151..17ac3c3e0b14a 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator from pendulum import Date +from source_mixpanel.utils import fix_date_time class MixpanelStream(HttpStream, ABC): @@ -90,6 +91,7 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma data = [json_response] for record in data: + fix_date_time(record) yield record def parse_response( diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/utils.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/utils.py index cea753dbee29d..cc1d1402647d8 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/utils.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/utils.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import re from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams import Stream @@ -13,3 +14,50 @@ def read_full_refresh(stream_instance: Stream): records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh) for record in records: yield record + + +# Precompile the regex pattern. +ISO_FORMAT_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2})[ t](\d{2}:\d{2}:\d{2})$") + + +def to_iso_format(s: str) -> str: + """ + Convert a date string to ISO format if it matches recognized patterns. + + Args: + - s (str): Input string to be converted. + + Returns: + - str: Converted string in ISO format or the original string if no recognized pattern is found. + """ + # Use the precompiled regex pattern to match the date format. + match = ISO_FORMAT_PATTERN.match(s) + if match: + return match.group(1) + "T" + match.group(2) + + return s + + +def fix_date_time(record): + """ + Recursively process a data structure to fix date and time formats. + + Args: + - record (dict or list): The input data structure, which can be a dictionary or a list. + + Returns: + - None: The function modifies the input data structure in place. + """ + # Define the list of fields that might contain date and time values. + date_time_fields = {"last_seen", "created", "last_authenticated"} + + if isinstance(record, dict): + for field, value in list(record.items()): # Convert to list to avoid runtime errors during iteration. + if field in date_time_fields and isinstance(value, str): + record[field] = to_iso_format(value) + elif isinstance(value, (dict, list)): + fix_date_time(value) + + elif isinstance(record, list): + for entry in record: + fix_date_time(entry)