### Домашнее задание
**Часть 1 - CuPy**

1. Реализовать функцию только с использованием numpy и не в одну операцию, чтобы функция несла какой-то смысл.
2. Реализовать функцию из пункта 1 с использованием cupy.
3. Реализовать с использованием cupy + @cp.fuse()

Сравнить время выполнения.

**Часть 2 - CuDF**

Сравнить среднюю вероятность смерти мужчин и женщин по группам возрастов на основе столбца death_ind. Аналогично для вероятности госпитализации, преобразовав переменную hosp_yn по примеру переменной death_yn.

Использовать cudf и сохранить результат на диск.

**Часть 3 - Dask-CuDF**

Выполнить задание аналогичное части 2, но с использованием Dask-CuDF (реализация для вычисления на нескольких GPU).

## Настройка окружения

In [1]:
!git clone https://github.com/rapidsai/rapidsai-csp-utils.git
!python rapidsai-csp-utils/colab/pip-install.py
!pip uninstall cupy-cuda11x --yes
!pip install cupy-cuda12x
!pip install graphviz

Cloning into 'rapidsai-csp-utils'...
remote: Enumerating objects: 413, done.[K
remote: Counting objects: 100% (144/144), done.[K
remote: Compressing objects: 100% (90/90), done.[K
remote: Total 413 (delta 103), reused 58 (delta 54), pack-reused 269[K
Receiving objects: 100% (413/413), 113.29 KiB | 2.06 MiB/s, done.
Resolving deltas: 100% (205/205), done.
Collecting pynvml
  Downloading pynvml-11.5.0-py3-none-any.whl (53 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 53.1/53.1 kB 819.6 kB/s eta 0:00:00
Installing collected packages: pynvml
Successfully installed pynvml-11.5.0
***********************************************************************
Woo! Your instance has the right kind of GPU, a Tesla T4!
We will now install RAPIDS cuDF, cuML, and cuGraph via pip! 
Please stand by, should be quick...
***********************************************************************

Looking in indexes: https://pypi.org/simple, https://pypi.nvidia.com
Collecting cudf-cu11==23.10.*
  Downloadin

## Импорт библиотек

In [73]:
import os
import cupy as cp
import numpy as np

import cudf as cd

import subprocess
import dask_cudf
import dask.dataframe as dd

from dask_cuda import LocalCUDACluster
from dask.distributed import Client, progress

os.environ["PATH"] = os.pathsep = "/usr/bin/dot"

## Часть 1 - CuPy

In [74]:
np_a_arrays, np_b_arrays = np.random.random_sample(1_000_000).reshape(1000, 1000),\
                           np.random.random_sample(1_000_000).reshape(1000, 1000)

cp_a_arrays, cp_b_arrays = cp.random.random_sample(1_000_000).reshape(1000, 1000),\
                           cp.random.random_sample(1_000_000).reshape(1000, 1000)

def numpy_cos_vectors(vectors_1:"ArrayLike", vectors_2:"ArrayLike"):
    calc_norm_vectors = lambda vectors: np.sqrt(np.sum(np.power(vectors, 2), axis=1))
    return np.sum(vectors_1 * vectors_2, axis=1) / (calc_norm_vectors(vectors_1) * calc_norm_vectors(vectors_2))

def cupy_cos_vectors(vectors_1:"ArrayLike", vectors_2:"ArrayLike"):
    calc_norm_vectors = lambda vectors: cp.sqrt(cp.sum(cp.power(vectors, 2), axis=1))
    return cp.sum(vectors_1 * vectors_2, axis=1) / (calc_norm_vectors(vectors_1) * calc_norm_vectors(vectors_2))

# Добавление декоратора @fuse()
fuse_cupy_cos_vectors = cp.fuse(cupy_cos_vectors)

In [75]:
%timeit -r 10 -n 50 numpy_cos_vectors(np_a_arrays, np_b_arrays)

61.9 ms ± 7.22 ms per loop (mean ± std. dev. of 10 runs, 50 loops each)


In [76]:
%timeit -r 10 -n 50 cupy_cos_vectors(cp_a_arrays, cp_b_arrays)

4.37 ms ± 844 µs per loop (mean ± std. dev. of 10 runs, 50 loops each)


In [77]:
%timeit -r 10 -n 50 fuse_cupy_cos_vectors(cp_a_arrays, cp_b_arrays)

35.4 ms ± 935 µs per loop (mean ± std. dev. of 10 runs, 50 loops each)


**Вывод**

По результатам измерения скорости выполнения видно, что реализация функции с использованием CuPy более, чем в 10 раз быстрее реализации с помощью numpy.

Однако использование декоратора @cupy.fuse для реализации с помощью CuPy не повысило скорость выполнения, а даже наоборот замедлило выполнение функции.

## Часть 2 - CuDF

In [78]:
cdf = cd.read_parquet("covid.gzip").dropna().rename(columns={"sex": "gender"})

# Создание полей для расчета вероятности на основе факта смерти и госпитализации
for case_name in ["death", "hosp"]:
  cdf[f"{case_name}_ind"] = (cdf[f"{case_name}_yn"] == "Yes") * 1

filter_mask = ((cdf["gender"] == "Male") | (cdf["gender"] == "Female")) & (cdf["age_group"] != "Unknown")

In [79]:
%%time
cdf_stats = cdf[filter_mask]\
            .groupby(["gender", "age_group"], as_index=False)\
            .agg({"death_ind": "mean", "hosp_ind": "mean"})\
            .sort_values(by=["age_group", "gender"])

CPU times: user 65.2 ms, sys: 42 ms, total: 107 ms
Wall time: 106 ms


In [80]:
cdf_stats.to_parquet("cdf_stats.gzip")
cd.read_parquet("cdf_stats.gzip")

Unnamed: 0,gender,age_group,death_ind,hosp_ind
2,Female,0 - 9 Years,0.000223,0.014042
15,Male,0 - 9 Years,0.000257,0.0159
14,Female,10 - 19 Years,0.000134,0.009225
13,Male,10 - 19 Years,0.000191,0.008124
0,Female,20 - 29 Years,0.000345,0.019351
1,Male,20 - 29 Years,0.000725,0.013744
10,Female,30 - 39 Years,0.001076,0.030775
11,Male,30 - 39 Years,0.00229,0.031092
7,Female,40 - 49 Years,0.002635,0.039122
6,Male,40 - 49 Years,0.006098,0.054479


**Вывод**

Использование CuDF для работы с данными во многом сильно похоже на то, как это делается с помощью pandas.

Также на основе рассчитанной статистики наблюдается факт того, что вероятность смерти для мужчин всех возрастов при заболевании выше, чем у женщин. Аналогично с вероятностью госпитализации мужчин за исключением мужчин с возрастом от 10 до 29 лет.  

## Часть 3 - Dask-CuDF

In [81]:
cluster = LocalCUDACluster(dashboard_address=":8902")
client  = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37753 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:34411
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:37753/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33467'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:42465', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42465
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42056
INFO:distributed.scheduler:Receive client connection: Client-15ea54bb-9e9f-11ee-83bf-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42060


0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:37753/status,

0,1
Dashboard: http://127.0.0.1:37753/status,Workers: 1
Total threads: 1,Total memory: 12.67 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34411,Workers: 1
Dashboard: http://127.0.0.1:37753/status,Total threads: 1
Started: Just now,Total memory: 12.67 GiB

0,1
Comm: tcp://127.0.0.1:42465,Total threads: 1
Dashboard: http://127.0.0.1:45935/status,Memory: 12.67 GiB
Nanny: tcp://127.0.0.1:33467,
Local directory: /tmp/dask-scratch-space/worker-sy8gb6ug,Local directory: /tmp/dask-scratch-space/worker-sy8gb6ug
GPU: Tesla T4,GPU memory: 15.00 GiB


In [82]:
ddf = dask_cudf.read_parquet("covid.gzip").dropna().rename(columns={"sex": "gender"})

for case_name in ["death", "hosp"]:
  ddf[f"{case_name}_ind"] = (ddf[f"{case_name}_yn"] == "Yes") * 1

filter_mask = ((ddf["gender"] == "Male") | (ddf["gender"] == "Female")) & (ddf["age_group"] != "Unknown")

In [83]:
%%time
ddf_stats = ddf[filter_mask]\
            .groupby(["gender", "age_group"], as_index=False)\
            .agg({"death_ind": "mean", "hosp_ind": "mean"})\
            .sort_values(by=["age_group", "gender"])\
            .compute()

ddf_stats.to_parquet("ddf_stats.gzip")
dask_cudf.read_parquet("ddf_stats.gzip").compute()

CPU times: user 321 ms, sys: 89.5 ms, total: 411 ms
Wall time: 2.75 s


Unnamed: 0,gender,age_group,death_ind,hosp_ind
7,Female,0 - 9 Years,0.000223,0.014042
6,Male,0 - 9 Years,0.000257,0.0159
2,Female,10 - 19 Years,0.000134,0.009225
11,Male,10 - 19 Years,0.000191,0.008124
10,Female,20 - 29 Years,0.000345,0.019351
9,Male,20 - 29 Years,0.000725,0.013744
8,Female,30 - 39 Years,0.001076,0.030775
5,Male,30 - 39 Years,0.00229,0.031092
12,Female,40 - 49 Years,0.002635,0.039122
1,Male,40 - 49 Years,0.006098,0.054479
