Skip to content

Commit

Permalink
Source Github: implement client-side throttling of requests (#25793)
Browse files Browse the repository at this point in the history
Signed-off-by: Serhii Chvaliuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed May 17, 2023
1 parent 5575d86 commit 30d8bb4
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 6 deletions.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.4.11
LABEL io.airbyte.version=0.5.0
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 0.4.11
dockerImageTag: 0.5.0
maxSecondsBetweenMessages: 5400
dockerRepository: airbyte/source-github
githubIssueLabel: source-github
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "pendulum~=2.1.2", "sgqlc"]

TEST_REQUIREMENTS = ["pytest~=6.1", "connector-acceptance-test", "responses~=0.19.0"]
TEST_REQUIREMENTS = ["pytest~=6.1", "connector-acceptance-test", "responses~=0.23.1", "freezegun~=1.2.0"]

setup(
name="source_github",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from source_github.utils import MultipleTokenAuthenticatorWithRateLimiter

from . import constants
from .streams import (
Expand Down Expand Up @@ -158,6 +159,13 @@ def get_access_token(config: Mapping[str, Any]):
def _get_authenticator(self, config: Mapping[str, Any]):
_, token = self.get_access_token(config)
tokens = [t.strip() for t in token.split(constants.TOKEN_SEPARATOR)]
requests_per_hour = config.get("requests_per_hour")
if requests_per_hour:
return MultipleTokenAuthenticatorWithRateLimiter(
tokens=tokens,
auth_method="token",
requests_per_hour=requests_per_hour,
)
return MultipleTokenAuthenticator(tokens=tokens, auth_method="token")

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@
"default": 10,
"description": "The Github connector contains several streams with a large amount of data. The page size of such streams depends on the size of your repository. We recommended that you specify values between 10 and 30.",
"order": 4
},
"requests_per_hour": {
"type": "integer",
"title": "Max requests per hour",
"description": "The GitHub API allows for a maximum of 5000 requests per hour (15000 for Github Enterprise). You can specify a lower value to limit your use of the API quota.",
"minimum": 1,
"order": 5
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
import time
from itertools import cycle
from types import SimpleNamespace
from typing import List

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator


def getter(D: dict, key_or_keys, strict=True):
Expand All @@ -24,3 +30,59 @@ def read_full_refresh(stream_instance: Stream):
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record


class MultipleTokenAuthenticatorWithRateLimiter(AbstractHeaderAuthenticator):
"""
Each token in the cycle is checked against the rate limiter.
If a token exceeds the capacity limit, the system switches to another token.
If all tokens are exhausted, the system will enter a sleep state until
the first token becomes available again.
"""

DURATION = 3600 # seconds

def __init__(self, tokens: List[str], requests_per_hour: int, auth_method: str = "Bearer", auth_header: str = "Authorization"):
self._auth_method = auth_method
self._auth_header = auth_header
now = time.time()
self._requests_per_hour = requests_per_hour
self._tokens = {t: SimpleNamespace(count=self._requests_per_hour, update_at=now) for t in tokens}
self._tokens_iter = cycle(self._tokens)

@property
def auth_header(self) -> str:
return self._auth_header

@property
def token(self) -> str:
while True:
token = next(self._tokens_iter)
if self._check_token(token):
return f"{self._auth_method} {token}"

def _check_token(self, token: str):
"""check that token is not limited"""
self._refill()
if self._sleep():
self._refill()
if self._tokens[token].count > 0:
self._tokens[token].count -= 1
return True

def _refill(self):
"""refill all needed tokens"""
now = time.time()
for token, ns in self._tokens.items():
if now - ns.update_at >= self.DURATION:
ns.update_at = now
ns.count = self._requests_per_hour

def _sleep(self):
"""sleep only if all tokens is exhausted"""
now = time.time()
if sum([ns.count for ns in self._tokens.values()]) == 0:
sleep_time = self.DURATION - (now - min([ns.update_at for ns in self._tokens.values()]))
logging.warning("Sleeping for %.1f seconds to enforce the limit of %d requests per hour.", sleep_time, self._requests_per_hour)
time.sleep(sleep_time)
return True
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
import time
from unittest.mock import MagicMock

import pytest
import responses
from airbyte_cdk.models import AirbyteConnectionStatus, Status
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from freezegun import freeze_time
from source_github.source import SourceGithub
from source_github.utils import MultipleTokenAuthenticatorWithRateLimiter


def check_source(repo_line: str) -> AirbyteConnectionStatus:
Expand Down Expand Up @@ -172,3 +176,43 @@ def test_streams_no_streams_available_error():
with pytest.raises(AirbyteTracedException) as e:
SourceGithub().streams(config={"access_token": "test_token", "repository": "airbytehq/airbyte-test"})
assert str(e.value) == "No streams available. Please check permissions"


def test_multiple_token_authenticator_with_rate_limiter(monkeypatch):

called_args = []

def sleep_mock(seconds):
frozen_time.tick(delta=datetime.timedelta(seconds=seconds))
called_args.append(seconds)

monkeypatch.setattr(time, 'sleep', sleep_mock)

with freeze_time("2021-01-01 12:00:00") as frozen_time:

authenticator = MultipleTokenAuthenticatorWithRateLimiter(tokens=["token1", "token2"], requests_per_hour=4)
authenticator._tokens["token1"].count = 2

assert authenticator.token == "Bearer token1"
frozen_time.tick(delta=datetime.timedelta(seconds=1))
assert authenticator.token == "Bearer token2"
frozen_time.tick(delta=datetime.timedelta(seconds=1))
assert authenticator.token == "Bearer token1"
frozen_time.tick(delta=datetime.timedelta(seconds=1))
assert authenticator.token == "Bearer token2"
frozen_time.tick(delta=datetime.timedelta(seconds=1))

# token1 is fully exhausted, token2 is still used
assert authenticator._tokens["token1"].count == 0
assert authenticator.token == "Bearer token2"
frozen_time.tick(delta=datetime.timedelta(seconds=1))
assert authenticator.token == "Bearer token2"
frozen_time.tick(delta=datetime.timedelta(seconds=1))
assert called_args == []

# now we have to sleep because all tokens are exhausted
assert authenticator.token == "Bearer token1"
assert called_args == [3594.0]

assert authenticator._tokens["token1"].count == 3
assert authenticator._tokens["token2"].count == 4
6 changes: 4 additions & 2 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Log into [GitHub](https://github.com) and then generate a [personal access token
7. **GitHub Repositories** - Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/airbyte airbytehq/another-repo` for multiple repositories. If you want to specify the organization to receive data from all its repositories, then you should specify it according to the following example: `airbytehq/*`. Repositories with the wrong name, or repositories that do not exist, or have the wrong name format are not allowed.
8. **Branch (Optional)** - Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled. (e.g. `airbytehq/airbyte/master airbytehq/airbyte/my-branch`).
9. **Page size for large streams (Optional)** - The GitHub connector contains several streams with a large load. The page size of such streams depends on the size of your repository. Recommended to specify values between 10 and 30.
10. **Max requests per hour (Optional)** - The GitHub API allows for a maximum of 5000 requests per hour (15000 for Github Enterprise). You can specify a lower value to limit your use of the API quota.
<!-- /env:cloud -->

<!-- env:oss -->
Expand Down Expand Up @@ -163,6 +164,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.5.0 | 2023-05-16 | [25793](https://github.com/airbytehq/airbyte/pull/25793) | Implement client-side throttling of requests |
| 0.4.11 | 2023-05-12 | [26025](https://github.com/airbytehq/airbyte/pull/26025) | Added more transparent depiction of the personal access token expired |
| 0.4.10 | 2023-05-15 | [26075](https://github.com/airbytehq/airbyte/pull/26075) | Add more specific error message description for no repos case. |
| 0.4.9 | 2023-05-01 | [24523](https://github.com/airbytehq/airbyte/pull/24523) | Add undeclared columns to spec |
Expand All @@ -171,8 +173,8 @@ The GitHub connector should not run into GitHub API limitations under normal usa
| 0.4.6 | 2023-03-24 | [24398](https://github.com/airbytehq/airbyte/pull/24398) | Fix caching for `get_starting_point` in stream "Commits" |
| 0.4.5 | 2023-03-23 | [24417](https://github.com/airbytehq/airbyte/pull/24417) | Add pattern_descriptors to fields with an expected format |
| 0.4.4 | 2023-03-17 | [24255](https://github.com/airbytehq/airbyte/pull/24255) | Add field groups and titles to improve display of connector setup form |
| 0.4.3 | 2023-03-04 | [22993](https://github.com/airbytehq/airbyte/pull/22993) | Specified date formatting in specification |
| 0.4.2 | 2023-03-03 | [23467](https://github.com/airbytehq/airbyte/pull/23467) | added user friendly messages, added AirbyteTracedException config_error, updated SAT |
| 0.4.3 | 2023-03-04 | [22993](https://github.com/airbytehq/airbyte/pull/22993) | Specified date formatting in specification |
| 0.4.2 | 2023-03-03 | [23467](https://github.com/airbytehq/airbyte/pull/23467) | added user friendly messages, added AirbyteTracedException config_error, updated SAT |
| 0.4.1 | 2023-01-27 | [22039](https://github.com/airbytehq/airbyte/pull/22039) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.4.0 | 2023-01-20 | [21457](https://github.com/airbytehq/airbyte/pull/21457) | Use GraphQL for `issue_reactions` stream |
| 0.3.12 | 2023-01-18 | [21481](https://github.com/airbytehq/airbyte/pull/21481) | Handle 502 Bad Gateway error with proper log message |
Expand Down

0 comments on commit 30d8bb4

Please sign in to comment.