# Waveforms to Xarray

<div class="alert alert-warning">

**Warning**: This part of obsplus is still very experimental and subject to rapid changes, proceed with caution.

</div>


The [xarray library](http://xarray.pydata.org/en/stable/) offers pandas-like data structures that are not limited to 2 dimensions (rows and columns). We have found working with seismic waveform data in such a way can be useful, but certainly is not as general as [obspy streams](https://docs.obspy.org/packages/autogen/obspy.core.stream.Stream.html). Particularly, Xarray data structures don't work well with gappy data or data with non-uniform sampling rates.

Before attempting to use these features in obsplus we highly recommend you read through the [xarray documentation](http://xarray.pydata.org/en/stable/) as the API may take a bit of time to learn. 

## Creating data arrays

Creating DataArrays from ObsPy objects is straight-forward. A single `Trace`, `Stream`, or collection (list-like) of either, or a mapping (dict-like) of either are all valid inputs. If a mapping is used, they keys will often correspond to an event id.

Conceptually the DataArray looks like this:

<img src="../../images/data_array2.png" width="350" height="350" align="left"/>
<br clear="all">

Each `DataArray` instance created by ObsPlus has three dimensions:

1. __stream_id__: The keys used in the dictionary or an integer starting at 0.

2. __seed_id__: The seed id (ie network.station.location.channel) of each trace.

3. __time__: Floating point values beginning at zero and incrementing by the sampling period.


In [1]:
import obspy
import obsplus
from obsplus import obspy_to_array

In [2]:
st = obspy.read()
# create data array from a trace
from_trace = obsplus.obspy_to_array(st[0])
# create data array from stream
from_stream = obsplus.obspy_to_array(st)
# create a data array from a list of streams
st_list = [st.copy() for _ in range(3)]
from_list = obsplus.obspy_to_array(st_list)
# create data array from a dict of streams
st_dict = {f'event{x}': st.copy() for x in range(3)}
from_dict = obsplus.obspy_to_array(st_dict)

In [3]:
print(from_trace.dims)

('stream_id', 'seed_id', 'time')


The data array created from a single trace will only have a size of one in the stream_id and seed_id columns, and the time dimension will be as long as the trace.

In [4]:
print(from_trace.shape)

(1, 1, 3000)


The dataarray created from a dict of streams, however, will be larger:

In [5]:
print(from_dict.shape)

(3, 3, 3000)


The xarray `str` representation is fairly large, but very descriptive:

In [6]:
# print value for the seed_id dimension
print(from_dict)

<xarray.DataArray (stream_id: 3, seed_id: 3, time: 3000)>
array([[[ 0.      , -0.014434, ...,  0.435445,  0.197664],
        [ 0.      ,  0.006044, ...,  0.492601,  0.254383],
        [ 0.      ,  0.006946, ...,  0.981962,  0.441969]],

       [[ 0.      , -0.014434, ...,  0.435445,  0.197664],
        [ 0.      ,  0.006044, ...,  0.492601,  0.254383],
        [ 0.      ,  0.006946, ...,  0.981962,  0.441969]],

       [[ 0.      , -0.014434, ...,  0.435445,  0.197664],
        [ 0.      ,  0.006044, ...,  0.492601,  0.254383],
        [ 0.      ,  0.006946, ...,  0.981962,  0.441969]]])
Coordinates:
  * time       (time) float64 0.0 0.01 0.02 0.03 ... 29.96 29.97 29.98 29.99
  * seed_id    (seed_id) object 'BW.RJOB..EHE' 'BW.RJOB..EHN' 'BW.RJOB..EHZ'
  * stream_id  (stream_id) object 'event0' 'event1' 'event2'
    starttime  (seed_id, stream_id) float64 1.251e+09 1.251e+09 ... 1.251e+09
Attributes:
    sampling_rate:  100.0
    stats:          {'event0': {'BW.RJOB..EHE': Stats({'sampl

DataArrays can also be converted back to dictionaries of `Stream` objects. The transformation *should* be lossless:

In [7]:
print(from_dict.ops.to_stream())

{'event0': <obspy.core.stream.Stream object at 0x7fdf64a5c470>, 'event1': <obspy.core.stream.Stream object at 0x7fdf64a5af98>, 'event2': <obspy.core.stream.Stream object at 0x7fdf64abd470>}


## Advantages of the DataArray

The DataArray has two potential advantages over the Stream representation:

   1. Efficiency 
    
   2. Organization

### Efficiency

There have been many efforts to improve efficiency of numpy/scipy functionality. Some of these, such as [Intel's MKL](https://software.intel.com/en-us/mkl) ship with scientific python distributions, like [Anaconda](https://www.anaconda.com/). These optimizations are great because you don't need to change anything about your code; it just runs faster.

Some of these optimizations involve making better use of modern hardware, particularly processors with many cores. It is much better to let the well-tested low-level libraries handle parallelism rather than implementing messy multiprocessing/multithreading python code when possible. For example, let's compare the time required to calculating FFTs for each `Trace` in a large `Stream` vs doing it all at once on a `DataArray` created with ObsPlus, both of which should return the same result. The latter will be more efficient because it allows numpy to better plan optimization strategies, as well as avoids python loops.

In [8]:
import numpy as np
import xarray as xr

print(f'numpy version: {np.__version__}')
print(f'xarray version: {xr.__version__}')

numpy version: 1.15.4
xarray version: 0.10.9


In [9]:
# create test streams with random data
import numpy as np
import obspy


num_stations = 200
sr = 100
data_length = sr * 60

traces = []
for station in ('{x:03d}'.format(x=x) for x in range(num_stations)):
    data = np.random.rand(data_length)
    stats = dict(network='OP', station=station, location='', channel='HHZ',
                 sampling_rate=sr)
    traces.append(obspy.Trace(data=data, header=stats))

st = obspy.Stream(traces=traces)
print(st)


200 Trace(s) in Stream:

OP.000..HHZ | 1970-01-01T00:00:00.000000Z - 1970-01-01T00:00:59.990000Z | 100.0 Hz, 6000 samples
...
(198 other traces)
...
OP.199..HHZ | 1970-01-01T00:00:00.000000Z - 1970-01-01T00:00:59.990000Z | 100.0 Hz, 6000 samples

[Use "print(Stream.__str__(extended=True))" to print all Traces]


In [10]:
# convert to data array
dar = obspy_to_array(st)

In [11]:
%%time
# Time looping through traces and performing fft
out1 = np.array([np.fft.rfft(tr.data) for tr in st])

CPU times: user 21.5 ms, sys: 4.6 ms, total: 26.1 ms
Wall time: 25 ms


In [12]:
%%time
# Time performing fft in one-go on large numpy block
out2 = np.fft.rfft(dar.data, axis=-1)

CPU times: user 17.2 ms, sys: 4.49 ms, total: 21.7 ms
Wall time: 20.4 ms


In [13]:
# after flattening one dimension in out2, the results should be (nearly) the same
assert np.allclose(out1, out2[0])

The xarray version is usually between 2 and 20 times faster, depending on the number of cores in your CPU and the python distribution you are using. However, this notebook may not show much of a difference if it was executed on the ReadTheDocs server. The best way to assess performance gains is to download and run this notebook yourself.

Moreover xarray provides ways of easily working with dask for distributed computing. This would be a bit more difficult using `Stream`s. See [this](http://xarray.pydata.org/en/stable/dask.html) for more details. 

### Organization

With the data organized in a 3D cube of sorts, it becomes fairly natural to slice and manipulate the data because xarray, like pandas, has intuitive and efficient indexing and sensible broadcasting. Here are a few examples of what you can do:

In [14]:
# get middle 10 seconds of data
time_mean = dar.time.mean()
duration = dar.time.max() - dar.time.min()
dar.sel(time=slice(time_mean - 5, time_mean + 5))

<xarray.DataArray (stream_id: 1, seed_id: 200, time: 1000)>
array([[[ 0.780687,  0.780221, ...,  0.089228,  0.346005],
        [ 0.505011,  0.624251, ...,  0.859623,  0.261464],
        ..., 
        [ 0.262803,  0.364587, ...,  0.120129,  0.577477],
        [ 0.381148,  0.826762, ...,  0.081735,  0.834522]]])
Coordinates:
  * time       (time) float64 25.0 25.01 25.02 25.03 ... 34.96 34.97 34.98 34.99
  * seed_id    (seed_id) object 'OP.000..HHZ' 'OP.001..HHZ' ... 'OP.199..HHZ'
  * stream_id  (stream_id) int64 0
    starttime  (seed_id, stream_id) float64 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0
Attributes:
    sampling_rate:  100.0
    stats:          {0: {'OP.000..HHZ': Stats({'sampling_rate': 100.0, 'delta...

In [15]:
# Trim seed_id (channels) to only include data from every 13th station
stations = list('OP.{x:03d}..HHZ'.format(x=x) for x in range(0, data_length, 13))
dar.where(dar.seed_id.isin(stations), drop=True)

<xarray.DataArray (stream_id: 1, seed_id: 16, time: 6000)>
array([[[ 0.810927,  0.261386, ...,  0.287553,  0.560684],
        [ 0.234642,  0.867585, ...,  0.375144,  0.748289],
        ..., 
        [ 0.062722,  0.948112, ...,  0.539999,  0.853875],
        [ 0.863649,  0.690813, ...,  0.082165,  0.423382]]])
Coordinates:
  * time       (time) float64 0.0 0.01 0.02 0.03 ... 59.96 59.97 59.98 59.99
  * seed_id    (seed_id) object 'OP.000..HHZ' 'OP.013..HHZ' ... 'OP.195..HHZ'
  * stream_id  (stream_id) int64 0
    starttime  (seed_id, stream_id) float64 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0
Attributes:
    sampling_rate:  100.0
    stats:          {0: {'OP.000..HHZ': Stats({'sampling_rate': 100.0, 'delta...

In [16]:
# simple detrend using mean
dar - dar.mean(dim='time')

<xarray.DataArray (stream_id: 1, seed_id: 200, time: 6000)>
array([[[ 0.309818, -0.239723, ..., -0.213556,  0.059575],
        [-0.315749, -0.083579, ...,  0.436938,  0.37874 ],
        ..., 
        [ 0.001515, -0.148361, ...,  0.022049, -0.364494],
        [-0.395892,  0.480718, ...,  0.221098,  0.436548]]])
Coordinates:
  * time       (time) float64 0.0 0.01 0.02 0.03 ... 59.96 59.97 59.98 59.99
  * seed_id    (seed_id) object 'OP.000..HHZ' 'OP.001..HHZ' ... 'OP.199..HHZ'
  * stream_id  (stream_id) int64 0
    starttime  (seed_id, stream_id) float64 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0

In [17]:
# calculate rolling sta/lta on each channel
sta_samples = 25
lta_samples = 200

sta = dar.rolling(time=sta_samples).mean()
lta = dar.rolling(time=lta_samples).mean()

result = sta / lta
print(result.dropna(dim='time'))

<xarray.DataArray (stream_id: 1, seed_id: 200, time: 5801)>
array([[[ 1.19446 ,  1.146797, ...,  0.856023,  0.829708],
        [ 1.14479 ,  1.113254, ...,  1.232959,  1.286623],
        ..., 
        [ 0.807364,  0.79146 , ...,  1.065028,  1.026563],
        [ 1.333705,  1.346979, ...,  1.194895,  1.206242]]])
Coordinates:
  * time       (time) float64 1.99 2.0 2.01 2.02 ... 59.96 59.97 59.98 59.99
  * seed_id    (seed_id) object 'OP.000..HHZ' 'OP.001..HHZ' ... 'OP.199..HHZ'
  * stream_id  (stream_id) int64 0
    starttime  (seed_id, stream_id) float64 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0


### Obsplus Accessor methods
Obsplus registers an [xarray accessor](http://xarray.pydata.org/en/stable/internals.html#extending-xarray) to add seismic specific functionality. These are accessed via the `ops` attribute which is available as long as obsplus is imported. Here is a brief tour of some of the methods:

In [18]:
# build a data array from the crandall dataset
import obsplus
ds = obsplus.load_dataset('crandall')
# get catalog, inventory and fetcher
cat = ds.event_client.get_events()
inv = ds.station_client.get_stations(network='TA')
fetcher = ds.get_fetcher()
# init dict of {event_id: stream} and get catalog/inventory
st_dict = dict(fetcher.yield_event_waveforms(time_before=5, time_after=30))
# create datarray
dar = obsplus.obspy_to_array_dict(st_dict)[40]

In [19]:
print(cat)

8 Event(s) in Catalog:
2007-08-06T01:44:48.810000Z | +39.462, -111.238 | 2.32 ml
2007-08-06T10:47:25.600000Z | +39.462, -111.232 | 1.92 ml
2007-08-06T08:48:40.010000Z | +39.464, -111.228 | 4.2 mb
2007-08-07T02:14:24.080000Z | +39.463, -111.223 | 1.17 ml
2007-08-07T03:44:18.470000Z | +39.462, -111.215 | 1.68 ml
2007-08-07T02:05:04.490000Z | +39.465, -111.225 | 2.44 ml
2007-08-07T21:42:51.130000Z | +39.463, -111.220 | 1.88 ml
2007-08-07T07:13:05.760000Z | +39.461, -111.224 | 2.55 ml


In [20]:
# select channels based on seed_ids (wildcards permitted)
filtered_dar = dar.ops.sel_sid('TA.*.*.BHZ')

# filter check
assert len(filtered_dar.seed_id)
for seed_id in filtered_dar.seed_id.values:
    assert seed_id.endswith('BHZ')
    assert seed_id.startswith('TA')

print(filtered_dar.seed_id)

<xarray.DataArray 'seed_id' (seed_id: 12)>
array(['TA.O15A..BHZ', 'TA.O16A..BHZ', 'TA.O18A..BHZ', 'TA.P15A..BHZ',
       'TA.P16A..BHZ', 'TA.P17A..BHZ', 'TA.P18A..BHZ', 'TA.Q15A..BHZ',
       'TA.Q16A..BHZ', 'TA.Q18A..BHZ', 'TA.R16A..BHZ', 'TA.R17A..BHZ'], dtype=object)
Coordinates:
  * seed_id  (seed_id) object 'TA.O15A..BHZ' 'TA.O16A..BHZ' ... 'TA.R17A..BHZ'


In [21]:
# Calcule rffts (note the "time" dimension has changed to "frequency")
rfft = dar.ops.rfft()
# print dimensions and corresponding size
print(dict(zip(rfft.dims, rfft.shape)))

{'stream_id': 8, 'seed_id': 36, 'frequency': 701}


In [22]:
# Calculate irffts
irfft = rfft.ops.irfft()
# print dimensions and corresponding size
print(dict(zip(irfft.dims, irfft.shape)))

{'stream_id': 8, 'seed_id': 36, 'time': 1400}


In [23]:
# convert back into a dict of streams
st_dict = dar.ops.to_stream()
for event_id, st in st_dict.items():
    print(f'event_id: {event_id} stream_size: {len(st)}')

event_id: smi:local/248828 stream_size: 33
event_id: smi:local/248843 stream_size: 33
event_id: smi:local/248839 stream_size: 33
event_id: smi:local/248883 stream_size: 36
event_id: smi:local/248887 stream_size: 36
event_id: smi:local/248882 stream_size: 36
event_id: smi:local/248925 stream_size: 36
event_id: smi:local/248891 stream_size: 36


In [24]:
# attach event information
dar_with_events = dar.ops.attach_events(cat)
# note the extra coords that have now been attached
print(dar_with_events.coords)

Coordinates:
  * seed_id      (seed_id) object 'TA.O15A..BHE' ... 'TA.R17A..BHZ'
  * time         (time) float64 0.0 0.025 0.05 0.075 ... 34.92 34.95 34.98 35.0
  * stream_id    (stream_id) object 'smi:local/248828' ... 'smi:local/248891'
    starttime    (seed_id, stream_id) float64 1.186e+09 1.186e+09 ... 1.186e+09
    origin_time  (stream_id) float64 1.186e+09 1.186e+09 ... 1.187e+09 1.186e+09
    p_time       (stream_id, seed_id) float64 nan nan ... nan 1.186e+09
    s_time       (stream_id, seed_id) float64 nan nan nan nan ... nan nan nan


In [25]:
# trim all waveforms to the mean of the P times picked on the same station
# if no such P pick exists the channels will not be trimmed
out = dar_with_events.ops.trim('p_time', aggregate_by='station')
print(out)

<xarray.DataArray (stream_id: 8, seed_id: 36, time: 0)>
array([], shape=(8, 36, 0), dtype=float64)
Coordinates:
  * time         (time) float64 
    starttime    (stream_id, seed_id) float64 1.186e+09 1.186e+09 ... 1.186e+09
    origin_time  (stream_id, seed_id) float64 1.186e+09 1.186e+09 ... 1.186e+09
    p_time       (stream_id, seed_id) float64 nan nan ... nan 1.186e+09
    s_time       (stream_id, seed_id) float64 nan nan nan nan ... nan nan nan
    station      (stream_id, seed_id) object 'TA.O15A' 'TA.O15A' ... 'TA.R17A'
  * stream_id    (stream_id) object 'smi:local/248828' ... 'smi:local/248891'
  * seed_id      (seed_id) object 'TA.O15A..BHE' ... 'TA.R17A..BHZ'
Attributes:
    sampling_rate:  40.0
    stats:          {'smi:local/248828': {'TA.O15A..BHE': Stats({'sampling_ra...
    events:         8 Event(s) in Catalog:\n2007-08-06T01:44:48.810000Z | +39...


In [26]:
# iterate over slices of stations
for seed_dar in dar.ops.iter_seed('station'):
    print(f'got channels: {set(seed_dar.seed_id.values)} in yielded data array')

got channels: {'TA.O15A..BHE', 'TA.O15A..BHN', 'TA.O15A..BHZ'} in yielded data array
got channels: {'TA.O16A..BHE', 'TA.O16A..BHZ', 'TA.O16A..BHN'} in yielded data array
got channels: {'TA.O18A..BHZ', 'TA.O18A..BHE', 'TA.O18A..BHN'} in yielded data array
got channels: {'TA.P15A..BHE', 'TA.P15A..BHZ', 'TA.P15A..BHN'} in yielded data array
got channels: {'TA.P16A..BHN', 'TA.P16A..BHE', 'TA.P16A..BHZ'} in yielded data array
got channels: {'TA.P17A..BHZ', 'TA.P17A..BHE', 'TA.P17A..BHN'} in yielded data array
got channels: {'TA.P18A..BHE', 'TA.P18A..BHN', 'TA.P18A..BHZ'} in yielded data array
got channels: {'TA.Q15A..BHZ', 'TA.Q15A..BHE', 'TA.Q15A..BHN'} in yielded data array
got channels: {'TA.Q16A..BHZ', 'TA.Q16A..BHN', 'TA.Q16A..BHE'} in yielded data array
got channels: {'TA.Q18A..BHZ', 'TA.Q18A..BHE', 'TA.Q18A..BHN'} in yielded data array
got channels: {'TA.R16A..BHZ', 'TA.R16A..BHN', 'TA.R16A..BHE'} in yielded data array
got channels: {'TA.R17A..BHE', 'TA.R17A..BHZ', 'TA.R17A..BHN'} in

In [27]:
# calculate the norm of the data recorded at each station
from numpy.linalg import norm
dar.ops.agg(np.linalg.norm, level='station')

<xarray.DataArray (stream_id: 8, seed_id: 12, time: 1401)>
array([[[ 1080.547084,  1072.754865, ...,  1154.632409,  1180.935646],
        [  799.563006,   842.579967, ...,   750.613749,   769.885706],
        ..., 
        [         nan,          nan, ...,          nan,          nan],
        [ 1126.902392,  1130.455218, ...,  1136.072181,  1130.280496]],

       [[  953.637772,   966.333276, ...,  1107.365342,  1080.58734 ],
        [  778.481856,   765.363312, ...,   787.934642,   826.899631],
        ..., 
        [         nan,          nan, ...,          nan,          nan],
        [ 1072.4486  ,  1071.925837, ...,  1092.773078,  1094.025137]],

       ..., 
       [[ 1096.374024,  1092.910792, ...,  1125.569189,  1144.432611],
        [  888.015203,   824.355506, ...,   883.946831,   913.840249],
        ..., 
        [  500.027999,   483.141801, ...,   320.170267,   226.072997],
        [ 1168.363813,  1173.752955, ...,  1103.527979,  1131.438907]],

       [[ 1115.726669,  1124