diff --git a/weather_dl/download_pipeline/pipeline.py b/weather_dl/download_pipeline/pipeline.py index f4df9966..09bbc630 100644 --- a/weather_dl/download_pipeline/pipeline.py +++ b/weather_dl/download_pipeline/pipeline.py @@ -76,6 +76,8 @@ class PipelineArgs: def pipeline(args: PipelineArgs) -> None: """Main pipeline entrypoint.""" + import builtins + import typing as t logger.info(f"Using '{args.num_requesters_per_key}' requests per subsection (license).") subsections = get_subsections(args.config) @@ -83,8 +85,8 @@ def pipeline(args: PipelineArgs) -> None: request_idxs = {name: itertools.cycle(range(args.num_requesters_per_key)) for name, _ in subsections} def subsection_and_request(it: Config) -> t.Tuple[str, int]: - subsection = t.cast(str, it.get('parameters', {}).get('__subsection__', 'default')) - return subsection, next(request_idxs[subsection]) + subsection = t.cast(builtins.str, it.get('parameters', {}).get('__subsection__', 'default')) + return subsection, builtins.next(request_idxs[subsection]) subsections_cycle = itertools.cycle(subsections) @@ -135,7 +137,8 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs: # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args + '--save_main_session True'.split()) + save_main_session_args = ['--save_main_session'] + ['True' if save_main_session else 'False'] + pipeline_options = PipelineOptions(pipeline_args + save_main_session_args) client_name = config['parameters']['client'] store = None # will default to using FileSystems()