Skip to content

Commit

Permalink
Source Instagram: fix bug with transient failures (#2666)
Browse files Browse the repository at this point in the history
* #Issue #2626: update backoff function for working with transient failures, added clear pagination for media resources to be sure that the backoff will always take place.

Co-authored-by: ykurochkin <y.kurochkin@zazmic.com>
  • Loading branch information
yevhenii-ldv and ykurochkin authored Mar 31, 2021
1 parent 6c6ea54 commit 473034d
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "6acf6b55-4f1e-4fca-944e-1a3caef8aba8",
"name": "Instagram",
"dockerRepository": "airbyte/source-instagram",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-instagram"
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
- sourceDefinitionId: 6acf6b55-4f1e-4fca-944e-1a3caef8aba8
name: Instagram
dockerRepository: airbyte/source-instagram
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://hub.docker.com/r/airbyte/source-instagram
- sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80
name: Gitlab
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install ".[main]"

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-instagram
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
Expand Down Expand Up @@ -168,7 +170,9 @@
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
Expand Down Expand Up @@ -213,7 +217,9 @@
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
Expand Down Expand Up @@ -88,7 +90,8 @@
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true
},
"sync_mode": "incremental"
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
Expand Down Expand Up @@ -136,7 +139,9 @@
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
Expand Down Expand Up @@ -175,7 +180,9 @@
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
from base_python.entrypoint import logger
from facebook_business.adobjects.igmedia import IGMedia
from facebook_business.adobjects.iguser import IGUser
from facebook_business.api import Cursor
from facebook_business.exceptions import FacebookRequestError

from .common import retry_pattern

backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=7, factor=5)


def clear_url(record_data: dict = None):
Expand Down Expand Up @@ -78,6 +79,28 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
def filter_input_fields(self, fields: Sequence[str] = None):
return list(set(fields) - set(self.non_object_fields))

@backoff_policy
def load_next_page(self, instance: Cursor):
instance.load_next_page()

@backoff_policy
def get_instance_cursor(self, ig_user: IGUser, method_name: str, params: dict = None, fields: Sequence[str] = None) -> Cursor:
return getattr(ig_user, method_name)(params=params, fields=fields)

def pagination(self, ig_user: IGUser, method_name: str, params: dict = None, fields: Sequence[str] = None) -> Iterator[Any]:
"""
To implement pagination, we use private variables of the Cursor class.
todo: Should be careful when updating the library version.
"""
instance = self.get_instance_cursor(ig_user, method_name, params, fields)
yield from instance._queue
next_page = not instance._finished_iteration
while next_page:
self.load_next_page(instance)
yield from instance._queue
next_page = not instance._finished_iteration


class IncrementalStreamAPI(StreamAPI, ABC):
@property
Expand Down Expand Up @@ -251,9 +274,6 @@ class MediaAPI(StreamAPI):
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
children_fields = self.filter_input_fields(list(set(fields) - set(self.INVALID_CHILDREN_FIELDS)))
for account in self._api.accounts:
# We get the Cursor object with the specified amount of Media (in our case, it is value of result_return_limit).
# And we begin to iterate over it, and when the Cursor reaches the last Media and reads it,
# then inside the facebook_businness Cursor is implemented in such a way that it pulls up the next {value of result_return_limit} Media (if they exist, of course).
media = self._get_media(
account["instagram_business_account"], {"limit": self.result_return_limit}, self.filter_input_fields(fields)
)
Expand All @@ -272,13 +292,8 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
)
yield clear_url(record_data)

@backoff_policy
def _get_media(self, instagram_user: IGUser, params: dict = None, fields: Sequence[str] = None) -> Iterator[Any]:
"""
This is necessary because the functions that call this endpoint return
a generator, whose calls need decorated with a backoff.
"""
return instagram_user.get_media(params=params, fields=fields)
yield from self.pagination(instagram_user, "get_media", params=params, fields=fields)

@backoff_policy
def _get_single_record(self, media_id: str, fields: Sequence[str] = None) -> IGMedia:
Expand All @@ -301,13 +316,8 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
)
yield clear_url(record_data)

@backoff_policy
def _get_stories(self, instagram_user: IGUser, params: dict, fields: Sequence[str] = None) -> Iterator[Any]:
"""
This is necessary because the functions that call this endpoint return
a generator, whose calls need decorated with a backoff.
"""
return instagram_user.get_stories(params=params, fields=fields)
yield from self.pagination(instagram_user, "get_stories", params=params, fields=fields)


class MediaInsightsAPI(MediaAPI):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ def log_retry_attempt(details):
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} more seconds then retrying...")

def should_retry_api_error(exc):
if exc.http_status() == status_codes.TOO_MANY_REQUESTS or (
exc.http_status() == status_codes.FORBIDDEN and exc.api_error_message() == "(#4) Application request limit reached"
if (
exc.http_status() == status_codes.TOO_MANY_REQUESTS
or (exc.http_status() == status_codes.FORBIDDEN and exc.api_error_message() == "(#4) Application request limit reached")
or exc.api_transient_error()
):
return True
return False
Expand Down

0 comments on commit 473034d

Please sign in to comment.