In [1]:
import time
import unittest
from test.test_crop import Test_max_daylight

import cupy as cp
import dask
import numpy as np
import pandas as pd
import dask.array as da
import src.geocat.comp.crop as geo
import xarray as xr
csvpath = "max_daylight_ported_test_numpy.csv"

## Plotting and Validation

In [2]:
import matplotlib.pyplot as plt
import numpy as np

def plot(allData,name):
    arraysizes = np.unique(allData['ArraySize'])
    sd_numpy = np.zeros(len(arraysizes))
    sd_cupy = np.zeros(len(arraysizes))
    y_numpy = np.zeros(len(arraysizes))
    y_cupy = np.zeros(len(arraysizes))
    plt.rcParams.update({'font.size': 15})
    for i in range(0,len(arraysizes)):
        cupydata = allData.loc[(allData['ArraySize'] == arraysizes[i]) & (allData['Approach'] == 'cupy')]
        numpydata = allData.loc[(allData['ArraySize'] == arraysizes[i]) & (allData['Approach'] == 'numpy')]
        y_cupy[i] = np.mean(cupydata['Runtime(s)'])
        y_numpy[i] = np.mean(numpydata['Runtime(s)'])
        sd_cupy[i] = np.std(cupydata['Runtime(s)'])
        sd_numpy[i] = np.std(numpydata['Runtime(s)'])
    fig, ax = plt.subplots(figsize=(9, 6))
    ax.errorbar(arraysizes, y_numpy, yerr=sd_numpy, fmt='-o',label='numpy')
    ax.errorbar(arraysizes, y_cupy, yerr=sd_cupy, fmt='-o',label='cupy')
    ax.legend();  # Add a legend.
    ax.set_xlabel('ArraySize')  # Add an x-label to the axes.
    ax.set_ylabel('Runtime(s)')  # Add a y-label to the axes.
    ax.set_title(("Test_max_daylight"))
    ax.set_xscale('log')
    ax.set_yscale('log')
    plt.savefig(name,dpi=fig.dpi)

def test_validation(res_numpy,res_cupy):
    assert np.allclose(res_numpy,res_cupy, atol=0.0000001)

## Cluster

### CPU cluster on PBS cluster

In [3]:
# import dask.array as da
from dask_jobqueue import PBSCluster
from dask.distributed import Client

clusterCPU = PBSCluster(memory='100 GB',
                     processes=3,
                     cores=3,
                     queue='casper',
                     walltime='02:00:00',
                     resource_spec='select=1:ncpus=3:mem=100gb')
print(clusterCPU.job_script())
clusterCPU.scale(1)
client = Client(clusterCPU)
#cluster.close()


  from distributed.utils import tmpfile


#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q casper
#PBS -A NTDD0005
#PBS -l select=1:ncpus=3:mem=100gb
#PBS -l walltime=02:00:00

/glade/work/hkashgar/conda-envs/geocat/bin/python -m distributed.cli.dask_worker tcp://10.12.205.44:36054 --nthreads 1 --nprocs 3 --memory-limit 31.04GiB --name dummy-name --nanny --death-timeout 60 --interface ib0 --protocol tcp://



Perhaps you already have a cluster running?
Hosting the HTTP server on port 32866 instead


In [4]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/hkashgar/proxy/32866/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/hkashgar/proxy/32866/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.205.44:36054,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/hkashgar/proxy/32866/status,Total threads: 0
Started: Just now,Total memory: 0 B


### CUDA cluster on PBS cluster

In [5]:
# import dask.array as da
from dask_jobqueue import PBSCluster
from dask.distributed import Client

clusterCUDA = PBSCluster(memory='200 GB',
                     processes=1,
                     cores=1,
                     queue='casper',
                     walltime='02:00:00',
                     resource_spec='select=1:ncpus=1:ngpus=1:mem=200gb')
print(clusterCUDA.job_script())
clusterCUDA.scale(1)
client = Client(clusterCUDA)
#cluster.close()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40117 instead


