Skip to content

Commit

Permalink
Source Google Search Console: improve config validation - site_urls (#…
Browse files Browse the repository at this point in the history
…17751)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed Oct 11, 2022
1 parent 397f6fe commit 0da8d7b
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@
- name: Google Search Console
sourceDefinitionId: eb4c9e00-db83-4d63-a386-39cfa91012a8
dockerRepository: airbyte/source-google-search-console
dockerImageTag: 0.1.16
dockerImageTag: 0.1.17
documentationUrl: https://docs.airbyte.io/integrations/sources/google-search-console
icon: googlesearchconsole.svg
sourceType: api
Expand Down
8 changes: 4 additions & 4 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4217,7 +4217,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-google-search-console:0.1.16"
- dockerImage: "airbyte/source-google-search-console:0.1.17"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/google-search-console"
connectionSpecification:
Expand All @@ -4238,8 +4238,8 @@
\ Read more <a href=\"https://support.google.com/webmasters/answer/34592?hl=en\"\
>here</a>."
examples:
- "https://example1.com"
- "https://example2.com"
- "https://example1.com/"
- "https://example2.com/"
order: 0
start_date:
type: "string"
Expand All @@ -4257,7 +4257,7 @@
\ will not be replicated. Must be greater or equal to the start date field."
examples:
- "2021-12-12"
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
pattern: "^$|^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
order: 2
authorization:
type: "object"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/source-google-search-console
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

import json
from typing import Any, List, Mapping, Optional, Tuple
from urllib.parse import urlparse

