Skip to content

Commit

Permalink
🎉Source Chartmogul: Migrate connector from Alpha (Python) to Beta (YA…
Browse files Browse the repository at this point in the history
…ML) (#19276)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed Dec 9, 2022
1 parent b691841 commit 80e5f29
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 332 deletions.
Expand Up @@ -232,7 +232,7 @@
- name: Chartmogul
sourceDefinitionId: b6604cbd-1b12-4c08-8767-e140d0fb0877
dockerRepository: airbyte/source-chartmogul
dockerImageTag: 0.1.1
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.com/integrations/sources/chartmogul
icon: chartmogul.svg
sourceType: api
Expand Down
9 changes: 6 additions & 3 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Expand Up @@ -2186,7 +2186,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-chartmogul:0.1.1"
- dockerImage: "airbyte/source-chartmogul:0.2.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/chartmogul"
connectionSpecification:
Expand All @@ -2197,15 +2197,17 @@
- "api_key"
- "start_date"
- "interval"
additionalProperties: false
properties:
api_key:
type: "string"
description: "Chartmogul API key"
title: "API key"
description: "Your Chartmogul API key. See <a href=\"https://help.chartmogul.com/hc/en-us/articles/4407796325906-Creating-and-Managing-API-keys#creating-an-api-key\"\
> the docs </a> for info on how to obtain this."
airbyte_secret: true
order: 0
start_date:
type: "string"
title: "Start date"
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
description: "UTC date and time in the format 2017-01-25T00:00:00Z. When\
\ feasible, any data before this date will not be replicated."
Expand All @@ -2214,6 +2216,7 @@
order: 1
interval:
type: "string"
title: "Interval"
description: "Some APIs such as <a href=\"https://dev.chartmogul.com/reference/endpoint-overview-metrics-api\"\
>Metrics</a> require intervals to cluster data."
enum:
Expand Down
Expand Up @@ -34,5 +34,5 @@ COPY source_chartmogul ./source_chartmogul
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/source-chartmogul
@@ -1,20 +1,29 @@
# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-chartmogul:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_chartmogul/spec.json"
tests:
- spec_path: "source_chartmogul/spec.yaml"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: ["activities"]
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: no
exact_order: no
extra_records: yes
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Up @@ -4,7 +4,8 @@
"stream": {
"name": "customers",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
Expand All @@ -13,7 +14,8 @@
"stream": {
"name": "activities",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["uuid"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
Expand All @@ -22,7 +24,8 @@
"stream": {
"name": "customer_count",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["date"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
Expand Down
@@ -0,0 +1,3 @@
{"stream":"customers","data":{"id":72038151,"uuid":"cus_e6e6b1e6-72d4-11ec-ac99-47c8cf90d52c","external_id":"cus_0001","name":"Adam Smith","email":"integration-test@airbyte.io","status":"Past Due","customer-since":"2015-11-01T00:00:00+00:00","attributes":{"custom":{},"clearbit":{},"stripe":{},"tags":[]},"data_source_uuid":"ds_2dbe3b80-72d4-11ec-bdef-0f585abe5bf3","data_source_uuids":["ds_2dbe3b80-72d4-11ec-bdef-0f585abe5bf3"],"external_ids":["cus_0001"],"company":"Airbyte","country":"US","state":"CA","city":"San Francisco","zip":"94121","lead_created_at":"2022-01-11T00:00:00.000Z","free_trial_started_at":"2022-01-11T00:00:00.000Z","address":{"country":"United States","state":"California","city":"San Francisco","address_zip":"94121"},"mrr":4100,"arr":49200,"billing-system-url":null,"chartmogul-url":"https://app.chartmogul.com/#/customers/72038151-Airbyte","billing-system-type":"Import API","currency":"USD","currency-sign":"$"},"emitted_at":1668512244585}
{"stream":"activities","data":{"description":"purchased the Bronze Plan plan with $10.00 discount applied","activity-mrr-movement":4100,"activity-mrr":4100,"activity-arr":49200,"date":"2015-11-01T00:00:00+00:00","type":"new_biz","currency":"USD","subscription-external-id":"sub_0001","plan-external-id":"bb8dcfe0-5505-013a-5d44-263d116b0774","customer-name":"Airbyte","customer-uuid":"cus_e6e6b1e6-72d4-11ec-ac99-47c8cf90d52c","customer-external-id":"cus_0001","billing-connector-uuid":"ds_2dbe3b80-72d4-11ec-bdef-0f585abe5bf3","uuid":"02a72371-72b1-440e-9816-d61435fd2e45"},"emitted_at":1668512245112}
{"stream":"customer_count","data":{"date":"2022-01-01","customers":1,"percentage-change":0},"emitted_at":1668512245914}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-chartmogul/setup.py
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-cdk~=0.10",
]

TEST_REQUIREMENTS = [
Expand Down
@@ -0,0 +1,88 @@
version: "0.2.0"

definitions:
selector:
extractor:
field_pointer: ["entries"]
requester:
url_base: "https://api.chartmogul.com"
http_method: "GET"
authenticator:
type: BasicHttpAuthenticator
username: "{{ config['api_key'] }}"
retriever:
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
customers_stream:
retriever:
$ref: "*ref(definitions.retriever)"
paginator:
type: DefaultPaginator
url_base: "*ref(definitions.requester.url_base)"
pagination_strategy:
type: "PageIncrement"
start_from_page: 1
page_size: 200
page_size_option:
inject_into: request_parameter
field_name: per_page
page_token_option:
inject_into: request_parameter
field_name: page
$options:
name: "customers"
primary_key: "id"
path: "/v1/customers"
activities_stream:
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.requester)"
request_options_provider:
request_parameters:
start-date: "{{ config.start_date }}"
paginator:
type: DefaultPaginator
url_base: "*ref(definitions.requester.url_base)"
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response['entries'][-1]['uuid'] }}"
stop_condition: "{{ not response.has_more }}"
page_size: 200
page_size_option:
inject_into: request_parameter
field_name: per_page
page_token_option:
inject_into: request_parameter
field_name: start-after
$options:
name: "activities"
primary_key: "uuid"
path: "/v1/activities"
customer_count_stream:
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.requester)"
request_options_provider:
request_body_data:
start-date: "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
end-date: "{{ now_utc().strftime('%Y-%m-%d') }}"
interval: "{{ config['interval'] }}"
paginator:
type: NoPagination
$options:
name: "customer_count"
primary_key: "date"
path: "/v1/metrics/customer-count"

streams:
- "*ref(definitions.customers_stream)"
- "*ref(definitions.activities_stream)"
- "*ref(definitions.customer_count_stream)"

check:
stream_names:
- "customers"
Expand Up @@ -2,146 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC
from base64 import b64encode
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib.parse import urljoin
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource

import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.exceptions import RequestBodyException
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
WARNING: Do not modify this file.
"""

# Basic full refresh stream
class ChartmogulStream(HttpStream, ABC):
url_base = "https://api.chartmogul.com"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield from response.json().get("entries", [])


class Customers(ChartmogulStream):
primary_key = "id"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
json_response = response.json()
if json_response.get("has_more", False):
return {"page": json_response.get("current_page") + 1}

return None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {"page": 1 if not next_page_token else next_page_token["page"]}

def path(self, **kwargs) -> str:
return "v1/customers"


class Activities(ChartmogulStream):
primary_key = "uuid"

def __init__(self, start_date: str, **kwargs):
super().__init__(**kwargs)
self.start_date = start_date

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
json_response = response.json()
if not json_response.get("has_more", False):
return None

return {"start-after": json_response["entries"][-1][self.primary_key]}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {}

if next_page_token:
params.update(next_page_token)
elif self.start_date:
params["start-date"] = self.start_date

return params

def path(self, **kwargs) -> str:
return "v1/activities"


class CustomerCount(ChartmogulStream):
primary_key = "date"

def __init__(self, start_date: str, interval: str, **kwargs):
super().__init__(**kwargs)
self.start_date = start_date
self.end_date = datetime.now().strftime("%Y-%m-%d")
self.interval = interval

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None

def request_body_data(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
return {
"start-date": self.start_date,
"end-date": self.end_date,
"interval": self.interval,
}

def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
) -> requests.PreparedRequest:
"""
Override to make possible sending http body with GET request.
"""
args = {"method": self.http_method, "url": urljoin(self.url_base, path), "headers": headers, "params": params}
if json and data:
raise RequestBodyException(
"At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
)
elif json:
args["json"] = json
elif data:
args["data"] = data

return self._session.prepare_request(requests.Request(**args))

def path(self, **kwargs) -> str:
return "v1/metrics/customer-count"


class HttpBasicAuthenticator(TokenAuthenticator):
def __init__(self, token: str, auth_method: str = "Basic", **kwargs):
auth_string = f"{token}:".encode("utf8")
b64_encoded = b64encode(auth_string).decode("utf8")
super().__init__(token=b64_encoded, auth_method=auth_method, **kwargs)


# Source
class SourceChartmogul(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
auth = HttpBasicAuthenticator(config["api_key"], auth_method="Basic").get_auth_header()
url = f"{ChartmogulStream.url_base}/v1/ping"
try:
resp = requests.get(url, headers=auth)
resp.raise_for_status()
return True, None
except Exception as e:
return False, e

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = HttpBasicAuthenticator(config["api_key"], auth_method="Basic")
return [
Customers(authenticator=auth),
CustomerCount(authenticator=auth, start_date=config.get("start_date"), interval=config.get("interval")),
Activities(authenticator=auth, start_date=config.get("start_date")),
]
# Declarative Source
class SourceChartmogul(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "chartmogul.yaml"})

0 comments on commit 80e5f29

Please sign in to comment.