Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions sources/facebook_ads/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
)
from .settings import (
FACEBOOK_INSIGHTS_RETENTION_PERIOD,
ALL_ACTION_BREAKDOWNS,
ALL_ACTION_ATTRIBUTION_WINDOWS,
DEFAULT_INSIGHT_FIELDS,
INSIGHT_FIELDS_TYPES,
Expand Down Expand Up @@ -126,8 +125,8 @@ def facebook_insights_source(
fields: Sequence[str] = DEFAULT_INSIGHT_FIELDS,
attribution_window_days_lag: int = 7,
time_increment_days: int = 1,
breakdowns: TInsightsBreakdownOptions = "ads_insights",
action_breakdowns: Sequence[str] = ALL_ACTION_BREAKDOWNS,
breakdowns: TInsightsBreakdownOptions = None,
action_breakdowns: Sequence[str] = None,
level: TInsightsLevels = "ad",
action_attribution_windows: Sequence[str] = ALL_ACTION_ATTRIBUTION_WINDOWS,
batch_size: int = 50,
Expand All @@ -149,8 +148,8 @@ def facebook_insights_source(
fields (Sequence[str], optional): A list of fields to include in each reports. Note that `breakdowns` option adds fields automatically. Defaults to DEFAULT_INSIGHT_FIELDS.
attribution_window_days_lag (int, optional): Attribution window in days. The reports in attribution window are refreshed on each run.. Defaults to 7.
time_increment_days (int, optional): The report aggregation window in days. use 7 for weekly aggregation. Defaults to 1.
breakdowns (TInsightsBreakdownOptions, optional): A presents with common aggregations. See settings.py for details. Defaults to "ads_insights_age_and_gender".
action_breakdowns (Sequence[str], optional): Action aggregation types. See settings.py for details. Defaults to ALL_ACTION_BREAKDOWNS.
breakdowns (TInsightsBreakdownOptions, optional): A presents with common aggregations. See settings.py for details. Defaults to None (no breakdowns).
action_breakdowns (Sequence[str], optional): Action aggregation types. See settings.py for details. Defaults to None (no action breakdowns).
level (TInsightsLevels, optional): The granularity level. Defaults to "ad".
action_attribution_windows (Sequence[str], optional): Attribution windows for actions. Defaults to ALL_ACTION_ATTRIBUTION_WINDOWS.
batch_size (int, optional): Page size when reading data from particular report. Defaults to 50.
Expand Down Expand Up @@ -186,16 +185,7 @@ def facebook_insights(
while start_date <= end_date:
query = {
"level": level,
"action_breakdowns": list(action_breakdowns),
"breakdowns": list(
INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["breakdowns"]
),
"limit": batch_size,
"fields": list(
set(fields)
.union(INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["fields"])
.difference(INVALID_INSIGHTS_FIELDS)
),
"time_increment": time_increment_days,
"action_attribution_windows": list(action_attribution_windows),
"time_ranges": [
Expand All @@ -207,6 +197,22 @@ def facebook_insights(
}
],
}

fields_to_use = set(fields)
# Only add breakdowns if explicitly provided
if breakdowns is not None:
query["breakdowns"] = list(
INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["breakdowns"]
)
fields_to_use = fields_to_use.union(
INSIGHTS_BREAKDOWNS_OPTIONS[breakdowns]["fields"]
)
query["fields"] = list(fields_to_use.difference(INVALID_INSIGHTS_FIELDS))

# Only add action_breakdowns if explicitly provided
if action_breakdowns is not None:
query["action_breakdowns"] = list(action_breakdowns)

job = execute_job(account.get_insights(params=query, is_async=True))
yield list(map(process_report_item, job.get_result()))
start_date = start_date.add(days=time_increment_days)
Expand Down
20 changes: 20 additions & 0 deletions sources/facebook_ads_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,30 @@ def load_insights() -> None:
print(info)


def load_insights_with_breakdowns() -> None:
"""Shows how to load insights with custom breakdowns and action breakdowns"""
pipeline = dlt.pipeline(
pipeline_name="facebook_insights_breakdowns",
destination="duckdb",
dataset_name="facebook_insights_data",
dev_mode=True,
)
# Load insights with age and gender breakdowns
i_with_breakdowns = facebook_insights_source(
initial_load_past_days=7,
breakdowns="ads_insights_age_and_gender",
# Uncomment to add action breakdowns:
# action_breakdowns=["action_type", "action_target_id"]
)
info = pipeline.run(i_with_breakdowns)
print(info)


if __name__ == "__main__":
# load_all_ads_objects()
merge_ads_objects()
# load_ads_with_custom_fields()
# load_only_disapproved_ads()
# load_and_enrich_objects()
# load_insights()
# load_insights_with_breakdowns()
6 changes: 3 additions & 3 deletions tests/test_dlt_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import dlt
from dlt.common.utils import set_working_dir
from dlt.cli import ai_command
from dlt.cli.ai_command import TSupportedIde
from dlt.cli.plugins import DEFAULT_VERIFIED_SOURCES_REPO
from dlt._workspace.cli import _ai_command as ai_command
from dlt._workspace.cli._ai_command import TSupportedIde
from dlt._workspace.cli import DEFAULT_VERIFIED_SOURCES_REPO

from tests.utils import TEST_STORAGE_ROOT

Expand Down
8 changes: 6 additions & 2 deletions tests/test_dlt_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

from dlt.sources import SourceReference

from dlt.cli import init_command, echo
from dlt.cli.init_command import SOURCES_MODULE_NAME, utils as cli_utils, files_ops
from dlt._workspace.cli import echo, _init_command as init_command
from dlt._workspace.cli._init_command import (
SOURCES_MODULE_NAME,
utils as cli_utils,
files_ops,
)
from dlt.reflection import names as n

from tests.utils import TEST_STORAGE_ROOT
Expand Down
5 changes: 3 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
SecretsTomlProvider,
)
from dlt.common.configuration.specs.pluggable_run_context import (
SupportsRunContext,
RunContextBase,
)

from dlt.common.runtime.run_context import DOT_DLT, RunContext
from dlt.common.pipeline import LoadInfo, PipelineContext, ExtractInfo
from dlt.common.storages import FileStorage
Expand Down Expand Up @@ -94,7 +95,7 @@ def data_dir(self) -> str:
_data_dir: str

@classmethod
def from_context(cls, ctx: SupportsRunContext) -> "MockableRunContext":
def from_context(cls, ctx: RunContextBase) -> "MockableRunContext":
cls_ = cls(ctx.run_dir)
cls_._name = ctx.name
cls_._global_dir = ctx.global_dir
Expand Down
Loading