# Usage of chunkindex with a slow network connexion simulated with lighttpd 
This notebook shows how to make use of chunkindex module to access data in a compressed netCDF file through a HTTP server.

In [1]:
# Append the root of the the project to PYTHONPATH
import sys
sys.path.append('..')

# Define some parameters used throughout this notebook

from pathlib import Path
import urllib

# Dataset filename
dataset_file = "ramp.nc"

# Define the dataset path
dataset_dir = Path('data') / 'www'
dataset_dir = dataset_dir.resolve()
dataset_dir.mkdir(parents=True, exist_ok=True)
dataset_path = dataset_dir / dataset_file

# Data URL
base_url = 'http://127.0.0.1:8000'
dataset_url = urllib.parse.urljoin(base_url, dataset_file)

# HTTP server speed
kbps=100

## Setup the HTTP server

Create a configuration file for the HTTP server: [lighttpd.conf](lighttpd.conf).

In [2]:
%%writefile lighttpd.conf
server.document-root = env.LIGHTTPD_DOC_ROOT
server.kbytes-per-second = env.LIGHTTPD_KBPS
server.port = 8000
dir-listing.activate = "enable" 

Overwriting lighttpd.conf


Start the HTTP server

In [3]:
%%bash --bg -s "$dataset_dir" "$kbps"

# Kill a previous lighttpd server
pkill lighttpd

# Run a HTTP server
LIGHTTPD_DOC_ROOT=${1} LIGHTTPD_KBPS=${2} lighttpd -Df lighttpd.conf

# Wait one second to let time to the server to start
sleep 3

Check that the HTTP server is running: Show the content of the data folder that is served by the HTTP server we have justed started: http://127.0.0.1:8000

In [4]:
from IPython.display import IFrame
IFrame(base_url, width=800, height=150)

## Create a simple dataset

In [5]:
# Create a compressed netcdf dataset

from pathlib import Path
import xarray as xr
import numpy as np

# Define the datatype
dtype = 'int32'
sizeof_dtype = np.dtype(dtype).itemsize

# Define the number of samples in the dataset
shape = (1024, 1024, 4)  # 16MB uncompressed
n = np.prod(shape)
chunk_size = tuple(int(s/2) for s in shape) # (512, 512, 2) i.e. chunks of 2MB uncompressed

# Create the dataset : a ramp with n samples
x = np.arange(n) + 10 * np.random.rand(n)
x = x.reshape(shape).astype(dtype)

# Create a data array with xarray
x_xr = xr.DataArray(x)
# Create a dataset
ds = xr.Dataset({'x': x_xr})

# Define the encoding options
encoding = {
    'x': {
        'dtype': dtype,
        'zlib': True,
        'complevel': 1,
        'shuffle': True,        # Shuffle set to true is a defavorable condition for the performances of chunkindex
        'chunksizes': chunk_size
    }
}
# Write the dataset to a netcdf file
ds.to_netcdf(dataset_path, encoding=encoding)
ds

## Create an index with chunkindex

Chunkindex create zran index that provides decompression starting points within the chunks.

File location: data/tmp/ramp_indexchunk.nc

In [6]:
import chunkindex
import contextlib
import os
import netCDF4

# Define the index path
index_filename = str(dataset_path.stem) + '_indexchunk.nc'
index_path = dataset_path.parent.joinpath(index_filename)
# index URL
index_url = urllib.parse.urljoin(base_url, index_filename)

# Remove it if it already exists
with contextlib.suppress(FileNotFoundError):
    os.remove(index_path)

# Create the zran index for all variables and chunks of the dataset and write it to the netcdf4 file at index_path
chunkindex.create_index(index_path, dataset_path)

# Display the resulting index for one chunk
index_x00 = xr.open_dataset(index_path, group='x/0.0.0')
index_x00

## Time to access to the data

We define a slice of data we will access to. It lays on two chunks in the 3rd dimension.

In [7]:
slice1 = ((slice(598, 600), slice(598, 600), slice(0,4)))

### Time to access to the data __without__ the index

#### With xarray

In [8]:
%%time
# Read the netcdf dataset without the use of the index (xarray and #mode=bytes style)

import time
import xarray as xr

start_time = time.time()

# Open the netCDF dataset
with xr.open_dataset(dataset_url + "#mode=bytes") as ds:
    data = ds['x'][slice1]
    print(data.max().values)

