Skip to content

Commit

Permalink
Source Google Analytics Data API: slicer updated, unit tests added (#…
Browse files Browse the repository at this point in the history
…21169)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed Jan 11, 2023
1 parent e571b2b commit 59ff2a2
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,11 +651,11 @@
- name: Google Analytics 4 (GA4)
sourceDefinitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
dockerRepository: airbyte/source-google-analytics-data-api
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-v4
icon: google-analytics.svg
sourceType: api
releaseStage: alpha
releaseStage: beta
- name: Google Directory
sourceDefinitionId: d19ae824-e289-4b14-995a-0632eb46d246
dockerRepository: airbyte/source-google-directory
Expand Down
18 changes: 9 additions & 9 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5383,7 +5383,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-google-analytics-data-api:0.1.0"
- dockerImage: "airbyte/source-google-analytics-data-api:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-analytics-v4"
connectionSpecification:
Expand All @@ -5395,12 +5395,6 @@
- "date_ranges_start_date"
additionalProperties: true
properties:
property_id:
type: "string"
title: "Property ID"
description: "A Google Analytics GA4 property identifier whose events are\
\ tracked. Specified in the URL path and not the body"
order: 1
credentials:
order: 0
type: "object"
Expand All @@ -5422,7 +5416,6 @@
title: "Client ID"
type: "string"
description: "The Client ID of your Google Analytics developer application."
airbyte_secret: true
order: 1
client_secret:
title: "Client Secret"
Expand Down Expand Up @@ -5460,6 +5453,13 @@
- "{ \"type\": \"service_account\", \"project_id\": YOUR_PROJECT_ID,\
\ \"private_key_id\": YOUR_PRIVATE_KEY, ... }"
airbyte_secret: true
order: 1
property_id:
type: "string"
title: "Property ID"
description: "A Google Analytics GA4 property identifier whose events are\
\ tracked. Specified in the URL path and not the body"
order: 1
date_ranges_start_date:
type: "string"
title: "Start Date"
Expand All @@ -5473,7 +5473,7 @@
type: "string"
title: "Custom Reports"
description: "A JSON array describing the custom reports you want to sync\
\ from Google Analytics. See <a href=\"https://docs.airbyte.com/integrations/sources/google-analytics-v4#data-processing-latency\"\
\ from Google Analytics. See <a href=\"https://docs.airbyte.com/integrations/sources/google-analytics-v4/#custom-reports\"\
>the docs</a> for more information about the exact format you can use\
\ to fill out this field."
window_in_days:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-google-analytics-data-api
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-analytics-data==0.11.2", "PyJWT==2.4.0", "cryptography==37.0.4", "requests==2.28.1"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.16", "PyJWT==2.4.0", "cryptography==37.0.4", "requests==2.28.1"]

