Skip to content

Commit

Permalink
🐛 Source Facebook Marketing: Fix error during transforming state (#35467
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tolik0 committed Feb 21, 2024
1 parent 198971b commit 6dbbcb0
Show file tree
Hide file tree
Showing 16 changed files with 322 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@
"type": "STREAM",
"stream": {
"stream_state": {
"updated_time": "2121-07-25T13:34:26Z",
"filter_statuses": ["ARCHIVED"]
"212551616838260": {
"updated_time": "2121-07-25T13:34:26Z",
"filter_statuses": ["ARCHIVED"],
"include_deleted": true
},
"include_deleted": true
},
"stream_descriptor": {
"name": "ads"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.4.0
dockerImageTag: 1.4.1
dockerRepository: airbyte/source-facebook-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
githubIssueLabel: source-facebook-marketing
Expand All @@ -23,7 +23,6 @@ data:
packageName: airbyte-source-facebook-marketing
registries:
cloud:
dockerImageTag: 1.3.2
enabled: true
oss:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.4.0"
version = "1.4.1"
name = "source-facebook-marketing"
description = "Source implementation for Facebook Marketing."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from source_facebook_marketing.spec import ValidAdSetStatuses, ValidAdStatuses, ValidCampaignStatuses

logger = logging.getLogger("airbyte_logger")

Expand Down Expand Up @@ -80,3 +81,43 @@ def migrate(cls, args: List[str], source: Source) -> None:
cls.emit_control_message(
cls.modify_and_save(config_path, source, config),
)


class MigrateIncludeDeletedToStatusFilters(MigrateAccountIdToArray):
"""
This class stands for migrating the config at runtime.
This migration is backwards compatible with the previous version, as new property will be created.
When falling back to the previous source version connector will use old property `include_deleted`.
Starting from `1.4.0`, the `include_deleted` property is replaced with `ad_statuses`,
`ad_statuses` and `campaign_statuses` which represent status filters.
"""

migrate_from_key: str = "include_deleted"
migrate_to_key: str = "ad_statuses"
stream_filter_to_statuses: Mapping[str, List[str]] = {
"ad_statuses": [status.value for status in ValidAdStatuses],
"adset_statuses": [status.value for status in ValidAdSetStatuses],
"campaign_statuses": [status.value for status in ValidCampaignStatuses],
}

@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether the config should be migrated to have the new property for filters.
Returns:
> True, if the transformation is necessary
> False, otherwise.
> Raises the Exception if the structure could not be migrated.
"""
config_is_updated = config.get(cls.migrate_to_key)
no_include_deleted = not config.get(cls.migrate_from_key)
return False if config_is_updated or no_include_deleted else True

@classmethod
def transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
# transform the config
for stream_filter, statuses in cls.stream_filter_to_statuses.items():
config[stream_filter] = statuses
# return transformed config
return config
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ def _transform_state_from_one_account_format(self, state: Mapping[str, Any], mov
return {}

def _transform_state_from_old_deleted_format(self, state: Mapping[str, Any]):
if all("filter_statuses" in account_state for account_state in state.values()):
# state is already in the new format
return state

# transform from the old format with `include_deleted`
for account_id in self._account_ids:
account_state = state.get(account_id, {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,27 +164,29 @@ def test_stream_slices_single_account_empty_state(self, incremental_class_instan
[
# Test case 1: State date is used because fewer filters are used
(
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}},
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}, "include_deleted": True},
{"account_id": "123", "date": "2021-01-20T00:00:00+00:00"},
{
"123": {
"date": "2021-01-30T00:00:00+00:00",
"filter_statuses": ["ACTIVE"],
"include_deleted": True,
}
},
"include_deleted": True,
},
["ACTIVE"],
),
# Test case 2: State date is used because filter_statuses is the same as include_deleted
(
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}},
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}, "include_deleted": True},
{"account_id": "123", "date": "2021-01-20T00:00:00+00:00"},
{
"123": {
"date": "2021-01-30T00:00:00+00:00",
"filter_statuses": ["ACTIVE", "PAUSED", "DELETED"],
"include_deleted": True,
}
},
"include_deleted": True,
},
["ACTIVE", "PAUSED", "DELETED"],
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,83 +6,144 @@
import json
from typing import Any, Mapping

import pytest
from airbyte_cdk.models import OrchestratorType, Type
from airbyte_cdk.sources import Source
from source_facebook_marketing.config_migrations import MigrateAccountIdToArray
from source_facebook_marketing.config_migrations import MigrateAccountIdToArray, MigrateIncludeDeletedToStatusFilters
from source_facebook_marketing.source import SourceFacebookMarketing

# BASE ARGS
CMD = "check"
TEST_CONFIG_PATH = "unit_tests/test_migrations/test_old_config.json"
NEW_TEST_CONFIG_PATH = "unit_tests/test_migrations/test_new_config.json"
UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/test_upgraded_config.json"
SOURCE_INPUT_ARGS = [CMD, "--config", TEST_CONFIG_PATH]
SOURCE: Source = SourceFacebookMarketing()


# HELPERS
def load_config(config_path: str = TEST_CONFIG_PATH) -> Mapping[str, Any]:
def load_config(config_path: str) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)


def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
with open(config_path, "r") as test_config:
config = json.load(test_config)
config.pop("account_ids")
with open(config_path, "w") as updated_config:
config = json.dumps(config)
updated_config.write(config)


def test_migrate_config():
migration_instance = MigrateAccountIdToArray()
original_config = load_config()
# migrate the test_config
migration_instance.migrate(SOURCE_INPUT_ARGS, SOURCE)
# load the updated config
test_migrated_config = load_config()
# check migrated property
assert "account_ids" in test_migrated_config
assert isinstance(test_migrated_config["account_ids"], list)
# check the old property is in place
assert "account_id" in test_migrated_config
assert isinstance(test_migrated_config["account_id"], str)
# check the migration should be skipped, once already done
assert not migration_instance.should_migrate(test_migrated_config)
# load the old custom reports VS migrated
assert [original_config["account_id"]] == test_migrated_config["account_ids"]
# test CONTROL MESSAGE was emitted
control_msg = migration_instance.message_repository._message_queue[0]
assert control_msg.type == Type.CONTROL
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
# old custom_reports are stil type(str)
assert isinstance(control_msg.control.connectorConfig.config["account_id"], str)
# new custom_reports are type(list)
assert isinstance(control_msg.control.connectorConfig.config["account_ids"], list)
# check the migrated values
assert control_msg.control.connectorConfig.config["account_ids"] == ["01234567890"]
# revert the test_config to the starting point
revert_migration()


def test_config_is_reverted():
# check the test_config state, it has to be the same as before tests
test_config = load_config()
# check the config no longer has the migarted property
assert "account_ids" not in test_config
# check the old property is still there
assert "account_id" in test_config
assert isinstance(test_config["account_id"], str)


def test_should_not_migrate_new_config():
new_config = load_config(NEW_TEST_CONFIG_PATH)
migration_instance = MigrateAccountIdToArray()
assert not migration_instance.should_migrate(new_config)


def test_should_not_migrate_upgraded_config():
new_config = load_config(UPGRADED_TEST_CONFIG_PATH)
migration_instance = MigrateAccountIdToArray()
assert not migration_instance.should_migrate(new_config)
class TestMigrateAccountIdToArray:
TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_old_config.json"
NEW_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_new_config.json"
UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_upgraded_config.json"

@staticmethod
def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
with open(config_path, "r") as test_config:
config = json.load(test_config)
config.pop("account_ids")
with open(config_path, "w") as updated_config:
config = json.dumps(config)
updated_config.write(config)

def test_migrate_config(self):
migration_instance = MigrateAccountIdToArray()
original_config = load_config(self.TEST_CONFIG_PATH)
# migrate the test_config
migration_instance.migrate([CMD, "--config", self.TEST_CONFIG_PATH], SOURCE)
# load the updated config
test_migrated_config = load_config(self.TEST_CONFIG_PATH)
# check migrated property
assert "account_ids" in test_migrated_config
assert isinstance(test_migrated_config["account_ids"], list)
# check the old property is in place
assert "account_id" in test_migrated_config
assert isinstance(test_migrated_config["account_id"], str)
# check the migration should be skipped, once already done
assert not migration_instance.should_migrate(test_migrated_config)
# load the old custom reports VS migrated
assert [original_config["account_id"]] == test_migrated_config["account_ids"]
# test CONTROL MESSAGE was emitted
control_msg = migration_instance.message_repository._message_queue[0]
assert control_msg.type == Type.CONTROL
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
# old custom_reports are stil type(str)
assert isinstance(control_msg.control.connectorConfig.config["account_id"], str)
# new custom_reports are type(list)
assert isinstance(control_msg.control.connectorConfig.config["account_ids"], list)
# check the migrated values
assert control_msg.control.connectorConfig.config["account_ids"] == ["01234567890"]
# revert the test_config to the starting point
self.revert_migration()

def test_config_is_reverted(self):
# check the test_config state, it has to be the same as before tests
test_config = load_config(self.TEST_CONFIG_PATH)
# check the config no longer has the migarted property
assert "account_ids" not in test_config
# check the old property is still there
assert "account_id" in test_config
assert isinstance(test_config["account_id"], str)

def test_should_not_migrate_new_config(self):
new_config = load_config(self.NEW_TEST_CONFIG_PATH)
migration_instance = MigrateAccountIdToArray()
assert not migration_instance.should_migrate(new_config)

def test_should_not_migrate_upgraded_config(self):
new_config = load_config(self.UPGRADED_TEST_CONFIG_PATH)
migration_instance = MigrateAccountIdToArray()
assert not migration_instance.should_migrate(new_config)


class TestMigrateIncludeDeletedToStatusFilters:
OLD_TEST1_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_false/test_old_config.json"
NEW_TEST1_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_false/test_new_config.json"
OLD_TEST2_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_true/test_old_config.json"
NEW_TEST2_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_true/test_new_config.json"

UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_upgraded_config.json"

filter_properties = ["ad_statuses", "adset_statuses", "campaign_statuses"]

def revert_migration(self, config_path: str) -> None:
with open(config_path, "r") as test_config:
config = json.load(test_config)
for filter in self.filter_properties:
config.pop(filter)
with open(config_path, "w") as updated_config:
config = json.dumps(config)
updated_config.write(config)

@pytest.mark.parametrize(
"old_config_path, new_config_path, include_deleted",
[(OLD_TEST1_CONFIG_PATH, NEW_TEST1_CONFIG_PATH, False), (OLD_TEST2_CONFIG_PATH, NEW_TEST2_CONFIG_PATH, True)],
)
def test_migrate_config(self, old_config_path, new_config_path, include_deleted):
migration_instance = MigrateIncludeDeletedToStatusFilters()
original_config = load_config(old_config_path)
# migrate the test_config
migration_instance.migrate([CMD, "--config", old_config_path], SOURCE)
# load the updated config
test_migrated_config = load_config(old_config_path)
# load expected updated config
expected_new_config = load_config(new_config_path)
# compare expected with migrated
assert expected_new_config == test_migrated_config
# check migrated property
if include_deleted:
assert all([filter in test_migrated_config for filter in self.filter_properties])
# check the old property is in place
assert "include_deleted" in test_migrated_config
assert test_migrated_config["include_deleted"] == include_deleted
# check the migration should be skipped, once already done
assert not migration_instance.should_migrate(test_migrated_config)
if include_deleted:
# test CONTROL MESSAGE was emitted
control_msg = migration_instance.message_repository._message_queue[0]
assert control_msg.type == Type.CONTROL
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
# revert the test_config to the starting point
self.revert_migration(old_config_path)

@pytest.mark.parametrize("new_config_path", [NEW_TEST1_CONFIG_PATH, NEW_TEST2_CONFIG_PATH])
def test_should_not_migrate_new_config(self, new_config_path):
new_config = load_config(new_config_path)
migration_instance = MigrateIncludeDeletedToStatusFilters()
assert not migration_instance.should_migrate(new_config)

def test_should_not_migrate_upgraded_config(self):
new_config = load_config(self.UPGRADED_TEST_CONFIG_PATH)
migration_instance = MigrateIncludeDeletedToStatusFilters()
assert not migration_instance.should_migrate(new_config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"start_date": "2021-02-08T00:00:00Z",
"end_date": "2021-02-15T00:00:00Z",
"custom_insights": [
{
"name": "custom_insight_stream",
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
"breakdowns": ["gender"],
"action_breakdowns": []
}
],
"account_ids": ["01234567890"],
"access_token": "access_token",
"include_deleted": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"start_date": "2021-02-08T00:00:00Z",
"end_date": "2021-02-15T00:00:00Z",
"custom_insights": [
{
"name": "custom_insight_stream",
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
"breakdowns": ["gender"],
"action_breakdowns": []
}
],
"include_deleted": false,
"account_ids": ["01234567890"],
"access_token": "access_token"
}
Loading

0 comments on commit 6dbbcb0

Please sign in to comment.