Skip to content

Commit

Permalink
Merge branch 'master' into jonathan/mysql-parallel-source-poc
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 13, 2023
2 parents 385ea1e + 6879cdf commit e3b41a1
Show file tree
Hide file tree
Showing 114 changed files with 1,711 additions and 307 deletions.
69 changes: 69 additions & 0 deletions .github/connectors_weekly_build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: Connectors weekly build

on:
schedule:
# 12PM UTC on Sunday is 2PM CEST, 3PM EEST, 5 PDT.
- cron: "0 12 * * 0"
workflow_dispatch:
inputs:
runs-on:
type: string
default: dev-large-runner
required: true
test-connectors-options:
default: --concurrency=5 --release-stage=alpha
required: true

run-name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Alpha connectors' }} - on ${{ inputs.runs-on || 'dev-large-runner' }}"

jobs:
test_connectors:
name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Alpha connectors' }} - on ${{ inputs.runs-on || 'dev-large-runner' }}"
timeout-minutes: 8640 # 6 days
runs-on: ${{ inputs.runs-on || 'dev-large-runner' }}
steps:
- name: Get start timestamp
id: get-start-timestamp
run: echo "::set-output name=start-timestamp::$(date +%s)"
- name: Login to DockerHub
run: "docker login -u ${DOCKER_HUB_USERNAME} -p ${DOCKER_HUB_PASSWORD}"
env:
DOCKER_HUB_USERNAME: ${{ secrets.DOCKER_HUB_USERNAME }}
DOCKER_HUB_PASSWORD: ${{ secrets.DOCKER_HUB_PASSWORD }}
- name: Checkout Airbyte
uses: actions/checkout@v3
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.gitref }}
- name: Extract branch name
shell: bash
if: github.event_name == 'workflow_dispatch'
run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/})"
id: extract_branch
- name: Install Python 3.10
uses: actions/setup-python@v4
with:
python-version: "3.10"
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
- name: Install ci-connector-ops package
run: pip install ./tools/ci_connector_ops\[pipelines]\
- name: Test connectors
run: |
export _EXPERIMENTAL_DAGGER_RUNNER_HOST="unix:///var/run/buildkit/buildkitd.sock"
DAGGER_CLI_COMMIT="6ed6264f1c4efbf84d310a104b57ef1bc57d57b0"
DAGGER_TMP_BINDIR="/tmp/dagger_${DAGGER_CLI_COMMIT}"
export _EXPERIMENTAL_DAGGER_CLI_BIN="$DAGGER_TMP_BINDIR/dagger"
if [ ! -f "$_EXPERIMENTAL_DAGGER_CLI_BIN" ]; then
mkdir -p "$DAGGER_TMP_BINDIR"
curl "https://dl.dagger.io/dagger/main/${DAGGER_CLI_COMMIT}/dagger_${DAGGER_CLI_COMMIT}_$(uname -s | tr A-Z a-z)_$(uname -m | sed s/x86_64/amd64/).tar.gz" | tar xvz -C "$DAGGER_TMP_BINDIR"
fi
airbyte-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} connectors ${{ inputs.test-connectors-options || '--concurrency=5 --release-stage=alpha' }} test
env:
_EXPERIMENTAL_DAGGER_CLOUD_TOKEN: "p.eyJ1IjogIjFiZjEwMmRjLWYyZmQtNDVhNi1iNzM1LTgxNzI1NGFkZDU2ZiIsICJpZCI6ICJlNjk3YzZiYy0yMDhiLTRlMTktODBjZC0yNjIyNGI3ZDBjMDEifQ.hT6eMOYt3KZgNoVGNYI3_v4CC-s19z8uQsBkGrBhU3k"
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
CI_REPORT_BUCKET_NAME: "airbyte-ci-reports"
CI_GITHUB_ACCESS_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
CI_GIT_BRANCH: ${{ steps.extract_branch.outputs.branch }}
CI_CONTEXT: "nightly_builds"
CI_PIPELINE_START_TIMESTAMP: ${{ steps.get-start-timestamp.outputs.start-timestamp }}
CI_JOB_KEY: "weekly_alpha_test"
4 changes: 2 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ jobs:
- name: Cache Build Artifacts
uses: ./.github/actions/cache-build-artifacts
with:
cache-key: ${{ secrets.CACHE_VERSION }}-connectors
cache-key: ${{ secrets.CACHE_VERSION }}-connectors-gradle-build

- uses: actions/setup-java@v3
with:
Expand All @@ -270,7 +270,7 @@ jobs:
python-version: "3.9"

- name: Install Pyenv
run: python3 -m pip install virtualenv==16.7.9 --user
run: python3 -m pip install virtualenv --user

- name: Install automake
run: apt-get update && apt-get install -y automake build-essential libtool libtool-bin autoconf
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.40.3
current_version = 0.40.4
commit = False

[bumpversion:file:setup.py]
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.40.4
Emit messages using message repository

