Skip to content

Commit

Permalink
✨ Source Github: marked start date as optional field (#30971)
Browse files Browse the repository at this point in the history
Co-authored-by: darynaishchenko <darynaishchenko@users.noreply.github.com>
  • Loading branch information
2 people authored and girarda committed Oct 4, 2023
1 parent c4ce9ed commit a718708
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 31 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.4.3
LABEL io.airbyte.version=1.4.4
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ acceptance_tests:
tests:
- spec_path: "source_github/spec.json"
backward_compatibility_tests_config:
disable_for_version: "0.5.0"
disable_for_version: "1.4.3"
connection:
tests:
- config_path: "secrets/config.json"
Expand All @@ -18,7 +18,7 @@ acceptance_tests:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.4.8"
disable_for_version: "1.4.3"
- config_path: "secrets/config_oauth.json"
backward_compatibility_tests_config:
disable_for_version: "0.4.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.4.3
dockerImageTag: 1.4.4
maxSecondsBetweenMessages: 5400
dockerRepository: airbyte/source-github
githubIssueLabel: source-github
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"api_url": config.get("api_url"),
"access_token_type": access_token_type,
}
organization_args_with_start_date = {**organization_args, "start_date": config["start_date"]}
start_date = config.get("start_date")
organization_args_with_start_date = {**organization_args, "start_date": start_date}

repository_args = {
"authenticator": authenticator,
Expand All @@ -280,7 +281,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"page_size_for_large_streams": page_size,
"access_token_type": access_token_type,
}
repository_args_with_start_date = {**repository_args, "start_date": config["start_date"]}
repository_args_with_start_date = {**repository_args, "start_date": start_date}

