In [1]:
import xarray as xr
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
import numpy as np
import os, glob
import pandas as pd
from datetime import datetime, timedelta
from scipy.interpolate import CubicSpline
import importlib as il
from tqdm import tqdm
import apexpy

In [2]:
# Keep this as general as possible so we don't have to go changing it later. 

In [3]:
path_to_sat_files = '../dmsp-eph/'

In [5]:
sat_full_files = glob.glob(path_to_sat_files+'*.nc')
sat_names = [i.split('/')[-1][:3] for i in sat_full_files]

In [6]:
sat_names, sat_full_files

(['F16', 'F17', 'F18'],
 ['../dmsp-eph/F16_2013_75-77_interped_SAMI-INTERP.nc',
  '../dmsp-eph/F17_2013_75-77_interped_SAMI-INTERP.nc',
  '../dmsp-eph/F18_2013_75-77_interped_SAMI-INTERP.nc'])

use open_dataset instead of open_mfdataset so the data is *actually* loaded into memory and not stored as a future or a pointer.

This is the first step to speed things up, but won't solve the problem completely.

The way you were doing this required a TON of calls to the data, when using open_mfdataset this reads from disk which will always be slower than reading from memory.

FYI, the advantaged of using dask is for datasets (or arrays) too big to fit in to memory or for doing really long & compolex calculations on a small subset of the data. Since our datasets are just 1-2 GB (and nothing is that complex) they will actually be slowed down by dask.



In [7]:
import dask

In [10]:
# this is just to prove my point...
tmp_list = []
onesatds = xr.open_mfdataset(sat_full_files[0])

print('is dask?: ', dask.is_dask_collection(onesatds))

for i in tqdm(onesatds.sat_step, desc='using dask (reading from disk)'):
    tmp_list.append(onesatds.isel(sat_step=i).glat)
    if i > 2500:
        break
        
        
tmp_list = []
del onesatds
onesatds = xr.open_dataset(sat_full_files[0])

print('is dask?: ', dask.is_dask_collection(onesatds))

for i in tqdm(onesatds.sat_step, desc='NOT reading from disk'):
    tmp_list.append(onesatds.isel(sat_step=i).glat)
    if i > 2500:
        break
    

is dask?:  True


using dask (reading from disk):   1%|█▏                                                                                                                               | 2501/258508 [00:09<15:27, 275.92it/s]


is dask?:  False


NOT reading from disk:   1%|█▎                                                                                                                                        | 2501/258508 [00:06<11:38, 366.74it/s]


In [11]:
# small speedup, but this is a VERY basic example and more calls to the dask dataset will really slow things down.

In [12]:
onesatds = xr.open_dataset(sat_full_files[0])

In [13]:
onesatds

ok here's my first stab at this..


In your code you pull the hour, minute, second out individually. I don't think we need to do that... So let me try just doing a for loop for the interpolations themselves.


Why? python for loops are really, really slow. They need to grab data from random points in memory which is inefficient; it's easier for python to use as few loops as possible. 

I'll stop it after 2500 iterations so we can see how long it takes and I don't lose my sanity.

In [14]:
# make sure we can grab the single sat step like I think we can and that it looks like I expect...

onesatds.isel(sat_step = 5) # (it does)

In [15]:
# first try at a speedup:

In [16]:
interpd_edens = [] # Array to hold interpoalted data

for t in tqdm(onesatds.sat_step.values): # Loop over sat_steps
    single_sat_step = onesatds.isel(sat_step=t) # select this sat_step. 
    # See above for the output from this.
    cs = CubicSpline(single_sat_step.sami_time, single_sat_step.edens) # the cubic spline fit
    interpd_edens.append(cs(single_sat_step.sat_time)) # append the result
    
    if t > 2500:
        break
    

  1%|█▌                                                                                                                                                              | 2501/258508 [01:08<1:57:08, 36.42it/s]


In [17]:
# LOVELY! Much better!

# I want to do better though. I think we can get this done in ~10 mins.

Next thing to optimize: when python makes lists, they're slow. By pre-allocating the memory (making a numpy array to hold the data), and then placing the data into it we don't make python keep resizing the memory allocated to something. 

