# Working with large data volumes

> Authors: Ashley Smith
>
> Abstract: Some strategies for requesting and handling larger data volumes

Note that the code could take a long time to run, so it is better to adjust it for smaller jobs if you are just testing it out.

In [None]:
%load_ext watermark
%watermark -i -v -p viresclient,pandas,xarray,matplotlib

In [None]:
from viresclient import SwarmRequest
import datetime as dt
import xarray as xr
import glob

## Set up the request parameters - magnetic data and model evaluations

We fetch the measurements (`F`, `B_NEC`), and model values (named as `F_CHAOS`, `B_NEC_CHAOS`) - here we name the custom model as "CHAOS" but you can call it anything.

It is also possible to fetch data from all satellites at once with `request.set_collection("SW_OPER_MAGA_LR_1B", "SW_OPER_MAGB_LR_1B", ""SW_OPER_MAGC_LR_1B")`, which will be identified in the returned data by the `Spacecraft` column.

In [None]:
request = SwarmRequest()
request.set_collection("SW_OPER_MAGA_LR_1B")  # Swarm Alpha
request.set_products(
    measurements=["F",],
    # Choose between the full CHAOS model (will be a lot slower - the MMA part could use some optimisation(?))
#     models=["CHAOS = 'CHAOS-6-Core' + 'CHAOS-6-Static' + 'CHAOS-6-MMA-Primary' + 'CHAOS-6-MMA-Secondary'"],
    # ...or just the core part:
  #  models=["CHAOS = 'CHAOS-Core'"],
    sampling_step="PT60S"
)
# Quality Flags
# https://earth.esa.int/web/guest/missions/esa-eo-missions/swarm/data-handbook/level-1b-product-definitions#label-Flags_F-and-Flags_B-Values-of-MDR_MAG_LR
# NB: will need to do something different for Charlie because the ASM broke so Flags_F are bad
request.set_range_filter("Flags_F", 0, 1)
request.set_range_filter("Flags_B", 0, 1)

## Look at one day to see what the output data will look like

In [None]:
data = request.get_between(
    start_time=dt.datetime(2014,1,1),
    end_time=dt.datetime(2020,1,2)
)

In [None]:
data.as_dataframe(expand=True).head()

In [None]:
data.as_xarray()

## Three options suggested for how to deal with larger volumes (option 2 is recommended)

### 1. a) Fetch two years of data and save them directly

For this example we simplify the acquired data (lower cadence, fewer measurements)

In [None]:
request = SwarmRequest()
request.set_collection("SW_OPER_MAGA_LR_1B")  # Swarm Alpha
request.set_products(
    measurements=["F"],
    sampling_step="PT5S"
)
# Quality Flags
# https://earth.esa.int/web/guest/missions/esa-eo-missions/swarm/data-handbook/level-1b-product-definitions#label-Flags_F-and-Flags_B-Values-of-MDR_MAG_LR
# NB: will need to do something different for Charlie because the ASM broke so Flags_F are bad
request.set_range_filter("Flags_F", 0, 1)
request.set_range_filter("Flags_B", 0, 1)

The request is automatically split up and sequentially processed.

In [None]:
data = request.get_between(
    start_time=dt.datetime(2014,1,1),
    end_time=dt.datetime(2016,1,1)
)

`data` is a kind of wrapper around some temporary CDF files (more data -> more files). This means the data is accessible but not yet loaded into memory. When the variable `data` is deleted, or the current program is closed, the files will be removed. `data.contents` is a list of objects which point to each file, and each on-disk filename can be retrieved as below.

Warning: this behaviour is likely to change in the future (an underscore in the name, `_file`, indicates a private variable whose behaviour in the future is not guaranteed).

In [None]:
data.contents[0]._file.name

`data.as_dataframe()` / `data.as_xarray()` will read the files and concatenate them, but will fail if you don't have the memory to load them all simultaneously.

#### Save the files directly

The length of `data.contents` tells us the number of temporary files. Use this to make up some file names to give them.

In [None]:
filenames = [f"testfile_{n:03}.cdf" for n in range(len(data.contents))]
filenames

In [None]:
print(data.to_files.__doc__)

In [None]:
data.to_files(filenames)

From here you may use some other tool to work with the files. I will just remove them now:

In [None]:
!rm testfile_*

### 1. b) Tune the size of each generated file by making multiple requests manually

Generate lists of start and end times to use...

