In [25]:
import numpy as np
import time

In [26]:
sc

<pyspark.context.SparkContext at 0x7fa189af51d0>

In [39]:
def raise_random_ints_by_two(N, partitions=32):
    t0 = time.time()
    DATA = np.random.random_integers(0, 10, (N, ))
    out = sc.parallelize(DATA, partitions).map(lambda x: x**2).collect()
    print "{}: {}".format(p, time.time() - t0)

In [41]:
[raise_random_ints_by_two(1000, p) for p in [16, 32, 64, 128, 256, 512]]    

16: 0.0662159919739
32: 0.0676469802856
64: 0.0822758674622
128: 0.137326955795
256: 0.185862064362
512: 0.305863857269


[None, None, None, None, None, None]

In [42]:
[raise_random_ints_by_two(10000, p) for p in [16, 32, 64, 128, 256, 512]]

16: 0.0904250144958
32: 0.0744869709015
64: 0.0944068431854
128: 0.140532016754
256: 0.192093133926
512: 0.308105945587


[None, None, None, None, None, None]

In [43]:
[raise_random_ints_by_two(100000, p) for p in [16, 32, 64, 128, 256, 512]]

16: 0.129706859589
32: 0.126039981842
64: 0.129119873047
128: 0.185903787613
256: 0.241034030914
512: 0.361915111542


[None, None, None, None, None, None]

In [44]:
[raise_random_ints_by_two(1000000, p) for p in [16, 32, 64, 128, 256, 512]]

16: 0.640254020691
32: 0.5771048069
64: 0.592895030975
128: 0.53816986084
256: 0.631277799606
512: 0.728481054306


[None, None, None, None, None, None]

In [45]:
[raise_random_ints_by_two(10000000, p) for p in [16, 32, 64, 128, 256, 512]]

16: 6.87504887581
32: 6.04728484154
64: 5.30131006241
128: 5.08445286751
256: 5.0289618969
512: 5.4452791214


[None, None, None, None, None, None]

In [46]:
raise_random_ints_by_two(100000000)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.OutOfMemoryError: GC overhead limit exceeded


## Mean Contour Example

In [15]:
import sys
from netCDF4 import Dataset
import numpy as np
import tempfile
import time

In [3]:
def debug(s):
    # noop here to disable debugging
    print s
    sys.stdout.flush()

def convert(data, variable, timestep):
    variable = data.variables[variable]
    shape = variable[timestep].shape

    # For now sub select ( take about 10% of the grid )
    lat_select_index = shape[0]
    lon_select_index = shape[1]

    # Extract out the lat lon names
    dimensions = map(lambda d : d, data.dimensions)
    for d in dimensions:
        if d.startswith('lat'):
            lat_name = d
        elif d.startswith('lon'):
            lon_name = d

    contour_data = {
        'gridWidth': lon_select_index,
        'gridHeight': lat_select_index,
        'x0': float(data.variables[lon_name][0]),
        'y0': float(data.variables[lat_name][0]),
        'dx': float(data.variables[lon_name][1] - data.variables[lon_name][0]),
        'dy': float(data.variables[lat_name][1] - data.variables[lat_name][0]),
        'values': variable[timestep][:lat_select_index, :lon_select_index].reshape(variable[timestep][:lat_select_index, :lon_select_index].size).tolist()
    }

    return contour_data

def toNetCDFDataset(source, variable, data):

    def _copy_variable(target_file, source_file, variable):
        src_var = source_file.variables[variable]
        target_var = target_file.createVariable(variable, src_var.datatype, src_var.dimensions)
        target_var.setncatts({k: src_var.getncattr(k) for k in src_var.ncattrs()})
        target_var[:] = src_var[:]

    (fd, filepath) = tempfile.mkstemp()
    os.close(fd)
    output = Dataset(filepath, 'w')

    # Extract out the lat lon names,  these are not
    # consistent across the netCDF files
    dimensions = map(lambda d : d, source.dimensions)
    for d in dimensions:
        if d.startswith('lat'):
            lat_name = d
        elif d.startswith('lon'):
            lon_name = d

    output.createDimension(lat_name, len(source.dimensions[lat_name])
                           if not source.dimensions[lat_name].isunlimited()
                           else None)
    output.createDimension(lon_name, len(source.dimensions[lon_name])
                           if not source.dimensions[lon_name].isunlimited()
                           else None)
    output.createDimension('time', len(source.dimensions['time'])
                           if not source.dimensions['time'].isunlimited()
                           else None)
    _copy_variable(output, source, lat_name)
    _copy_variable(output, source, lon_name)

    if type(data) == list:
        data_type = data[0].dtype
    else:
        data_type = data.dtype

    output.createVariable(variable, data_type, ('time', lat_name, lon_name))

    if type(data) == list:
        for i in xrange(len(data)):
            output.variables[variable][i] = data[i]
    else:
        output.variables[variable][:] = data

    return output



