-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source Facebook: migrate to CDK #3743
Conversation
…rsions" fields and add other available fields that have been tested and confirmed to work.
/test connector=source-facebook-marketing
|
/test connector=source-facebook-marketing
|
/test connector=source-facebook-marketing
|
/test connector=source-facebook-marketing
|
/test connector=source-facebook-marketing
|
11912b0
to
9439e6d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some questions but mostly looks good.
Taking a step back this feels like a good opportunity to explore an async read pattern in the CDK e.g: having an AsyncStream
class where read_records
is async
. Definitely out of scope here but this might be a candidate for implementation when we get to it.
config = ConnectorConfig.parse_obj(config) # FIXME: this will be not need after we fix CDK | ||
api = API(account_id=config.account_id, access_token=config.access_token) | ||
|
||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the whole code block supposed to be inside try
?
} | ||
|
||
@backoff_policy | ||
def _get_insights(self, params) -> AdReportRun: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rename it to _create_insights_job
or something that indicates it is creating a job?
date_ranges = list(self._date_ranges(stream_state=stream_state)) | ||
|
||
# accumulate MAX_ASYNC_JOBS jobs in the buffer to schedule them all before trying to wait | ||
for params in date_ranges[: self.MAX_ASYNC_JOBS]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we process MAX_ASYNC_JOBS
at a time? The current impl seems to process MAX_ASYNC_JOBS
the first time then creates as many jobs as is left which may be greater than the max
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we yield each job we wouldn't advance to next job before we read the result of the previous, so each yield
is a wait for a job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this launches MAX_ASYNC_JOBS
in parallel, waits for all of them to complete, then runs jobs one-by-one after that?
if pendulum.parse(obj[self.cursor_field]) >= min_cursor: | ||
yield obj.export_all_data() | ||
|
||
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is retry functionality working correctly? We never retry a job if it fails. is that the desired behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually, it is a very good reason for a job to fail. At least the response doesn't contain any useful information about the reason, in my practice failed job appears only when something is wrong with the query itself (fields or breakdowns not supported), so here we only retry checking of the status of the job. We save states for every async job, so if the next job fails we have a checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://developers.facebook.com/docs/marketing-api/insights/best-practices/
from the docs
Job Failed | Job has failed. Review your query and try again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Facebook is leading by counter-example on how to give a good error message
/test connector=source-facebook-marketing
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once build errors are fixed
/publish connector=connectors/source-facebook-marketing
|
What
closes #3525
This is quite old PR, because effectively fix all issues and test the fixes was difficult I decided to migrate to CDK and SAT at the same time as fixing issues above (#3525).
The PR contains:
insights_days_per_job
adsets
toad_sets
How
Describe the solution
Pre-merge Checklist
Recommended reading order
test.java
component.ts