In [None]:
def gen_start_ends(
        start=dt.datetime(2014, 1, 1),
        end=dt.datetime(2014, 2, 1),
        ndays=1):
    delta_t = dt.timedelta(days=ndays)
    start_times = []
    end_times = []
    start_i = start
    end_i = start_i + dt.timedelta(days=1)
    while end_i <= end:
    #     print(start, end)
        start_times.append(start_i)
        end_times.append(end_i)
        start_i, end_i = end_i, end_i + delta_t
    # Append an uneven ending segment if necessary
    if end_times[-1] < end:
        start_times.append(start_i)
        end_times.append(end)
    return start_times, end_times
    
start_times, end_times = gen_start_ends(
    start=dt.datetime(2014, 1, 1),
    end=dt.datetime(2014, 2, 1),
    ndays=1
)
list(zip(start_times, end_times))

Generate some file names to use

In [None]:
filenames = [f"data_{start.strftime('%Y-%m-%d')}.cdf" for start in start_times]
filenames

Loop through these dates and file names to fetch and save each.

(Here we remove the progress bars with `show_progress=False` just to keep this notebook cleaner - in reality you might want to keep them)

In [None]:
for start, end, filename in zip(start_times, end_times, filenames):
    data = request.get_between(start, end, show_progress=False)
    data.to_file(filename, overwrite=True)

In [None]:
!rm data_2014*.cdf

### 2. Use viresclient to translate the data from CDF to xarray then to netCDF - using the chunks defined as above

There are some nicer tools for working with netCDF4/HDF files in Python so this may be preferable. This is also a point at which you may do some pre-processing before saving the files.

In [None]:
start_times, end_times = gen_start_ends(
    start=dt.datetime(2014, 1, 1),
    end=dt.datetime(2014, 2, 1))
filenames_nc = [f"data_{start.strftime('%Y-%m-%d')}.nc" for start in start_times]

for start, end, filename in zip(start_times, end_times, filenames_nc):
    try:
        data = request.get_between(start, end, show_progress=False)
        ds = data.as_xarray()
        print(f"saved {filename}")
    except RuntimeError:
        print(f"No data for {filename} - data not downloaded")
    try:
        ds.to_netcdf(filename)
    except AttributeError:
        print(f"No data for {filename} - file not created")

#### Use xarray+dask to lazily load the data

See https://xarray.pydata.org/en/stable/dask.html

Note: there is currently a bug in loading data where the variables in the file are empty (Swarm Alpha had a few problem days in January 2014 - this does not happen often). We can identify these problem files like this:

In [None]:
filenames = glob.glob("data_*.nc")
empty_files = []
for filename in filenames:
    try:
        xr.open_dataset(filename)
    except ValueError:
        empty_files.append(filename)
empty_files

In [None]:
# Remove the problem files above
filenames = [f for f in filenames if f not in empty_files]
filenames.sort()
filenames

In [None]:
# ds = xr.open_mfdataset("data_2014*.nc", combine="by_coords")
ds = xr.open_mfdataset(filenames, combine="by_coords")
ds

Note: Loading in this way has lost the source information - only the first one has been kept.

In [None]:
ds.Sources

We now have access to the dataset which is stored on disk as multiple files:

In [None]:
ds["F"].plot();

In [None]:
del ds, data

In [None]:
!rm data_*

### 3. Leave the handling to viresclient and just try to load the full data directly

There are some performance issues here, and if the total size is too big for your machine's memory then it won't be possible. We could make some changes to viresclient in the future to perform the lazy loading as above.

In [None]:
data = request.get_between(
    start_time=dt.datetime(2014,1,1),
    end_time=dt.datetime(2015,1,1)
)

In [None]:
%%time
ds = data.as_xarray()
ds

In [None]:
for i in ds.Sources[:3]:
    print(i)
print("...")
for i in ds.Sources[-3:]:
    print(i)

#### It is possible to access each file and load as xarray Dataset (rather than automatically concatenating them all)

In [None]:
data.contents[0].as_xarray()

In [None]:
# %%time
# filenames = [f"testfile_{n:03}.nc" for n in range(len(data.contents))]
# for data_part, filename in zip(data.contents, filenames):
#     data_part.as_xarray().to_netcdf(filename)

In [None]:
# ds = open_mfdataset("testfile*.nc", combine="by_coords")
# ds

In [None]:
# !rm testfile*.nc