#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q casper
#PBS -A NTDD0005
#PBS -l select=1:ncpus=1:ngpus=1:mem=200gb
#PBS -l walltime=02:00:00

/glade/work/hkashgar/conda-envs/geocat/bin/python -m distributed.cli.dask_worker tcp://10.12.205.44:40999 --nthreads 1 --memory-limit 186.26GiB --name dummy-name --nanny --death-timeout 60 --interface ib0 --protocol tcp://



In [6]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/hkashgar/proxy/40117/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/hkashgar/proxy/40117/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.205.44:40999,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/hkashgar/proxy/40117/status,Total threads: 0
Started: Just now,Total memory: 0 B


## Initializing Arrays

In [7]:
max_power = 5
chunksize = 10**5
jday_gt_arrays = []
lat_gt_arrays = []
for i in range(1,max_power):
    jday_gt_arrays.append(xr.DataArray(np.linspace(1, 365, num=10**i)))
    lat_gt_arrays.append(xr.DataArray(np.linspace(-66, 66, num=10**i)))
numpy_results = []
cupy_results = []

## Testing Numpy input and CPU

In [8]:
xp = np
for i in range(max_power-1):
    jday_gt = jday_gt_arrays[i].data
    lat_gt = lat_gt_arrays[i].data
    ncl_gt1 = geo.max_daylight(jday_gt, lat_gt, use_gpu = False)

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>


In [9]:
type(ncl_gt1)

numpy.ndarray

## Testing Numpy input and GPU

In [10]:
for i in range(max_power-1):
    jday_gt = jday_gt_arrays[i].data
    lat_gt = lat_gt_arrays[i].data
    ncl_gt2 = geo.max_daylight(jday_gt, lat_gt, use_gpu = True)

<class 'cupy._core.core.ndarray'>
<class 'cupy._core.core.ndarray'>
<class 'cupy._core.core.ndarray'>
<class 'cupy._core.core.ndarray'>


In [11]:
type(ncl_gt2)

cupy._core.core.ndarray

## Testing Xarray input and CPU

In [12]:
from src.geocat.comp.comp_util import _is_duck_array

In [13]:
for i in range(max_power-1):
    jday_gt = jday_gt_arrays[i]
    lat_gt = lat_gt_arrays[i]
    ncl_gt1 = geo.max_daylight(jday_gt, lat_gt, use_gpu = False)

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>


In [14]:
type(ncl_gt1.data)

numpy.ndarray

## Testing Xarray input and GPU

In [15]:
for i in range(max_power-1):
    jday_gt = jday_gt_arrays[i]
    lat_gt = lat_gt_arrays[i]
    ncl_gt2 = geo.max_daylight(jday_gt, lat_gt, use_gpu = True)

<class 'cupy._core.core.ndarray'>
<class 'cupy._core.core.ndarray'>
<class 'cupy._core.core.ndarray'>
<class 'cupy._core.core.ndarray'>


In [16]:
ncl_gt2.data.device

<CUDA Device 0>

## Testing Dask input with CPU

In [17]:
for i in range(2):
    jday_gt = jday_gt_arrays[i].chunk(10)
    lat_gt = lat_gt_arrays[i].chunk(10)
    ncl_gt1 = geo.max_daylight(jday_gt, lat_gt, use_gpu = False)

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>


In [18]:
ncl_gt1

## Testing Dask input with GPU

In [19]:
client = Client(clusterCUDA)
for i in range(2):
    jday_gt = jday_gt_arrays[i].chunk(10)
    lat_gt = lat_gt_arrays[i].chunk(10)
    ncl_gt2 = geo.max_daylight(jday_gt, lat_gt, use_gpu = True).compute()

<class 'cupy._core.core.ndarray'>
<class 'cupy._core.core.ndarray'>


In [20]:
i = 0
jday_gt = jday_gt_arrays[i].chunk(10)
lat_gt = lat_gt_arrays[i].chunk(10)
geo.max_daylight(jday_gt, lat_gt, use_gpu = True)

<class 'cupy._core.core.ndarray'>