Plus, lists are slow. Numpy arrays are faster and better at some things (not all)


I'm not expecting a huge speedup here. Just a small one, if anything. Most likely they'll be comparable. But having this done now will help later...


In [18]:
interpd_edens = np.zeros(onesatds.sat_step.shape)

for t in tqdm(onesatds.sat_step.values):
    
    single_sat_step = onesatds.isel(sat_step=t)
    cs = CubicSpline(single_sat_step.sami_time, single_sat_step.edens)
    interpd_edens[t] = cs(single_sat_step.sat_time)
    
    if t > 2500:
        break

  1%|█▌                                                                                                                                                              | 2501/258508 [00:59<1:41:47, 41.91it/s]


Yeah, marginal (if anything). Not great. But this will depend a lot on processor architecture and the system I'm running this on isn't optimized for single-core tasks so it doesn't see much of a difference. Some systems will be better at this than others.


Next order of business: we got rid of all but one for loop. Let's see if we can get rid of the last one too...


To do this, I'm going to make a function that does the interpolating and then use `multiprocessing` to "loop" over all of the values. Then we can use more than a single core to get this done really quickly. First, the naive approach then some refinement.


In [19]:
shorter_ds = onesatds.where(onesatds.sat_step < 10000, drop=True)

In [20]:
shorter_ds

In [21]:
from multiprocessing import Pool
from itertools import repeat

In [22]:
os.cpu_count()

48

In [23]:
# Sometimes it's faster to use all cores, sometimes not. 

# Since we're only doing this once (ideally), it doesn't really make sense to optimize it too far.

# BUT, x2go is shared so it would not be nice to use every core. specify the number of cores below.


In [24]:
def do_interpolations(ds, t_step):
    # all the x-values are the same for all interpolations. specify them here... 
    one_t = ds.isel(sat_step=t_step)
    cs = CubicSpline(one_t.sami_time, one_t.edens)
    return cs(one_t.sat_time)
    

In [25]:
do_interpolations(shorter_ds, 5) # double check that the function works.

array(701.49067631)

In [26]:
t_start = datetime.now() # just a timer
with Pool(48) as pool: # put the number of cores here.
    interpd = pool.starmap(do_interpolations, zip(repeat(shorter_ds), shorter_ds.sat_step))
    
t_total = datetime.now() - t_start

print('took %s for %i points, so the full file will take %s' %(str(t_total), 
                                                               len(interpd),
                                                               str(t_total * (onesatds.sat_step.max().values/len(interpd)))))

took 0:00:33.027569 for 10000 points, so the full file will take 0:14:13.785778


Normally, I'd say that this is fine. But just for fun let's go even faster!


So, some things slowing this down:

- the overhead of python having to make so many workers (sometimes programs run faster with *less* cores)
    - I'm sitting here watching this run annd I know that using like ~8-16 cores will actually be faster.
        - Why? one thread directs the rest. By making that single thread manage so many others, it is slow. If it only had to manage a few it would be faster. This is fun to tune & experiment with but it's not worth it for this single problem since we only need to do it once.
- Having to repeat the dataset


Let's fix the second and maybe tweak the first.



# Final result

