# Testing Rapid's Features

This notebook is my testing ground for some of RAPIDs features to try to understand how they work and whether they are useful for Sanaa's ML code. 

- dask_cuda - Version of Dask that works on GPUs.
- cudf - Pandas for GPU.
- dask_cudf - Dask wrapped Pandas for GPU.
- cuml - Similar funcitons to Sklearn but for GPU.
- cupy - Numpy for GPU.

The idea is that since all these libraries are a part of RAPIDs then they should play together nicely. And it potentially seems that they could replace the ML script that uses Sklearn to do the machine learning.

## The Libraries:

In [2]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

import cudf
import cuml
import cupy as cp
import dask_cudf

cluster = LocalCUDACluster()
client = Client(cluster)

distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize


In [3]:
print(client)

<Client: 'tcp://127.0.0.1:40905' processes=2 threads=2, memory=191.00 GiB>


In [4]:
from dask import delayed
import time

## Testing dask delayed

In [100]:
from dask import delayed
import time

In [29]:
@delayed
def inc(x):
    time.sleep(0.5)
    return x + 1

@delayed
def double(x):
    time.sleep(0.5)
    return 2 * x

@delayed
def add(x, y):
    time.sleep(0.5)
    return x + y

In [40]:
data = cp.array([1, 2, 3, 4])
output = cp.array([])

In [62]:
data

array([1, 2, 3, 4])

In [63]:
%%time

#data = [1, 2, 3, 4]
data = cp.array([1, 2, 3, 4])

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = delayed(sum)(output)
total

CPU times: user 2.62 ms, sys: 752 µs, total: 3.38 ms
Wall time: 2.72 ms


Delayed('sum-7af87d9f-4f48-4637-bacc-07ae8b6c7efa')

In [32]:
%%time

total.compute()

CPU times: user 339 ms, sys: 48.6 ms, total: 387 ms
Wall time: 3.05 s


34

## Converting main_MLR2_RF.py to GPU 

In [5]:
load_dir_dataset = "/g/data/w97/sho561/Downscale/BARRA/Training_Testing_new/"

In [6]:
train_grids = cp.array([642, 714, 720, 1207, 1233, 1682, 1728, 2348, 2817, 2855, 3002, 3114, 3346, 3809, 4233, 4322, 4615, 4623, 6081, 6145])
all_years = cp.arange(1990,2019, step=1)
train_years = cp.array([1990, 1991, 1992, 1995, 1996, 2001, 2003, 2004, 2016, 2018])
test_years = cp.array([1993, 1994, 1997, 1998, 1999, 2000, 2002, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2017, 2019]) 

featuresList = ['av_lat_hflx', 'av_mslp', 'av_netlwsfc', 'av_netswsfc', 'av_qsair_scrn', 'av_temp_scrn', 
'av_canopy_height', 'av_uwnd10m', 'av_vwnd10m', 'av_leaf_area_index', 'soil_albedo', 'soil_porosity', 'soil_bulk_density', 'topog' ]

seed = 100
ntrees = 100

- Check to see if the array is using GPU

In [21]:
cp.get_array_module(all_years)

<module 'cupy' from '/opt/conda/envs/rapids/lib/python3.9/site-packages/cupy/__init__.py'>

In [37]:
%%time

dask_sample_cudf = dask_cudf.concat([dask_cudf.read_csv(load_dir_dataset +'%s_%s_predictors_target.csv' %(642, year)) for year in all_years], axis=0).reset_index()

CPU times: user 466 ms, sys: 65.9 ms, total: 532 ms
Wall time: 505 ms


In [8]:
all_sample_cudf = dask_cudf.read_csv(load_dir_dataset +'%s_%s_predictors_target.csv' %(642, 1990))

In [33]:
cp.get_array_module(delayed_sample_cudf)

<module 'numpy' from '/opt/conda/envs/rapids/lib/python3.9/site-packages/numpy/__init__.py'>

In [50]:
dask_sample_cudf

Unnamed: 0_level_0,index,ref_coarse_cell,ref_fine_cell,year,month,day,target,av_lat_hflx,av_mslp,av_netlwsfc,av_netswsfc,av_qsair_scrn,av_temp_scrn,av_uwnd10m,av_vwnd10m,av_canopy_height,av_leaf_area_index,soil_bulk_density,soil_porosity,topog,soil_albedo,ETnw,ETn,ETne,ETw,ETe,ETsw,ETs,ETse
npartitions=29,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1
,int64,int64,int64,int64,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [16]:
@dask.delayed
def open_file_delayed(train, yr):
    
    #yr = t0.year
    #mn = str(t0.month).zfill(2)
    #day = str(t0.day).zfill(2)
    #return xr.open_dataset(glob(f'{in_path}/{yr}/gpcp_v01r03_daily_d{yr}{mn}{day}*.nc')[0],
    #                           chunks={'time':1})
    return dask_cudf.read_csv(load_dir_dataset +'%s_%s_predictors_target.csv' %(train, yr)).dropna(axis=0)

