Skip to content

Commit

Permalink
šŸ› Source Google Analytics Data API: Add CohortSpec to custom reportā€¦
Browse files Browse the repository at this point in the history
ā€¦ in specification (#33802)
  • Loading branch information
artem1205 committed Jan 4, 2024
1 parent 2533008 commit ce84d11
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ acceptance_tests:
tests:
- spec_path: "source_google_analytics_data_api/spec.json"
backward_compatibility_tests_config:
# changed the structure of `custom_reports`
# from `json string` to `list[reports]`
disable_for_version: 1.5.1
# changed the structure of `custom_reports` -> `cohortSpec`
disable_for_version: 2.1.0
connection:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

from airbyte_cdk.entrypoint import launch
from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi
from source_google_analytics_data_api.config_migrations import MigrateCustomReports, MigratePropertyID
from source_google_analytics_data_api.config_migrations import MigrateCustomReports, MigrateCustomReportsCohortSpec, MigratePropertyID

if __name__ == "__main__":
source = SourceGoogleAnalyticsDataApi()
MigratePropertyID.migrate(sys.argv[1:], source)
MigrateCustomReports.migrate(sys.argv[1:], source)
MigrateCustomReportsCohortSpec.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ data:
- www.googleapis.com
- analyticsdata.googleapis.com
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: api
connectorType: source
definitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
dockerImageTag: 2.0.3
dockerImageTag: 2.1.0
dockerRepository: airbyte/source-google-analytics-data-api
documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-data-api
githubIssueLabel: source-google-analytics-data-api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from typing import Any, List, Mapping

import dpath.util
from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
Expand Down Expand Up @@ -33,9 +34,9 @@ class MigratePropertyID:
@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config require migration.
This method determines whether config requires migration.
Returns:
> True, if the transformation is neccessary
> True, if the transformation is necessary
> False, otherwise.
"""
if cls.migrate_from_key in config:
Expand Down Expand Up @@ -72,7 +73,7 @@ def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
def migrate(cls, args: List[str], source: SourceGoogleAnalyticsDataApi) -> None:
"""
This method checks the input args, should the config be migrated,
transform if neccessary and emit the CONTROL message.
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
Expand Down Expand Up @@ -104,7 +105,7 @@ class MigrateCustomReports:
@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether or not the config should be migrated to have the new structure for the `custom_reports`,
This method determines whether the config should be migrated to have the new structure for the `custom_reports`,
based on the source spec.
Returns:
> True, if the transformation is necessary
Expand All @@ -126,7 +127,7 @@ def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
def _transform_to_array(cls, config: Mapping[str, Any], source: SourceGoogleAnalyticsDataApi = None) -> Mapping[str, Any]:
# assign old values to new property that will be used within the new version
config[cls.migrate_to_key] = config[cls.migrate_from_key]
# transfom `json_str` to `list` of objects
# transform `json_str` to `list` of objects
return source._validate_custom_reports(config)

@classmethod
Expand All @@ -150,7 +151,77 @@ def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
def migrate(cls, args: List[str], source: SourceGoogleAnalyticsDataApi) -> None:
"""
This method checks the input args, should the config be migrated,
transform if neccessary and emit the CONTROL message.
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)


class MigrateCustomReportsCohortSpec:
"""
This class stands for migrating the config at runtime,
Specifically, starting from `2.1.0`; the `cohortSpec` property will be added tp `custom_reports_array` with flag `enabled`:
> List([{name: my_report, "cohortSpec": { "enabled": "true" } }, ...])
"""

message_repository: MessageRepository = InMemoryMessageRepository()

@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether the config should be migrated to have the new structure for the `cohortSpec` inside `custom_reports`,
based on the source spec.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""

return not dpath.util.search(config, "custom_reports_array/**/cohortSpec/enabled")

@classmethod
def _transform_custom_reports_cohort_spec(
cls,
config: Mapping[str, Any],
) -> Mapping[str, Any]:
"""Assign `enabled` property that will be used within the new version"""
for report in config.get("custom_reports_array", []):
if report.get("cohortSpec"):
report["cohortSpec"]["enabled"] = "true"
else:
report.setdefault("cohortSpec", {})["enabled"] = "false"
return config

@classmethod
def _modify_and_save(cls, config_path: str, source: SourceGoogleAnalyticsDataApi, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_custom_reports_cohort_spec(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository.consume_queue():
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: SourceGoogleAnalyticsDataApi) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def instantiate_report_streams(self, report: dict, config: Mapping[str, Any], **
def instantiate_report_class(
report: dict, add_name_suffix: bool, config: Mapping[str, Any], **extra_kwargs
) -> GoogleAnalyticsDataApiBaseStream:
cohort_spec = report.get("cohortSpec")
cohort_spec = report.get("cohortSpec", {})
pivots = report.get("pivots")
stream_config = {
**config,
Expand All @@ -558,7 +558,7 @@ def instantiate_report_class(
if pivots:
stream_config["pivots"] = pivots
report_class_tuple = (PivotReport,)
if cohort_spec:
if cohort_spec.pop("enabled", "") == "true":
stream_config["cohort_spec"] = cohort_spec
report_class_tuple = (CohortReportMixin, *report_class_tuple)
name = report["name"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,132 @@
"required": ["field_name", "filter"]
}
]
},
"cohortSpec": {
"title": "Cohort Reports",
"description": "Cohort reports creates a time series of user retention for the cohort.",
"type": "object",
"order": 5,
"oneOf": [
{
"title": "Disabled",
"type": "object",
"properties": {
"enabled": {
"type": "string",
"const": "false"
}
}
},
{
"title": "Enabled",
"type": "object",
"properties": {
"enabled": {
"type": "string",
"const": "true"
},
"cohorts": {
"name": "Cohorts",
"order": 0,
"type": "array",
"always_show": true,
"items": {
"title": "Cohorts",
"type": "object",
"required": ["dimension", "dateRange"],
"properties": {
"name": {
"title": "Name",
"type": "string",
"always_show": true,
"pattern": "^(?!(cohort_|RESERVED_)).*$",
"description": "Assigns a name to this cohort. If not set, cohorts are named by their zero based index cohort_0, cohort_1, etc.",
"order": 0
},
"dimension": {
"title": "Dimension",
"description": "Dimension used by the cohort. Required and only supports `firstSessionDate`",
"type": "string",
"enum": ["firstSessionDate"],
"order": 1
},
"dateRange": {
"type": "object",
"required": ["startDate", "endDate"],
"properties": {
"startDate": {
"title": "Start Date",
"type": "string",
"format": "date",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern_descriptor": "YYYY-MM-DD",
"examples": ["2021-01-01"],
"order": 2
},
"endDate": {
"title": "End Date",
"type": "string",
"format": "date",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern_descriptor": "YYYY-MM-DD",
"examples": ["2021-01-01"],
"order": 3
}
}
}
}
}
},
"cohortsRange": {
"type": "object",
"order": 1,
"required": ["granularity", "endOffset"],
"properties": {
"granularity": {
"title": "Granularity",
"description": "The granularity used to interpret the startOffset and endOffset for the extended reporting date range for a cohort report.",
"type": "string",
"enum": [
"GRANULARITY_UNSPECIFIED",
"DAILY",
"WEEKLY",
"MONTHLY"
],
"order": 0
},
"startOffset": {
"title": "Start Offset",
"description": "Specifies the start date of the extended reporting date range for a cohort report.",
"type": "integer",
"minimum": 0,
"order": 1
},
"endOffset": {
"title": "End Offset",
"description": "Specifies the end date of the extended reporting date range for a cohort report.",
"type": "integer",
"minimum": 0,
"order": 2
}
}
},
"cohortReportSettings": {
"type": "object",
"title": "Cohort Report Settings",
"description": "Optional settings for a cohort report.",
"properties": {
"accumulate": {
"always_show": true,
"title": "Accumulate",
"description": "If true, accumulates the result from first touch day to the end day",
"type": "boolean"
}
}
}
}
}
]
}
},
"required": ["name", "dimensions", "metrics"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"credentials": {
"auth_type": "Service",
"credentials_json": ""
},
"date_ranges_start_date": "2023-09-01",
"window_in_days": 30,
"property_ids": "314186564",
"custom_reports_array": [
{
"name": "cohort_report",
"dimensions": ["cohort", "cohortNthDay"],
"metrics": ["cohortActiveUsers"],
"cohortSpec": {
"cohorts": [
{
"dimension": "firstSessionDate",
"dateRange": {
"startDate": "2023-04-24",
"endDate": "2023-04-24"
}
}
],
"cohortsRange": {
"endOffset": 100,
"granularity": "DAILY"
},
"cohortReportSettings": {
"accumulate": false
}
}
},
{
"name": "pivot_report",
"dateRanges": [
{
"startDate": "2020-09-01",
"endDate": "2020-09-15"
}
],
"dimensions": ["browser", "country", "language"],
"metrics": ["sessions"],
"pivots": [
{
"fieldNames": ["browser"],
"limit": 5
},
{
"fieldNames": ["country"],
"limit": 250
},
{
"fieldNames": ["language"],
"limit": 15
}
]
}
]
}
Loading

0 comments on commit ce84d11

Please sign in to comment.