# M2.4 - Processing Long Climate Data Records Concurrently

*Part of:* [**Computational Climate Science**](https://github.com/OpenClimateScience/M2-Computational-Climate-Science) | **Previous Lesson** | **Next Lesson**

**Contents:**

- [Resource limitations in computing](#Resource-limitations-in-computing)
  - [CPU-bound problems](#CPU-bound-problems)
- [Concurrent processing for large climate datasets](#Concurrent-processing-for-large-climate-datasets)
- [Computing PET using Hargreaves equation](#Computing-PET-using-Hargreaves-equation)
  - [Computing top-of-atmosphere (TOA) radiation](#Computing-top-of-atmosphere-(TOA)-radiation)
  - [Well-documented functions](#Well-documented-functions)
  - [Vectorized functions](#Vectorized-functions)
  - [Deriving variables from `xarray` coordinates](#Deriving-variables-from-xarray-coordinates)
- [Applying a function to independent chunks](#Applying-a-function-to-independent-chunks)
  - [Mapping an arbitrary function](#Mapping-an-arbitrary-function)
- [Profiling computational resources](#Profiling-computational-resources)
  - [Measuring the wall time of a task with `timeit`](#Measuring-the-wall-time-of-a-task-with-timeit)

## Overview

In the previous lesson, we discussed how a simple bucket model can be used to quantify the difference between water supply (precipitation) and water loss (potential evapotranspiration or PET). The ratio of these two quantities is also useful as an index of how much of the water loss is replenished by precipitation:
$$
\text{Percentage replenished} \approx 100\times \frac{\text{Precipitation}}{\text{PET}}
$$

**The method for calculating PET that we will use is [the Hargreaves method](https://www.fao.org/4/X0490E/x0490e07.htm#minimum%20data%20requirements) (Allen et al. 2000), because it only requires temperature data.** We'll use temperature data from MERRA-2 to calculate PET. Then, we'll use precipitation data from CHIRPS, again, to derive our hydrologic drought index.

**While there are many sources of PET data, we're going to calculate PET on our own so that we can get more experience working with large climate datasets.** Along the way, we'll learn how large climate datasets can be processed **concurrently,** which can help to address two common problems:

1. The entire dataset is too large to load into memory all at once;
2. Data processing can be time-consuming, either because the dataset is so large or because the computations are complex.

---

## Resource limitations in computing

Generally, the bigger the dataset, the more computational resources are required to analyze it. But exactly what resources are needed depends on both the data and the kind of analysis we want to perform.

**In computing problems, there are three major kinds of resource limitations or *bottlenecks,* i.e., limiting factors to running a computer algorithm:**

1. **Read and write speed from a file system**
2. **Computer memory**
3. **Central processing unit (CPU) clock speed (e.g., 3 GHz)**
   
A bottleneck of **Type 1** occurs when we have either very large datasets or slow file-system read-write speeds. The speed of reading and writing from a file system (or hard disk) depends on the medium; solid-state drives are generally faster than spinning disk hard drives. If the drive is a network attached storage (NAS) device instead of the hard-drive on your computer, then the speed of the network connection is also part of Type 1 bottlenecks. **Problems that are limited by a Type 1 bottleneck are called I/O-Bound (Input/Output-Bound).**

A bottleneck of **Type 2** can occur if the dataset is very large and we try to store it all in memory at once, or if our analysis generates too much data in memory. Of course, memory is finite, so data either fits in memory or it doesn't. If our computer program is very sophisticated, it can offload some data stored in memory onto the computer's hard disk. This is called *swapping* and it is extremely slow. Hence, if you are running out of computer memory, your program may not stop due to a lack of memory, but it will slow down severely as it tries to juggle data between memory and hard disk. **Problems that are limited by a Type 2 bottleneck are called Memory-Bound.**

A bottleneck of **Type 3** has a lot less to do with the data and more to do with the algorithm we're running. If we're reading in a huge dataset and just doing a simple unit conversion (for example, multiplying the data by 1000 and then saving it back to disk), then CPU clock speed probably isn't an issue: computers can multiply numbers very fast. But exactly how fast depends on how fast the CPU is. **Problems that are limited by a Type 3 bottleneck are called CPU-Bound.**

### CPU-bound problems

Historically, Type 3 bottlenecks have received the most attention. Improvements in the manufacturing process for CPUs have led to faster and faster chips. Gordon Moore was one of the first to notice the rate of this upward trend, and **Moore's Law** has been an article of faith in the industry for a long time: the tendency for CPU clock speeds to double every 2 years (Moore 1965).

[But there are recent signs that this rate of doubling may be slowing down.](https://www.tomshardware.com/tech-industry/semiconductors/intels-ceo-says-moores-law-is-slowing-to-a-three-year-cadence-but-its-not-dead-yet) There are several reasons for this that are beyond the scope of this lesson (Bohr 2007). A major reason is the problem of heat dissipation. Trying to maintain the same rate of growth in transistors has required making transistors smaller. But the smaller they get, the hotter they get when electricity flows through them. Modern chip design is primarily concerned with trying to keep things from melting!

However, if we combine multiple low-power CPUs together, we can actually get better performance than from a single, high-power CPU. Consider the figure below. With a single CPU, it is only possible to process data in a Sequential or Concurrent scheme. Sequential processing means that only a single task can be worked on before switching to another task.

![](./assets/M2_concurrency.jpg)

*Image by [Kevin Wahome](https://kwahome.medium.com/concurrency-is-not-parallelism-a5451d1cde8d)*

In a **Concurrent scheme,** computers can seamlessly switch between tasks so fast that it appears as if multiple tasks, or **threads,** are being worked on simultaneously. **Concurrency** or **multi-threading** is how single CPUs have allowed us to do multiple tasks for the first few decades of the personal computer. **To get faster computers and mobile phones today, we are now using multiple, low-power CPUs to work on independent tasks simultaneously.** This is the **Parallel scheme.** 

**Today, we'll see how multiple CPUs can be used to break a problem down into smaller parts that can be executed simultaneously.** Some of the tools we're working make it so easy to use a Concurrent or Parallel processing scheme that it can be hard to tell the difference between the two. So, in this lesson, we'll use the terms "Concurrent processing" or "Concurrency" to refer to both the Concurrent and Parallel processing schemes.

---

## Concurrent processing for large climate datasets

As we've seen previously, we can use `earthaccess` to download [MERRA-2](https://gmao.gsfc.nasa.gov/reanalysis/MERRA-2/) data from NASA EarthData Search. We'll be using the daily, aggregated data we used before, with the `short_name` `"M2SDNXSLV"`.

In [None]:
import earthaccess
import xarray as xr
from matplotlib import pyplot

auth = earthaccess.login()

results = earthaccess.search_data(
    short_name = 'M2SDNXSLV',
    temporal = ("2024-01-01", "2024-05-31"))

#### &#x1F3AF; Best Practice

**Remember: We want to make sure we don't accidentally change our raw data, so these data should be downloaded to a folder reserved for raw data.**

In [None]:
# Could take about 1 minute on a broadband connection
earthaccess.download(results, 'data_raw/MERRA2')

Once again, we'll use `xr.open_mfdataset()` to open our collection of files as a single `xarray.Dataset`.

In [None]:
ds = xr.open_mfdataset('./data_raw/MERRA2/*2024*.nc4')
ds

The MERRA-2 data variables we are interested in are:

- `T2MMAX`, the maximum daily temperature (degrees C)
- `T2MMEAN`, the mean daily temperature (degrees C)
- `T2MMIN`, the minimum daily temperature (degrees C)

Note that we have 122 days of data, so the resulting data cube has a time axis of 122 daily time steps. `xarray` has automatically broken our dataset into equal-sized **chunks** that could be processed independently.

&#x1F449; In `xarray`, a **chunk** (also called a **block**) is a piece of our dataset: a defined subset along one or more axes.

In [None]:
ds['T2MMEAN']

**The size and shape of the chunks are important if we are going to use concurrency.** Consider, for example, if we wanted to calculate long-term trends. With the chunks we currently have, we could not calculate trends because each chunk contains only one time step.

We could try using [the `chunks` argument of `open_mfdataset()`](https://docs.xarray.dev/en/stable/generated/xarray.open_mfdataset.html) to specify that chunks should have 122 elements along the `time` axis...

In [None]:
# The "chunks" argument tells xarray what size the chunks should be on one or more axes
ds = xr.open_mfdataset('./data_raw/MERRA2/*2024*.nc4', chunks = {'time': 122})
ds['T2MMEAN'].data

However, it's clear that didn't work; each chunk still only has one time step.

#### &#x1F6A9; <span style="color:red">Pay Attention</red>

**This is because the `chunks` argument is evaluated separately for each file.** `xr.open_mfdataset()` opens multiple files and combines them into a single dataset but, in this case, because each file represents a different time step, it can't create chunks that span multiple files.

Alternatively, we can tell `xarray` how big each chunk should be along the `lat` and `lon` axes, because this doesn't require spanning multiple files. Below, we specify chunk sizes that result in just 4 chunks for every file.

In [None]:
ds = xr.open_mfdataset('./data_raw/MERRA2/*2024*.nc4', chunks = {'lat': 182, 'lon': 288})
ds['T2MMEAN'].data

**If we really needed each chunk to contain the entire `time` axis (122 time steps), we would need to re-chunk the data *after* reading in all the files.** We can do this using [the `chunk()` method of an `xarray.Dataset` or `xarray.DataArray`.](https://docs.xarray.dev/en/stable/generated/xarray.DataArray.chunk.html)

In [None]:
# TODO Re-chunking the data *after* loading is generally inefficient, but might be necessary; 
#    give example of "what if" we were interested in calculating trends

ds = xr.open_mfdataset('./data_raw/MERRA2/*2024*.nc4')
ds = ds.chunk({'time': 122})
ds['T2MMEAN'].data

#### &#x1F3AF; Best Practice

In general, it's best to use the `chunks` argument because re-chunking the data is inefficient. However, in cases where you need chunks to span multiple files, you will have to re-chunk the data using the `chunk()` method.

In this case, we don't actually need chunks to with 122 time steps. We are fine with whatever chunking `xarray` does by default. If we set `chunks = 'auto'`, then `xarray` will choose to load all the input files into memory at once; hence, there is one chunk per file.

In [None]:
ds = xr.open_mfdataset('./data_raw/MERRA2/*2024*.nc4', chunks = 'auto')
ds['T2MMEAN'].data

&#x1F449; **Notice how fast each of the above code blocks was executed.** That's because `xarray` hasn't actually loaded any data into memory yet. Remember **lazy evaluation?** Again, `xarray` will not load data into memory until the last minute, when it's absolutely necessary to do so. And `xarray` doesn't need to load data into memory in order to give us the information we were looking at above. It just read a little bit of information from each file to learn how to *represent* the complete dataset.

---

## Computing PET using Hargreaves equation

In order to calculate the Precipitation-to-PET ratio, we'll first need to use the Hargreaves equation to calculate PET:
$$
\text{PET} = 0.0023 \times R_A \times \sqrt{T_{max} - T_{min}} \times (T + 17.8)
$$

Above, $R_A$ is the top-of-atmosphere (TOA) solar radiation and $T$, $T_{max}$, and $T_{min}$ are the mean, maximum, and minimum temperatures, respectively.

#### &#x1F3AF; Best Practice

The Hargreaves equation is just complex enough that we need to develop multiple data-processing steps to get to our goal, which is the Precipitation-to-PET ratio for a defined region. This effort will require that we pay attention to several potential pitfalls of computational data science:

- Ensuring that processing steps are done in the correct order, so that data structures and/or Python variables are correctly initialized.
- Ensuring that measurement units are correct and compatible between different data processing steps.
- Documenting each processing step so that we can identify potential errors and so that a third party can verify or reproduce our analysis.

A technique from computer science called **decomposition** can help us to plan our analysis. **Decomposition** involves breaking a problem down into a series of independent, manageable steps. We might decompose our problem into these steps:

1. Load the required temperature data inputs.
2. Calculate top-of-atmosphere (TOA) solar radiation.
3. Calculate potential evapotranspiration (PET) using the Hargreaves equation.
4. Compute the Precipitation-to-PET ratio.

**These ordered steps should help us to organize our workflow in a way that someone else can easily understand.** We've already loaded the required temperature data (Step 1), so let's move on to calculating TOA radiation.

### Computing top-of-atmosphere (TOA) radiation

Here is a function for calculating TOA radiation, [based on FAO guidance.](https://www.fao.org/4/X0490E/x0490e07.htm#radiation)

In [None]:
import numpy as np

def toa_radiation(latitude, doy):
    '''
    Top-of-atmosphere (TOA) radiation for a given latitude (L) and day of year
    (DOY) can be calculated as:

    R = ((24 * 60) / pi) * G * d * (w * sin(L) * sin(D) + cos(L) * cos(D) * sin(w))

    Where G is the solar constant, 0.0820 [MJ m-2 day-1]; d is the (inverse) 
    relative earth-sun distance; w is the sunset hour angle; and D is the solar
    declination angle.
    
    For more information, consult the FAO documentation:

        https://www.fao.org/4/X0490E/x0490e07.htm#radiation
    
    Parameters
    ----------
    latitude : float
        The latitude on earth, in degrees, where southern latitudes
        are represented as negative numbers
    doy : int
        The day of the year (DOY), an integer on [1,366]
    
    Returns
    -------
    Number
        Top-of-atmosphere (TOA) radiation, in [MJ m-2 day-1]
    '''
    assert isinstance(doy, int) or issubclass(doy.dtype.type, np.integer), 'The "doy" argument must be an integer'
    assert np.all(doy >= 1) and np.all(doy <= 366), 'The "doy" argument must be between 1 and 366, inclusive'
    
    solar_constant = 0.0820 # [MJ m-2 day-1]
    pi = 3.14159
    
    # Convert latitude from degrees to radians
    latitude_radians = np.deg2rad(latitude)
    # Inverse Earth-Sun distance (relative), as a function of day-of-year (DOY)
    earth_sun_dist = 1 + 0.0033 * np.cos((doy * 2 * pi) / 365)
    # Solar declination, as a function of DOY
    declination = 0.409 * np.sin(((doy * 2 * pi) / 365) - 1.39)
    
    # Sunset hour angle; we use np.where() below to guard against
    #   warnings where arccos() would return invalid values, which
    #   happens when the argument is outside [-1, 1]
    _hour_angle = -np.tan(latitude_radians) * np.tan(declination)
    _hour_angle = np.where(np.abs(_hour_angle) > 1, np.nan, _hour_angle)
    sunset_hour_angle = np.arccos(_hour_angle)

    # Incident radiation, depends only on the relative earth-sun distance
    inc_radiation = ((24 * 60) / pi) * solar_constant * earth_sun_dist
    return inc_radiation * (sunset_hour_angle * np.sin(latitude_radians) * np.sin(declination) +
            np.cos(latitude_radians) * np.cos(declination) * np.sin(sunset_hour_angle))

### Well-documented functions

**There are several things to note about this function.**

There is a **function-level docstring** that provides rich information about the purpose and use of the function. In addition to the important "Parameters" and "Return" value sections, we have provided a simple, human-readable form of the equation we're using to calculate TOA radiation. We also provided a link to the FAO document where this equation came from. These are all very important things to include so that someone else can figure out how we're calculating TOA radiation. These things also help us to later verify that we're performing calculations correctly.

In the **Parameters** section, we made sure to define the measurement units required for each input parameter. This is *extremely* important. In the above example, we would get a different, and incorrect, answer if `latitude` was given in radians instead of degrees. We also indicated the Python **data type,** e.g., `float`. This is also important to include because, when a computation involves the wrong data type, it is often difficult to figure out that the error is due to an incorrect data type.

**Variable names** are chosen carefully. We use `latitude` instead of a name like `x`, which is too short and could signify multiple things. We also defined a variable `latitude_radians` to distinguish when we are using latitude in radians, as opposed to degrees. While `latitude` could have been written as `latitude_degrees`, we decided to compromise clarity for a shorter name in this case, although clarity is usually most important. Ultimately, there are some subjective choices to be made, but you should consider choosing variable names that communicate the meaning *and* the measurement units of the quantity they represent. If that is hard to, **inline comments** can help to keep track of units, as we did with the inline comment next to `solar_constant`.

**Constants** are defined at the top of our function: `pi` and `solar_constant`. While many people might recognize a number like 3.14 as the number pi, defining it as a variable, `pi`, in our function makes this more clear and allows for us to control the precision of this number in one place. In general, constants should be defined only once!

**Comments** are used frequently. In particular, where there are complex calculation steps to obtain the `sunset_hour_angle`, we have a long comment above the code to explain what it does. If we need to use intermediate variables in our calculation, we can use less informative variable names, like `_hour_angle`. In Python, variable names that begin with the underscore, `_`, signal to users that the variable is less important or can be ignored.

For long calculations, like the `return` value of our function, it can be helpful to break them up into smaller, more meaningful quantities, paying attention to the order of operations. This is why we defined the `inc_radiation` variable. When a calculation can't be broken down into meaningful parts, it can improve readability to break the equation across multiple lines, as we did by creating a line break after a `+` operation.

Finally, note that we included **assertions,** using the `assert` keyword, to help ensure that users call this function correctly. Consider what happens when the wrong data type, or an out-of-range value, is provided for the `doy` argument:

In [None]:
toa_radiation(36.1, doy = 14.0)

In [None]:
toa_radiation(36.1, doy = 500)

#### &#x1F3C1; Challenge: Writing a well-documented function

Now that we've reviewed what makes a well-documented function, **write the function for the next step of our analysis.** The equation below can be used to calculate PET. Write a well-documented Python function called `potential_et()` that returns PET in units of millimeters per day (mm day$^{-1}$).

$$
\text{PET} = 0.0023 \times R_A \times \sqrt{T_{max} - T_{min}} \times (T + 17.8)
$$

The inputs to the `potential_et()` function are:

- $R_A$ is the top-of-atmosphere solar radiation, in mm H$_2$O equivalent per month
- $T_{max}$ is the monthly maximum temperature, in degrees C
- $T_{min}$ is the monthly minimum temperature, in degrees C
- $T$ is the monthly average temperature, in degrees C

**Hint:** There is an `np.sqrt()` function for calculating square roots.

Start with editing the function below.

In [None]:
def potential_et(toa_radiation, temp_max, temp_min, temp_mean):
    pass

Expand the cell below to see one solution to this problem.

In [None]:
def potential_et(toa_radiation, temp_max, temp_min, temp_mean):
    '''
    Calculates potential evapotranspiration, according to the Hargreaves
    equation:

    PET = 0.0023 * R * sqrt(Tmax - Tmin) * (Tmean + 17.8)

    Where R is the top-of-atmosphere (TOA) radiation (mm month-1); Tmax and 
    Tmin are the maximum and minimum monthly air temperatures (degrees C),
    respectively; and Tmean is monthly mean air temperature (degrees C).

    Parameters
    ----------
    toa_radiation : Number
        The top-of-atmosphere (TOA) radiation (mm day-1)
    temp_max : Number
        Maximum monthly air temperature (degrees C)
    temp_min : Number
        Minimum monthly air temperature (degrees C)
    temp_mean : Number
        Average monthly air temperature (degrees C)

    Returns
    -------
    Number
        The potential evapotranspiration (PET) in [mm day-1]
    '''
    return 0.0023 * toa_radiation * np.sqrt(temp_max - temp_min) * (temp_mean + 17.8)

If the function is written correctly, when it is called with the arguments below, you should get a value close to `3.1`.

In [None]:
potential_et(10, 30, 20, 25)

### Vectorized functions

Let's review how to call the Python functions we wrote.

In [None]:
toa_radiation(32, 200)

Recall that, because NumPy arrays are treated just like numbers, we can call the `toa_radiation()` function with an array of numbers for one of the arguments.

In [None]:
lats = np.array([22, 32, 42])

toa_radiation(lats, 200)

In computer science, using a function in this way is called **vectorization** and functions that are compatible with both a single number, as in `toa_radiation(32, 200)`, or an array of numbers, as in `toa_radiation(lats, 200)`, are called **vectorized functions.** Because we are almost always working with arrays of data, rather than single numbers, **vectorized functions** are very important.

For example, vectorization allows us to plot the TOA radiation as a function of a range of dates. The latitude value remains the same for each value of `doy`, so that mathematical operations that depend both on day-of-year and latitude look something like adding or multiplying a single number (latitude) to an array of numbers (days of the year).

In [None]:
from matplotlib import pyplot

doy = np.arange(1, 366).astype(np.int32)

rad = toa_radiation(32, doy)
pyplot.plot(doy, rad, 'k-')
pyplot.xlabel('Day of Year')
pyplot.ylabel('TOA Radiation [MJ m-2 day-1]')
pyplot.title('TOA Radiation at 32 deg N latitude')

**However, if we have two or more input arrays, they need to be compatible.** Below, we get an error because we're trying to multiple two arrays with incompatible shapes. While `lats` has only 3 elements, `doy` has 365 elements.

In [None]:
# Won't work because array shapes are incompatible
toa_radiation(lats, doy)

The only way to fix this without changing our function is to make our two arrays compatible. We could reshape them so that they are 2-dimensional, $T\times N$ arrays, where $T$ is the number of days in `doy` and $N$ is the number of different latitudes in `lats`. This works well but may be confusing in some cases, because now we have a much larger, 2-dimensional array to work with.

In [None]:
toa_radiation(lats.reshape((1,3)), doy.reshape((365,1)))

### Deriving variables from `xarray` coordinates

Vectorization will be key to computing PET for our gridded temperatures dataset, because we have multiple latitudes and multiple days-of-the-year to consider when computing TOA radiation. Consider our dataset's coordinates, below.

In [None]:
ds.coords

**We need to derive:**

- The latitude for every pixel
- The day-of-year for every pixel

The MERRA-2 temperatures dataset has 361 latitude bins represented.

In [None]:
ds.lat.shape

One step towards vectorization is to convert our 1-dimensional array of latitudes into a *grid* of latitudes, since every pixel must have a latitude value!

In [None]:
lats = ds['lat'].values
lats = lats.reshape((361, 1)).repeat(ds['lon'].size, axis = 1)
lats.shape

In [None]:
pyplot.imshow(lats)
pyplot.colorbar()

#### &#x1F6A9; <span style="color:red">Pay Attention</red>

Our latitudes seem to be upside-down! Making a plot like the one above is a great way to check our intution.

**However, this is not a problem.** Recall that, in our `xarray.Dataset`, the `lats` coordinates go from -90 (south latitude) to +90 (north latitude). We want our latitudes grid to match the coordinates of the existing `xarray.Dataset` because that is how the data variables are also structured. We can verify this by looking at the raw `values` of one of those variables.

In [None]:
pyplot.imshow(ds['T2MMIN'].values[0])

**Hence, when we add the latitude grid to our `xarray.Dataset` as a new variable, we'll want to tell `xarray` what the corresponding axes of the grid are.** Below, we do this by providing assigning a *tuple* to the new `"lat_grid"` variable:

- The first element of the tuple is another tuple, `('lat', 'lon')` that specifies the order and name of the axes. These should be axes that already exist in the `ds` Dataset.
- The second element, `lats`, is our grid of latitudes.

Below, the new `"lat_grid"` variable is shown as having `(lat, lon)` dimensions.

In [None]:
ds['lat_grid'] = (('lat', 'lon'), lats)
ds

However, the other data variables (e.g., `T2MMIN`, `T2MMAX`) have `(time, lat, lon)` dimensions. It will be much easier to do some computations later if `"lat_grid"` has the same dimensions as all the other data variables.

In [None]:
lats2 = lats.reshape((1, 361, 576)).repeat(122, axis = 0)
lats2.shape

In [None]:
ds['lat_grid'] = (('time', 'lat', 'lon'), lats2)
ds

**Now we have latitude for every pixel. What about day-of-year?** Fortunately, this is easy to derive from our `time` coordinates because they are represented as the `xarray` `datetime64[ns]` data type and therefore have date-time components like `'month'` and `'dayofyear'`. [You can read more about date-time components here.](https://docs.xarray.dev/en/stable/user-guide/time-series.html#datetime-components)

In [None]:
ds['time.dayofyear']

---

## Applying a function to independent chunks

Now that we have the data needed to compute TOA radiation, let's test our `toa_radiation()` function using data from a single date.

In [None]:
test = ds.sel(time = '2024-05-01')

rad = toa_radiation(test['lat_grid'], test['time.dayofyear'])
rad

Our result has a shape of `(1, 361, 576)` because our input `"lat_grid"` and `"time.dayofyear"` data arrays have that same shape.

We could easily add this result to our `xarray.Dataset`, `test`, using assignment:

In [None]:
test['toa_radiation'] = rad
test

In this case, the variable `rad` is already an `xarray.DataArray` with the proper dimensions. However, it's a good idea to review the more general way to add a data variable to an `xarray.Dataset`, where we specify the names and the order of the dimensions:

In [None]:
# NOTE: When rad is already an xarray.DataArray, we need to write rad.data
test['toa_radiation'] = (('time', 'lat', 'lon'), rad.data)

Let's plot the result to check if everything makes sense. We can see that TOA radiation is highest in the northern hemisphere, along a latitude band of about 25 N latitude, which makes sense for this time of year (May).

In [None]:
test['toa_radiation'].plot()

**How can we apply what we just did to the entire dataset?** Well, we didn't need to subset the dataset to a single day, as we did above. 

&#x1F449; **However, if our time-series dataset was very long, we might want to think about how best to take advantage of concurrency.** That is, we know that our problem (computing TOA radiation) is independent over space and time: the result of the calculation doesn't depend on adjacent pixels or adjacent time steps. This is a type of problem referred to as **embarassingly parallel** because, in theory, we could have an indefinitely large number of tasks executing in parallel without affecting the result.

Before we try take advantage of concurrency, let's review how `xarray` is currently chunking our data.

In [None]:
ds['lat_grid'].chunks

Whoops. Our new `"lat_grid"` dataset doesn't actually have any chunks. Let's go ahead and make some. The simplest approach would be to let each time step (each day) be a different chunk. The `'lat'` and `'lon'` dimensions can be set to `'auto'` to indicate we don't care how `xarray` chunks them.

In [None]:
# The size of the 'time' axis in each chunk should be 1: a different chunk for each time step
ds['lat_grid'] = ds['lat_grid'].chunk({'lat': 'auto', 'lon': 'auto', 'time': 1})
ds['lat_grid']

### Mapping an arbitrary function

Now we're ready to apply a function to our independent chunks. With **concurrent processing,** we call this **mapping** a function over independent subsets of our data. In `xarray`, those independent subsets are called either **chunks** or **blocks** (confusingly, `xarray` uses both terms to refer to the same thing). Hence, [the function we want to use to assign a task to independent chunks is called `xarray.map_blocks()`.](https://docs.xarray.dev/en/stable/generated/xarray.map_blocks.html)

Consider the example below, where we have an arbitrary function called `my_function()`. This function takes an `xarray.Dataset` as its single argument and returns a modified version of the `"lat_grid"` variable.

When we map `my_function()` over the independent blocks (chunks), the result is clearly a data cube with the same shape and dimensions.

In [None]:
# Trivial function that multiplies latitudes by two
def my_function(dataset):
    return dataset['lat_grid'] * 2

xr.map_blocks(my_function, ds)

Again, note how fast that code was executed. Rememeber that `xarray` uses **lazy evaluation** and nothing has happened yet. We need to call the `compute()` method to tell `xarray` we're ready to actually load the data into memory and perform our computation.

In [None]:
result = xr.map_blocks(my_function, ds).compute()
result

Now let's consider our `toa_radiation()` function. That function doesn't know anything about `xarray` variables; it assumes we are calling it with numbers or `numpy` arrays. So, we need to first create a function that wraps `toa_radiation()`, as below.

In [None]:
def toa_radiation_wrapper(dataset):
    return toa_radiation(dataset['lat_grid'], dataset['time.dayofyear'])

Again, we need to follow `map_blocks()` with `compute()` if we are ready to do the computation.

In [None]:
result = xr.map_blocks(toa_radiation_wrapper, ds)

In [None]:
toa_data = result.compute()

If we plot the same time step as before, we can see that we got the same result.

In [None]:
toa_data.sel(time = '2024-05-01').plot()

&#x1F449; One last thing we need to do is a unit conversion. $R_A$ should be multiplied by 0.408 to convert it from [MJ m-2 day-1] to [mm day-1]. Let's go ahead and store this result in our `xarray.Dataset` as well, calling the variable `'toa_radiation'`.

In [None]:
# Converting TOA Radiation from [MJ m-2 day-1] to [mm H2O day-1]
ds['toa_radiation'] = toa_data * 0.408
ds

#### &#x1F3AF; Best Practice

**We now have all the data we need to compute PET, on a consistent global grid, as a single `xarray.Dataset`! Let's make sure to include some field-level metadata, in case we end up sharing this dataset with others.**

In [None]:
ds['toa_radiation'].attrs

There are currently no attributes. We should at least make one that clarifies the measurement units of our `'toa_radiation'` field.

In [None]:
ds['toa_radiation'].attrs['units'] = 'mm H2O day-1'
ds['toa_radiation'].attrs

--- 

## Profiling computational resources

Combining `xarray` and `dask` together makes it easy to use concurrent processing. However, computers are still pretty fast, and there may be several tasks we want to complete that run very quickly on a single CPU, even if the dataset is large. Concurrent processing may be unnecessary.

#### &#x1F3AF; Best Practice

In general, we should **profile** the resource requirements of a computational workflow before we implement concurrent processing or some other solution. How long does it take to run? How much memory does it require? We should answer these questions first instead of assuming it will take too long, or it won't fit into memory.

In this case, because we're still learning, we're using a dataset that is small enough to fit into memory. We know this because, when we open the dataset using `open_mfdataset()`, `xarray` will always show us how much memory each `xarray.DataArray` requires. Below, we can see that a single MERRA-2 variable requires only about 96.77 MB.

In [None]:
ds['T2MMEAN'].data

**Our task is clearly not limited by memory size; that is, it's not *memory-bound.*** But is it CPU-bound? One way we can answer that question is to time how long it takes to run on a small part of the dataset. For example, let's try running a single day.

In [None]:
# Note there is exactly one chunk; i.e., the subsequent computation will not use more than one process
first_day = ds.sel(time = '2024-01-01')
first_day['T2MMEAN'].data

Below, we've re-written the `potential_et()` function so that it works with a MERRA-2 `xarray.Dataset`.

In [None]:
def potential_et(dataset):
    '''
    Calculates potential evapotranspiration, according to the Hargreaves
    equation:

    PET = 0.0023 * R * sqrt(Tmax - Tmin) * (Tmean + 17.8)

    Where R is the top-of-atmosphere (TOA) radiation (mm month-1); Tmax and 
    Tmin are the maximum and minimum monthly air temperatures (degrees C),
    respectively; and Tmean is monthly mean air temperature (degrees C).

    Single input argument should be an xarray.Dataset with the following
    data variables:

        T2MMIN: Maximum monthly air temperature (degrees C)
        T2MMAX: Minimum monthly air temperature (degrees C)
        T2MMEAN: Average monthly air temperature (degrees C)
        toa_radiation: The top-of-atmosphere (TOA) radiation (mm day-1)

    Parameters
    ----------
    dataset: xarray.Dataset

    Returns
    -------
    Number
        The potential evapotranspiration (PET) in [mm day-1]
    '''
    return 0.0023 * dataset['toa_radiation'] * np.sqrt(dataset['T2MMAX'] - dataset['T2MMIN']) * (dataset['T2MMEAN'] + 17.8)

### Measuring the wall time of a task with `timeit`

We can use Python's built-in `timeit` module to time how long it takes to run a block of code. `timeit` will report the **wall time** for our task: the time that would be elapsed by a wall clock while the task is running. The **wall time** is usually what we're interested in when we are evaluating how long it will take to complete a task and whether it is worth trying to divide that task among multiple (concurrent) processes.

&#x1F449; If we want to use `timeit` inside a Jupyter Notebook, we can just add the magic command `%%timeit` to the top of the Python code block we want to time.

In [None]:
%%timeit

potential_et(first_day)

**But wait!** Remember that `xarray` uses **lazy evaluation.** We need to add a call to `compute()` to make sure we're actually measuring the processing time.

In [None]:
%%timeit

# TODO Note that we shouldn't try to assign any variables inside a timeit block
potential_et(first_day).compute()

[The `timeit` module](https://docs.python.org/3/library/timeit.html) will actually run the code cell multiple times to get an average of how long it took each time. The time it takes will be different for different machines but, on at least one machine we tried this on, it took about 20 milliseconds (20 ms).

**If we want to know how long it would take to run `potential_et()` on the entire dataset using a single process,** we can make an initial esitmate by multiplying the number of chunks (number of time steps) by the time it took to run a single chunk, `first_day`.

In [None]:
# TODO About 700 ms for a single day

20e-3 * ds.time.size

So, it would take only about 2.4 seconds. Because we're working with a smaller dataset for teaching purposes, and because `potential_et()` is still a relatively simple function, processing the entire dataset will be fast.

Let's run `potential_et()` over the entire dataset.

In [None]:
%%timeit

potential_et(ds).compute()

&#x1F449; **Actually, it took a lot less than 2.4 seconds to run `potential_et()` over the entire dataset. Why?** Our estimate based on running a single chunk will always be an overestimate because software like `xarray` and the Python interpreter itself will often figure out ways to optimize a task that is executed in a loop, which is essentially what is happening when we run `potential_et()` over multiple chunks using a single CPU.

Now, let's compare the single-CPU wall time to a multiple-CPU wall time, below.

In [None]:
%%timeit

xr.map_blocks(potential_et, ds).compute()

&#x1F449; **Now our task runs even slower! What happened?** This is because, when we use multiple CPUs or **concurrent processes,** there is a certain amount of overhead associated with scheduling a task and collecting the results. Some of that overhead might include multiple processes (CPUs) waiting on their turn to read or write data, as one example. When our task, `potential_et()`, is already quite fast (as we saw above), this overhead can be significant compared to the time it actually takes to run the task.

Multi-process overhead can also be significant when there is a large number of chunks, because the scheduler has more work to do in coordinating the workload for each process. **Selecting the right chunk size is a balance.** If chunks are too large, there won't be much of a decrease in **wall time** associated with **concurrent processing** because fewer processes are being used. Memory might also be an issue with large chunks. But if chunks are too small, then processes (CPUs) will often be waiting for new data.

[Read more about chunk size and concurrency here.](https://docs.xarray.dev/en/stable/user-guide/dask.html#chunking-and-performance)

#### &#x1F6A9; <span style="color:red">Pay Attention</red>

**When we use the `%%timeit` magic command in a code cell, it can interfere with variable assignment.** In general, we should only use `%%timeit` to estimate the wall time. If we wanted to get the *result* of a computation, we should run it in a code cell *without* the `%%timeit` magic command.

In [None]:
result = potential_et(ds).compute()

# If we applied a custom function, the result might inherit its name from
#   an existing input variable; we can reset the name this way
result.name = 'Potential ET (mm day-1)'
result.sel(time = '2024-01-01').plot()

Read more about the `timeit` module here:

- https://docs.python.org/3/library/timeit.html
- https://sjvrijn.github.io/2019/09/28/how-to-timeit.html

---

In [None]:
pet = potential_et(ds)
pet

In [None]:
pet_tiaret = pet.sel(lon = -1.32, lat = 35.37, method = 'nearest')
pet_tiaret

In [None]:
pet_tiaret.plot()

In [None]:
chirps = xr.open_mfdataset('data_raw/CHIRPS/CHIRPS-v2_Africa_monthly_2014-2024.nc')
chirps_tiaret = chirps['precip'].sel(x = slice(0.8, 1.8), y = slice(36.1, 35.1))
chirps_tiaret

In [None]:
# TODO Increasing the frequency of our monthly dataset to daily using nearest-neighbor interpolation

chirps_tiaret_resampled = chirps_tiaret.isel(time = slice(120, 125)).resample(time = 'D').nearest()
chirps_tiaret_resampled

In [None]:
# TODO Note that we're using a rough approximation of the number of days in a month

chirps_tiaret_daily = chirps_tiaret_resampled.mean(['x', 'y']) / 30
chirps_tiaret_daily

In [None]:
ratio = chirps_tiaret_daily.values / pet_tiaret.values

pyplot.figure(figsize = (12, 4))
pyplot.plot(pet['time'].values, ratio, 'k-')

On its own, the graph above doesn't tell us how severe the drought in Tiaret is. Although precipitation in the region has replenished less than 5% of its lost water over the past few months, this could be part of the normal seasonal cycle. Actually, we know that January through April is a relatively wet period for Tiaret, but the question remains: **Can we compare this year to past years?**

---

## Summary

- When using **concurrent processing,** the chunk size must be selected carefully. Too many small chunks may result in a lot of overhead. With too much overhead, a task might run slower with multiple processes than with a single process.

---

### More resources

- The National Center for Atmospheric Research (NCAR) has an excellent article on ["Using `dask` to scale up your data analysis."](https://ncar.github.io/Xarray-Dask-ESDS-2024/notebooks/02-dask-intro.html)
- [Parallel computing with `dask`](docs.xarray.dev/en/stable/user-guide/dask.html)
- Sander van Rijn's [tutorial on using the `timeit` module.](https://sjvrijn.github.io/2019/09/28/how-to-timeit.html)

### References

Bohr, Mark. 2007. "A 30-year retrospective on Dennard's MOSFET scaling paper." [https://www.eng.auburn.edu/~agrawvd/COURSE/READING/LOWP/Boh07.pdf](https://www.eng.auburn.edu/~agrawvd/COURSE/READING/LOWP/Boh07.pdf)

Moore, Gordon E. 1965. "Cramming more components onto integrated circuits" *Electronics Magazine.*