Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Google Search Console: improve config validation - site_urls #17751

Merged
merged 18 commits into from Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/source-google-search-console
Expand Up @@ -4,15 +4,16 @@

import json
from typing import Any, List, Mapping, Optional, Tuple
from urllib.parse import urlparse

import jsonschema
import pendulum
import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator
from jsonschema import validate
from source_google_search_console.exceptions import InvalidSiteURLValidationError
from source_google_search_console.service_account_authenticator import ServiceAccountAuthenticator
from source_google_search_console.streams import (
Expand Down Expand Up @@ -41,11 +42,42 @@


class SourceGoogleSearchConsole(AbstractSource):
@staticmethod
def normalize_url(url):
parse_result = urlparse(url)
if parse_result.path == "":
parse_result = parse_result._replace(path="/")
return parse_result.geturl()

def _validate_and_transform(self, config: Mapping[str, Any]):
authorization = config["authorization"]
if authorization["auth_type"] == "Service":
try:
authorization["service_account_info"] = json.loads(authorization["service_account_info"])
except ValueError:
raise Exception("authorization.service_account_info is not valid JSON")

if "custom_reports" in config:
try:
config["custom_reports"] = json.loads(config["custom_reports"])
except ValueError:
raise Exception("custom_reports is not valid JSON")
jsonschema.validate(config["custom_reports"], custom_reports_schema)

pendulum.parse(config["start_date"])
end_date = config.get("end_date")
if end_date:
pendulum.parse(end_date)
config["end_date"] = end_date or pendulum.now().to_date_string()

config["site_urls"] = [self.normalize_url(url) for url in config["site_urls"]]
return config

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
config = self._validate_and_transform(config)
stream_kwargs = self.get_stream_kwargs(config)
self.validate_site_urls(config, stream_kwargs)

self.validate_site_urls(config["site_urls"], stream_kwargs["authenticator"])
sites = Sites(**stream_kwargs)
stream_slice = sites.stream_slices(SyncMode.full_refresh)

Expand All @@ -56,7 +88,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
next(sites_gen)
return True, None

except InvalidSiteURLValidationError as e:
except (InvalidSiteURLValidationError, jsonschema.ValidationError) as e:
return False, repr(e)
except Exception as error:
return (
Expand All @@ -65,9 +97,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
)

@staticmethod
def validate_site_urls(config, stream_kwargs):
auth = stream_kwargs["authenticator"]

def validate_site_urls(site_urls, auth):
if isinstance(auth, ServiceAccountAuthenticator):
request = auth(requests.Request(method="GET", url="https://www.googleapis.com/webmasters/v3/sites"))
with requests.Session() as s:
Expand All @@ -76,18 +106,16 @@ def validate_site_urls(config, stream_kwargs):
response = requests.get("https://www.googleapis.com/webmasters/v3/sites", headers=auth.get_auth_header())
response_data = response.json()

site_urls = set([s["siteUrl"] for s in response_data["siteEntry"]])
provided_by_client = set(config["site_urls"])

invalid_site_url = provided_by_client - site_urls
remote_site_urls = {s["siteUrl"] for s in response_data["siteEntry"]}
invalid_site_url = set(site_urls) - remote_site_urls
if invalid_site_url:
raise InvalidSiteURLValidationError(f'The following URLs are not permitted: {", ".join(invalid_site_url)}')

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""

config = self._validate_and_transform(config)
stream_config = self.get_stream_kwargs(config)

streams = [
Expand All @@ -106,40 +134,32 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return streams

def get_custom_reports(self, config: Mapping[str, Any], stream_config: Mapping[str, Any]) -> List[Optional[Stream]]:
if "custom_reports" not in config:
return []

reports = json.loads(config["custom_reports"])
validate(reports, custom_reports_schema)

return [
type(report["name"], (SearchAnalyticsByCustomDimensions,), {})(dimensions=report["dimensions"], **stream_config)
for report in reports
for report in config.get("custom_reports", [])
]

@staticmethod
def get_stream_kwargs(config: Mapping[str, Any]) -> Mapping[str, Any]:
authorization = config.get("authorization", {})

stream_kwargs = {
"site_urls": config.get("site_urls"),
"start_date": config.get("start_date"),
"end_date": config.get("end_date") or pendulum.now().to_date_string(),
def get_stream_kwargs(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
return {
"site_urls": config["site_urls"],
"start_date": config["start_date"],
"end_date": config["end_date"],
"authenticator": self.get_authenticator(config),
}

auth_type = authorization.get("auth_type")
def get_authenticator(self, config):
authorization = config["authorization"]
auth_type = authorization["auth_type"]

if auth_type == "Client":
stream_kwargs["authenticator"] = Oauth2Authenticator(
return Oauth2Authenticator(
token_refresh_endpoint="https://oauth2.googleapis.com/token",
client_secret=authorization.get("client_secret"),
client_id=authorization.get("client_id"),
refresh_token=authorization.get("refresh_token"),
client_secret=authorization["client_secret"],
client_id=authorization["client_id"],
refresh_token=authorization["refresh_token"],
)
elif auth_type == "Service":
stream_kwargs["authenticator"] = ServiceAccountAuthenticator(
service_account_info=json.loads(authorization.get("service_account_info")), email=authorization.get("email")
return ServiceAccountAuthenticator(
service_account_info=authorization["service_account_info"],
email=authorization["email"],
)
else:
raise Exception(f"Invalid auth type: {auth_type}")

return stream_kwargs
Expand Up @@ -13,7 +13,7 @@
},
"title": "Website URL Property",
"description": "The URLs of the website property attached to your GSC account. Read more <a href=\"https://support.google.com/webmasters/answer/34592?hl=en\">here</a>.",
"examples": ["https://example1.com", "https://example2.com"],
"examples": ["https://example1.com/", "https://example2.com/"],
"order": 0
},
"start_date": {
Expand All @@ -29,7 +29,7 @@
"title": "End Date",
"description": "UTC date in the format 2017-01-25. Any data after this date will not be replicated. Must be greater or equal to the start date field.",
"examples": ["2021-12-12"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern": "^$|^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"order": 2
},
"authorization": {
Expand Down
Expand Up @@ -2,23 +2,33 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from copy import deepcopy

from pytest import fixture


@fixture(name="config")
def config_fixture(requests_mock):
url = "https://oauth2.googleapis.com/token"
requests_mock.post(url, json={"access_token": "token", "expires_in": 10})
config = {
"site_urls": ["https://example.com"],
"start_date": "start_date",
"end_date": "end_date",
return {
"site_urls": ["https://example.com/"],
"start_date": "2022-01-01",
"end_date": "2022-02-01",
"authorization": {
"auth_type": "Client",
"client_id": "client_id",
"client_secret": "client_secret",
"refresh_token": "refresh_token",
},
"custom_reports": '[{"name": "custom_dimensions", "dimensions": ["date", "country", "device"]}]',
}

return config

@fixture
def config_gen(config):
def inner(**kwargs):
new_config = deepcopy(config)
# WARNING, no support deep dictionaries
new_config.update(kwargs)
return {k: v for k, v in new_config.items() if v is not ...}

return inner
Expand Up @@ -7,9 +7,10 @@
from urllib.parse import quote_plus

import pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteConnectionStatus, Status, SyncMode
from source_google_search_console.source import SourceGoogleSearchConsole
from source_google_search_console.streams import ROW_LIMIT, GoogleSearchConsole, SearchAnalyticsByCustomDimensions, SearchAnalyticsByDate
from utils import command_check

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -135,34 +136,58 @@ def test_parse_response(stream_class, expected):
assert record == expected


def test_check_connection_ok(config, mocker, requests_mock):
url = "https://www.googleapis.com/webmasters/v3/sites/https%3A%2F%2Fexample.com"
requests_mock.get(url, json={})
requests_mock.get("https://www.googleapis.com/webmasters/v3/sites", json={"siteEntry": [{"siteUrl": "https://example.com"}]})
ok, error_msg = SourceGoogleSearchConsole().check_connection(logger, config=config)
def test_check_connection(config_gen, mocker, requests_mock):
requests_mock.get("https://www.googleapis.com/webmasters/v3/sites/https%3A%2F%2Fexample.com%2F", json={})
requests_mock.get("https://www.googleapis.com/webmasters/v3/sites", json={"siteEntry": [{"siteUrl": "https://example.com/"}]})
requests_mock.post("https://oauth2.googleapis.com/token", json={"access_token": "token", "expires_in": 10})

assert ok
assert not error_msg
source = SourceGoogleSearchConsole()

assert command_check(source, config_gen()) == AirbyteConnectionStatus(status=Status.SUCCEEDED)

def test_check_connection_invalid_config(config):
config.pop("start_date")
ok, error_msg = SourceGoogleSearchConsole().check_connection(logger, config=config)

assert not ok
assert error_msg

# test site_urls
assert command_check(source, config_gen(site_urls=["https://example.com"])) == AirbyteConnectionStatus(status=Status.SUCCEEDED)
assert command_check(source, config_gen(site_urls=["https://missed.com"])) == AirbyteConnectionStatus(
status=Status.FAILED, message="\"InvalidSiteURLValidationError('The following URLs are not permitted: https://missed.com/')\""
)

def test_check_connection_exception(config):
ok, error_msg = SourceGoogleSearchConsole().check_connection(logger, config=config)
# test start_date
with pytest.raises(Exception):
assert command_check(source, config_gen(start_date=...))
with pytest.raises(Exception):
assert command_check(source, config_gen(start_date=""))
with pytest.raises(Exception):
assert command_check(source, config_gen(start_date="start_date"))
assert command_check(source, config_gen(start_date="2022-99-99")) == AirbyteConnectionStatus(
status=Status.FAILED,
message="\"Unable to connect to Google Search Console API with the provided credentials - ParserError('Unable to parse string [2022-99-99]')\"",
)

assert not ok
assert error_msg
# test end_date
assert command_check(source, config_gen(end_date=...)) == AirbyteConnectionStatus(status=Status.SUCCEEDED)
assert command_check(source, config_gen(end_date="")) == AirbyteConnectionStatus(status=Status.SUCCEEDED)
with pytest.raises(Exception):
assert command_check(source, config_gen(end_date="end_date"))
assert command_check(source, config_gen(end_date="2022-99-99")) == AirbyteConnectionStatus(
status=Status.FAILED,
message="\"Unable to connect to Google Search Console API with the provided credentials - ParserError('Unable to parse string [2022-99-99]')\"",
)

# test custom_reports
assert command_check(source, config_gen(custom_reports="")) == AirbyteConnectionStatus(
status=Status.FAILED,
message="\"Unable to connect to Google Search Console API with the provided credentials - Exception('custom_reports is not valid JSON')\"",
)
assert command_check(source, config_gen(custom_reports="{}")) == AirbyteConnectionStatus(
status=Status.FAILED, message="'<ValidationError: \"{} is not of type \\'array\\'\">'"
)

def test_streams(config):
streams = SourceGoogleSearchConsole().streams(config)

def test_streams(config_gen):
source = SourceGoogleSearchConsole()
streams = source.streams(config_gen())
assert len(streams) == 9
streams = source.streams(config_gen(custom_reports=...))
assert len(streams) == 8


Expand Down
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from unittest import mock

from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config


def command_check(source: Source, config):
logger = mock.MagicMock()
connector_config, _ = split_config(config)
if source.check_config_against_spec:
source_spec: ConnectorSpecification = source.spec(logger)
check_config_against_spec_or_exit(connector_config, source_spec)
return source.check(logger, config)
1 change: 1 addition & 0 deletions docs/integrations/sources/google-search-console.md
Expand Up @@ -122,6 +122,7 @@ This connector attempts to back off gracefully when it hits Reports API's rate l

| Version | Date | Pull Request | Subject |
| :------- | :--------- | :------------------------------------------------------------------------------------------------------------ | :---------------------------------------------------------- |
| `0.1.17` | 2022-10-08 | [17751](https://github.com/airbytehq/airbyte/pull/17751) | Improved config validation: start_date, end_date, site_urls |
| `0.1.16` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state. |
| `0.1.15` | 2022-09-16 | [16819](https://github.com/airbytehq/airbyte/pull/16819) | Check available site urls to avoid 403 error on sync |
| `0.1.14` | 2022-09-08 | [16433](https://github.com/airbytehq/airbyte/pull/16433) | Add custom analytics stream. |
Expand Down