# 레이 데이터셋을 활용한 데이터 분산 처리

### 생성

In [1]:
import ray

In [None]:
# in case of ImportError: cannot import name 'ExtensionArrayFormatter
!pip install pandas==2.1.4

In [2]:
ds = ray.data.range(10000)

print(ds.count())
print(ds.take(5))
print(ds.schema())

2024-03-15 12:35:07,266	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


10000
[0, 1, 2, 3, 4]
<class 'int'>


### 스토리지에 읽고 쓰기

In [4]:
ray.data.range(10000).repartition(5).write_csv("local_dir")
ds = ray.data.read_csv("local_dir")
print(ds.count())

Read: 100%|███████████████████████████████████| 33/33 [00:00<00:00, 1737.60it/s]
Repartition: 100%|███████████████████████████████| 5/5 [00:00<00:00, 906.25it/s]
Write Progress: 100%|███████████████████████████| 5/5 [00:00<00:00, 1736.34it/s]
Read progress: 100%|████████████████████████████| 5/5 [00:00<00:00, 2529.74it/s]

10000





### 빌트인 변환 기능

In [6]:
ds1 = ray.data.range(10000)
ds2 = ray.data.range(10000)
ds3 = ds1.union(ds2)
print(ds3.count())

ds3 = ds3.filter(lambda x: x % 2 == 0)
print(ds3.count())

ds3 = ds3.sort()
print(ds3.take(5))





20000


Read->Filter: 100%|███████████████████████████| 66/66 [00:00<00:00, 1177.32it/s]


10000


Sort Sample: 100%|███████████████████████████| 66/66 [00:00<00:00, 14758.44it/s]
Shuffle Map: 100%|█████████████████████████████| 66/66 [00:00<00:00, 986.65it/s]
Shuffle Reduce: 100%|██████████████████████████| 66/66 [00:00<00:00, 555.64it/s]

[0, 0, 2, 2, 4]





### 블록 및 파티셔닝

In [7]:
ds1 = ray.data.range(10000)
print(ds1.num_blocks())
ds2 = ray.data.range(10000)
print(ds2.num_blocks())
ds3 = ds1.union(ds2)
print(ds3.num_blocks())

print(ds3.repartition(200).num_blocks())

33
33
66


Read: 100%|████████████████████████████████████| 66/66 [00:00<00:00, 231.95it/s]
Repartition: 100%|██████████████████████████| 200/200 [00:00<00:00, 3337.41it/s]

200





### 스키마와 데이터 포멧

In [9]:
ds = ray.data.from_items([{"id": "abc", "value":1}])
print(ds.schema())

id: string
value: int64


### 데이터셋 연산

In [11]:

# 단일 행 연산
ds = ray.data.range(10000).map(lambda x: x ** 2)
print(ds.take(5))

# vectorized 연산
import numpy as np
ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
print(ds.take(5))

Read->Map: 100%|██████████████████████████████| 33/33 [00:00<00:00, 1158.77it/s]


[0, 1, 4, 9, 16]


Read->Map_Batches: 100%|███████████████████████| 33/33 [00:00<00:00, 567.66it/s]

[0, 1, 4, 9, 16]





In [26]:
# ML 모델에 batch 적용
import numpy as np
def load_model():
    class DummyModel:
        def __call__(self, batch):
            return np.square(batch).tolist()
    return DummyModel()
        
class MLModel:        
    def __init__(self):
        self._model = load_model()
    def __call__(self, batch):
        return self._model(batch)

ds = ray.data.range(10000)
ds = ds.map_batches(MLModel, compute="actors")
print(ds.take(5))

Map Progress (1 actors 0 pending): 100%|████████| 33/33 [00:00<00:00, 50.48it/s]

[0, 1, 4, 9, 16]





### 파이프라인

In [None]:
# NOTE: this only works if you create an S3 bucket and upload the data there.
ds = (ray.data.read_parquet("s3://my_bucket/input_data")
      .map(cpu_intensive_preprocessing)
      .map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)
      .repartition(10))