end_time = time.time() - start_time
print('Elapsed time: %.2fms' % (end_time*1000))

# Check the data decompressed
print(data.values)
assert(np.all(data == x[slice1]))

2455911
Elapsed time: 9072.70ms
[[[2451805 2451806 2451809 2451805]
  [2451807 2451809 2451815 2451812]]

 [[2455899 2455903 2455901 2455902]
  [2455901 2455910 2455904 2455911]]]
CPU times: user 51.1 ms, sys: 12.4 ms, total: 63.5 ms
Wall time: 9.08 s


#### With xarray and fsspec

In [9]:
%%time
# Read the netcdf dataset without the use of the index (fsspec and xarray style)

import time
import xarray as xr
import fsspec

start_time = time.time()

# Open the netCDF dataset
with fsspec.open(dataset_url, 'rb', block_size=32*2**10) as f:
    with xr.open_dataset(f) as ds:
        data = ds['x'][slice1]
        print(data.max().values)
        
end_time = time.time() - start_time
print('Elapsed time: %.2fms' % (end_time*1000))

# Check the data decompressed
print(data.values)
assert(np.all(data == x[slice1]))

2455911
Elapsed time: 9479.92ms
[[[2451805 2451806 2451809 2451805]
  [2451807 2451809 2451815 2451812]]

 [[2455899 2455903 2455901 2455902]
  [2455901 2455910 2455904 2455911]]]
CPU times: user 425 ms, sys: 39.7 ms, total: 464 ms
Wall time: 9.48 s


#### With h5py and fsspec

In [10]:
%%time
# Read the netcdf dataset without the use of the index (fsspec and h5py style)

import time
import h5py as h5

start_time = time.time()

# Open the netCDF dataset
with fsspec.open(dataset_url, block_size=32*2**10) as f:
    with h5.File(f) as ds:
        # Access to h5py low-level API to have a direct access to the compressed data
        data = ds['x'][slice1]
        print(data.max())

end_time = time.time() - start_time
print('Elapsed time: %.2fms' % (end_time*1000))

# Check the data decompressed
print(data)
assert(np.all(data == x[slice1]))

2455911
Elapsed time: 10094.58ms
[[[2451805 2451806 2451809 2451805]
  [2451807 2451809 2451815 2451812]]

 [[2455899 2455903 2455901 2455902]
  [2455901 2455910 2455904 2455911]]]
CPU times: user 62.9 ms, sys: 1.74 ms, total: 64.6 ms
Wall time: 10.1 s


### Time to access to the data __with__ the index

#### With the index locally

In [11]:
%%time
# Read the netcdf dataset with the use of the local index

import time

start_time = time.time()

with fsspec.open(dataset_url, block_size=32*2**10) as f:
    with open(index_path, mode='rb') as index:
        data = chunkindex.read_slice(f, index, 'x', slice1)
        print(data.max())

end_time = time.time() - start_time
print('Elapsed time: %.2fms' % (end_time*1000))

# Check the data decompressed
print(data)
assert(np.all(data == x[slice1]))

2455911
Elapsed time: 4387.58ms
[[[2451805 2451806 2451809 2451805]
  [2451807 2451809 2451815 2451812]]

 [[2455899 2455903 2455901 2455902]
  [2455901 2455910 2455904 2455911]]]
CPU times: user 309 ms, sys: 25.2 ms, total: 335 ms
Wall time: 4.39 s


#### With the index remote

*We need to improve this case !*

In [12]:
%%time
# Read the netcdf dataset with the use of the local index

import time

start_time = time.time()

with fsspec.open(dataset_url, block_size=32*2**10) as f:
    with fsspec.open(index_url, block_size=32*2**10) as index:
        data = chunkindex.read_slice(f, index, 'x', slice1)
        print(data.max())

end_time = time.time() - start_time
print('Elapsed time: %.2fms' % (end_time*1000))

# Check the data decompressed
print(data)
assert(np.all(data == x[slice1]))

2455911
Elapsed time: 16552.05ms
[[[2451805 2451806 2451809 2451805]
  [2451807 2451809 2451815 2451812]]

 [[2455899 2455903 2455901 2455902]
  [2455901 2455910 2455904 2455911]]]
CPU times: user 414 ms, sys: 42.5 ms, total: 456 ms
Wall time: 16.6 s