In [22]:
jday_gt = jday_gt_arrays[0].data
lat_gt = lat_gt_arrays[0].data

from src.geocat.comp.comp_util import _is_duck_array, _import_cupy, _convert_to_gpu_array, _convert_to_cpu_array

jday = da.from_array(jday_gt)
lat = da.from_array(lat_gt)

a = geo.max_daylight(jday, lat, True)

<class 'xarray.core.dataarray.DataArray'>


TypeError: Unsupported type <class 'xarray.core.dataarray.DataArray'>

In [None]:
isinstance(lat, p.ndarray)

In [None]:
lat

In [None]:
cp.asarray(lat, dtype='float32')

In [None]:
lat=lat.map_blocks(cp.asnumpy)

In [None]:
lat

In [None]:
lat_gt

In [None]:
_is_duck_array(lat_gt, cp)

In [None]:
cp.asarray(lat, dtype='float32')

In [None]:
a.compute()

In [None]:
a,b = _convert_to_gpu_array([jday,jday])

In [None]:
print(jday.map_blocks(cp.asarray))

In [None]:
jday.map_blocks(cp.asarray)

In [None]:
for items in jday:
    items = items.map_blocks(cp.asarray)

In [None]:
items

## Unittests with CPU!

In [None]:
client = Client(clusterCPU)
test = Test_max_daylight()

test.setUpClass()
test.test_float_input(use_gpu = False)
test.test_list_input(use_gpu = False)
test.test_numpy_input(use_gpu = False)
test.test_dask_unchunked_input(use_gpu = False)
test.test_dask_chunked_input(use_gpu = False)
test.test_input_dim(use_gpu = False)
test.test_lat_bound_warning(use_gpu = False)
test.test_lat_bound_second_warning(use_gpu = False)

## Unittests with GPU

In [None]:
client = Client(clusterCUDA)
test = Test_max_daylight()

test.setUpClass()
test.test_float_input(use_gpu = True)
test.test_list_input(use_gpu = True)
test.test_numpy_input(use_gpu = True)
#test.test_dask_unchunked_input(use_gpu = True)
#test.test_dask_chunked_input(use_gpu = True)
test.test_input_dim(use_gpu = True)
test.test_lat_bound_warning(use_gpu = True)
test.test_lat_bound_second_warning(use_gpu = True)

## Benchmark Results for different Array Sizes (NUMPY/CUPY)

In [None]:
#Test_max_daylight main body
Routine = "max_daylight"
print(Routine)
allData = pd.DataFrame()
#For different Array sizes
for i in range(1,max_power):
    ArraySize = 10**i
    tk_def = tk_def_arrays[i-1].data
    rh_def = rh_def_arrays[i-1].data
    print("Array size: ", ArraySize)
    #for numpy and cupy both
    for xp in [np,cp]:
        #calculation will be repeated 10 time to get the less biased performance results
        repsize = 10
        repeat = np.zeros([repsize])
        for rep in range(0,repsize):
            #create different sizes of arrays
            if(xp == cp):
                res_cupy = geo.max_daylight(tk_def, rh_def,True)
                cp.cuda.runtime.deviceSynchronize()
                time1 = time.time()
                res_cupy = geo.max_daylight(tk_def, rh_def,True)
                cp.cuda.runtime.deviceSynchronize()
                time2 = time.time()
                repeat[rep] = time2-time1
            else:
                time1 = time.time()
                res_numpy = geo.max_daylight(tk_def, rh_def,False)
                time2 = time.time()
                repeat[rep] = time2-time1
        #save times
        data = {'Routine': np.repeat(Routine, repsize),
                'Input':"NumPy input",
                'Approach': np.repeat(xp.__name__ , repsize),
                'ArraySize': np.repeat(ArraySize , repsize),
                'iteration' : np.arange(1,repsize+1),
                'Runtime(s)': repeat}

        allData = pd.concat([allData,pd.DataFrame(data)], ignore_index=True)
        print(xp.__name__,np.mean(repeat), "seconds")
    test_validation(res_numpy,res_cupy)