TEST_REQUIREMENTS = [
"freezegun",
"pytest~=6.1",
"pytest-mock~=3.6.1",
"requests-mock~=1.9",
"source-acceptance-test",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,15 @@ def stream_slices(
else:
start_date = self.config["date_ranges_start_date"]

timedelta: int = self.config["window_in_days"]

while start_date <= today:
end_date: datetime.date = start_date + datetime.timedelta(days=timedelta)
if timedelta > 1 and end_date > today:
end_date: datetime.date = start_date + datetime.timedelta(days=timedelta - (end_date - today).days)

if self._stop_iteration:
return

yield {"startDate": utils.date_to_string(start_date), "endDate": utils.date_to_string(end_date)}

start_date: datetime.date = end_date + datetime.timedelta(days=1)
yield {
"startDate": utils.date_to_string(start_date),
"endDate": utils.date_to_string(min(start_date + datetime.timedelta(days=self.config["window_in_days"] - 1), today)),
}
start_date += datetime.timedelta(days=self.config["window_in_days"])


class GoogleAnalyticsDataApiMetadataStream(GoogleAnalyticsDataApiAbstractStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@
"required": ["property_id", "date_ranges_start_date"],
"additionalProperties": true,
"properties": {
"property_id": {
"type": "string",
"title": "Property ID",
"description": "A Google Analytics GA4 property identifier whose events are tracked. Specified in the URL path and not the body",
"order": 1
},
"credentials": {
"order": 0,
"type": "object",
Expand All @@ -33,7 +27,6 @@
"title": "Client ID",
"type": "string",
"description": "The Client ID of your Google Analytics developer application.",
"airbyte_secret": true,
"order": 1
},
"client_secret": {
Expand Down Expand Up @@ -76,12 +69,19 @@
"examples": [
"{ \"type\": \"service_account\", \"project_id\": YOUR_PROJECT_ID, \"private_key_id\": YOUR_PRIVATE_KEY, ... }"
],
"airbyte_secret": true
"airbyte_secret": true,
"order": 1
}
}
}
]
},
"property_id": {
"type": "string",
"title": "Property ID",
"description": "A Google Analytics GA4 property identifier whose events are tracked. Specified in the URL path and not the body",
"order": 1
},
"date_ranges_start_date": {
"type": "string",
"title": "Start Date",
Expand All @@ -93,7 +93,7 @@
"order": 3,
"type": "string",
"title": "Custom Reports",
"description": "A JSON array describing the custom reports you want to sync from Google Analytics. See <a href=\"https://docs.airbyte.com/integrations/sources/google-analytics-v4#data-processing-latency\">the docs</a> for more information about the exact format you can use to fill out this field."
"description": "A JSON array describing the custom reports you want to sync from Google Analytics. See <a href=\"https://docs.airbyte.com/integrations/sources/google-analytics-v4/#custom-reports\">the docs</a> for more information about the exact format you can use to fill out this field."
},
"window_in_days": {
"type": "integer",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import requests
from freezegun import freeze_time
from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator


@freeze_time("2023-01-01 00:00:00")
def test_token_rotation(requests_mock):
credentials = {
"client_email": "client_email",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEA3slcXL+dA36ESmOi\n1xBhZmp5Hn0WkaHDtW4naba3plva0ibloBNWhFhjQOh7Ff01PVjhT4D5jgqXBIgc\nz9Gv3QIDAQABAkEArlhYPoD5SB2/O1PjwHgiMPrL1C9B9S/pr1cH4vPJnpY3VKE3\n5hvdil14YwRrcbmIxMkK2iRLi9lM4mJmdWPy4QIhAPsRFXZSGx0TZsDxD9V0ZJmZ\n0AuDCj/NF1xB5KPLmp7pAiEA4yoFox6w7ql/a1pUVaLt0NJkDfE+22pxYGNQaiXU\nuNUCIQCsFLaIJZiN4jlgbxlyLVeya9lLuqIwvqqPQl6q4ad12QIgS9gG48xmdHig\n8z3IdIMedZ8ZCtKmEun6Cp1+BsK0wDUCIF0nHfSuU+eTQ2qAON2SHIrJf8UeFO7N\nzdTN1IwwQqjI\n-----END PRIVATE KEY-----\n",
"client_id": "client_id"
}
authenticator = GoogleServiceKeyAuthenticator(credentials)

auth_request = requests_mock.register_uri(
"POST",
authenticator._google_oauth2_token_endpoint,
json={"access_token": "bearer_token", "expires_in": 3600}
)

authenticated_request = authenticator(requests.Request())
assert auth_request.call_count == 1
assert auth_request.last_request.qs.get("assertion") == ['eyj0exaioijkv1qilcjhbgcioijsuzi1niisimtpzci6imnsawvudf9pzcj9.eyjpc3mioijjbgllbnrfzw1hawwilcjzy29wzsi6imh0dhbzoi8vd3d3lmdvb2dszwfwaxmuy29tl2f1dggvyw5hbhl0awnzlnjlywrvbmx5iiwiyxvkijoiahr0chm6ly9vyxv0adiuz29vz2xlyxbpcy5jb20vdg9rzw4ilcjlehaioje2nzi1mzq4mdasimlhdci6mty3mjuzmtiwmh0.u1gpfmncrtlsy_ujxpc2iazpvdzb6eq4mobq3xez5v6gqtj0xgou__c6neu9d7qvb8h0jkynggsfibkoci_g7a']
assert auth_request.last_request.qs.get("grant_type") == ["urn:ietf:params:oauth:grant-type:jwt-bearer"]
assert authenticator._token.get("expires_at") == 1672534800
assert authenticated_request.headers.get("Authorization") == "Bearer bearer_token"
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
from unittest.mock import MagicMock

import pytest
from freezegun import freeze_time
from source_google_analytics_data_api.source import GoogleAnalyticsDataApiBaseStream

from .utils import read_incremental

json_credentials = """
{
"type": "service_account",
Expand Down Expand Up @@ -271,3 +274,118 @@ def test_backoff_time(patch_base_class):
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
expected_backoff_time = None
assert stream.backoff_time(response_mock) == expected_backoff_time


@freeze_time("2023-01-01 00:00:00")
def test_stream_slices():
config = {"date_ranges_start_date": datetime.date(2022, 12, 29), "window_in_days": 1}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
slices = list(stream.stream_slices(sync_mode=None))
assert slices == [
{"startDate": "2022-12-29", "endDate": "2022-12-29"},
{"startDate": "2022-12-30", "endDate": "2022-12-30"},
{"startDate": "2022-12-31", "endDate": "2022-12-31"},
{"startDate": "2023-01-01", "endDate": "2023-01-01"},
]

config = {"date_ranges_start_date": datetime.date(2022, 12, 28), "window_in_days": 2}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
slices = list(stream.stream_slices(sync_mode=None))
assert slices == [
{"startDate": "2022-12-28", "endDate": "2022-12-29"},
{"startDate": "2022-12-30", "endDate": "2022-12-31"},
{"startDate": "2023-01-01", "endDate": "2023-01-01"},
]

config = {"date_ranges_start_date": datetime.date(2022, 12, 20), "window_in_days": 5}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
slices = list(stream.stream_slices(sync_mode=None))
assert slices == [
{"startDate": "2022-12-20", "endDate": "2022-12-24"},
{"startDate": "2022-12-25", "endDate": "2022-12-29"},
{"startDate": "2022-12-30", "endDate": "2023-01-01"},
]


def test_read_incremental(requests_mock):
config = {
"property_id": 123,
"date_ranges_start_date": datetime.date(2022, 12, 29),
"window_in_days": 1,
"dimensions": ["date"],
"metrics": ["totalUsers"],
}

stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
stream_state = {}

responses = [
{
"dimensionHeaders": [{"name": "date"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "20221229"}], "metricValues": [{"value": "100"}]}],
"rowCount": 1
},
{
"dimensionHeaders": [{"name": "date"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "20221230"}], "metricValues": [{"value": "110"}]}],
"rowCount": 1
},
{
"dimensionHeaders": [{"name": "date"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "20221231"}], "metricValues": [{"value": "120"}]}],
"rowCount": 1
},
{
"dimensionHeaders": [{"name": "date"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "20230101"}], "metricValues": [{"value": "130"}]}],
"rowCount": 1
},
{
"dimensionHeaders": [{"name": "date"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "20230101"}], "metricValues": [{"value": "140"}]}],
"rowCount": 1
},
{
"dimensionHeaders": [{"name": "date"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "20230102"}], "metricValues": [{"value": "150"}]}],
"rowCount": 1
}
]

requests_mock.register_uri(
"POST",
"https://analyticsdata.googleapis.com/v1beta/properties/123:runReport",
json=lambda request, context: responses.pop(0),
)

with freeze_time("2023-01-01 12:00:00"):
records = list(read_incremental(stream, stream_state))

for record in records:
del record["uuid"]

assert records == [
{"date": "20221229", "totalUsers": 100, "property_id": 123},
{"date": "20221230", "totalUsers": 110, "property_id": 123},
{"date": "20221231", "totalUsers": 120, "property_id": 123},
{"date": "20230101", "totalUsers": 130, "property_id": 123},
]

assert stream_state == {"date": "20230101"}

with freeze_time("2023-01-02 12:00:00"):
records = list(read_incremental(stream, stream_state))

for record in records:
del record["uuid"]

assert records == [
{"date": "20230101", "totalUsers": 140, "property_id": 123},
{"date": "20230102", "totalUsers": 150, "property_id": 123},
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, MutableMapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream


def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]):
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)
for _slice in slices:
records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=_slice, stream_state=stream_state)
for record in records:
stream_state = stream_instance.get_updated_state(stream_state, record)
yield record
Loading

0 comments on commit 59ff2a2

Please sign in to comment.