Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Sentry for performance and errors tracking. #8248

Merged
merged 10 commits into from Dec 21, 2021
19 changes: 15 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Expand Up @@ -9,12 +9,14 @@
import os.path
import sys
import tempfile
from typing import Iterable, List
from typing import Any, Dict, Iterable, List

from airbyte_cdk.logger import AirbyteLogFormatter, init_logger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, get_secret_values, split_config
from airbyte_cdk.sources.utils.sentry import AirbyteSentry
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets

logger = init_logger("airbyte")
Expand Down Expand Up @@ -59,14 +61,23 @@ def parse_args(args: List[str]) -> argparse.Namespace:

return main_parser.parse_args(args)

def configure_sentry(self, spec_schema: Dict[str, Any], parsed_args: argparse.Namespace):
secret_values = []
if "config" in parsed_args:
config = self.source.read_config(parsed_args.config)
secret_values = get_secret_values(spec_schema, config)
source_name = self.source.__module__.split(".")[0]
source_name = source_name.split("_", 1)[-1]
AirbyteSentry.init(source_tag=source_name, secret_values=secret_values)

def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
cmd = parsed_args.command
if not cmd:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes
source_spec = self.source.spec(self.logger)

source_spec: ConnectorSpecification = self.source.spec(self.logger)
self.configure_sentry(source_spec.connectionSpecification, parsed_args)
with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
Expand Down
70 changes: 40 additions & 30 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Expand Up @@ -3,6 +3,7 @@
#


import logging
import os
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
Expand All @@ -13,6 +14,7 @@
import vcr.cassette as Cassette
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.utils.sentry import AirbyteSentry
from requests.auth import AuthBase

from .auth.core import HttpAuthenticator, NoAuth
Expand All @@ -22,6 +24,8 @@
# list of all possible HTTP methods which can be used for sending of request bodies
BODY_REQUEST_METHODS = ("POST", "PUT", "PATCH")

logging.getLogger("vcr").setLevel(logging.ERROR)


class HttpStream(Stream, ABC):
"""
Expand Down Expand Up @@ -272,7 +276,9 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
response: requests.Response = self._session.send(request, **request_kwargs)
AirbyteSentry.add_breadcrumb(message=f"Issue {request.url}", data=request_kwargs)
with AirbyteSentry.start_transaction_span(op="_send", description=request.url):
response: requests.Response = self._session.send(request, **request_kwargs)

if self.should_retry(response):
custom_backoff_time = self.backoff_time(response)
Expand Down Expand Up @@ -313,10 +319,12 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi
"""
if max_tries is not None:
max_tries = max(0, max_tries) + 1
AirbyteSentry.set_context("request", {"url": request.url, "headers": request.headers, "args": request_kwargs})

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)
with AirbyteSentry.start_transaction_span(op="_send_request"):
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)

def read_records(
self,
Expand All @@ -329,36 +337,38 @@ def read_records(
pagination_complete = False

next_page_token = None
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
response = self._send_request(request, request_kwargs)
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:
request_headers = self.request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

else:
response = self._send_request(request, request_kwargs)
if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
response = self._send_request(request, request_kwargs)

yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
else:
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []
# Always return an empty generator just in case no records were ever yielded
yield from []


class HttpSubStream(HttpStream, ABC):
Expand Down
32 changes: 31 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Expand Up @@ -7,8 +7,9 @@
import json
import os
import pkgutil
from typing import Any, ClassVar, Dict, Mapping, Tuple
from typing import Any, ClassVar, Dict, List, Mapping, Set, Tuple

import dpath.util
import jsonref
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import validate
Expand Down Expand Up @@ -144,3 +145,32 @@ def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
else:
main_config[k] = v
return main_config, InternalConfig.parse_obj(internal_config)


def get_secret_values(schema: Mapping[str, Any], config: Mapping[str, Any]) -> List[str]:
def get_secret_pathes(schema: Mapping[str, Any]) -> Set[str]:
pathes = set()

def traverse_schema(schema: Any, path: List[str]):
if isinstance(schema, dict):
for k, v in schema.items():
traverse_schema(v, [*path, k])
elif isinstance(schema, list):
for i in schema:
traverse_schema(i, path)
else:
if path[-1] == "airbyte_secret" and schema is True:
path = "/".join([p for p in path[:-1] if p not in ["properties", "oneOf"]])
pathes.add(path)

traverse_schema(schema, [])
return pathes

secret_pathes = get_secret_pathes(schema)
result = []
for path in secret_pathes:
try:
result.append(dpath.util.get(config, path))
except KeyError:
pass
return result