try:
    previous = pd.read_csv(csvpath)
    previous = pd.concat([previous,allData])
except FileNotFoundError:
    previous = allData
previous.to_csv(csvpath, index=False)
# plot(allData,"Test_max_daylight_ported_numpy.jpg")

## Results for different ArraySizes Xarray (with NumPy/CuPy arrays inside the Xarray)

### Test max_daylight on CPU

In [None]:
client = Client(clusterCPU)
client

In [None]:
csvpath = "max_daylight_ported_test_xarray.csv"
#Test_max_daylight main body
Routine = "max_daylight"
print(Routine)
allData = pd.DataFrame()
#For different Array sizes
for i in range(1,max_power):
    ArraySize = 10**i
    print("Array size: ", ArraySize)
    tk_def = tk_def_arrays[i-1]
    rh_def = rh_def_arrays[i-1]
    #for numpy and cupy both
    xp = np 
    repsize = 10
    repeat = np.zeros([repsize])
    for rep in range(0,repsize):
    #create different sizes of arrays
        numpy_res = geo.max_daylight(tk_def, rh_def, False).compute()
        time1 = time.time()
        numpy_res = geo.max_daylight(tk_def, rh_def, False).compute()
        time2 = time.time()
        repeat[rep] = time2-time1
    numpy_results.append(numpy_res)
    #save times
    data = {'Routine': np.repeat(Routine, repsize),
            'Input':"Xarray with NumPy input",
            'Approach': np.repeat(xp.__name__ , repsize),
            'ArraySize': np.repeat(ArraySize , repsize),
            'iteration' : np.arange(1,repsize+1),
            'Runtime(s)': repeat}
    allData = pd.concat([allData,pd.DataFrame(data)], ignore_index=True)
    print(xp.__name__,np.mean(repeat), "seconds")
try:
    previous = pd.read_csv(csvpath)
    previous = pd.concat([previous,allData])
except FileNotFoundError:
    previous = allData
previous.to_csv(csvpath, index=False)

### Test max_daylight on GPU

In [None]:
client = Client(clusterCUDA)
client

In [None]:
#Test_max_daylight main body
Routine = "max_daylight"
print(Routine)
#For different Array sizes
for i in range(1,max_power):
    ArraySize = 10**i
    rh_def = rh_def_arrays[i-1]
    tk_def = tk_def_arrays[i-1]
    print("Array size: ", ArraySize)
    #for numpy and cupy both
    xp = cp
        #calculation will be repeated 10 time to get the less biased performance results
    repsize = 10
    repeat = np.zeros([repsize])
    for rep in range(0,repsize):
        #create different sizes of arrays
        cupy_res = geo.max_daylight(tk_def, rh_def, True).compute()
        cp.cuda.runtime.deviceSynchronize()
        time1 = time.time()
        cupy_res = geo.max_daylight(tk_def, rh_def, True).compute()
        cp.cuda.runtime.deviceSynchronize()
        time2 = time.time()
        repeat[rep] = time2-time1
    cupy_results.append(cupy_res)
    #save times
    data = {'Routine': np.repeat(Routine, repsize),
            'Input':"Xarray with NumPy input",
            'Approach': np.repeat(xp.__name__ , repsize),
            'ArraySize': np.repeat(ArraySize , repsize),
            'iteration' : np.arange(1,repsize+1),
            'Runtime(s)': repeat}
    allData = pd.concat([allData,pd.DataFrame(data)], ignore_index=True)
    print(xp.__name__,np.mean(repeat), "seconds")

try:
    previous = pd.read_csv(csvpath)
    previous = pd.concat([previous,allData])
except FileNotFoundError:
    previous = allData
previous.to_csv(csvpath, index=False)
plot(allData,"Test_max_daylight_ported_xarray.jpg")

### Validation

In [None]:
#validation 
for i in range(len(numpy_results)):
    test_validation(cupy_results[i].data,numpy_results[i].data)

In [None]:
cupy_results[0]

