Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SEC-C style internal chunking of frequency domain correlations #285

Merged
merged 11 commits into from
Jan 23, 2019
84 changes: 61 additions & 23 deletions eqcorrscan/tests/correlate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest
from multiprocessing import cpu_count
from obspy import Trace, Stream, read
from scipy.fftpack import next_fast_len

import eqcorrscan.utils.correlate as corr
from eqcorrscan.utils.correlate import register_array_xcorr
Expand Down Expand Up @@ -60,6 +61,8 @@ def wrapper(*args, **kwargs):
stream_len = 100000
template_len = 200
gap_start = 5000
fft_len = next_fast_len(max(stream_len // 4, template_len))
# fft_len = None


def generate_multichannel_stream():
Expand Down Expand Up @@ -144,9 +147,10 @@ def print_class_name(request):
@pytest.fixture(scope='module')
def array_template():
"""
return a set of templates, generated with randomn, for correlation tests.
return a set of templates, generated with random, for correlation tests.
"""
return random.randn(200, 200)
# return random.randn(2, 200)


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -181,6 +185,13 @@ def array_ccs(array_template, array_stream, pads):
print("Running %s" % name)
cc, _ = time_func(func, name, array_template, array_stream, pads)
out[name] = cc
if "fftw" in name:
print("Running fixed len fft")
fft_len = next_fast_len(
max(len(array_stream) // 4, len(array_template)))
cc, _ = time_func(func, name, array_template, array_stream, pads,
fft_len=fft_len)
out[name] = cc
return out


Expand Down Expand Up @@ -250,14 +261,21 @@ def stream_cc_output_dict(multichannel_templates, multichannel_stream):
cc_out = time_func(func, name, multichannel_templates,
multichannel_stream, cores=cores)
out["{0}.{1}".format(name, cores)] = cc_out
if "fftw" in name and cores > 1:
print("Running outer core parallel")
# Make sure that using both parallel methods gives the same
# result
if "fftw" in name:
if cores > 1:
print("Running outer core parallel")
# Make sure that using both parallel methods gives the same
# result
cc_out = time_func(
func, name, multichannel_templates, multichannel_stream,
calum-chamberlain marked this conversation as resolved.
Show resolved Hide resolved
cores=1, cores_outer=cores)
out["{0}.{1}_outer".format(name, cores)] = cc_out
print("Running shorter, fixed fft-len")
# Make sure that running with a pre-defined fft-len works
cc_out = time_func(
func, name, multichannel_templates, multichannel_stream,
cores=1, cores_outer=cores)
out["{0}.{1}_outer".format(name, cores)] = cc_out
cores=cores, fft_len=fft_len)
out["{0}.{1}_fixed_fft".format(name, cores)] = cc_out
return out


Expand All @@ -281,15 +299,23 @@ def gappy_stream_cc_output_dict(
gappy_multichannel_stream, cores=cores)
out["{0}.{1}".format(name, cores)] = (
cc_out, copy.deepcopy(log_messages['warning']))
if "fftw" in name and cores > 1:
print("Running outer core parallel")
_log_handler.reset()
# Make sure that using both parallel methods gives the same
# result
if "fftw" in name:
if cores > 1:
print("Running outer core parallel")
_log_handler.reset()
# Make sure that using both parallel methods gives the same
# result
cc_out = time_func(
func, name, multichannel_templates,
gappy_multichannel_stream, cores=1, cores_outer=cores)
out["{0}.{1}_outer".format(name, cores)] = (
cc_out, copy.deepcopy(log_messages['warning']))
print("Running shorter, fixed fft-len")
# Make sure that running with a pre-defined fft-len works
cc_out = time_func(
func, name, multichannel_templates,
gappy_multichannel_stream, cores=1, cores_outer=cores)
out["{0}.{1}_outer".format(name, cores)] = (
gappy_multichannel_stream, cores=cores, fft_len=fft_len)
out["{0}.{1}_fixed_fft".format(name, cores)] = (
cc_out, copy.deepcopy(log_messages['warning']))
return out

Expand All @@ -315,15 +341,24 @@ def gappy_real_cc_output_dict(
gappy_real_data, cores=cores)
out["{0}.{1}".format(name, cores)] = (
cc_out, copy.deepcopy(log_messages['warning']))
if "fftw" in name and cores > 1:
if "fftw" in name:
if cores > 1:
_log_handler.reset()
print("Running outer core parallel")
# Make sure that using both parallel methods gives the same
# result
cc_out = time_func(
func, name, gappy_real_data_template,
gappy_real_data, cores=1, cores_outer=cores)
out["{0}.{1}_outer".format(name, cores)] = (
cc_out, copy.deepcopy(log_messages['warning']))
_log_handler.reset()
print("Running outer core parallel")
# Make sure that using both parallel methods gives the same
# result
print("Running shorter, fixed fft-len")
# Make sure that running with a pre-defined fft-len works
cc_out = time_func(
func, name, gappy_real_data_template,
gappy_real_data, cores=1, cores_outer=cores)
out["{0}.{1}_outer".format(name, cores)] = (
func, name, gappy_real_data_template, gappy_real_data,
cores=cores, fft_len=fft_len)
out["{0}.{1}_fixed_fft".format(name, cores)] = (
cc_out, copy.deepcopy(log_messages['warning']))
return out

Expand All @@ -349,10 +384,13 @@ def test_single_channel_similar(self, array_ccs):
given the same input data """
cc_list = list(array_ccs.values())
for cc1, cc2 in itertools.combinations(cc_list, 2):
if not np.allclose(cc1, cc2, atol=self.atol):
np.save("cc.npy", cc1)
np.save("cc_1.npy", cc2)
assert np.allclose(cc1, cc2, atol=self.atol)

def test_test_autocorrelation(self, array_ccs):
""" ensure an auto correlationoccurred in each of ccs where it is
def test_autocorrelation(self, array_ccs):
""" ensure an autocorrelation occurred in each of ccs where it is
expected, defined by starting_index variable """
for name, cc in array_ccs.items():
assert np.isclose(cc[0, starting_index], 1., atol=self.atol)
Expand Down
29 changes: 16 additions & 13 deletions eqcorrscan/utils/correlate.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,10 @@ def fftw_normxcorr(templates, stream, pads, threaded=False, *args, **kwargs):
template_length = templates.shape[1]
stream_length = len(stream)
n_templates = templates.shape[0]
fftshape = next_fast_len(template_length + stream_length - 1)

fftshape = kwargs.get("fft_len")
if fftshape is None:
fftshape = next_fast_len(template_length + stream_length - 1)
assert(fftshape >= template_length)
# # Normalize and flip the templates
norm = ((templates - templates.mean(axis=-1, keepdims=True)) / (
templates.std(axis=-1, keepdims=True) * template_length))
Expand Down Expand Up @@ -652,8 +654,8 @@ def _fftw_stream_xcorr(templates, stream, *args, **kwargs):
# if `cores` or `cores_outer` passed in then use that
# else if OMP_NUM_THREADS set use that
# otherwise use all available
num_cores_inner = kwargs.get('cores')
num_cores_outer = kwargs.get('cores_outer')
num_cores_inner = kwargs.pop('cores', None)
num_cores_outer = kwargs.pop('cores_outer', None)
if num_cores_inner is None and num_cores_outer is None:
num_cores_inner = int(os.getenv("OMP_NUM_THREADS", cpu_count()))
num_cores_outer = 1
Expand All @@ -669,7 +671,7 @@ def _fftw_stream_xcorr(templates, stream, *args, **kwargs):
cccsums, tr_chans = fftw_multi_normxcorr(
template_array=template_dict, stream_array=stream_dict,
pad_array=pad_dict, seed_ids=seed_ids, cores_inner=num_cores_inner,
cores_outer=num_cores_outer)
cores_outer=num_cores_outer, *args, **kwargs)
no_chans = np.sum(np.array(tr_chans).astype(np.int), axis=0)
for seed_id, tr_chan in zip(seed_ids, tr_chans):
for chan, state in zip(chans, tr_chan):
Expand All @@ -680,7 +682,7 @@ def _fftw_stream_xcorr(templates, stream, *args, **kwargs):


def fftw_multi_normxcorr(template_array, stream_array, pad_array, seed_ids,
cores_inner, cores_outer):
cores_inner, cores_outer, *args, **kwargs):
"""
Use a C loop rather than a Python loop - in some cases this will be fast.

Expand Down Expand Up @@ -744,10 +746,12 @@ def fftw_multi_normxcorr(template_array, stream_array, pad_array, seed_ids,
n_channels = len(seed_ids)
n_templates = template_array[seed_ids[0]].shape[0]
image_len = stream_array[seed_ids[0]].shape[0]
fft_len = next_fast_len(template_len + image_len - 1)
template_array = np.ascontiguousarray([template_array[x]
for x in seed_ids],
dtype=np.float32)
fft_len = kwargs.get("fft_len")
if fft_len is None:
fft_len = next_fast_len(template_len + image_len - 1)
assert(fft_len >= template_len)
template_array = np.ascontiguousarray(
[template_array[x] for x in seed_ids], dtype=np.float32)
for x in seed_ids:
# Check that stream is non-zero and above variance threshold
if not np.all(stream_array[x] == 0) and np.var(stream_array[x]) < 1e-8:
Expand All @@ -760,9 +764,8 @@ def fftw_multi_normxcorr(template_array, stream_array, pad_array, seed_ids,
cccs = np.zeros((n_templates, image_len - template_len + 1),
np.float32)
used_chans_np = np.ascontiguousarray(used_chans, dtype=np.intc)
pad_array_np = np.ascontiguousarray([pad_array[seed_id]
for seed_id in seed_ids],
dtype=np.intc)
pad_array_np = np.ascontiguousarray(
[pad_array[seed_id] for seed_id in seed_ids], dtype=np.intc)
variance_warnings = np.ascontiguousarray(
np.zeros(n_channels), dtype=np.intc)

Expand Down
83 changes: 67 additions & 16 deletions eqcorrscan/utils/src/multi_corr.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ static inline int set_ncc(long t, long i, long template_len, long image_len, flo
int normxcorr_fftw_main(float*, long, long, float*, long, float*, long, float*, float*, float*,
fftwf_complex*, fftwf_complex*, fftwf_complex*, fftwf_plan, fftwf_plan, fftwf_plan, int*, int*, int, int*);

int normxcorr_fftw_internal(long, long, float*, long, float*, long, long, float*, float*, float*, float*, fftwf_complex*,
fftwf_complex*, fftwf_complex*, fftwf_plan, fftwf_plan, int*, int*, int,
int*);

int normxcorr_fftw_threaded(float*, long, long, float*, long, float*, long, int*, int*, int*);

void free_fftwf_arrays(int, float**, float**, float**, fftwf_complex**, fftwf_complex**, fftwf_complex**);
Expand Down Expand Up @@ -301,12 +305,8 @@ int normxcorr_fftw_main(float *templates, long template_len, long n_templates,
pb: Forward plan for image
px: Reverse plan
*/
long N2 = fft_len / 2 + 1;
long i, t, startind;
int status = 0, unused_corr = 0;
int * flatline_count = (int *) calloc(image_len - template_len + 1, sizeof(int));
double *mean, *var;
double new_samp, old_samp, sum=0.0;
long i, t, chunk, n_chunks, chunk_len, remainder, startind;
int status = 0;
float * norm_sums = (float *) calloc(n_templates, sizeof(float));

if (norm_sums == NULL) {
Expand All @@ -322,14 +322,66 @@ int normxcorr_fftw_main(float *templates, long template_len, long n_templates,
norm_sums[t] += templates[(t * template_len) + i];
}
}
for (i = 0; i < image_len; ++i)
{
image_ext[i] = image[i];
}

// Compute fft of template
fftwf_execute_dft_r2c(pa, template_ext, outa);


if (fft_len >= image_len)
{
n_chunks = 1;
chunk_len = image_len;
remainder = image_len;
} else
{
n_chunks = image_len / fft_len + (image_len % fft_len > 0);
chunk_len = image_len / n_chunks;
remainder = image_len - ((n_chunks * chunk_len) - ((template_len - 1) * n_chunks));
if (remainder > chunk_len){
n_chunks += remainder / chunk_len + (remainder % chunk_len > 0);
remainder = image_len - ((n_chunks * chunk_len) - ((template_len - 1) * n_chunks));
}
if (remainder > 0){
n_chunks += 1;
} else {
remainder = chunk_len;
}
}
for (chunk = 0; chunk < n_chunks; ++chunk)
{
startind = (chunk * chunk_len) - (chunk * (template_len - 1));
if (chunk == n_chunks - 1)
{
chunk_len = remainder;
}
memset(image_ext, 0, (size_t) fft_len * sizeof(float));
for (i = 0; i < chunk_len; ++i)
{
image_ext[i] = image[startind + i];
}
status += normxcorr_fftw_internal(
template_len, n_templates, &image[startind], chunk_len,
&ncc[startind], image_len, fft_len, template_ext,
image_ext, norm_sums, ccc, outa, outb, out, pb, px, used_chans,
pad_array, num_threads, variance_warning);
}
free(norm_sums);
return status;
}

int normxcorr_fftw_internal(long template_len, long n_templates, float *image,
long image_len, float *ncc, long ncc_len, long fft_len, float *template_ext,
float *image_ext, float *norm_sums, float *ccc, fftwf_complex *outa,
fftwf_complex *outb, fftwf_complex *out, fftwf_plan pb,
fftwf_plan px, int *used_chans, int *pad_array, int num_threads,
int *variance_warning)
{
long i, t, startind;
long N2 = fft_len / 2 + 1;
int status = 0, unused_corr = 0;
int * flatline_count = (int *) calloc(image_len - template_len + 1, sizeof(int));
double *mean, *var;
double new_samp, old_samp, sum=0.0;

// Compute fft of image
fftwf_execute_dft_r2c(pb, image_ext, outb);

Expand Down Expand Up @@ -361,7 +413,7 @@ int normxcorr_fftw_main(float *templates, long template_len, long n_templates,
return 1;
}

// Procedures for normalisation
// Procedures for normalisation
// Compute starting mean, will update this
sum = 0.0;
for (i=0; i < template_len; ++i){
Expand All @@ -383,7 +435,7 @@ int normxcorr_fftw_main(float *templates, long template_len, long n_templates,
for (t = 0; t < n_templates; ++t){
double c = ((ccc[(t * fft_len) + startind] / (fft_len * n_templates)) - norm_sums[t] * mean[0]);
c /= stdev;
status += set_ncc(t, 0, template_len, image_len, (float) c, used_chans, pad_array, ncc);
status += set_ncc(t, 0, template_len, ncc_len, (float) c, used_chans, pad_array, ncc);

}
if (var[0] <= WARN_DIFF){
Expand Down Expand Up @@ -419,7 +471,7 @@ int normxcorr_fftw_main(float *templates, long template_len, long n_templates,
for (t = 0; t < n_templates; ++t){
double c = ((ccc[(t * fft_len) + i + startind] / (fft_len * n_templates)) - norm_sums[t] * mean[i]);
c /= stdev;
status += set_ncc(t, i, template_len, image_len, (float) c, used_chans, pad_array, ncc);
status += set_ncc(t, i, template_len, ncc_len, (float) c, used_chans, pad_array, ncc);
}
}
else {
Expand All @@ -439,7 +491,6 @@ int normxcorr_fftw_main(float *templates, long template_len, long n_templates,
}

// Clean up
free(norm_sums);
free(mean);
free(var);
free(flatline_count);
Expand Down Expand Up @@ -679,7 +730,7 @@ int multi_normxcorr_fftw(float *templates, long n_templates, long template_len,
#endif
/* initialise memory to zero */
memset(template_ext[tid], 0, (size_t) fft_len * n_templates * sizeof(float));
memset(image_ext[tid], 0, (size_t) fft_len * sizeof(float));
// Done internally now. memset(image_ext[tid], 0, (size_t) fft_len * sizeof(float));

/* call the routine */
results[i] = normxcorr_fftw_main(&templates[(size_t) n_templates * template_len * i], template_len,
Expand Down