import jsonschema
import pendulum
import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator
from jsonschema import validate
from source_google_search_console.exceptions import InvalidSiteURLValidationError
from source_google_search_console.service_account_authenticator import ServiceAccountAuthenticator
from source_google_search_console.streams import (
Expand Down Expand Up @@ -41,11 +42,42 @@


class SourceGoogleSearchConsole(AbstractSource):
@staticmethod
def normalize_url(url):
parse_result = urlparse(url)
if parse_result.path == "":
parse_result = parse_result._replace(path="/")
return parse_result.geturl()

def _validate_and_transform(self, config: Mapping[str, Any]):
authorization = config["authorization"]
if authorization["auth_type"] == "Service":
try:
authorization["service_account_info"] = json.loads(authorization["service_account_info"])
except ValueError:
raise Exception("authorization.service_account_info is not valid JSON")

if "custom_reports" in config:
try:
config["custom_reports"] = json.loads(config["custom_reports"])
except ValueError:
raise Exception("custom_reports is not valid JSON")
jsonschema.validate(config["custom_reports"], custom_reports_schema)

pendulum.parse(config["start_date"])
end_date = config.get("end_date")
if end_date:
pendulum.parse(end_date)
config["end_date"] = end_date or pendulum.now().to_date_string()

config["site_urls"] = [self.normalize_url(url) for url in config["site_urls"]]
return config

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
config = self._validate_and_transform(config)
stream_kwargs = self.get_stream_kwargs(config)
self.validate_site_urls(config, stream_kwargs)

self.validate_site_urls(config["site_urls"], stream_kwargs["authenticator"])
sites = Sites(**stream_kwargs)
stream_slice = sites.stream_slices(SyncMode.full_refresh)

Expand All @@ -56,7 +88,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
next(sites_gen)
return True, None

except InvalidSiteURLValidationError as e:
except (InvalidSiteURLValidationError, jsonschema.ValidationError) as e:
return False, repr(e)
except Exception as error:
return (
Expand All @@ -65,9 +97,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
)

@staticmethod
def validate_site_urls(config, stream_kwargs):
auth = stream_kwargs["authenticator"]

def validate_site_urls(site_urls, auth):
if isinstance(auth, ServiceAccountAuthenticator):
request = auth(requests.Request(method="GET", url="https://www.googleapis.com/webmasters/v3/sites"))
with requests.Session() as s:
Expand All @@ -76,18 +106,16 @@ def validate_site_urls(config, stream_kwargs):
response = requests.get("https://www.googleapis.com/webmasters/v3/sites", headers=auth.get_auth_header())
response_data = response.json()

site_urls = set([s["siteUrl"] for s in response_data["siteEntry"]])
provided_by_client = set(config["site_urls"])

invalid_site_url = provided_by_client - site_urls
remote_site_urls = {s["siteUrl"] for s in response_data["siteEntry"]}
invalid_site_url = set(site_urls) - remote_site_urls
if invalid_site_url:
raise InvalidSiteURLValidationError(f'The following URLs are not permitted: {", ".join(invalid_site_url)}')

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""

config = self._validate_and_transform(config)
stream_config = self.get_stream_kwargs(config)

streams = [
Expand All @@ -106,40 +134,32 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return streams

def get_custom_reports(self, config: Mapping[str, Any], stream_config: Mapping[str, Any]) -> List[Optional[Stream]]:
if "custom_reports" not in config:
return []

reports = json.loads(config["custom_reports"])
validate(reports, custom_reports_schema)

return [
type(report["name"], (SearchAnalyticsByCustomDimensions,), {})(dimensions=report["dimensions"], **stream_config)
for report in reports
for report in config.get("custom_reports", [])
]

@staticmethod
def get_stream_kwargs(config: Mapping[str, Any]) -> Mapping[str, Any]:
authorization = config.get("authorization", {})

stream_kwargs = {
"site_urls": config.get("site_urls"),
"start_date": config.get("start_date"),
"end_date": config.get("end_date") or pendulum.now().to_date_string(),
def get_stream_kwargs(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
return {
"site_urls": config["site_urls"],
"start_date": config["start_date"],
"end_date": config["end_date"],
"authenticator": self.get_authenticator(config),
}

auth_type = authorization.get("auth_type")
def get_authenticator(self, config):
authorization = config["authorization"]
auth_type = authorization["auth_type"]

if auth_type == "Client":
stream_kwargs["authenticator"] = Oauth2Authenticator(
return Oauth2Authenticator(
token_refresh_endpoint="https://oauth2.googleapis.com/token",
client_secret=authorization.get("client_secret"),
client_id=authorization.get("client_id"),
refresh_token=authorization.get("refresh_token"),
client_secret=authorization["client_secret"],
client_id=authorization["client_id"],
refresh_token=authorization["refresh_token"],
)
elif auth_type == "Service":
stream_kwargs["authenticator"] = ServiceAccountAuthenticator(
service_account_info=json.loads(authorization.get("service_account_info")), email=authorization.get("email")
return ServiceAccountAuthenticator(
service_account_info=authorization["service_account_info"],
email=authorization["email"],
)
else:
raise Exception(f"Invalid auth type: {auth_type}")

return stream_kwargs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
},
"title": "Website URL Property",
"description": "The URLs of the website property attached to your GSC account. Read more <a href=\"https://support.google.com/webmasters/answer/34592?hl=en\">here</a>.",
"examples": ["https://example1.com", "https://example2.com"],
"examples": ["https://example1.com/", "https://example2.com/"],
"order": 0
},
"start_date": {
Expand All @@ -29,7 +29,7 @@
"title": "End Date",
"description": "UTC date in the format 2017-01-25. Any data after this date will not be replicated. Must be greater or equal to the start date field.",
"examples": ["2021-12-12"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern": "^$|^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"order": 2
},
"authorization": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,33 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from copy import deepcopy

from pytest import fixture


@fixture(name="config")
def config_fixture(requests_mock):
url = "https://oauth2.googleapis.com/token"
requests_mock.post(url, json={"access_token": "token", "expires_in": 10})
config = {
"site_urls": ["https://example.com"],
"start_date": "start_date",
"end_date": "end_date",
return {
"site_urls": ["https://example.com/"],
"start_date": "2022-01-01",
"end_date": "2022-02-01",
"authorization": {
"auth_type": "Client",
"client_id": "client_id",
"client_secret": "client_secret",
"refresh_token": "refresh_token",
},
"custom_reports": '[{"name": "custom_dimensions", "dimensions": ["date", "country", "device"]}]',
}

return config

@fixture
def config_gen(config):
def inner(**kwargs):
new_config = deepcopy(config)
# WARNING, no support deep dictionaries
new_config.update(kwargs)
return {k: v for k, v in new_config.items() if v is not ...}

return inner
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from urllib.parse import quote_plus

import pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteConnectionStatus, Status, SyncMode
from source_google_search_console.source import SourceGoogleSearchConsole
from source_google_search_console.streams import ROW_LIMIT, GoogleSearchConsole, SearchAnalyticsByCustomDimensions, SearchAnalyticsByDate
from utils import command_check

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -135,34 +136,58 @@ def test_parse_response(stream_class, expected):
assert record == expected


def test_check_connection_ok(config, mocker, requests_mock):
url = "https://www.googleapis.com/webmasters/v3/sites/https%3A%2F%2Fexample.com"
requests_mock.get(url, json={})
requests_mock.get("https://www.googleapis.com/webmasters/v3/sites", json={"siteEntry": [{"siteUrl": "https://example.com"}]})
ok, error_msg = SourceGoogleSearchConsole().check_connection(logger, config=config)
def test_check_connection(config_gen, mocker, requests_mock):
requests_mock.get("https://www.googleapis.com/webmasters/v3/sites/https%3A%2F%2Fexample.com%2F", json={})
requests_mock.get("https://www.googleapis.com/webmasters/v3/sites", json={"siteEntry": [{"siteUrl": "https://example.com/"}]})
requests_mock.post("https://oauth2.googleapis.com/token", json={"access_token": "token", "expires_in": 10})

assert ok
assert not error_msg
source = SourceGoogleSearchConsole()

assert command_check(source, config_gen()) == AirbyteConnectionStatus(status=Status.SUCCEEDED)

def test_check_connection_invalid_config(config):
config.pop("start_date")
ok, error_msg = SourceGoogleSearchConsole().check_connection(logger, config=config)

assert not ok
assert error_msg

# test site_urls
assert command_check(source, config_gen(site_urls=["https://example.com"])) == AirbyteConnectionStatus(status=Status.SUCCEEDED)
assert command_check(source, config_gen(site_urls=["https://missed.com"])) == AirbyteConnectionStatus(
status=Status.FAILED, message="\"InvalidSiteURLValidationError('The following URLs are not permitted: https://missed.com/')\""
)

def test_check_connection_exception(config):
ok, error_msg = SourceGoogleSearchConsole().check_connection(logger, config=config)
# test start_date
with pytest.raises(Exception):
assert command_check(source, config_gen(start_date=...))
with pytest.raises(Exception):
assert command_check(source, config_gen(start_date=""))
with pytest.raises(Exception):
assert command_check(source, config_gen(start_date="start_date"))
assert command_check(source, config_gen(start_date="2022-99-99")) == AirbyteConnectionStatus(
status=Status.FAILED,
message="\"Unable to connect to Google Search Console API with the provided credentials - ParserError('Unable to parse string [2022-99-99]')\"",
)

assert not ok
assert error_msg
# test end_date
assert command_check(source, config_gen(end_date=...)) == AirbyteConnectionStatus(status=Status.SUCCEEDED)
assert command_check(source, config_gen(end_date="")) == AirbyteConnectionStatus(status=Status.SUCCEEDED)
with pytest.raises(Exception):
assert command_check(source, config_gen(end_date="end_date"))
assert command_check(source, config_gen(end_date="2022-99-99")) == AirbyteConnectionStatus(
status=Status.FAILED,
message="\"Unable to connect to Google Search Console API with the provided credentials - ParserError('Unable to parse string [2022-99-99]')\"",
)

# test custom_reports
assert command_check(source, config_gen(custom_reports="")) == AirbyteConnectionStatus(
status=Status.FAILED,
message="\"Unable to connect to Google Search Console API with the provided credentials - Exception('custom_reports is not valid JSON')\"",
)
assert command_check(source, config_gen(custom_reports="{}")) == AirbyteConnectionStatus(
status=Status.FAILED, message="'<ValidationError: \"{} is not of type \\'array\\'\">'"
)

def test_streams(config):
streams = SourceGoogleSearchConsole().streams(config)

def test_streams(config_gen):
source = SourceGoogleSearchConsole()
streams = source.streams(config_gen())
assert len(streams) == 9
streams = source.streams(config_gen(custom_reports=...))
assert len(streams) == 8


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from unittest import mock

from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config


def command_check(source: Source, config):
logger = mock.MagicMock()
connector_config, _ = split_config(config)
if source.check_config_against_spec:
source_spec: ConnectorSpecification = source.spec(logger)
check_config_against_spec_or_exit(connector_config, source_spec)
return source.check(logger, config)
1 change: 1 addition & 0 deletions docs/integrations/sources/google-search-console.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ This connector attempts to back off gracefully when it hits Reports API's rate l

| Version | Date | Pull Request | Subject |
| :------- | :--------- | :------------------------------------------------------------------------------------------------------------ | :---------------------------------------------------------- |
| `0.1.17` | 2022-10-08 | [17751](https://github.com/airbytehq/airbyte/pull/17751) | Improved config validation: start_date, end_date, site_urls |
| `0.1.16` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state. |
| `0.1.15` | 2022-09-16 | [16819](https://github.com/airbytehq/airbyte/pull/16819) | Check available site urls to avoid 403 error on sync |
| `0.1.14` | 2022-09-08 | [16433](https://github.com/airbytehq/airbyte/pull/16433) | Add custom analytics stream. |
Expand Down

0 comments on commit 0da8d7b

Please sign in to comment.