In [None]:
# import all the libraries we need
import os

import parsl
from parsl import python_app
from parsl.config import Config
from parsl.channels import LocalChannel
from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider

# helper functions
from grouputils import initialize_stager
from grouputils import plot_tiles


## Background

The first step in our workflow is to "stage" our data. Staging the data encompasses the following pre-processing tasks:

- simplify the polygons 
- set an input CRS if one is missing
- reproject the data when required
- add additional properties to each polygon, including: the centroid x and y
  coordinates, the area, a unique ID, and the name of the file that the
  polygon originated from
- break each input file into [standardized tiles](https://docs.opengeospatial.org/is/17-083r2/17-083r2.html)
- save them to disk, following a file hierarchy and naming format for x, y, and z coordinates of the tiles

Here is a diagram showing what the most important step, the last one, looks like.

![](https://raw.githubusercontent.com/PermafrostDiscoveryGateway/viz-staging/develop/docs/images/staging_tldr.png)

We will use some methods from the `pdgstaging` library to stage our tiles. The first step, is to initalize the `TileStager`. The `TileStager` is a class with a method `stage`, which works on a single vector file.

### Initalize the stager

Fist we need to use the `initialize_stager` function to instantiate the `TileStager` object. The only argument to this function is `dir_input`, the directory of input data.

Input vector files are located **in `/home/shares/example-pdg-data`**

In [None]:
# execute the initialize_stager function with the filepath for the input data
# save the result to a variable called iwp_stager
iwp_stager = initialize_stager("/home/shares/example-pdg-data")

The `iwp_stager` object works as a tool that communicates between the configuration settings and the staging function. The stager tells the staging function:
- where to pull the input files from
- where to write the staged tiles
- the coordinate reference system to use
- whether the input data should be deduplicated, etc.

Next let's use it to get a list of files to stage.

In [None]:
files_to_stage = iwp_stager.tiles.get_filenames_from_dir('input')

In [None]:
len(files_to_stage)

In [None]:
# # split the input files into batches
# def batch(all_files, batch_size):
#     return [all_files[i:i + batch_size] for i in range(0, len(all_files), batch_size)]

# file_batches = batch(files_to_stage, 2)

In [None]:
# len(file_batches)

## Stage one file

Here is an example of how to run the stager on one file. We use the `stage` method on the `iwp_stager` object, with a path to a file as the argument to the method.

In [None]:
example_file = files_to_stage[0]

iwp_stager.stage(example_file)

Based on how long staging one file took, estimate how long that would take to stage all the input files that we have in this example, serially. How long would it take if we had 100 files? 1000?

In [None]:
# estimate computation time
100*66 # answer in seconds
(100*66)/60 # answer in minutes

For these example data, the amount of time it takes is not super high. But as the number of files gets bigger, things get out of hand quickly. Luckily for us, this problem is pleasingly parallel. The staging of each file is completely independent of the others. So, let's set this up as a `parsl` workflow using the skills we learned in Section 4. 

Just to get a sense of what happened, let's plot the result of our test staging effort using a `plot_tiles` helper we wrote for this activity.

In [None]:
plot_tiles(iwp_stager)

Finally, let's remove the files we just created (including the staging summary csv file that's generated with the staged files) to prepare to run this over all of the files.

If we don't do this, polygons will get appended to the staged files which will result in duplication.

In [None]:
os.system(f'rm -rf {iwp_stager.config.get("dir_staged")}')
os.system(f'rm {iwp_stager.config.get("filename_staging_summary")}')

### Staging in parallel

First set up the configuration for `parsl` using `config`, and a `HighThroughputExecutor`. For the executor, set the `max_workers` to 32, and set the `max_blocks` to 1. This will spread our work over 32 processes on the server. Make sure you pass the bash command you use to invoke your virtual environment to the `worker_init` argument as a string.

In [None]:
# TEMPLATE FOR PARSL CONFIG:
# htex_config = config(
#   executors=[
#       HighThroughputExecutor(
#           ..., 
#           provider = LocalProvider(...)
#       )
#   ]
# )

activate_env = 'workon scomp2023-03-20'
htex_local = Config(
    executors=[
        HighThroughputExecutor(
            max_workers=32, # can reduce this when reduce numb of input files
            provider=LocalProvider(
                worker_init=activate_env,
                max_blocks = 1 # default
            )
        )
    ],
)
parsl.clear()
parsl.load(htex_local)

Next, set up your Parsl app to run the `stage` method in parallel. You'll need to pass 2 arguments to the app function:
1. The path to the input file.
2. The `TileStager` instance we created earlier.

Note how the function returns the input path. This will give us something to interate over, since the `stage` method returns `None` (and writes files!)

In [None]:
# Decorators seem to be ignored as the first line of a cell, so print something first
#print("Stage in parallel")

# Make a Parsl app that uses the stage method
# function arguments: path, stager

@python_app
def stage_file(path, stager):
    stager.stage(path)
    return path

# @python_app
# def stage_file(batch, stager):
#     for path in batch:
#         stager.stage(path)
#     return True

Now, execute the app in parallel over all of the `files_to_stage`. In this solution, we use a simple loop to run our parsl app, and then list comprehension to retrieve the result from our futures.

In [None]:
# all_app_futures = []
# for batch in file_batches:
#     app_future = stage_file(batch, iwp_stager)
#     all_app_futures.append(app_future)

# [app_future.result() for app_future in all_app_futures]

In [None]:
# execute the app using app.futures
all_app_futures = []
for path in files_to_stage:
    app_future = stage_file(path, iwp_stager)
    all_app_futures.append(app_future)

# By getting the `result()` of each app future, this block won't continue to 
# the print statement until all the files are staged.
[app_future.result() for app_future in all_app_futures] 

# shutdown and clear the parsl executor
htex_local.executors[0].shutdown()
parsl.clear() # took 25 min to produce 2249 gpkg files

### Alternative Syntax

Here is a second solution, similar to the syntax in the Course Materials section ["parallel processing with parsl"](https://learning.nceas.ucsb.edu/2023-03-arctic/sections/parallel-programming.html#parallel-processing-with-parsl).

This approach wraps the loop and futures blocks into their own functions. Either solution will work!

In [None]:
# #os.system(f'rm -rf {iwp_stager.config.get("dir_staged")}')
# #os.system(f'rm {iwp_stager.config.get("filename_staging_summary")}')

# activate_env = 'workon scomp2023-03-20'
# htex_local = Config(
#     executors=[
#         HighThroughputExecutor(
#             max_workers=32,
#             provider=LocalProvider(
#                 worker_init=activate_env
#             )
#         )
#     ],
# )
# parsl.clear()
# parsl.load(htex_local)

# @python_app
# def stage_files(files):
#     all_results = []
#     for file in files:
#         result = stage_file(file, iwp_stager)
#         all_results.append(result)
#     return(all_results)

# def wait_for_futures(files):
#     all_results = stage_files(files)
#     done = [app_future.result() for app_future in all_results]
#     print(done)

# wait_for_futures(files_to_stage) # takes 5-11 min when run before SCC

In [7]:
# # don't forget to shutdown and clear your executor
# htex_local.executors[0].shutdown()
# parsl.clear()

Now we can check out the `plot_tiles` result again (which will only plot the first 45 of our tiled files)

In [None]:
plot_tiles(iwp_stager)

## Bonus

This process took the original 35 files, ranging in size from 20MB to 500MB (6 GB total), and tiled them into arond 2200 files, and if you set up your executor like we described, it should have taken around 15 minutes. 

Discuss in your groups whether you suspect this process is CPU bound, I/O bound, memory bound, or network bound. How would you figure it out for sure? Why would you want to know?

### Answer

