Skip to content

Commit

Permalink
🐛 Source Google Analytics: Unit and Acceptance tests (#11512)
Browse files Browse the repository at this point in the history
* #11477 source GA: fix stream slices and ubnormal state processing

* #11477 GA to beta: unit tests and SAT

* #11477 GA to beta: run black on master and merge into branch

* #11477 GA to beta: review fix

* Revert "#11477 GA to beta: run black on master and merge into branch"

This reverts commit b86fdc2.

* #11477 GA to beta: add changelog

* #11477 ga to beta: roll back to

* #11477 ga to beta: rm odd init

* #11477 ga to beta: rollback source definition

* #11477 ga to beta: upd source definition
  • Loading branch information
davydov-d committed Apr 4, 2022
1 parent 24d7ab1 commit 291a86e
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@
- name: Google Analytics
sourceDefinitionId: eff3616a-f9c3-11eb-9a03-0242ac130003
dockerRepository: airbyte/source-google-analytics-v4
dockerImageTag: 0.1.16
dockerImageTag: 0.1.17
documentationUrl: https://docs.airbyte.io/integrations/sources/google-analytics-v4
icon: google-analytics.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2754,7 +2754,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-google-analytics-v4:0.1.16"
- dockerImage: "airbyte/source-google-analytics-v4:0.1.17"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/google-analytics-v4"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ FROM python:3.7-slim
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

WORKDIR /airbyte/integration_code
COPY source_google_analytics_v4 ./source_google_analytics_v4
COPY main.py ./
COPY setup.py ./
RUN pip install .
COPY source_google_analytics_v4 ./source_google_analytics_v4
COPY main.py ./

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/source-google-analytics-v4
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests

# The 'future_state_path' field is commented out to skip the test `test_state_with_abnormally_large_values`
# as a temporary solution not to block publishing of new versions. The reason is
# When specifying future date in the state the current implementation of the connector produces records for [current_date, current_date] slice,
# and it makes SAT fail, because it should produce no records with the state with abnormally large values
connector_image: airbyte/source-google-analytics-v4:dev
tests:
spec:
Expand All @@ -27,7 +23,7 @@ tests:
incremental:
- config_path: "secrets/service_config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/service_config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
"daily_active_users": { "ga_date": "2050-05-01" },
"devices": { "ga_date": "2050-05-01" },
"users_per_day": { "ga_date": "2050-05-01" },
"sessions_per_country_day": { "ga_date": "2050-05-01" }
"new_users_per_day": {"ga_date": "2050-05-01"}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
"daily_active_users": { "ga_date": "2021-02-11" },
"devices": { "ga_date": "2021-02-11" },
"users_per_day": { "ga_date": "2021-02-11" },
"sessions_per_country_day": { "ga_date": "2021-02-11" }
"new_users_per_day": { "ga_date": "2021-02-11" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jwt
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
Expand Down Expand Up @@ -92,7 +93,6 @@ class GoogleAnalyticsV4Stream(HttpStream, ABC):

url_base = "https://analyticsreporting.googleapis.com/v4/"
report_field = "reports"
data_fields = ["data", "rows"]

map_type = dict(INTEGER="integer", FLOAT="number", PERCENT="number", TIME="number")

Expand Down Expand Up @@ -226,33 +226,29 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs: Any) -
...]
"""

today = pendulum.now().date()
start_date = pendulum.parse(self.start_date).date()
end_date = pendulum.now().date()

# Determine stream_state, if no stream_state we use start_date
if stream_state:
start_date = pendulum.parse(stream_state.get(self.cursor_field)).date()
prev_end_date = pendulum.parse(stream_state.get(self.cursor_field)).date()
start_date = prev_end_date.add(days=1)
end_date = today
if start_date > end_date:
return [None]

# use the lowest date between start_date and self.end_date, otherwise API fails if start_date is in future
start_date = min(start_date, end_date)
date_slices = []
while start_date <= end_date:
end_date_slice = start_date.add(days=self.window_in_days)
slice_start_date = start_date
while slice_start_date <= end_date:
slice_end_date = slice_start_date.add(days=self.window_in_days)
# limit the slice range with end_date
end_date_slice = min(end_date_slice, end_date)
date_slices.append({"startDate": self.to_datetime_str(start_date), "endDate": self.to_datetime_str(end_date_slice)})
# add 1 day for start next slice from next day and not duplicate data from previous slice end date.
start_date = end_date_slice.add(days=1)
return date_slices

# TODO: the method has to be updated for more logical and obvious
def get_data(self, data): # type: ignore[no-untyped-def]
for data_field in self.data_fields:
if data and isinstance(data, dict):
data = data.get(data_field, [])
else:
return []
return data
slice_end_date = min(slice_end_date, end_date)
date_slices.append({"startDate": self.to_datetime_str(slice_start_date), "endDate": self.to_datetime_str(slice_end_date)})
# start next slice 1 day after previous slice ended to prevent duplicate reads
slice_start_date = slice_end_date.add(days=1)
return date_slices or [None]

@staticmethod
def report_rows(report_body: MutableMapping[Any, Any]) -> List[MutableMapping[Any, Any]]:
return report_body.get("data", {}).get("rows", [])

def lookup_data_type(self, field_type: str, attribute: str) -> str:
"""
Expand All @@ -268,7 +264,7 @@ def lookup_data_type(self, field_type: str, attribute: str) -> str:
attr_type = self.dimensions_ref[attribute]
elif field_type == "metric":
# Custom Google Analytics Metrics {ga:goalXXStarts, ga:metricXX, ... }
# We always treat them as as strings as we can not be sure of their data type
# We always treat them as strings as we can not be sure of their data type
if attribute.startswith("ga:goal") and attribute.endswith(
("Starts", "Completions", "Value", "ConversionRate", "Abandons", "AbandonRate")
):
Expand All @@ -282,10 +278,10 @@ def lookup_data_type(self, field_type: str, attribute: str) -> str:
attr_type = self.metrics_ref[attribute]
else:
attr_type = None
self.logger.error(f"Unsuported GA type: {field_type}")
self.logger.error(f"Unsupported GA type: {field_type}")
except KeyError:
attr_type = None
self.logger.error(f"Unsuported GA {field_type}: {attribute}")
self.logger.error(f"Unsupported GA {field_type}: {attribute}")

