Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source GitHub: migrate repo and branches to array in spec #31056

Merged
merged 10 commits into from Oct 4, 2023
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.4.5
LABEL io.airbyte.version=1.4.6
LABEL io.airbyte.name=airbyte/source-github
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-github/main.py
Expand Up @@ -7,7 +7,10 @@

from airbyte_cdk.entrypoint import launch
from source_github import SourceGithub
from source_github.config_migrations import MigrateBranch, MigrateRepository

if __name__ == "__main__":
source = SourceGithub()
MigrateRepository.migrate(sys.argv[1:], source)
MigrateBranch.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.4.5
dockerImageTag: 1.4.6
maxSecondsBetweenMessages: 5400
dockerRepository: airbyte/source-github
githubIssueLabel: source-github
Expand Down
@@ -0,0 +1,106 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import abc
import logging
from abc import ABC
from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository

from .source import SourceGithub

logger = logging.getLogger("airbyte_logger")


class MigrateStringToArray(ABC):
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.

Specifically, starting from `1.4.6`, the `repository` and `branch` properties should be like :
> List(["<repository_1>", "<repository_2>", ..., "<repository_n>"])
instead of, in `1.4.5`:
> JSON STR: "repository_1 repository_2"
"""

message_repository: MessageRepository = InMemoryMessageRepository()

@property
@abc.abstractmethod
def migrate_from_key(self) -> str:
...

@property
@abc.abstractmethod
def migrate_to_key(self) -> str:
...

@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config require migration.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
if cls.migrate_from_key in config and cls.migrate_to_key not in config:
return True
return False

@classmethod
def _transform_to_array(cls, config: Mapping[str, Any], source: SourceGithub = None) -> Mapping[str, Any]:
# assign old values to new property that will be used within the new version
config[cls.migrate_to_key] = config[cls.migrate_to_key] if cls.migrate_to_key in config else []
data = set(filter(None, config.get(cls.migrate_from_key).split(" ")))
config[cls.migrate_to_key] = list(data | set(config[cls.migrate_to_key]))
return config

@classmethod
def _modify_and_save(cls, config_path: str, source: SourceGithub, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_to_array(config, source)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: SourceGithub) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)


class MigrateRepository(MigrateStringToArray):

migrate_from_key: str = "repository"
migrate_to_key: str = "repositories"


class MigrateBranch(MigrateStringToArray):

migrate_from_key: str = "branch"
migrate_to_key: str = "branches"
Expand Up @@ -3,7 +3,7 @@
#

from os import getenv
from typing import Any, Dict, List, Mapping, MutableMapping, Set, Tuple
from typing import Any, Dict, List, Mapping, MutableMapping, Tuple
from urllib.parse import urlparse

from airbyte_cdk import AirbyteLogger
Expand Down Expand Up @@ -61,27 +61,15 @@


class SourceGithub(AbstractSource):
@staticmethod
def _get_and_prepare_repositories_config(config: Mapping[str, Any]) -> Set[str]:
"""
_get_and_prepare_repositories_config gets set of repositories names from config and removes simple errors that user could provide
Args:
config: Dict representing connector's config
Returns:
set of provided repositories
"""
config_repositories = set(filter(None, config["repository"].split(" ")))
return config_repositories

@staticmethod
def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> Tuple[List[str], List[str]]:
"""
Parse config.repository and produce two lists: organizations, repositories.
Parse config/repositories and produce two lists: organizations, repositories.
Args:
config (dict): Dict representing connector's config
authenticator(MultipleTokenAuthenticator): authenticator object
"""
config_repositories = SourceGithub._get_and_prepare_repositories_config(config)
config_repositories = set(config.get("repositories"))

repositories = set()
organizations = set()
Expand Down Expand Up @@ -144,6 +132,12 @@ def _get_authenticator(self, config: Mapping[str, Any]):
)
return MultipleTokenAuthenticator(tokens=tokens, auth_method="token")

def _validate_and_transform_config(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
config = self._ensure_default_values(config)
config = self._validate_repositories(config)
config = self._validate_branches(config)
return config

def _ensure_default_values(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
config.setdefault("api_url", "https://api.github.com")
api_url_parsed = urlparse(config["api_url"])
Expand All @@ -159,13 +153,31 @@ def _ensure_default_values(self, config: MutableMapping[str, Any]) -> MutableMap

raise AirbyteTracedException(message=message, failure_type=FailureType.config_error)

def _validate_repositories(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
if config.get("repositories"):
pass
elif config.get("repository"):
config["repositories"] = set(filter(None, config["repository"].split(" ")))

return config

def _validate_branches(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
if config.get("branches"):
pass
elif config.get("branch"):
config["branches"] = set(filter(None, config["branch"].split(" ")))

return config

@staticmethod
def _is_http_allowed() -> bool:
return getenv("DEPLOYMENT_MODE", "").upper() != "CLOUD"

@staticmethod
def _get_branches_data(selected_branches: str, full_refresh_args: Dict[str, Any] = None) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
selected_branches = set(filter(None, selected_branches.split(" ")))
def _get_branches_data(
selected_branches: List, full_refresh_args: Dict[str, Any] = None
) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
selected_branches = set(selected_branches)

# Get the default branch for each repository
default_branches = {}
Expand Down Expand Up @@ -218,7 +230,7 @@ def user_friendly_error_message(self, message: str) -> str:
return user_message

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
config = self._ensure_default_values(config)
config = self._validate_and_transform_config(config)
try:
authenticator = self._get_authenticator(config)
_, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
Expand All @@ -236,7 +248,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = self._get_authenticator(config)
config = self._ensure_default_values(config)
config = self._validate_and_transform_config(config)
try:
organizations, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
except Exception as e:
Expand Down Expand Up @@ -283,7 +295,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
repository_args_with_start_date = {**repository_args, "start_date": start_date}

default_branches, branches_to_pull = self._get_branches_data(config.get("branch", ""), repository_args)
default_branches, branches_to_pull = self._get_branches_data(config.get("branch", []), repository_args)
pull_requests_stream = PullRequests(**repository_args_with_start_date)
projects_stream = Projects(**repository_args_with_start_date)
project_columns_stream = ProjectColumns(projects_stream, **repository_args_with_start_date)
Expand Down
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GitHub Source Spec",
"type": "object",
"required": ["repository"],
"required": ["credentials", "repositories"],
"additionalProperties": true,
"properties": {
"credentials": {
Expand Down Expand Up @@ -72,17 +72,35 @@
"airbytehq/airbyte"
],
"title": "GitHub Repositories",
"description": "Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
"order": 1,
"description": "(DEPRCATED) Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
"airbyte_hidden": true,
"pattern": "^([\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))\\s+)*[\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))$",
"pattern_descriptor": "org/repo org/another-repo org/*"
},
"repositories": {
"type": "array",
"items": {
"type": "string",
"pattern": "^([\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))\\s+)*[\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))$"
},
"minItems": 1,
"examples": [
"airbytehq/airbyte airbytehq/another-repo",
"airbytehq/*",
"airbytehq/airbyte"
],
"title": "GitHub Repositories",
"description": "List of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
"order": 1,
"pattern_descriptor": "org/repo org/another-repo org/*"
},
"start_date": {
"type": "string",
"title": "Start date",
"description": "The date from which you'd like to replicate data from GitHub in the format YYYY-MM-DDT00:00:00Z. If the date is not set, all data will be replicated. For the streams which support this configuration, only data generated on or after the start date will be replicated. This field doesn't apply to all streams, see the <a href=\"https://docs.airbyte.com/integrations/sources/github\">docs</a> for more info",
"examples": ["2021-03-01T00:00:00Z"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"pattern_descriptor": "YYYY-MM-DDTHH:mm:ssZ",
"order": 2,
"format": "date-time"
},
Expand All @@ -98,7 +116,18 @@
"type": "string",
"title": "Branch",
"examples": ["airbytehq/airbyte/master airbytehq/airbyte/my-branch"],
"description": "Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
"description": "(DEPRCATED) Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
"airbyte_hidden": true,
"pattern_descriptor": "org/repo/branch1 org/repo/branch2"
},
"branches": {
"type": "array",
"items": {
"type": "string"
},
"title": "Branches",
"examples": ["airbytehq/airbyte/master airbytehq/airbyte/my-branch"],
"description": "List of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
"order": 4,
"pattern_descriptor": "org/repo/branch1 org/repo/branch2"
},
Expand Down
@@ -0,0 +1,8 @@
{
"credentials": {
"personal_access_token": "personal_access_token"
},
"repository": "airbytehq/airbyte airbytehq/airbyte-platform",
"start_date": "2000-01-01T00:00:00Z",
"branch": "airbytehq/airbyte/master airbytehq/airbyte-platform/main"
}
@@ -0,0 +1,76 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import json
import os
from typing import Any, Mapping

from airbyte_cdk.models import OrchestratorType, Type
from airbyte_cdk.sources import Source
from source_github.config_migrations import MigrateBranch, MigrateRepository
from source_github.source import SourceGithub

# BASE ARGS
CMD = "check"
TEST_CONFIG_PATH = f"{os.path.dirname(__file__)}/test_config.json"
NEW_TEST_CONFIG_PATH = f"{os.path.dirname(__file__)}/test_new_config.json"
SOURCE_INPUT_ARGS = [CMD, "--config", TEST_CONFIG_PATH]
SOURCE: Source = SourceGithub()


# HELPERS
def load_config(config_path: str = TEST_CONFIG_PATH) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)


def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
with open(config_path, "r") as test_config:
config = json.load(test_config)
config.pop("repositories")
with open(config_path, "w") as updated_config:
config = json.dumps(config)
updated_config.write(config)


def test_migrate_config():
migration_instance = MigrateRepository
# migrate the test_config
migration_instance.migrate(SOURCE_INPUT_ARGS, SOURCE)
# load the updated config
test_migrated_config = load_config()
# check migrated property
assert "repositories" in test_migrated_config
assert isinstance(test_migrated_config["repositories"], list)
# check the old property is in place
assert "repository" in test_migrated_config
assert isinstance(test_migrated_config["repository"], str)
# test CONTROL MESSAGE was emitted
control_msg = migration_instance.message_repository._message_queue[0]
assert control_msg.type == Type.CONTROL
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
# new repositories is of type(list)
assert isinstance(control_msg.control.connectorConfig.config["repositories"], list)
# check the migrated values
revert_migration()


def test_config_is_reverted():
# check the test_config state, it has to be the same as before tests
test_config = load_config()
# check the config no longer has the migrated property
assert "repositories" not in test_config
assert "branches" not in test_config
# check the old property is still there
assert "repository" in test_config
assert "branch" in test_config
assert isinstance(test_config["repository"], str)
assert isinstance(test_config["branch"], str)


def test_should_not_migrate_new_config():
new_config = load_config(NEW_TEST_CONFIG_PATH)
for instance in MigrateBranch, MigrateRepository:
assert not instance._should_migrate(new_config)