## Handling Big Data 

Here we will discuss tools to deal with `Big Data`. What counts as Big Data depends on the resources at hand, but people often seem to define it as 'Medium' data as when it can't fit in RAM, and Big Data when it can't fit in RAM and when it takes up a lot of disk space (e.g. on the order of Terabytes)

To deal with Medium or big data, there are useful libraries that enable parralelisation of tasks form numpy and xarray.

In [None]:
import os
import datetime
import xarray as xr
import dask
from pathlib import Path

notebook_dir = os.getcwd()
data_dir = str(Path(notebook_dir).parents[1] / 'data_samples' )
WEATHERBENCH_BUCKET = 'gs://weatherbench2/datasets/era5/1959-2022-full_37-6h-0p25deg_derived.zarr'

## Introduction to Dask

Dask is one of many computing libraries that uses 'lazy' evaluation to deal with large amounts of data. The key concept is that, rather than evaluating everything immediately, the program waits until explicitly asked to compute everything, and then it will try to make the computation as efficient as possible

Resources: 
https://tristansalles.github.io/EnviReef/6-addson/dask.html, 
https://docs.xarray.dev/en/stable/user-guide/dask.html

https://tutorial.xarray.dev/intermediate/xarray_and_dask.html

Let's start by creating a basic dask array

In [None]:
import dask.array as da

ones = da.ones((2000, 5000))
ones

In [None]:
# Specify the size of each chunk of data using chunks
darray = da.ones((2000, 5000), chunks=(1000, 1000))
darray

In [None]:
# We can perform operations on this array, but it won't actually do anything, just makes a note of the operations, and creates a graph
total_sum = darray.sum()
total_sum

In [None]:
# To actually get the answer, we can finally call compute
total_sum.compute()

From the xarray documentation

''A good rule of thumb is to create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), the cost of queueing up Dask operations can be noticeable, and you may need even larger chunksizes.''

## Loading a dataset with dask

Most of the time, we don't have to interact with dask directly, because xarray has great integration with dask.

We can simply specify the size of the chunks when opening a netcdf file, and under the hood it will store the data as a dask array

Simply specify how many chunks to separate your dimensions into. Since latitude and longitude aren't specified, the default is to have one chunk for each

In [None]:
ds = xr.open_dataset(os.path.join(data_dir, 'netcdf', 'E-OBS', 'UK_monthly.nc'), chunks={"time": 200, 'latitude': 30})

In [None]:
# Note that the data hasn't been loaded yet, it is a dask array, and just shows details of the data shape
ds['pp']

In [None]:
# you can also do this by loading it normally and using the .chunks() method

ds = xr.open_dataset(os.path.join(data_dir, 'netcdf', 'E-OBS', 'UK_monthly.nc'))
ds = ds.chunk({'time': 200})
ds['pp']

In [None]:
# Perform operations like on a regular xarray Dataset / Dataarray
# Nearly all xarray operations have been extended so that they are compatible with dask

final_ds = ds.isel(time=slice(0,100)).groupby('time.year').mean()

In [None]:
# Let's have a look at what operations the dataset will do. Note that only one chunk of the data is being acted on,
# Because of the time values we have chosen
dask.visualize(final_ds.pp.data, rankdir="LR")

In [None]:
# Finally when you are ready, call .compute() or .load() to make the computation actually happen
# The difference between load and compute: load operates in-place (i.e. the original dataset is overwritten) whereas compute returns the loaded dataset without overwriting the original

# so we can do this
computed_ds = final_ds.compute()

# Or alternatively overwrite final_ds with the in-memory dataset
final_ds.load()

# Alternatively, you can write the dataset to file, without having to load into memory, by just saving using ds.to_netcdf("filename.nc")

In [None]:
# To access the data, calling .values will force the data into memory, whilst .data will keep it as a dask array
# So using .data can be useful if you have non-standard functions that act on your data
print(type(ds['pp'].values))
print(type(ds['pp'].data))

## Multifile datasets


Here we will look at opening a dataset made up of many files, which is a common occurence. One way of doing this is to just loop over the files yourself, and combine them using e.g. `concat`. But there is also a handy function to do this in one step in xarray

In [None]:
# First we create a new multifile dataset in a new temporary directory 
uk_monthly_ds = xr.open_dataset(os.path.join(data_dir, 'netcdf', 'E-OBS', 'UK_monthly.nc'))

# Create a temporary directory, if it doesn't already exist
temp_dir_path = os.path.join(data_dir, 'tmp')
os.makedirs(temp_dir_path, exist_ok=True)


for n, t in enumerate(uk_monthly_ds.time.values):

    tmp_ds = uk_monthly_ds.sel(time=t)

    tmp_ds.expand_dims(time=1).to_netcdf(os.path.join(temp_dir_path, f"uk_monthly_{n}.nc"))

In [None]:
# Now we can load it easily using open_mfdataset
# First find all the relevant filepaths. There are many ways to do this, including a manual loop, but often glob is handy, as it uses a linux-like syntax

from glob import glob

# Find all filepaths that are of the form uk_monthly_*.nc (where * is a wildcard, so can be any character)
relevant_fps = glob(os.path.join(temp_dir_path, 'uk_monthly_*.nc'))[:20]

print(relevant_fps[:5])

In [None]:
# now pass this list of filepaths into open_mfdataset
mutifile_dataset = xr.open_mfdataset(relevant_fps, 
                                     combine='nested', 
                                     concat_dim='time')

In [None]:
# This has created us a dataset made up of dask arrays
# Note by default it has chunked the data into one chunk per time step
mutifile_dataset['pp']

In [None]:
# If you need to specify a particular chunking for the data, then it's best to specify the chunks when opening the data, rather than loading with the default
mutifile_dataset = xr.open_mfdataset(relevant_fps, 
                                     combine='nested', 
                                     concat_dim='time', 
                                     parallel=True, 
                                     chunks={'time': 4})

## Using zarr arrays

Zarr is a common format for storing large multi-dimensional arrays data, often used for storing data in the cloud.

The data is stored in a distributed (chunked) and compressed format

Often we can access zarr data using xarray, but not always, as the zarr datahas to be saved in a particular format

In [None]:
# One example is the data stored as part of the Weatherbench project. This is ERA5 reanalysis data stored as zarr files
# e.g. we can load ERA5 data from weatherbench
ds = xr.open_zarr(WEATHERBENCH_BUCKET)

In [None]:
# The result is a dataset where data is stored as dask arrays
ds['2m_temperature']

In [None]:
# As above, we can then subselect the data, and eventually call compute() to fetch the data. In this case, it's the 2-metre temperature on 1st January 2016 at 00:00am  a particular point. 

ds['2m_temperature'].sel(latitude=0).sel(longitude=0).sel(time=datetime.datetime(2016,1,1,0)).compute()

## Visualising computations using the dask client

In [None]:
from dask.distributed import Client

# This piece of code is just for a correct dashboard link mybinder.org or other JupyterHub demos
import dask
import os


client = Client()
client

Just click on the Dashboard link above to access the dashboard

Then try running the below code and it will show you information about the run, such as CPU usage

In [None]:
xr.open_mfdataset(relevant_fps, 
                                     combine='nested', 
                                     concat_dim='time', 
                                     chunks={'time': 4}).groupby('time.year').mean().compute()

In [None]:
# Finally, we need to close the client

client.close()