def netcdf_mean(filepath, parameter, grid_chunk_size, partitions):
    data = Dataset(filepath)
    pr = data.variables[parameter]

    # Get the number of timesteps
    num_timesteps = data.variables['time'].size

    # For now don't break up timesteps,  just take mean across
    # Grid sections. If we set this to some other value it would
    # produce a new dataset with (num_timesteps / timesteps)  new
    # panels where each panel was the mean of that group of timesteps
    # e.g.,  if timesteps was 10  and num_timesteps was 50 we would have
    # 5 panels,  with the average of timesteps 0-10, 10-20, 20-30 etc
    timesteps = num_timesteps

    # Get number of locations per timestep
    shape = pr[0].shape
    num_grid_points = pr[0].size

    # Break timesteps into n size chunks
    timestep_chunks = []

    # Break timesteps into n size chunks
    timestep_chunks = []
    for x in xrange(0, num_timesteps, timesteps):
        if x + timesteps < num_timesteps:
            timestep_chunks.append((x, x + timesteps))
        else:
            timestep_chunks.append((x, num_timesteps))


    # Break locations into chunks
    grid_chunks = []
    for lat in xrange(0, shape[0], grid_chunk_size):
        for lon in xrange(0, shape[1], grid_chunk_size):
            grid_chunks.append((lat, lon))

    debug('Grid chunks: %d' % len(grid_chunks))

    # Function to process a set of locations for this partition
    def calculate_means(grid_chunk):
        from netCDF4 import Dataset
        import numpy as np
        data = Dataset(filepath)
        pr = data.variables[parameter]

        (lat, lon) = grid_chunk

        values = []
        for timestep_range in timestep_chunks:
            (start_timesteps, end_timesteps) = timestep_range

            mean = np.mean(pr[start_timesteps:end_timesteps,
                              lat:lat+grid_chunk_size,
                              lon:lon+grid_chunk_size], axis=0)
            values.append(mean)

        return values

    # parallelize the grid
    grid_chunks = sc.parallelize(grid_chunks, partitions)

    # Now calculate means
    means = grid_chunks.map(calculate_means)
    means = means.collect()

    timestep_means = [np.ma.empty(shape) for x in range(len(timestep_chunks))]

    i = 0
    for lat in xrange(0, shape[0], grid_chunk_size):
        for lon in xrange(0, shape[1], grid_chunk_size):
            for j in range(len(timestep_chunks)):
                chunk = means[i][j]
                timestep_means[j][lat:lat+chunk.shape[0], lon:lon+chunk.shape[1]] = chunk

            i += 1

    return toNetCDFDataset(data, parameter, timestep_means)

In [4]:
! wget --timeout=10 -P /data/tmp/ $(head /home/ubuntu/1997_http_files.txt -n 1)

--2015-10-20 20:22:48--  http://nasanex.s3.amazonaws.com/NEX-GDDP/BCSD/historical/day/atmos/pr/r1i1p1/v1.0/pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997.nc
Resolving nasanex.s3.amazonaws.com (nasanex.s3.amazonaws.com)... 54.231.168.13
Connecting to nasanex.s3.amazonaws.com (nasanex.s3.amazonaws.com)|54.231.168.13|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 669108492 (638M) [application/x-netcdf]
Saving to: ‘/data/tmp/pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997.nc.2’


2015-10-20 20:23:03 (44.3 MB/s) - ‘/data/tmp/pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997.nc.2’ saved [669108492/669108492]



In [6]:
! for ip in $(cat /home/ubuntu/ip.list); do scp /data/tmp/* $ip:/data/tmp/; done

pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 159.5MB/s   00:04    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997. 100%  638MB 127.6MB/s   00:05    
pr_day_BCSD_historical_r1i1p1_ACCESS1-0_

In [13]:
def run_mean_contour(grid_chunk_size = 20, partitions = 32):
    variable = "pr"
    file_path = "/data/tmp/pr_day_BCSD_historical_r1i1p1_ACCESS1-0_1997.nc"
    t0 = time.time()
    
    data = netcdf_mean(file_path, variable, grid_chunk_size, partitions)
    contour = convert(data, variable, 0)
    
    debug(time.time() - t0)
    return time.time() - t0

In [17]:
run_mean_contour(partitions=16)

Grid chunks: 2592
19.5290880203


In [16]:
# Default 32 partitions
run_mean_contour()

Grid chunks: 2592
11.2182228565


In [18]:
run_mean_contour(partitions=64)

Grid chunks: 2592
7.78277301788


In [19]:
run_mean_contour(partitions=128)

Grid chunks: 2592
5.35389208794


In [20]:
run_mean_contour(partitions=256)

Grid chunks: 2592
4.77282094955


In [21]:
run_mean_contour(partitions=512)

Grid chunks: 2592
4.16521501541


In [22]:
run_mean_contour(partitions=1024)

Grid chunks: 2592
4.04609704018


In [23]:
run_mean_contour(partitions=2048)

Grid chunks: 2592
4.16026520729


In [24]:
run_mean_contour(partitions=2592)

Grid chunks: 2592
4.17134904861
