Skip to content

Commit

Permalink
Merge branch 'master' into evan/redshift-fails-on-big-data
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Jul 13, 2023
2 parents 58f607f + 792878b commit c6b4ef0
Show file tree
Hide file tree
Showing 338 changed files with 12,688 additions and 2,093 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.50.6
current_version = 0.50.7
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Try it out yourself with our [demo app](https://demo.airbyte.io/), visit our [fu

### Join the Airbyte Community

The Airbyte community can be found in the [Airbyte Community Slack](https://airbyte.com/community), where you can ask questions and voice ideas. You can also ask for help in our [Discourse forum](https://discuss.airbyte.io/), or join our [office hours](https://airbyte.io/weekly-office-hours/). Airbyte's roadmap is publicly viewable on [GitHub](https://github.com/orgs/airbytehq/projects/37/views/1?pane=issue&itemId=26937554).
The Airbyte community can be found in the [Airbyte Community Slack](https://airbyte.com/community), where you can ask questions and voice ideas. You can also ask for help in our [Airbyte Forum](https://github.com/airbytehq/airbyte/discussions), or join our [Office Hours](https://airbyte.io/daily-office-hours/). Airbyte's roadmap is publicly viewable on [GitHub](https://github.com/orgs/airbytehq/projects/37/views/1?pane=issue&itemId=26937554).

For videos and blogs on data engineering and building your data stack, check out Airbyte's [Content Hub](https://airbyte.com/content-hub), [Youtube](https://www.youtube.com/c/AirbyteHQ), and sign up for our [newsletter](https://airbyte.com/newsletter).

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.44.3
current_version = 0.44.4
commit = False

[bumpversion:file:setup.py]
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.44.4
Connector builder: Show all request/responses as part of the testing panel

## 0.44.3
[ISSUE #27494] allow for state to rely on transformed field

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.44.3
RUN pip install --prefix=/install airbyte-cdk==0.44.4

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.44.3
LABEL io.airbyte.version=0.44.4
LABEL io.airbyte.name=airbyte/source-declarative-manifest
125 changes: 83 additions & 42 deletions airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Union
from urllib.parse import parse_qs, urlparse

from airbyte_cdk.connector_builder.models import HttpRequest, HttpResponse, LogMessage, StreamRead, StreamReadPages, StreamReadSlices
from airbyte_cdk.connector_builder.models import (
AuxiliaryRequest,
HttpRequest,
HttpResponse,
LogMessage,
StreamRead,
StreamReadPages,
StreamReadSlices,
)
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand Down Expand Up @@ -57,6 +65,7 @@ def get_message_groups(
slices = []
log_messages = []
latest_config_update: AirbyteControlMessage = None
auxiliary_requests = []
for message_group in self._get_message_groups(
self._read_stream(source, config, configured_catalog),
schema_inferrer,
Expand All @@ -72,13 +81,16 @@ def get_message_groups(
elif isinstance(message_group, AirbyteControlMessage):
if not latest_config_update or latest_config_update.emitted_at <= message_group.emitted_at:
latest_config_update = message_group
elif isinstance(message_group, AuxiliaryRequest):
auxiliary_requests.append(message_group)
else:
slices.append(message_group)

return StreamRead(
logs=log_messages,
slices=slices,
test_read_limit_reached=self._has_reached_limit(slices),
auxiliary_requests=auxiliary_requests,
inferred_schema=schema_inferrer.get_stream_schema(
configured_catalog.streams[0].stream.name
), # The connector builder currently only supports reading from a single stream at a time
Expand All @@ -88,7 +100,7 @@ def get_message_groups(

def _get_message_groups(
self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, datetime_format_inferrer: DatetimeFormatInferrer, limit: int
) -> Iterable[Union[StreamReadPages, AirbyteControlMessage, AirbyteLogMessage, AirbyteTraceMessage]]:
) -> Iterable[Union[StreamReadPages, AirbyteControlMessage, AirbyteLogMessage, AirbyteTraceMessage, AuxiliaryRequest]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
Expand All @@ -114,7 +126,8 @@ def _get_message_groups(
had_error = False

while records_count < limit and (message := next(messages, None)):
if self._need_to_close_page(at_least_one_page_in_group, message):
json_message = self._parse_json(message.log) if message.type == MessageType.LOG else None
if self._need_to_close_page(at_least_one_page_in_group, message, json_message):
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records, True)
current_page_request = None
current_page_response = None
Expand All @@ -127,16 +140,26 @@ def _get_message_groups(
elif message.type == MessageType.LOG and message.log.message.startswith(AbstractSource.SLICE_LOG_PREFIX):
# parsing the first slice
current_slice_descriptor = self._parse_slice_description(message.log.message)
elif message.type == MessageType.LOG and message.log.message.startswith("request:"):
if not at_least_one_page_in_group:
at_least_one_page_in_group = True
current_page_request = self._create_request_from_log_message(message.log)
elif message.type == MessageType.LOG and message.log.message.startswith("response:"):
current_page_response = self._create_response_from_log_message(message.log)
elif message.type == MessageType.LOG:
if message.log.level == Level.ERROR:
had_error = True
yield message.log
if self._is_http_log(json_message):
if self._is_auxiliary_http_request(json_message):
title_prefix = (
"Parent stream: " if json_message.get("airbyte_cdk", {}).get("stream", {}).get("is_substream", False) else ""
)
yield AuxiliaryRequest(
title=title_prefix + json_message.get("http", {}).get("title", None),
description=json_message.get("http", {}).get("description", None),
request=self._create_request_from_log_message(json_message),
response=self._create_response_from_log_message(json_message),
)
else:
at_least_one_page_in_group = True
current_page_request = self._create_request_from_log_message(json_message)
current_page_response = self._create_response_from_log_message(json_message)
else:
if message.log.level == Level.ERROR:
had_error = True
yield message.log
elif message.type == MessageType.TRACE:
if message.trace.type == TraceType.ERROR:
had_error = True
Expand All @@ -153,13 +176,35 @@ def _get_message_groups(
yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor)

@staticmethod
def _need_to_close_page(at_least_one_page_in_group: bool, message: AirbyteMessage) -> bool:
def _need_to_close_page(at_least_one_page_in_group: bool, message: AirbyteMessage, json_message: Optional[dict]) -> bool:
return (
at_least_one_page_in_group
and message.type == MessageType.LOG
and (message.log.message.startswith("request:") or message.log.message.startswith("slice:"))
and (MessageGrouper._is_page_http_request(json_message) or message.log.message.startswith("slice:"))
)

@staticmethod
def _is_page_http_request(json_message):
return MessageGrouper._is_http_log(json_message) and not MessageGrouper._is_auxiliary_http_request(json_message)

@staticmethod
def _is_http_log(message: Optional[dict]) -> bool:
return message and bool(message.get("http", False))

@staticmethod
def _is_auxiliary_http_request(message: Optional[dict]) -> bool:
"""
A auxiliary request is a request that is performed and will not directly lead to record for the specific stream it is being queried.
A couple of examples are:
* OAuth authentication
* Substream slice generation
"""
if not message:
return False

is_http = MessageGrouper._is_http_log(message)
return is_http and message.get("http", {}).get("is_auxiliary", False)

@staticmethod
def _close_page(current_page_request, current_page_response, current_slice_pages, current_page_records, validate_page_complete: bool):
"""
Expand All @@ -184,39 +229,35 @@ def _read_stream(self, source: DeclarativeSource, config: Mapping[str, Any], con
error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}"
yield AirbyteTracedException.from_exception(e, message=error_message).as_airbyte_message()

def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]:
# TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the
@staticmethod
def _parse_json(log_message: AirbyteLogMessage):
# TODO: As a temporary stopgap, the CDK emits request/response data as a log message string. Ideally this should come in the
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
# protocol change is worked on.
raw_request = log_message.message.partition("request:")[2]
try:
request = json.loads(raw_request)
url = urlparse(request.get("url", ""))
full_path = f"{url.scheme}://{url.hostname}{url.path}" if url else ""
parameters = parse_qs(url.query) or None
return HttpRequest(
url=full_path,
http_method=request.get("http_method", ""),
headers=request.get("headers"),
parameters=parameters,
body=request.get("body"),
)
except JSONDecodeError as error:
self.logger.warning(f"Failed to parse log message into request object with error: {error}")
return json.loads(log_message.message)
except JSONDecodeError:
return None

def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpResponse]:
# TODO: As a temporary stopgap, the CDK emits response data as a log message string. Ideally this should come in the
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
# protocol change is worked on.
raw_response = log_message.message.partition("response:")[2]
try:
response = json.loads(raw_response)
body = response.get("body", "{}")
return HttpResponse(status=response.get("status_code"), body=body, headers=response.get("headers"))
except JSONDecodeError as error:
self.logger.warning(f"Failed to parse log message into response object with error: {error}")
return None
@staticmethod
def _create_request_from_log_message(json_http_message: dict) -> HttpRequest:
url = urlparse(json_http_message.get("url", {}).get("full", ""))
full_path = f"{url.scheme}://{url.hostname}{url.path}" if url else ""
request = json_http_message.get("http", {}).get("request", {})
parameters = parse_qs(url.query) or None
return HttpRequest(
url=full_path,
http_method=request.get("method", ""),
headers=request.get("headers"),
parameters=parameters,
body=request.get("body", {}).get("content", ""),
)

@staticmethod
def _create_response_from_log_message(json_http_message: dict) -> HttpResponse:
response = json_http_message.get("http", {}).get("response", {})
body = response.get("body", {}).get("content", "")
return HttpResponse(status=response.get("status_code"), body=body, headers=response.get("headers"))

def _has_reached_limit(self, slices: List[StreamReadPages]):
if len(slices) >= self._max_slices:
Expand Down
9 changes: 9 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/connector_builder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,20 @@ class LogMessage:
level: str


@dataclass
class AuxiliaryRequest:
title: str
description: str
request: HttpRequest
response: HttpResponse


@dataclass
class StreamRead(object):
logs: List[LogMessage]
slices: List[StreamReadSlices]
test_read_limit_reached: bool
auxiliary_requests: List[AuxiliaryRequest]
inferred_schema: Optional[Dict[str, Any]]
inferred_datetime_formats: Optional[Dict[str, str]]
latest_config_update: Optional[Dict[str, Any]]
Expand Down
10 changes: 10 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator
from airbyte_cdk.sources.streams.http.requests_native_auth.oauth import SingleUseRefreshTokenOauth2Authenticator

Expand All @@ -33,6 +34,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut
token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
"""

token_refresh_endpoint: Union[InterpolatedString, str]
Expand All @@ -49,6 +51,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut
expires_in_name: Union[InterpolatedString, str] = "expires_in"
refresh_request_body: Optional[Mapping[str, Any]] = None
grant_type: Union[InterpolatedString, str] = "refresh_token"
message_repository: MessageRepository = NoopMessageRepository()

def __post_init__(self, parameters: Mapping[str, Any]):
self.token_refresh_endpoint = InterpolatedString.create(self.token_refresh_endpoint, parameters=parameters)
Expand Down Expand Up @@ -135,6 +138,13 @@ def access_token(self) -> str:
def access_token(self, value: str):
self._access_token = value

@property
def _message_repository(self) -> MessageRepository:
"""
Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
"""
return self.message_repository


@dataclass
class DeclarativeSingleUseRefreshTokenOauth2Authenticator(SingleUseRefreshTokenOauth2Authenticator, DeclarativeAuthenticator):
Expand Down

0 comments on commit c6b4ef0

Please sign in to comment.