Skip to content

Commit

Permalink
(#123) Fixed beam not being able to access global namespace. (#124)
Browse files Browse the repository at this point in the history
Also fixed related bug where save_main_session arg for run() wasn't doing anything.
  • Loading branch information
pbattaglia committed Mar 28, 2022
1 parent fd0c5e4 commit e3e3ee7
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions weather_dl/download_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ class PipelineArgs:

def pipeline(args: PipelineArgs) -> None:
"""Main pipeline entrypoint."""
import builtins
import typing as t
logger.info(f"Using '{args.num_requesters_per_key}' requests per subsection (license).")

subsections = get_subsections(args.config)

request_idxs = {name: itertools.cycle(range(args.num_requesters_per_key)) for name, _ in subsections}

def subsection_and_request(it: Config) -> t.Tuple[str, int]:
subsection = t.cast(str, it.get('parameters', {}).get('__subsection__', 'default'))
return subsection, next(request_idxs[subsection])
subsection = t.cast(builtins.str, it.get('parameters', {}).get('__subsection__', 'default'))
return subsection, builtins.next(request_idxs[subsection])

subsections_cycle = itertools.cycle(subsections)

Expand Down Expand Up @@ -135,7 +137,8 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs:

# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args + '--save_main_session True'.split())
save_main_session_args = ['--save_main_session'] + ['True' if save_main_session else 'False']
pipeline_options = PipelineOptions(pipeline_args + save_main_session_args)

client_name = config['parameters']['client']
store = None # will default to using FileSystems()
Expand Down

0 comments on commit e3e3ee7

Please sign in to comment.