takes 15 seconds per sat file (on the machine I'm using).

In [27]:
# i think we can speed this up by using numpy arrays to send the data into the interpolation function instead of xarray.


# rewrite the interpolation function:



In [28]:
# Since all X's are the same (same set of sami_time, define it once and only once.)

In [29]:
global X_s

X_s = onesatds.sami_time.values

In [30]:
def do_interpolations(y_values, t_step):
    cs = CubicSpline(X_s, y_values)
    return cs(t_step)
    

In [31]:
# now make the arrays with the input data and I'll explain more later.

In [32]:
y_arr_in = onesatds.edens.values.T

times_out = onesatds.sat_time.values

In [33]:
y_arr_in.shape, times_out.shape

((258508, 596), (258508,))

So in the first multi-thread example we used `repeat(shorterds)` and then pulled the values at the time we want inside of the function.


Here, we'll feed the function only the relevant info, so it does not have to be pulled out later.

So I just made sure the arrays for x & y are in the correct shape. We will be running the code on these arrays which need to be formatted correclty (same length along first axis/dimension)

watch this:


In [34]:
# test it for sanity: (should be same as above)

do_interpolations(y_arr_in[5], times_out[5])

array(701.49067631)

In [35]:
# lovely, not do the interpolating on the whole thing. 

# copying the cell above & making the changes we need ( when you adapt this to your code, don't copy it exactly. You don't need timers or the print statement, just the code to make the result):

In [36]:
t_start = datetime.now() # just a timer
with Pool(48) as pool: # put the number of cores here.
    interpd = pool.starmap(do_interpolations, zip(y_arr_in, times_out))
    
t_total = datetime.now() - t_start

print('took %s for %i points, so the full file will take %s' %(str(t_total), 
                                                               len(interpd),
                                                               str(t_total * (onesatds.sat_step.max().values/len(interpd)))))

took 0:00:15.416273 for 258508 points, so the full file will take 0:00:15.416213


In [37]:
# Cool. I would say that 15 seconds is just fine. No further optimization necessary!

# Using the outputs...

They're output to a list of one-element numpy arrays. I'll show you how to put that into a Pandas DataFrame, which you can use or use the same code to make it into an xarray dataset/dataarray

In [38]:
# putting the interpolated values from above into a use-able data structure (pandas, not x-array.)

# xarray is good for multi-dimensional stuff, but pandas dataframes are great for single dimensions.


df = pd.DataFrame()

In [39]:
df['time'] = onesatds['sat_time'].values
df['glat'] = onesatds['glat'].values
df['glon'] = onesatds['glon'].values
df['alt'] = onesatds['alt'].values
df['edens'] = np.array(interpd).flatten()

In [40]:
df

Unnamed: 0,time,glat,glon,alt,edens
0,2013-03-16 00:00:00,2.8,261.2,854.7,0.000000e+00
1,2013-03-16 00:00:01,2.9,261.2,854.7,1.414053e+02
2,2013-03-16 00:00:02,2.9,261.2,854.7,2.822699e+02
3,2013-03-16 00:00:03,3.0,261.2,854.7,4.225535e+02
4,2013-03-16 00:00:04,3.1,261.2,854.7,5.622703e+02
...,...,...,...,...,...
258503,2013-03-18 23:59:55,39.8,88.8,850.0,-3.789539e+07
258504,2013-03-18 23:59:56,39.7,88.8,850.0,-3.843019e+07
258505,2013-03-18 23:59:57,39.7,88.8,850.0,-3.843164e+07
258506,2013-03-18 23:59:58,39.6,88.8,850.0,-3.896650e+07


# where to go from here...

write this to a file. 

`df.to_csv('path/to/filename.csv', index=None)`

Then read it with 

`pd.read_csv()`

> and then you can use the time as the index column rather than the row number, or something else.

## OR,

if you love xarray now, you don't have to use pandas. but trust me, it's easier.


In [None]:
# earlier we defined a bunch of things that I said would make this more useful. Watch how easy it is to do every file, on every day.

In [None]:
NUM_WORKERS = 329847019237 # make this less than os.cpu_count()

# maybe half or three quarters the number of available processors?



In [211]:
# redefining so you know what to copy from
def do_interpolations(y_values, t_step):
    cs = CubicSpline(X_s, y_values)
    return cs(t_step)

In [None]:
for n, single_sat_file in enumerate(sat_full_files): # I like using enumerate more than zip in my for loops for multiple variables
    ds = xr.open_dataset(single_sat_file)
    
    global X_s
    X_s = ds.sami_time.values
    
    y_vals = ds.edens.values.T
    times_vals = ds.sat_time.values
    
    with Pool(NUM_WORKERS) as pool: 
        interpd = pool.starmap(do_interpolations, zip(y_arr_in, times_out))
        
    df = pd.DataFrame()
    df['time'] = ds['sat_time'].values
    df['glat'] = ds['glat'].values
    df['glon'] = ds['glon'].values
    df['alt'] = ds['alt'].values
    df['edens'] = np.array(interpd).flatten()
    
    filename = 'path/to/some/folder/' + sat_names[n] + '_time_interpolated'
    
    df.to_csv(filename, index=None)
    
    

> ***_NOTE:_*** I have not tested this. Some things may need to be changed slightly

