Skip to content

Commit

Permalink
馃帀 Source Zuora: support of Unlimited option for Data Query (#7093)
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov committed Oct 22, 2021
1 parent cb6408a commit 9a435ec
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "3dc3037c-5ce8-4661-adc2-f7a9e3c5ece5",
"name": "Zuora",
"dockerRepository": "airbyte/source-zuora",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/zuora"
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@
- sourceDefinitionId: 3dc3037c-5ce8-4661-adc2-f7a9e3c5ece5
name: Zuora
dockerRepository: airbyte/source-zuora
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/zuora
sourceType: api
- sourceDefinitionId: 47f25999-dd5e-4636-8c39-e7cea2453331
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-zuora/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ COPY source_zuora ./source_zuora
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-zuora
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Dict, Mapping

import pendulum
import pytest
from airbyte_cdk import AirbyteLogger
from source_zuora.source import (
SourceZuora,
Expand All @@ -17,7 +18,6 @@
ZuoraSubmitJob,
)
from source_zuora.zuora_auth import ZuoraAuthenticator
from source_zuora.zuora_endpoint import get_url_base
from source_zuora.zuora_excluded_streams import ZUORA_EXCLUDED_STREAMS


Expand All @@ -33,15 +33,9 @@ def client(config: Dict):
"""
Create client by extending config dict with authenticator and url_base
"""
url_base = get_url_base(config["tenant_endpoint"])
authenticator = ZuoraAuthenticator(
token_refresh_endpoint=f"{url_base}/oauth/token",
client_id=config["client_id"],
client_secret=config["client_secret"],
refresh_token=None, # Zuora doesn't have Refresh Token parameter.
)
config["authenticator"] = authenticator
config["url_base"] = url_base
auth = ZuoraAuthenticator(config)
config["authenticator"] = auth.get_auth()
config["url_base"] = auth.url_base
return config


Expand Down Expand Up @@ -73,6 +67,9 @@ class TestZuora:

# create client
config = client(config=get_config("secrets/config.json"))
# create client with Data Query Type == "Unlimited option
unlimited_config = client(config=get_config("secrets/config.json"))
unlimited_config["data_query"] = "Unlimited"

# Define common test input
test_stream = "account"
Expand Down Expand Up @@ -102,18 +99,20 @@ def test_zuora_connection(self):
connection = SourceZuora.check_connection(self, logger=AirbyteLogger, config=self.config)
assert connection == (True, None)

def test_list_all_zuora_objects(self):
@pytest.mark.parametrize("config", [(config)], ids=["LIVE"])
def test_list_all_zuora_objects(self, config):
"""
Test retrieves all the objects (streams) available from Zuora Account and checks if test_stream is in the list.
"""
zuora_objects_list = ZuoraListObjects(self.config).read_records(sync_mode=None)
zuora_objects_list = ZuoraListObjects(config).read_records(sync_mode=None)
assert self.test_stream in zuora_objects_list

def test_excluded_streams_are_not_in_the_list(self):
@pytest.mark.parametrize("config", [(config)], ids=["LIVE"])
def test_excluded_streams_are_not_in_the_list(self, config):
"""
Test retrieves all the objects (streams) available from Zuora Account and checks if excluded streams are not in the list.
"""
zuora_streams_list = SourceZuora.streams(self, config=self.config)
zuora_streams_list = SourceZuora.streams(self, config=config)
# extract stream names from auto-generated stream class
generated_stream_class_names = []
for stream in zuora_streams_list:
Expand All @@ -122,11 +121,12 @@ def test_excluded_streams_are_not_in_the_list(self):
for excluded_stream in ZUORA_EXCLUDED_STREAMS:
assert False if excluded_stream in generated_stream_class_names else True

def test_get_json_schema(self):
@pytest.mark.parametrize("config", [(config)], ids=["LIVE"])
def test_get_json_schema(self, config):
"""
Test of getting schema from Zuora endpoint, check converted JsonSchema Types are correct
Test of getting schema from Zuora endpoint, check converted JsonSchema Types are correct.
"""
schema = list(ZuoraDescribeObject(self.test_stream, config=self.config).read_records(sync_mode=None))
schema = list(ZuoraDescribeObject(self.test_stream, config=config).read_records(sync_mode=None))
schema = {key: d[key] for d in schema for key in d}

# Filter the schema up to the test_schema_fields
Expand Down Expand Up @@ -176,7 +176,8 @@ def test_query_full_object(self):
# If the query is correctly build using connector class return True
assert example_query == test_query

def test_submit_job(self):
@pytest.mark.parametrize("config", [(config)], ids=["LIVE"])
def test_submit_job(self, config):
"""
Test submits the job to the server and returns the `job_id` as confirmation that the job was submitted successfully.
"""
Expand All @@ -187,13 +188,14 @@ def test_submit_job(self):
# Submitting the job to the server
job_id = ZuoraSubmitJob(
ZuoraObjectsBase.query(self, stream_name=self.test_stream, cursor_field=self.test_cursor_field, date_slice=test_date_slice),
self.config,
config,
).read_records(sync_mode=None)

# Return True if we have submited job_id
assert len(list(job_id)) > 0

def test_check_job_status(self):
@pytest.mark.parametrize("config", [(config)], ids=["LIVE"])
def test_check_job_status(self, config):
"""
Test checks submited job for status, if status is "completed" job_data_url will contain URL for jsonl dataFile,
Otherwise, if the status of the job is in ["failed", "canceled", "aborted"] it will raise the error message to the output,
Expand All @@ -206,7 +208,7 @@ def test_check_job_status(self):
# Submiting a job first
job_id = ZuoraSubmitJob(
ZuoraObjectsBase.query(self, stream_name=self.test_stream, cursor_field=self.test_cursor_field, date_slice=test_date_slice),
self.config,
config,
).read_records(sync_mode=None)

# checking iteratively if the job is completed, then return the URL with jsonl datafile
Expand All @@ -215,7 +217,8 @@ def test_check_job_status(self):
# Return True if there is a URL leading to a file
assert "https://" in list(job_data_url)[0]

def test_get_job_result(self):
@pytest.mark.parametrize("config", [(config)], ids=["LIVE"])
def test_get_job_result(self, config):
"""
Test reads the dataFile from URL of submited, checked and successfully completed job.
"""
Expand All @@ -226,7 +229,7 @@ def test_get_job_result(self):
# Submiting a job first
job_id = ZuoraSubmitJob(
ZuoraObjectsBase.query(self, stream_name=self.test_stream, cursor_field=self.test_cursor_field, date_slice=test_date_slice),
self.config,
config,
).read_records(sync_mode=None)

# checking iteratively if the job is completed, then return the URL with jsonl datafile
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"start_date": "2020-01-01",
"window_in_days": 30,
"window_in_days": "30",
"tenant_endpoint": "US Cloud API Sandbox",
"data_query": "Live",
"client_id": "some_client_id",
"client_secret": "some_client_secret",
"tenant_endpoint": "US Cloud API Sandbox"
}
"client_secret": "some_client_secret"
}
70 changes: 29 additions & 41 deletions airbyte-integrations/connectors/source-zuora/source_zuora/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

import pendulum
import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams.http import HttpStream

from .zuora_auth import ZuoraAuthenticator
from .zuora_endpoint import get_url_base
from .zuora_errors import (
QueryWindowError,
ZOQLQueryCannotProcessObject,
ZOQLQueryFailed,
ZOQLQueryFieldCannotResolveAltCursor,
Expand All @@ -40,14 +40,22 @@ class ZuoraStream(HttpStream, ABC):

def __init__(self, config: Dict):
super().__init__(authenticator=config["authenticator"])
self._url_base = config["url_base"]
self.start_date = config["start_date"]
self.window_in_days = config["window_in_days"]
self._config = config

@property
def url_base(self) -> str:
return self._url_base
return self._config["url_base"]

@property
def window_in_days(self) -> float:
"""
Converting `Query Window` config parameter from string type into type float.
"""
try:
value = self._config["window_in_days"]
return float(value)
except ValueError:
raise QueryWindowError(value)

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
""" Abstractmethod HTTPStream CDK dependency """
Expand All @@ -57,11 +65,14 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa
""" Abstractmethod HTTPStream CDK dependency """
return {}

def base_query_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
def base_query_params(self) -> MutableMapping[str, Any]:
"""
Returns base query parameters for default CDK request_json_body method
"""
return {"compression": "NONE", "output": {"target": "S3"}, "outputFormat": "JSON"}
params = {"compression": "NONE", "output": {"target": "S3"}, "outputFormat": "JSON"}
if self._config["data_query"] == "Unlimited":
params["sourceData"] = "DATAHUB"
return params


class ZuoraBase(ZuoraStream):
Expand Down Expand Up @@ -152,7 +163,7 @@ class ZuoraObjectsBase(ZuoraBase):
"""

@property
def state_checkpoint_interval(self) -> int:
def state_checkpoint_interval(self) -> float:
return self.window_in_days

@staticmethod
Expand Down Expand Up @@ -235,25 +246,22 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
...
"""

start_date = pendulum.parse(self.start_date).astimezone()
start_date = pendulum.parse(self._config["start_date"]).astimezone()
end_date = pendulum.now().astimezone()

# Determine stream_state, if no stream_state we use start_date
if stream_state:
state = stream_state.get(self.cursor_field, stream_state.get(self.alt_cursor_field))
start_date = pendulum.parse(state) if state else self.start_date
start_date = pendulum.parse(state) if state else self._config["start_date"]

# use the lowest date between start_date and self.end_date, otherwise API fails if start_date is in future
start_date = min(start_date, end_date)
date_slices = []

while start_date <= end_date:
end_date_slice = start_date.add(days=self.window_in_days)
date_slices.append({"start_date": self.to_datetime_str(start_date), "end_date": self.to_datetime_str(end_date_slice)})
yield {"start_date": self.to_datetime_str(start_date), "end_date": self.to_datetime_str(end_date_slice)}
start_date = end_date_slice

return date_slices


class ZuoraListObjects(ZuoraBase):
"""
Expand All @@ -264,7 +272,6 @@ def query(self, **kwargs) -> str:
return "SHOW TABLES"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
self.logger.info("Retrieving the list of available Objects from Zuora")
return [name["Table"] for name in response]


Expand All @@ -279,7 +286,6 @@ def __init__(self, zuora_object_name: str, config: Dict):
self.zuora_object_name = zuora_object_name

def query(self, **kwargs) -> str:
self.logger.info(f"Getting schema information for {self.zuora_object_name}")
return f"DESCRIBE {self.zuora_object_name}"

def parse_response(self, response: requests.Response, **kwargs) -> List[Dict]:
Expand Down Expand Up @@ -350,7 +356,7 @@ def request_body_json(self, **kwargs) -> Optional[Mapping]:
"""
Override of default CDK method to return SQL-like query and use it in _send_request method.
"""
params = self.base_query_params(stream_state=None)
params = self.base_query_params()
params["query"] = self.query
return params

Expand Down Expand Up @@ -497,16 +503,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
"""
Testing connection availability for the connector by granting the token.
"""

# Define the endpoint from user's config
url_base = get_url_base(config["tenant_endpoint"])
auth = ZuoraAuthenticator(config).get_auth()
try:
ZuoraAuthenticator(
token_refresh_endpoint=f"{url_base}/oauth/token",
client_id=config["client_id"],
client_secret=config["client_secret"],
refresh_token=None, # Zuora doesn't have Refresh Token parameter.
).get_auth_header()
auth.get_auth_header()
return True, None
except Exception as e:
return False, e
Expand All @@ -516,20 +515,9 @@ def streams(self, config: Mapping[str, Any]) -> List[ZuoraStream]:
Mapping a input config of the user input configuration as defined in the connector spec.
Defining streams to run by building stream classes dynamically.
"""

# Define the endpoint from user's config
url_base = get_url_base(config["tenant_endpoint"])

# Get Authotization Header with Access Token
authenticator = ZuoraAuthenticator(
token_refresh_endpoint=f"{url_base}/oauth/token",
client_id=config["client_id"],
client_secret=config["client_secret"],
refresh_token=None, # Zuora doesn't have Refresh Token parameter.
)

config["authenticator"] = authenticator
config["url_base"] = url_base
auth = ZuoraAuthenticator(config)
config["authenticator"] = auth.get_auth()
config["url_base"] = auth.url_base

# List available objects (streams) names from Zuora
# Example: zuora_stream_names = ["account", "country", "user"]
Expand Down

0 comments on commit 9a435ec

Please sign in to comment.