return self.map_type.get(attr_type, "string")

Expand Down Expand Up @@ -387,7 +383,7 @@ def parse_response(self, response: requests.Response, **kwargs: Any) -> Iterable

self.check_for_sampled_result(report.get("data", {}))

for row in self.get_data(report):
for row in self.report_rows(report):
record = {}
dimensions = row.get("dimensions", [])
metrics = row.get("metrics", [])
Expand Down Expand Up @@ -421,11 +417,19 @@ class GoogleAnalyticsV4IncrementalObjectsBase(GoogleAnalyticsV4Stream):
cursor_field = "ga_date"

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Update the state value, default CDK method.
"""
return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))}

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
if not stream_slice:
return []
return super().read_records(sync_mode, cursor_field, stream_slice, stream_state)


class GoogleAnalyticsServiceOauth2Authenticator(Oauth2Authenticator):
"""Request example for API token extraction:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"reports": [
{
"columnHeader": {
"dimensions": ["ga: date"],
"dimensions": ["ga:date"],
"metricHeader": {
"metricHeaderEntries": [
{
"name": "ga: 14dayUsers",
"name": "ga:14dayUsers",
"type": "INTEGER"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"reports": [
{
"columnHeader": {
"dimensions": ["ga: date"],
"dimensions": ["ga:date"],
"metricHeader": {
"metricHeaderEntries": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"reports": [
{
"columnHeader": {
"dimensions": ["ga: date"],
"dimensions": ["ga:date"],
"metricHeader": {
"metricHeaderEntries": [
{
"name": "ga: 14dayUsers",
"name": "ga:14dayUsers",
"type": "INTEGER"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"reports": [
{
"columnHeader": {
"dimensions": ["ga: date"],
"dimensions": ["ga:date"],
"metricHeader": {
"metricHeaderEntries": [
{
"name": "ga: 14dayUsers",
"name": "ga:14dayUsers",
"type": "INTEGER"
}
]
Expand Down
Loading

0 comments on commit 291a86e

Please sign in to comment.