## 0.40.3
Add utils for inferring datetime formats

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.40.3
RUN pip install --prefix=/install airbyte-cdk==0.40.4

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.40.3
LABEL io.airbyte.version=0.40.4
LABEL io.airbyte.name=airbyte/source-declarative-manifest
12 changes: 10 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,18 @@ def observe_connector_config(non_observed_connector_config: MutableMapping[str,


def emit_configuration_as_airbyte_control_message(config: MutableMapping):
"""
WARNING: deprecated - emit_configuration_as_airbyte_control_message is being deprecated in favor of the MessageRepository mechanism.
See the airbyte_cdk.sources.message package
"""
airbyte_message = create_connector_config_control_message(config)
print(airbyte_message.json(exclude_unset=True))


def create_connector_config_control_message(config):
control_message = AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=time.time() * 1000,
connectorConfig=AirbyteControlConnectorConfigMessage(config=config),
)
airbyte_message = AirbyteMessage(type=Type.CONTROL, control=control_message)
print(airbyte_message.json(exclude_unset=True))
return AirbyteMessage(type=Type.CONTROL, control=control_message)
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_message_groups(
inferred_schema=schema_inferrer.get_stream_schema(
configured_catalog.streams[0].stream.name
), # The connector builder currently only supports reading from a single stream at a time
latest_config_update=latest_config_update.connectorConfig.config if latest_config_update else self._clean_config(config),
latest_config_update=self._clean_config(latest_config_update.connectorConfig.config) if latest_config_update else None,
inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(),
)

Expand Down
53 changes: 34 additions & 19 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,32 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
else:
self.logger.setLevel(logging.INFO)

# todo: add try catch for exceptions with different exit codes
source_spec: ConnectorSpecification = self.source.spec(self.logger)
with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield message.json(exclude_unset=True)
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)

if cmd == "check":
yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.check(source_spec, config))
elif cmd == "discover":
yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.discover(source_spec, config))
elif cmd == "read":
config_catalog = self.source.read_catalog(parsed_args.catalog)
state = self.source.read_state(parsed_args.state)

yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))
try:
with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield from [
self.airbyte_message_to_string(queued_message) for queued_message in self._emit_queued_messages(self.source)
]
yield self.airbyte_message_to_string(message)
else:
raise Exception("Unexpected command " + cmd)
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)

if cmd == "check":
yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.check(source_spec, config))
elif cmd == "discover":
yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.discover(source_spec, config))
elif cmd == "read":
config_catalog = self.source.read_catalog(parsed_args.catalog)
state = self.source.read_state(parsed_args.state)

yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))
else:
raise Exception("Unexpected command " + cmd)
finally:
yield from [self.airbyte_message_to_string(queued_message) for queued_message in self._emit_queued_messages(self.source)]

def check(self, source_spec: ConnectorSpecification, config: TConfig) -> Iterable[AirbyteMessage]:
self.set_up_secret_filter(config, source_spec.connectionSpecification)
Expand All @@ -106,6 +111,7 @@ def check(self, source_spec: ConnectorSpecification, config: TConfig) -> Iterabl
except AirbyteTracedException as traced_exc:
connection_status = traced_exc.as_connection_status_message()
if connection_status:
yield from self._emit_queued_messages(self.source)
yield connection_status
return

Expand All @@ -115,13 +121,16 @@ def check(self, source_spec: ConnectorSpecification, config: TConfig) -> Iterabl
else:
self.logger.error("Check failed")

yield from self._emit_queued_messages(self.source)
yield AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)

def discover(self, source_spec: ConnectorSpecification, config: TConfig) -> Iterable[AirbyteMessage]:
self.set_up_secret_filter(config, source_spec.connectionSpecification)
if self.source.check_config_against_spec:
self.validate_connection(source_spec, config)
catalog = self.source.discover(self.logger, config)

yield from self._emit_queued_messages(self.source)
yield AirbyteMessage(type=Type.CATALOG, catalog=catalog)