default_branches, branches_to_pull = self._get_branches_data(config.get("branch", ""), repository_args)
pull_requests_stream = PullRequests(**repository_args_with_start_date)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GitHub Source Spec",
"type": "object",
"required": ["start_date", "repository"],
"required": ["repository"],
"additionalProperties": true,
"properties": {
"credentials": {
Expand Down Expand Up @@ -64,15 +64,6 @@
}
]
},
"start_date": {
"type": "string",
"title": "Start date",
"description": "The date from which you'd like to replicate data from GitHub in the format YYYY-MM-DDT00:00:00Z. For the streams which support this configuration, only data generated on or after the start date will be replicated. This field doesn't apply to all streams, see the <a href=\"https://docs.airbyte.com/integrations/sources/github\">docs</a> for more info",
"examples": ["2021-03-01T00:00:00Z"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"order": 1,
"format": "date-time"
},
"repository": {
"type": "string",
"examples": [
Expand All @@ -82,10 +73,19 @@
],
"title": "GitHub Repositories",
"description": "Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
"order": 2,
"order": 1,
"pattern": "^([\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))\\s+)*[\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))$",
"pattern_descriptor": "org/repo org/another-repo org/*"
},
"start_date": {
"type": "string",
"title": "Start date",
"description": "The date from which you'd like to replicate data from GitHub in the format YYYY-MM-DDT00:00:00Z. If the date is not set, all data will be replicated. For the streams which support this configuration, only data generated on or after the start date will be replicated. This field doesn't apply to all streams, see the <a href=\"https://docs.airbyte.com/integrations/sources/github\">docs</a> for more info",
"examples": ["2021-03-01T00:00:00Z"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"order": 2,
"format": "date-time"
},
"api_url": {
"type": "string",
"examples": ["https://github.com", "https://github.company.org"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ def _get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Map
state_path = [stream_slice[k] for k in self.slice_keys] + [self.cursor_field]
stream_state_value = getter(stream_state, state_path, strict=False)
if stream_state_value:
return max(self._start_date, stream_state_value)
if self._start_date:
return max(self._start_date, stream_state_value)
return stream_state_value
return self._start_date

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
Expand All @@ -313,7 +315,7 @@ def read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
cursor_value = self.convert_cursor_value(record[self.cursor_field])
if cursor_value > start_point:
if not start_point or cursor_value > start_point:
yield record
elif self.is_sorted == "desc" and cursor_value < start_point:
break
Expand Down Expand Up @@ -661,7 +663,9 @@ def __init__(self, branches_to_pull: Mapping[str, List[str]], default_branches:

def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super(IncrementalMixin, self).request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
params["since"] = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
since = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
if since:
params["since"] = since
params["sha"] = stream_slice["branch"]
return params

Expand Down Expand Up @@ -985,7 +989,9 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
parent_id = str(stream_slice[self.copy_parent_key])
stream_state_value = stream_state.get(repository, {}).get(parent_id, {}).get(self.cursor_field)
if stream_state_value:
return max(self._start_date, stream_state_value)
if self._start_date:
return max(self._start_date, stream_state_value)
return stream_state_value
return self._start_date

def read_records(
Expand All @@ -999,7 +1005,7 @@ def read_records(
for record in super().read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
if record[self.cursor_field] > starting_point:
if not starting_point or record[self.cursor_field] > starting_point:
yield record

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -1254,7 +1260,7 @@ def read_records(
for record in super().read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
if record[self.cursor_field] > starting_point:
if not starting_point or record[self.cursor_field] > starting_point:
yield record

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
Expand All @@ -1263,7 +1269,9 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
project_id = str(stream_slice["project_id"])
stream_state_value = stream_state.get(repository, {}).get(project_id, {}).get(self.cursor_field)
if stream_state_value:
return max(self._start_date, stream_state_value)
if self._start_date:
return max(self._start_date, stream_state_value)
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
Expand Down Expand Up @@ -1322,7 +1330,7 @@ def read_records(
for record in super().read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
if record[self.cursor_field] > starting_point:
if not starting_point or record[self.cursor_field] > starting_point:
yield record

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
Expand All @@ -1332,7 +1340,9 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
column_id = str(stream_slice["column_id"])
stream_state_value = stream_state.get(repository, {}).get(project_id, {}).get(column_id, {}).get(self.cursor_field)
if stream_state_value:
return max(self._start_date, stream_state_value)
if self._start_date:
return max(self._start_date, stream_state_value)
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
Expand Down Expand Up @@ -1407,15 +1417,17 @@ def read_records(
# workflows_runs records cannot be updated. It means if we initially fully synced stream on subsequent incremental sync we need
# only to look behind on 30 days to find all records which were updated.
start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
break_point = (pendulum.parse(start_point) - pendulum.duration(days=self.re_run_period)).to_iso8601_string()
break_point = None
if start_point:
break_point = (pendulum.parse(start_point) - pendulum.duration(days=self.re_run_period)).to_iso8601_string()
for record in super(SemiIncrementalMixin, self).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
cursor_value = record[self.cursor_field]
created_at = record["created_at"]
if cursor_value > start_point:
if not start_point or cursor_value > start_point:
yield record
if created_at < break_point:
if break_point and created_at < break_point:
break


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ def check_source(repo_line: str) -> AirbyteConnectionStatus:
return source.check(logger_mock, config)


@responses.activate
@pytest.mark.parametrize(
"config, expected",
(
(
{
"start_date": "2021-08-27T00:00:46Z",
"access_token": "test_token",
"repository": "airbyte/test",
},
True,
),
({"access_token": "test_token", "repository": "airbyte/test"}, True),
),
)
def test_check_start_date(config, expected):
responses.add(responses.GET, "https://api.github.com/repos/airbyte/test?per_page=100", json={"full_name": "test_full_name"})
source = SourceGithub()
status, _ = source.check_connection(logger=logging.getLogger("airbyte"), config=config)
assert status == expected


@pytest.mark.parametrize(
"api_url, deployment_env, expected_message",
(
Expand Down Expand Up @@ -301,3 +323,41 @@ def test_streams_page_size():
assert stream.page_size == constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM
else:
assert stream.page_size == constants.DEFAULT_PAGE_SIZE


@responses.activate
@pytest.mark.parametrize(
"config, expected",
(
(
{
"start_date": "2021-08-27T00:00:46Z",
"access_token": "test_token",
"repository": "airbyte/test",
},
39,
),
({"access_token": "test_token", "repository": "airbyte/test"}, 39),
),
)
def test_streams_config_start_date(config, expected):
responses.add(responses.GET, "https://api.github.com/repos/airbyte/test?per_page=100", json={"full_name": "airbyte/test"})
responses.add(
responses.GET,
"https://api.github.com/repos/airbyte/test?per_page=100",
json={"full_name": "airbyte/test", "default_branch": "default_branch"},
)
responses.add(
responses.GET,
"https://api.github.com/repos/airbyte/test/branches?per_page=100",
json=[{"repository": "airbyte/test", "name": "name"}],
)
source = SourceGithub()
streams = source.streams(config=config)
# projects stream that uses start date
project_stream = streams[4]
assert len(streams) == expected
if config.get("start_date"):
assert project_stream._start_date == "2021-08-27T00:00:46Z"
else:
assert not project_stream._start_date
5 changes: 3 additions & 2 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ Log into [GitHub](https://github.com) and then generate a [personal access token
3. On the source setup page, select **GitHub** from the Source type dropdown and enter a name for this connector.
4. Click `Authenticate your GitHub account` by selecting Oauth or Personal Access Token for Authentication.
5. Log in and Authorize to the GitHub account.
6. **Start date** - The date from which you'd like to replicate data for streams: `comments`, `commit_comment_reactions`, `commit_comments`, `commits`, `deployments`, `events`, `issue_comment_reactions`, `issue_events`, `issue_milestones`, `issue_reactions`, `issues`, `project_cards`, `project_columns`, `projects`, `pull_request_comment_reactions`, `pull_requests`, `pull_requeststats`, `releases`, `review_comments`, `reviews`, `stargazers`, `workflow_runs`, `workflows`.
7. **GitHub Repositories** - Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/airbyte airbytehq/another-repo` for multiple repositories. If you want to specify the organization to receive data from all its repositories, then you should specify it according to the following example: `airbytehq/*`. Repositories with the wrong name, or repositories that do not exist, or have the wrong name format are not allowed.
6. **GitHub Repositories** - Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/airbyte airbytehq/another-repo` for multiple repositories. If you want to specify the organization to receive data from all its repositories, then you should specify it according to the following example: `airbytehq/*`. Repositories with the wrong name, or repositories that do not exist, or have the wrong name format are not allowed.
7. **Start date (Optional)** - The date from which you'd like to replicate data for streams. If the date is not set, all data will be replicated. Using for streams: `Comments`, `Commit comment reactions`, `Commit comments`, `Commits`, `Deployments`, `Events`, `Issue comment reactions`, `Issue events`, `Issue milestones`, `Issue reactions`, `Issues`, `Project cards`, `Project columns`, `Projects`, `Pull request comment reactions`, `Pull requests`, `Pull request stats`, `Releases`, `Review comments`, `Reviews`, `Stargazers`, `Workflow runs`, `Workflows`.
8. **Branch (Optional)** - Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled. (e.g. `airbytehq/airbyte/master airbytehq/airbyte/my-branch`).
9. **Max requests per hour (Optional)** - The GitHub API allows for a maximum of 5000 requests per hour (15000 for Github Enterprise). You can specify a lower value to limit your use of the API quota.
<!-- /env:cloud -->
Expand Down Expand Up @@ -166,6 +166,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.4.4 | 2023-10-02 | [30971](https://github.com/airbytehq/airbyte/pull/30971) | Mark `start_date` as optional. |
| 1.4.3 | 2023-10-02 | [30979](https://github.com/airbytehq/airbyte/pull/30979) | Fetch archived records in `Project Cards` |
| 1.4.2 | 2023-09-30 | [30927](https://github.com/airbytehq/airbyte/pull/30927) | Provide actionable user error messages |
| 1.4.1 | 2023-09-30 | [30839](https://github.com/airbytehq/airbyte/pull/30839) | Update CDK to Latest version |
Expand Down

0 comments on commit a718708

Please sign in to comment.