Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Mixpanel: added HttpAvailabilityStrategy and undeclared fields for export and annotations streams #25056

Merged
merged 16 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.31
LABEL io.airbyte.version=0.1.32
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ acceptance_tests:
extra_fields: no
exact_order: no
extra_records: yes
fail_on_extra_columns: false
full_refresh:
tests:
- config_path: "secrets/config_old.json"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
},
"description": {
"type": ["null", "string"]
},
"user": {
"type": "object",
"properties": {
"id": {
"type": ["null", "integer"]
},
"first_name": {
"type" : ["null", "string"]
},
"last_name": {
"type" : ["null", "string"]
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,207 @@
"type": ["null", "string"],
"format": "date-time"
},
"browser": {
"type": ["null", "string"]
},
"created": {
"type": ["null", "string"]
},
"email": {
"type": ["null", "string"]
},
"first_name": {
"type": ["null", "string"]
},
"last_name": {
"type": ["null", "string"]
},
"initial_referrer": {
"type": ["null", "string"]
},
"os": {
"type": ["null", "string"]
},
"Abandon Cart Count": {
"type": ["null", "integer"]
},
"Account Created Count": {
"type": ["null", "integer"]
},
"Add To Cart Count": {
"type": ["null", "integer"]
},
"Affiliate": {
"type": ["null", "string"]
},
"Browse Count": {
"type": ["null", "string"]
},
"Browse Filter": {
"type": ["null", "array"],
"items": {
"type": ["null", "array"]
}
},
"Campaign Name": {
"type": ["null", "string"]
},
"Campaign Source": {
"type": ["null", "string"]
},
"Card Type": {
"type": ["null", "string"]
},
"Cart Items": {
"type": ["null", "string"]
},
"Cart Size": {
"type": ["null", "string"]
},
"Cart Size (# of Items)": {
"type": ["null", "integer"]
},
"Cart Value": {
"type": ["null", "integer"]
},
"Complete Purchase Count": {
"type": ["null", "integer"]
},
"Coupon": {
"type": ["null", "string"]
},
"Coupon Count Used": {
"type": ["null", "integer"]
},
"Date of Last Item Detail View": {
"type": ["null", "string"]
},
"Delivery Day": {
"type": ["null", "string"]
},
"Delivery Fee": {
"type": ["null", "integer"]
},
"Delivery Fees": {
"type": ["null", "integer"]
},
"Delivery Method": {
"type": ["null", "string"]
},
"Delivery Method Added Count": {
"type": ["null", "integer"]
},
"Gender": {
"type": ["null", "string"]
},
"Item Category": {
"type": ["null", "string"]
},
"Item Cost": {
"type": ["null", "integer"]
},
"Item Detail Page Count": {
"type": ["null", "integer"]
},
"Item Name": {
"type": ["null", "string"]
},
"Item Rating": {
"type": ["null", "integer"]
},
"Items in Browse": {
"type": ["null", "integer"]
},
"Landing Page Loaded Count": {
"type": ["null", "integer"]
},
"Last Cart Abandonment": {
"type": ["null", "string"]
},
"Last Event": {
"type": ["null", "string"]
},
"Last Purchase": {
"type": ["null", "string"]
},
"Last Search": {
"type": ["null", "string"]
},
"Marketing A/B Test": {
"type": ["null", "string"]
},
"Misc Fee": {
"type": ["null", "integer"]
},
"Misc Fees": {
"type": ["null", "integer"]
},
"Number of Cards Added": {
"type": ["null", "integer"]
},
"Number of Cart Abandons": {
"type": ["null", "integer"]
},
"Number of Item Details Viewed": {
"type": ["null", "integer"]
},
"Number of Purchases": {
"type": ["null", "integer"]
},
"Number of Searches": {
"type": ["null", "integer"]
},
"Page Version": {
"type": ["null", "string"]
},
"Payment Method Added Count": {
"type": ["null", "integer"]
},
"Platform": {
"type": ["null", "string"]
},
"Registration Date": {
"type": ["null", "string"]
},
"Registration Method": {
"type": ["null", "string"]
},
"Review Payment Count": {
"type": ["null", "integer"]
},
"Search Count": {
"type": ["null", "integer"]
},
"Search Page": {
"type": ["null", "string"]
},
"Search Results Count": {
"type": ["null", "integer"]
},
"Search Term": {
"type": ["null", "string"]
},
"Suggested Item": {
"type": ["null", "boolean"]
},
"Total Charge": {
"type": ["null", "integer"]
},
"UTM_Medium": {
"type": ["null", "string"]
},
"UTM_Term": {
"type": ["null", "string"]
},
"UTM_source": {
"type": ["null", "string"]
},
"Within Checkout Process": {
"type": ["null", "boolean"]
},
"mp_lib": {
"type": ["null", "string"]
},
"labels": {
"type": ["null", "array"],
"items": {
Expand All @@ -23,6 +224,27 @@
},
"dataset": {
"type": ["null", "string"]
},
"Referred by": {
"type": ["null", "string"]
},
"import": {
"type": ["null", "boolean"]
},
"URL": {
"type": ["null", "string"]
},
"insert_id": {
"type": ["null", "string"]
},
"mp_api_timestamp_ms": {
"type": ["null", "string"]
},
"mp_api_endpoint": {
"type": ["null", "string"]
},
"mp_processing_time_ms": {
"type": ["null", "string"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import time
from abc import ABC
from datetime import timedelta
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
Expand All @@ -12,6 +11,7 @@
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from pendulum import Date


Expand All @@ -20,8 +20,6 @@ class MixpanelStream(HttpStream, ABC):
Formatted API Rate Limit (https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-API-Endpoints):
A maximum of 5 concurrent queries
60 queries per hour.

API Rate Limit Handler: after each request freeze for the time period: 3600/reqs_per_hour_limit seconds
"""

@property
Expand All @@ -31,6 +29,7 @@ def url_base(self):

# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
reqs_per_hour_limit: int = 60 # 1 query per minute
retries: int = 0

def __init__(
self,
Expand All @@ -56,10 +55,6 @@ def __init__(

super().__init__(authenticator=authenticator)

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""Define abstract method"""
return None
Expand Down Expand Up @@ -100,18 +95,18 @@ def parse_response(
# parse the whole response
yield from self.process_response(response, stream_state=stream_state, **kwargs)

if self.reqs_per_hour_limit > 0:
# we skip this block, if self.reqs_per_hour_limit = 0,
# in all other cases wait for X seconds to match API limitations
self.logger.info("Sleep for %s seconds to match API limitations", 3600 / self.reqs_per_hour_limit)
time.sleep(3600 / self.reqs_per_hour_limit)

def backoff_time(self, response: requests.Response) -> float:
"""
Some API endpoints do not return "Retry-After" header
some endpoints return a strangely low number
Some API endpoints do not return "Retry-After" header.
https://developer.mixpanel.com/reference/import-events#rate-limits (exponential backoff)
"""
return max(int(response.headers.get("Retry-After", 600)), 60)

retry_after = response.headers.get("Retry-After")
if retry_after:
return float(retry_after)

self.retries += 1
return 2**self.retries * 60

def get_stream_params(self) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

from ..property_transformation import transform_property_names
from .base import DateSlicesMixin, IncrementalMixpanelStream, MixpanelStream
Expand Down Expand Up @@ -77,6 +78,8 @@ class Export(DateSlicesMixin, IncrementalMixpanelStream):
primary_key: str = None
cursor_field: str = "time"

transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

@property
def url_base(self):
prefix = "-eu" if self.region == "EU" else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .streams import Funnels

AVAILABLE_TESTING_RANGE_DAYS = 30
AVAILABLE_TESTING_RANGE_DAYS = 10


def funnel_slices_patched(self: Funnels, sync_mode):
Expand Down
Loading