def open_datefile(t0, ds0):
    # Prepare to open the file
    #var_data = open_datefile_delayed(t0)['precip'].data
    
    #var_data = all_sample_cudf.dropna(axis=0)
    
    # Tell Dask the delayed function returns an array, and the size and type of that array
    return dask.array.from_delayed(var_data, (1, 180, 360), ds0['precip'].dtype)

SyntaxError: invalid syntax (2362713222.py, line 9)

In [None]:
delayed_sample_cudf = dask_cudf.concat([open_datefile(train_grids[0], year) for year in all_years], axis=0)

In [None]:
@dask.delayed
def opendata(file)

- Training data

In [133]:
%%time

all_sample_df = cudf.DataFrame()
for year in all_years:
        
    filename_dataset = load_dir_dataset +'%s_%s_predictors_target.csv' %(train_grids[0], year)
    single_year_df = cudf.read_csv(filename_dataset)
    # Multi layer perceptron doesn't like Null values
    single_year_df = single_year_df.dropna(axis=0)
    all_sample_df = cudf.concat([all_sample_df, single_year_df])
        
    
# concatening will mess up with the index of the combined dataframe
all_sample_df= all_sample_df.reset_index()

CPU times: user 0 ns, sys: 797 ms, total: 797 ms
Wall time: 612 ms


In [128]:
cp.get_array_module(single_year_df.to_cupy())

<module 'cupy' from '/opt/conda/envs/rapids/lib/python3.9/site-packages/cupy/__init__.py'>

In [141]:
# split the data into training and testing
index_testing = all_sample_df.where(all_sample_df['year'].isin(test_years))
#index_testing = index_testing.dropna(axis=0).to_cupy()
index_training = all_sample_df.where(all_sample_df['year'].isin(train_years))
#index_training = index_training.dropna(axis=0).to_cupy()
#in_sample_df = all_sample_df.iloc[index_training.to_cupy()]
#out_sample_df = all_sample_df.iloc[index_testing.to_cupy()]

In [39]:
index_testing = dask_sample_cudf.where(dask_sample_cudf['year'].isin(test_years))
index_training = dask_sample_cudf.where(dask_sample_cudf['year'].isin(train_years))

In [40]:
index_training

Unnamed: 0_level_0,index,ref_coarse_cell,ref_fine_cell,year,month,day,target,av_lat_hflx,av_mslp,av_netlwsfc,av_netswsfc,av_qsair_scrn,av_temp_scrn,av_uwnd10m,av_vwnd10m,av_canopy_height,av_leaf_area_index,soil_bulk_density,soil_porosity,topog,soil_albedo,ETnw,ETn,ETne,ETw,ETe,ETsw,ETs,ETse
npartitions=29,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
X = dask_sample_cudf[featuresList]
X = cuml.preprocessing.StandardScaler().fit_transform(X.compute()) 
X = X[index_training[featuresList].compute()]
#print('shape of X is ', X.shape)
#print(index_training.shape)

In [73]:
print('shape of X is ', X.shape)
print('shape of index is ', index_training[featuresList].compute().shape)

distributed.scheduler - ERROR - 
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/scheduler.py", line 4811, in update_graph
    plugin.update_graph(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/diagnostics/websocket.py", line 36, in update_graph
    self.socket.send("update_graph", {"client": client})
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/http/scheduler/info.py", line 197, in send
    self.write_message(data)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/tornado/websocket.py", line 337, in write_message
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError


shape of X is  (677888, 14)
shape of index is  (677888, 14)


In [78]:
# target
y = dask_sample_cudf.iloc[index_training.compute()]
y  = y[['target']].to_cupy()

distributed.scheduler - ERROR - 
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/scheduler.py", line 4811, in update_graph
    plugin.update_graph(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/diagnostics/websocket.py", line 36, in update_graph
    self.socket.send("update_graph", {"client": client})
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/http/scheduler/info.py", line 197, in send
    self.write_message(data)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/tornado/websocket.py", line 337, in write_message
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError


TypeError: Implicit conversion to a NumPy array is not allowed. Please use `.get()` to construct a NumPy array explicitly.

In [153]:
%%time
import random

random.seed(seed)
regr = cuml.linear_model.LinearRegression()
regr.fit(X, y)

ValueError: Expected 677888 rows but got 233856 rows.