Skip to content

Commit

Permalink
Fix dates
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Sep 20, 2023
1 parent efd6d21 commit 603d9c7
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
import logging
from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources import Source
Expand Down Expand Up @@ -30,10 +31,10 @@ def should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
return "project_id" in config

@classmethod
def move_project_id(cls, config: Mapping[str, Any], source: Source = None) -> Mapping[str, Any]:
@staticmethod
def move_project_id(config: Mapping[str, Any]) -> Mapping[str, Any]:
# assign old values to new property that will be used within the new version
if isinstance(config.get("credentials")):
if isinstance(config.get("credentials", 0), dict):
config["credentials"]["project_id"] = config["project_id"]
else:
config["credentials"] = {"project_id": config["project_id"]}
Expand All @@ -43,7 +44,7 @@ def move_project_id(cls, config: Mapping[str, Any], source: Source = None) -> Ma
@classmethod
def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls.move_project_id(config, source)
migrated_config = cls.move_project_id(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"airbyte_secret": true
},
"project_id": {
"order": 1,
"order": 3,
"title": "Project ID",
"description": "Your project ID number. See the <a href=\"https://help.mixpanel.com/hc/en-us/articles/115004490503-Project-Settings#project-id\">docs</a> for more information on how to obtain this. Required if you are using a service account to authenticate.",
"type": "integer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from pendulum import Date
from source_mixpanel.utils import fix_date_time


class MixpanelStream(HttpStream, ABC):
Expand Down Expand Up @@ -90,6 +91,7 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
data = [json_response]

for record in data:
fix_date_time(record)
yield record

def parse_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import re

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
Expand All @@ -13,3 +14,50 @@ def read_full_refresh(stream_instance: Stream):
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record


# Precompile the regex pattern.
ISO_FORMAT_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2})[ t](\d{2}:\d{2}:\d{2})$")


def to_iso_format(s: str) -> str:
"""
Convert a date string to ISO format if it matches recognized patterns.
Args:
- s (str): Input string to be converted.
Returns:
- str: Converted string in ISO format or the original string if no recognized pattern is found.
"""
# Use the precompiled regex pattern to match the date format.
match = ISO_FORMAT_PATTERN.match(s)
if match:
return match.group(1) + "T" + match.group(2)

return s


def fix_date_time(record):
"""
Recursively process a data structure to fix date and time formats.
Args:
- record (dict or list): The input data structure, which can be a dictionary or a list.
Returns:
- None: The function modifies the input data structure in place.
"""
# Define the list of fields that might contain date and time values.
date_time_fields = {"last_seen", "created", "last_authenticated"}

if isinstance(record, dict):
for field, value in list(record.items()): # Convert to list to avoid runtime errors during iteration.
if field in date_time_fields and isinstance(value, str):
record[field] = to_iso_format(value)
elif isinstance(value, (dict, list)):
fix_date_time(value)

elif isinstance(record, list):
for entry in record:
fix_date_time(entry)

0 comments on commit 603d9c7

Please sign in to comment.