In [None]:
numpy_results[0]

## Benchmark Results for different ArraySizes Xarray (with Dask arrays inside the Xarray, then dask array type is either NumPy or CuPy)

### Test max_daylight on CPU

In [None]:
client = Client(clusterCPU)
client

In [None]:
csvpath = "max_daylight_ported_test_dask.csv"
#Test_max_daylight main body
Routine = "max_daylight"
print(Routine)
allData = pd.DataFrame()
#For different Array sizes
for i in range(1,max_power):
    ArraySize = 10**i
    print("Array size: ", ArraySize)
    rh_def = rh_def_arrays[i-1].chunk(chunksize)
    tk_def = tk_def_arrays[i-1].chunk(chunksize)
    #for numpy and cupy both
    xp = np
    #calculation will be repeated 10 time to get the less biased performance results
    repsize = 10
    repeat = np.zeros([repsize])
    for rep in range(0,repsize):
        #create different sizes of arrays
        numpy_res = geo.max_daylight(tk_def, rh_def, False).compute()
        time1 = time.time()
        numpy_res = geo.max_daylight(tk_def, rh_def, False).compute()
        time2 = time.time()
        repeat[rep] = time2-time1
        #save times
    numpy_results.append(numpy_res)
    data = {'Routine': np.repeat(Routine, repsize),
            'Input':"Xarray with Dask array input",
            'Approach': np.repeat(xp.__name__ , repsize),
            'ArraySize': np.repeat(ArraySize , repsize),
            'iteration' : np.arange(1,repsize+1),
            'Runtime(s)': repeat}
    allData = pd.concat([allData,pd.DataFrame(data)], ignore_index=True)
    print(xp.__name__,np.mean(repeat), "seconds")

try:
    previous = pd.read_csv(csvpath)
    previous = pd.concat([previous,allData])
except FileNotFoundError:
    previous = allData
previous.to_csv(csvpath, index=False)

### Test max_daylight on GPU

In [None]:
client = Client(clusterCUDA)
client

In [None]:
#Test_max_daylight main body
Routine = "max_daylight"
print(Routine)
#For different Array sizes
for i in range(1,max_power):
    ArraySize = 10**i
    print("Array size: ", ArraySize)
    rh_def = rh_def_arrays[i-1].chunk(chunksize)
    tk_def = tk_def_arrays[i-1].chunk(chunksize)
    #for numpy and cupy both
    xp = cp
    #calculation will be repeated 10 time to get the less biased performance results
    repsize = 10
    repeat = np.zeros([repsize])
    for rep in range(0,repsize):
        #create different sizes of arrays
        cupy_res = geo.max_daylight(tk_def, rh_def, True).compute()
        cp.cuda.runtime.deviceSynchronize()
        time1 = time.time()
        cupy_res = geo.max_daylight(tk_def, rh_def, True).compute()
        cp.cuda.runtime.deviceSynchronize()
        time2 = time.time()
        repeat[rep] = time2-time1
        #save times
    cupy_results.append(cupy_res)
    data = {'Routine': np.repeat(Routine, repsize),
            'Input':"Xarray with Dask array input",
            'Approach': np.repeat(xp.__name__ , repsize),
            'ArraySize': np.repeat(ArraySize , repsize),
            'iteration' : np.arange(1,repsize+1),
            'Runtime(s)': repeat}
    new = pd.DataFrame(data)
    allData = pd.concat([allData,new], ignore_index=True)
    print(xp.__name__,np.mean(repeat), "seconds")
    #print(np.allclose(cupy_res.data,numpy_res.data,atol=0.0000001))
try:
    previous = pd.read_csv(csvpath)
    previous = pd.concat([previous,allData])
except FileNotFoundError:
    previous = allData
previous.to_csv(csvpath, index=False)

plot(allData,"Test_max_daylight_ported_dask.jpg")

In [None]:
cupy_res.data

In [None]:
numpy_res.data

### Validation

In [None]:
#validation 
for i in range(len(numpy_results)):
    test_validation(cupy_results[i].data,numpy_results[i].data)

