Skip to content

Commit

Permalink
update retry logic for all uio calls
Browse files Browse the repository at this point in the history
  • Loading branch information
jtimeus-slalom committed May 12, 2023
1 parent 3fd8c36 commit 2d1ac45
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 4 deletions.
16 changes: 15 additions & 1 deletion tapdance/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from logless import get_logger, logged
from typing import Optional, Dict, Any, Tuple, List, Union
import uio
import backoff
from botocore.exceptions import ConnectionClosedError

logging = get_logger("tapdance")

Expand Down Expand Up @@ -194,8 +196,20 @@ def get_taps_dir(override: str = None) -> str:
return value will be a local copy of the remote path.
"""
taps_dir = override or os.environ.get(ENV_TAP_CONFIG_DIR, ".")
return uio.make_local(taps_dir) # if remote path provided, download locally

return make_tap_dir_local(taps_dir) # if remote path provided, download locally

def log_backoff_attempt(details):
logging.warning(f'Error detected communicating with AWS downloading config files, triggering backoff: {details.get("tries")}')

@backoff.on_exception(
backoff.expo,
ConnectionClosedError,
max_tries=3,
on_backoff=log_backoff_attempt
)
def make_tap_dir_local(taps_dir):
return uio.make_local(taps_dir)

def get_log_dir(override: str = None) -> Optional[str]:
"""Return the remote logging dir, or None if logging not configured."""
Expand Down
20 changes: 18 additions & 2 deletions tapdance/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from logless import get_logger
import uio
import backoff
from botocore.exceptions import ConnectionClosedError

logging = get_logger("tapdance")

Expand All @@ -29,16 +31,30 @@ def make_aggregate_state_file(raw_json_lines_file: str, output_json_file: str) -
Path to use when saving the aggregated json file.
"""
try:
uio.create_text_file(
download_state_file(
output_json_file,
get_aggregate_state(uio.get_text_file_contents(raw_json_lines_file)),
get_aggregate_state(uio.get_text_file_contents(raw_json_lines_file))
)
except ValueError as ex:
raise ValueError(
f"State file from '{raw_json_lines_file}' is not valid JSON or JSONL. "
f"Please either delete or fix the file and then retry. {ex}"
)

def log_backoff_attempt(details):
logging.warning(f'Error detected communicating with AWS downloading state file, triggering backoff: {details.get("tries")}')

@backoff.on_exception(
backoff.expo,
ConnectionClosedError,
max_tries=3,
on_backoff=log_backoff_attempt
)
def download_state_file(output_json_file,aggregate_state):
uio.create_text_file(
output_json_file,
aggregate_state)


def get_aggregate_state(raw_json_lines_text: str) -> str:
"""
Expand Down
2 changes: 1 addition & 1 deletion tapdance/syncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def _sync_one_table(
upload_state_file(local_state_file_out, table_state_file)

def log_backoff_attempt(details):
logging.warrning(f'Error dectected communicating with AWS, triggering backoff: {details.get("tries")}')
logging.warning(f'Error detected communicating with AWS uploading state file, triggering backoff: {details.get("tries")}')

@backoff.on_exception(
backoff.expo,
Expand Down

0 comments on commit 2d1ac45

Please sign in to comment.