ds.write_parquet("s3://my_bucket/output_predictions")

In [None]:
# NOTE: this only works if you create an S3 bucket and upload the data there.
ds = (ray.data.read_parquet("s3://my_bucket/input_data")
      .window(blocks_per_window=5)
      .map(cpu_intensive_preprocessing)
      .map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)
      .repartition(10))
ds.write_parquet("s3://my_bucket/output_predictions")

### 병렬 분류기 복사본 훈련

In [35]:
from sklearn import datasets
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split

@ray.remote
class TrainingWorker:
    def __init__(self, alpha: float):
        self._model = SGDClassifier(alpha=alpha)
    def train(self, train_shard: ray.data.Dataset):
        for i, epoch in enumerate(train_shard.iter_epochs()):
            X, Y = zip(*list(epoch.iter_rows()))
            self._model.partial_fit(X, Y, classes=[0,1])
        return self._model
    def test(self, X_test: np.ndarray, Y_test: np.ndarray):
        return self._model.score(X_test, Y_test)

ALPHA_VALS = [0.00008, 0.00009, 0.0001, 0.00011, 0.00012]
print(f"Starting {len(ALPHA_VALS)} training workers")
workers = [TrainingWorker.remote(alpha) for alpha in ALPHA_VALS]

Starting 5 training workers


In [36]:
X_train, X_test, Y_train, Y_test = train_test_split(
    *datasets.make_classification()
)
train_ds = ray.data.from_items(list(zip(X_train, Y_train)))
shards = (train_ds.repeat(10)
          .random_shuffle_each_window()
          .split(len(workers), locality_hints=workers))
ray.get([
    worker.train.remote(shard)
    for worker, shard in zip(workers, shards)
])

Stage 0:   0%|          | 0/10 [00:00<?, ?it/s]179254)[0m 
  0%|          | 0/10 [00:00<?, ?it/s][Ar pid=179254)[0m 
Stage 1:   0%|          | 0/10 [00:00<?, ?it/s][A254)[0m 
[2m[36m(PipelineSplitExecutorCoordinator pid=179254)[0m 
Stage 0:  20%|██        | 2/10 [00:01<00:04,  1.63it/s][A 
[2m[36m(PipelineSplitExecutorCoordinator pid=179254)[0m 
Stage 0:  30%|███       | 3/10 [00:01<00:03,  1.95it/s][A 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
[2m[36m(PipelineSplitExecutorCoordinator pid=179254)[0m 
Stage 0:  40%|████      | 4/10 [00:01<00:02,  2.77it/s][A 
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.96it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  2.23it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|███████

[SGDClassifier(alpha=8e-05),
 SGDClassifier(alpha=9e-05),
 SGDClassifier(),
 SGDClassifier(alpha=0.00011),
 SGDClassifier(alpha=0.00012)]

Stage 0: 100%|██████████| 1/1 [00:00<00:00,  9.17it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 119.67it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 84.64it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  8.95it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 124.85it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 83.27it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  9.23it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 127.76it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 84.65it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  9.11it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 118.84it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 82.24it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  8.87it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 117.50it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 82.45it/s]


In [37]:
print(ray.get([worker.test.remote(X_test, Y_test) for worker in workers]))

[0.88, 0.8, 0.88, 0.6, 0.72]


## 외부 라이브러리와 통합(dask)

* !pip install dask==2022.2.0

In [None]:
import ray
from ray.util.dask import enable_dask_on_ray
ray.init()
enable_dask_on_ray()

In [None]:
import dask

df = dask.datasets.timeseries()
df = df[df.y > 0].groupby("name").x.std()
df.compute()  # Trigger the task graph to be evaluated.

In [None]:
import ray
ds = ray.data.range(10000)

# Convert the Dataset to a Dask DataFrame.
df = ds.to_dask()
print(df.std().compute())  # -> 2886.89568

# Convert the Dask DataFrame back to a Dataset.
ds = ray.data.from_dask(df)
print(ds.std())  # -> 2886.89568