def read(self, source_spec: ConnectorSpecification, config: TConfig, catalog: TCatalog, state: TState) -> Iterable[AirbyteMessage]:
Expand All @@ -130,6 +139,7 @@ def read(self, source_spec: ConnectorSpecification, config: TConfig, catalog: TC
self.validate_connection(source_spec, config)

yield from self.source.read(self.logger, config, catalog, state)
yield from self._emit_queued_messages(self.source)

@staticmethod
def validate_connection(source_spec: ConnectorSpecification, config: Mapping[str, Any]) -> None:
Expand All @@ -149,6 +159,11 @@ def set_up_secret_filter(config, connection_specification: Mapping[str, Any]):
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
return airbyte_message.json(exclude_unset=True)

def _emit_queued_messages(self, source) -> Iterable[AirbyteMessage]:
if hasattr(source, "message_repository") and source.message_repository:
yield from source.message_repository.consume_queue()
return


def launch(source: Source, args: List[str]):
source_entrypoint = AirbyteEntrypoint(source)
Expand Down
13 changes: 13 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import StreamData
Expand Down Expand Up @@ -130,6 +131,7 @@ def read(
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
raise e
except Exception as e:
yield from self._emit_queued_messages()
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
Expand Down Expand Up @@ -198,6 +200,7 @@ def _read_stream(
logger.info(f"Marking stream {stream_name} as RUNNING")
# If we just read the first record of the stream, emit the transition to the RUNNING state
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)
yield from self._emit_queued_messages()
yield record

logger.info(f"Read {record_counter} records from {stream_name} stream")
Expand Down Expand Up @@ -264,6 +267,7 @@ def _read_incremental(
record_counter = 0
for message_counter, record_data_or_message in enumerate(records, start=1):
message = self._get_message(record_data_or_message, stream_instance)
yield from self._emit_queued_messages()
yield message
if message.type == MessageType.RECORD:
record = message.record
Expand Down Expand Up @@ -298,6 +302,11 @@ def should_log_slice_message(self, logger: logging.Logger):
"""
return logger.isEnabledFor(logging.DEBUG)

def _emit_queued_messages(self):
if self.message_repository:
yield from self.message_repository.consume_queue()
return

def _read_full_refresh(
self,
logger: logging.Logger,
Expand Down Expand Up @@ -357,3 +366,7 @@ def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage]
return record_data_or_message
else:
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())

@property
def message_repository(self) -> Union[None, MessageRepository]:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,40 @@ definitions:
- "{{ config['record_cursor'] }}"
datetime_format:
title: Cursor Field Datetime Format
description: The datetime format of the Cursor Field.
description: |
The datetime format of the Cursor Field. Use placeholders starting with "%" to describe the format the API is using. The following placeholders are available:
* **%s**: Epoch unix timestamp - `1686218963`
* **%a**: Weekday (abbreviated) - `Sun`
* **%A**: Weekday (full) - `Sunday`
* **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)
* **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`
* **%b**: Month (abbreviated) - `Jan`
* **%B**: Month (full) - `January`
* **%m**: Month (zero-padded) - `01`, `02`, ..., `12`
* **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`
* **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`
* **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`
* **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`
* **%p**: AM/PM indicator
* **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`
* **%S**: Second (zero-padded) - `00`, `01`, ..., `59`
* **%f**: Microsecond (zero-padded to 6 digits) - `000000`
* **%z**: UTC offset - `(empty)`, `+0000`, `-0400`
* **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`
* **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`
* **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`
* **%W**: Week number of the year (starting Monday) - `00`, ..., `53`
* **%c**: Date and time - `Tue Aug 16 21:30:00 1988`
* **%x**: Date standard format - `08/16/1988`
* **%X**: Time standard format - `21:30:00`
* **%%**: Literal '%' character
Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).
type: string
examples:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%d"
- "%s"
cursor_granularity:
title: Cursor Granularity
description:
Expand Down Expand Up @@ -1283,11 +1313,39 @@ definitions:
- "{{ config['start_time'] }}"
datetime_format:
title: Datetime Format
description: Format of the datetime value. Defaults to "%Y-%m-%dT%H:%M:%S.%f%z" if left empty. Use %s if the datetime value is in epoch time (Unix timestamp).
description: |
Format of the datetime value. Defaults to "%Y-%m-%dT%H:%M:%S.%f%z" if left empty. Use placeholders starting with "%" to describe the format the API is using. The following placeholders are available:
* **%s**: Epoch unix timestamp - `1686218963`
* **%a**: Weekday (abbreviated) - `Sun`
* **%A**: Weekday (full) - `Sunday`
* **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)
* **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`
* **%b**: Month (abbreviated) - `Jan`
* **%B**: Month (full) - `January`
* **%m**: Month (zero-padded) - `01`, `02`, ..., `12`
* **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`
* **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`
* **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`
* **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`
* **%p**: AM/PM indicator
* **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`
* **%S**: Second (zero-padded) - `00`, `01`, ..., `59`
* **%f**: Microsecond (zero-padded to 6 digits) - `000000`, `000001`, ..., `999999`
* **%z**: UTC offset - `(empty)`, `+0000`, `-0400`, `+1030`, `+063415`, `-030712.345216`
* **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`
* **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`
* **%U**: Week number of the year (Sunday as first day) - `00`, `01`, ..., `53`
* **%W**: Week number of the year (Monday as first day) - `00`, `01`, ..., `53`
* **%c**: Date and time representation - `Tue Aug 16 21:30:00 1988`
* **%x**: Date representation - `08/16/1988`
* **%X**: Time representation - `21:30:00`
* **%%**: Literal '%' character
Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).
type: string
default: ""
examples:
- "%Y-%m-%dT%H:%M:%S.%f%"
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%d"
- "%s"
max_datetime:
Expand Down
Loading

0 comments on commit e3b41a1

Please sign in to comment.