In [None]:
numpy_results[0]

In [None]:
cupy_results[0]

In [None]:
len(numpy_results)

### Only comparing "compute()" runtime

In [None]:
chunksize = 10**5

#### on CPU

In [None]:
client = Client(clusterCPU)
client

In [None]:
csvpath = "max_daylight_ported_test_dask_compute_10.csv"
#Test_max_daylight main body
Routine = "max_daylight"
print(Routine)
allData = pd.DataFrame()
#For different Array sizes
for i in range(1,7):
    ArraySize = 10**i
    print("Array size: ", ArraySize)
    rh_def = rh_def_arrays[i-1].chunk(chunksize)
    tk_def = tk_def_arrays[i-1].chunk(chunksize)
    #for numpy and cupy both
    xp = np
    #calculation will be repeated 10 time to get the less biased performance results
    repsize = 10
    repeat = np.zeros([repsize])
    for rep in range(0,repsize):
        #create different sizes of arrays
        numpy_res = geo.max_daylight(tk_def, rh_def,False)
        numpy_res.compute()
        time1 = time.time()
        numpy_res = numpy_res.compute()
        time2 = time.time()
        repeat[rep] = time2-time1
        #save times
    numpy_results.append(numpy_res)
    data = {'Routine': np.repeat(Routine, repsize),
            'Input':"Xarray with Dask array input compute",
            'Approach': np.repeat(xp.__name__ , repsize),
            'ArraySize': np.repeat(ArraySize , repsize),
            'iteration' : np.arange(1,repsize+1),
            'Runtime(s)': repeat}
    allData = pd.concat([allData,pd.DataFrame(data)], ignore_index=True)
    print(xp.__name__,np.mean(repeat), "seconds")

try:
    previous = pd.read_csv(csvpath)
    previous = pd.concat([previous,allData])
except FileNotFoundError:
    previous = allData
previous.to_csv(csvpath, index=False)

#### on GPU

In [None]:
client = Client(clusterCUDA)
client

In [None]:
#Test_max_daylight main body
Routine = "max_daylight"
print(Routine)
#For different Array sizes
for i in range(1,7):
    ArraySize = 10**i
    print("Array size: ", ArraySize)
    rh_def = rh_def_arrays[i-1].chunk(chunksize)
    tk_def = tk_def_arrays[i-1].chunk(chunksize)
    #for numpy and cupy both
    xp = cp
    #calculation will be repeated 10 time to get the less biased performance results
    repsize = 10
    repeat = np.zeros([repsize])
    for rep in range(0,repsize):
        #create different sizes of arrays
        cupy_res = geo.max_daylight(tk_def, rh_def,True)
        cupy_res.compute()
        cp.cuda.runtime.deviceSynchronize()
        time1 = time.time()
        cupy_res = cupy_res.compute()
        cp.cuda.runtime.deviceSynchronize()
        time2 = time.time()
        repeat[rep] = time2-time1
        #save times
    cupy_results.append(cupy_res)
    data = {'Routine': np.repeat(Routine, repsize),
            'Input':"Xarray with Dask array input compute",
            'Approach': np.repeat(xp.__name__ , repsize),
            'ArraySize': np.repeat(ArraySize , repsize),
            'iteration' : np.arange(1,repsize+1),
            'Runtime(s)': repeat}
    new = pd.DataFrame(data)
    allData = pd.concat([allData,new], ignore_index=True)
    print(xp.__name__,np.mean(repeat), "seconds")
    #print(np.allclose(cupy_res.data,numpy_res.data,atol=0.0000001))
try:
    previous = pd.read_csv(csvpath)
    previous = pd.concat([previous,allData])
except FileNotFoundError:
    previous = allData
previous.to_csv(csvpath, index=False)

In [None]:
plot(allData,"Test_max_daylight_ported_compute")

#### Validation

In [None]:
#validation 
for i in range(len(numpy_results)):
    test_validation(cupy_results[i].data,numpy_results[i].data)