# Module 1: Primer on ray.data with Google Cloud Storage

This module covers reading, transforming and writing from Google Cloud Storage with ray.data.

## Imports and initialization

In [None]:
# Install, restart runtime, and comment this line
# Note: This was tested with Ray 2.4.0
# ! pip install pyarrow==14.0.0
# pip install -U ipywidgets>=8

In [None]:
project_id_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = project_id_output[0]

project_nbr_output = !gcloud projects describe $PROJECT_ID --format='value(projectNumber)'
PROJECT_NBR = project_nbr_output[0]

import ray
import pyarrow
from google.cloud import aiplatform
from google.cloud.aiplatform.preview import vertex_ray

IRIS_DATA_CSV_SRC_GCS_FQ_URI=f"gs://ray_lab_data_bucket_{PROJECT_NBR}/sample-input-data/iris.csv"
IRIS_DATA_PARQUET_TARGET_GCS_FQ_URI=f"gs://ray_lab_data_bucket_{PROJECT_NBR}/sample-output-data/iris.parquet"
RAY_ADDRESS=f"vertex_ray://projects/{PROJECT_NBR}/locations/us-central1/persistentResources/ray-kicking-tires-cluster"


print('PROJECT_ID: ', PROJECT_ID)
print('PROJECT_NBR: ', PROJECT_NBR)
print('RAY_ADDRESS:', RAY_ADDRESS)
print('IRIS_DATA_CSV_SRC_GCS_FQ_URI:', IRIS_DATA_CSV_SRC_GCS_FQ_URI)
print('IRIS_DATA_PARQUET_TARGET_GCS: ', IRIS_DATA_PARQUET_TARGET_GCS_FQ_URI)

PROJECT_ID:  ray-of-sunshine
PROJECT_NBR:  567162267085
RAY_ADDRESS: vertex_ray://projects/567162267085/locations/us-central1/persistentResources/ray-kicking-tires-cluster
IRIS_DATA_CSV_SRC_GCS_FQ_URI: gs://ray_lab_data_bucket_567162267085/sample-input-data/iris.csv
IRIS_DATA_PARQUET_TARGET_GCS:  gs://ray_lab_data_bucket_567162267085/sample-output-data/iris.parquet


In [None]:
ray.__version__

'2.4.0'

In [None]:
pyarrow.show_info()

pyarrow version info
--------------------
Package kind              : python-wheel-manylinux228
Arrow C++ library version : 14.0.0  
Arrow C++ compiler        : GNU 12.2.1
Arrow C++ compiler flags  :  -fdiagnostics-color=always
Arrow C++ git revision    :         
Arrow C++ git description :         
Arrow C++ build type      : release 

Platform:
  OS / Arch           : Linux x86_64
  SIMD Level          : avx2    
  Detected SIMD Level : avx2    

Memory:
  Default backend     : jemalloc
  Bytes allocated     : 0 bytes 
  Max memory          : 0 bytes 
  Supported Backends  : jemalloc, mimalloc, system

Optional modules:
  csv                 : Enabled 
  cuda                : -       
  dataset             : Enabled 
  feather             : Enabled 
  flight              : Enabled 
  fs                  : Enabled 
  gandiva             : -       
  json                : Enabled 
  orc                 : Enabled 
  parquet             : Enabled 

Filesystems:
  GcsFileSystem       : E

## 1. Read CSV data in GCS

In [None]:
! gsutil cat $IRIS_DATA_CSV_SRC_GCS_FQ_URI | head -2

Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
1,5.1,3.5,1.4,0.2,Iris-setosa


In [None]:
runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]", "pyarrow==14.0.0"]
  }

ray.shutdown()
ray.init(RAY_ADDRESS, runtime_env=runtime_env)

[Ray on Vertex AI]: Cluster State = State.RUNNING


0,1
Python version:,3.10.13
Ray version:,2.4.0
Vertex SDK version:,1.39.0
Dashboard:,755d3a0b41a330d0-dot-us-central1.aiplatform-training.googleusercontent.com
Interactive Terminal Uri:,87d1c5fdddbfd3fe-dot-us-central1.aiplatform-training.googleusercontent.com
Cluster Name:,ray-kicking-tires-cluster


### 1.1. Read CSV, infer schema, print schema

In [None]:
# Read CSV in GCS & print schema
@ray.remote
def fntReadCSVDisplaySchema(path):
    ds= ray.data.read_csv(paths=path)
    return ds.schema()

iris_raw_obj_ref = fntReadCSVDisplaySchema.remote(IRIS_DATA_CSV_SRC_GCS_FQ_URI)
ray.get(iris_raw_obj_ref)



Id: int64
SepalLengthCm: double
SepalWidthCm: double
PetalLengthCm: double
PetalWidthCm: double
Species: string

### 1.2. Read CSV, and print content

In [None]:
# Read CSV in GCS & print 3 records
@ray.remote
def fntReadCSVAndDisplay(path):
    ds= ray.data.read_csv(paths=path)
    return ds.take(3)


# Execute the function
ray.get(fntReadCSVAndDisplay.remote(IRIS_DATA_CSV_SRC_GCS_FQ_URI))

[2m[36m(fntReadCSVAndDisplay pid=14810, ip=10.126.0.4)[0m 2024-02-28 18:20:36,114	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[DoRead]
[2m[36m(fntReadCSVAndDisplay pid=14810, ip=10.126.0.4)[0m 2024-02-28 18:20:36,114	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)


[{'Id': 1,
  'SepalLengthCm': 5.1,
  'SepalWidthCm': 3.5,
  'PetalLengthCm': 1.4,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa'},
 {'Id': 2,
  'SepalLengthCm': 4.9,
  'SepalWidthCm': 3.0,
  'PetalLengthCm': 1.4,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa'},
 {'Id': 3,
  'SepalLengthCm': 4.7,
  'SepalWidthCm': 3.2,
  'PetalLengthCm': 1.3,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa'}]

## 2. Transform data with Ray

In [None]:
import numpy as np
import ray
import pyarrow


ray.shutdown()
runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]", "pyarrow==14.0.0"]
  }

ray.init(address=RAY_ADDRESS, runtime_env=runtime_env)

[Ray on Vertex AI]: Cluster State = State.RUNNING


0,1
Python version:,3.10.13
Ray version:,2.4.0
Vertex SDK version:,1.39.0
Dashboard:,755d3a0b41a330d0-dot-us-central1.aiplatform-training.googleusercontent.com
Interactive Terminal Uri:,87d1c5fdddbfd3fe-dot-us-central1.aiplatform-training.googleusercontent.com
Cluster Name:,ray-kicking-tires-cluster


In [None]:
# Read CSV in GCS, transform iris - calculate petal area and append the same as a column

from typing import Dict
import numpy as np

# Compute a "petal area" attribute.
def fnTransformIrisBatch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["PetalLengthCm"]
    vec_b = batch["PetalWidthCm"]
    batch["PetalAreaCm"] = vec_a * vec_b
    return batch

# Python remote function / task
@ray.remote
def fntTransformIris(path):
    ds= ray.data.read_csv(paths=path)
    transformed_ds = ds.map_batches(fnTransformIrisBatch)
    return transformed_ds.take(3)


# Execute the function
ray.get(fntTransformIris.remote(IRIS_DATA_CSV_SRC_GCS_FQ_URI))

[2m[36m(fntTransformIris pid=15041, ip=10.126.0.4)[0m 2024-02-28 18:22:07,364	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->MapBatches(fnTransformIrisBatch)]
[2m[36m(fntTransformIris pid=15041, ip=10.126.0.4)[0m 2024-02-28 18:22:07,364	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)


[{'Id': 1,
  'SepalLengthCm': 5.1,
  'SepalWidthCm': 3.5,
  'PetalLengthCm': 1.4,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa',
  'PetalAreaCm': 0.27999999999999997},
 {'Id': 2,
  'SepalLengthCm': 4.9,
  'SepalWidthCm': 3.0,
  'PetalLengthCm': 1.4,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa',
  'PetalAreaCm': 0.27999999999999997},
 {'Id': 3,
  'SepalLengthCm': 4.7,
  'SepalWidthCm': 3.2,
  'PetalLengthCm': 1.3,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa',
  'PetalAreaCm': 0.26}]

## 3. Persist data with Ray

We will take the same function we created to transform Iris data and persist as Parquet to GCS and then learn to read it.

In [None]:
import numpy as np
import ray
import pyarrow


ray.shutdown()
runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]", "pyarrow==14.0.0"]
  }

ray.init(address=RAY_ADDRESS, runtime_env=runtime_env)

[Ray on Vertex AI]: Cluster State = State.RUNNING


0,1
Python version:,3.10.13
Ray version:,2.4.0
Vertex SDK version:,1.39.0
Dashboard:,755d3a0b41a330d0-dot-us-central1.aiplatform-training.googleusercontent.com
Interactive Terminal Uri:,87d1c5fdddbfd3fe-dot-us-central1.aiplatform-training.googleusercontent.com
Cluster Name:,ray-kicking-tires-cluster


In [None]:
# Read CSV in GCS, transform irisand persist as Parquet to GCS

from typing import Dict
import numpy as np

# Compute a "petal area" attribute.
def fnTransformIrisBatch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["PetalLengthCm"]
    vec_b = batch["PetalWidthCm"]
    batch["PetalAreaCm"] = vec_a * vec_b
    return batch

# Remote function/task to transform Iris & persist to GCS
@ray.remote
def fntTransformAndPersistIris(path):
    ds= ray.data.read_csv(paths=path)
    transformed_ds = ds.map_batches(fnTransformIrisBatch)
    transformed_ds.repartition(1).write_parquet(IRIS_DATA_PARQUET_TARGET_GCS_FQ_URI)
    return transformed_ds.take(3)

# Execute the function
ray.get(fntTransformAndPersistIris.remote(IRIS_DATA_CSV_SRC_GCS_FQ_URI))

[2m[36m(fntTransformAndPersistIris pid=15146, ip=10.126.0.4)[0m 2024-02-28 18:22:40,291	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->MapBatches(fnTransformIrisBatch)] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Write]
[2m[36m(fntTransformAndPersistIris pid=15146, ip=10.126.0.4)[0m 2024-02-28 18:22:40,291	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(fntTransformAndPersistIris pid=15146, ip=10.126.0.4)[0m 2024-02-28 18:22:40,675	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->MapBatches(fnTransformIrisBatch)]
[2m[36m(fntTransformAndPersistIris pid=15146, ip=10.126.0.4)[0m 2024-02-28 18:22:40,675	INFO streaming_executor.py:84 -- Execution confi

[{'Id': 1,
  'SepalLengthCm': 5.1,
  'SepalWidthCm': 3.5,
  'PetalLengthCm': 1.4,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa',
  'PetalAreaCm': 0.27999999999999997},
 {'Id': 2,
  'SepalLengthCm': 4.9,
  'SepalWidthCm': 3.0,
  'PetalLengthCm': 1.4,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa',
  'PetalAreaCm': 0.27999999999999997},
 {'Id': 3,
  'SepalLengthCm': 4.7,
  'SepalWidthCm': 3.2,
  'PetalLengthCm': 1.3,
  'PetalWidthCm': 0.2,
  'Species': 'Iris-setosa',
  'PetalAreaCm': 0.26}]

In [None]:
# Validate by reading Parquet from GCS, while doing so, lets filter for speciific columns and those with sepal length > 1 cm and print 3

import pyarrow as pa
irisFields = [("SepalLengthCm", pa.float32()),
          ("SepalWidthCm", pa.float32()),
          ("PetalLengthCm", pa.float32()),
          ("PetalWidthCm", pa.float32()),
          ("Species", pa.string()),
          ("PetalAreaCm", pa.float32())]

@ray.remote
def fntReadParquetAndDisplay(path):
    ds= ray.data.read_parquet(paths=path,schema=pa.schema(irisFields),columns=["Species", "SepalLengthCm"],filter=pa.dataset.field("SepalLengthCm") > 1.0)
    return ds.take(3)

ray.get(fntReadParquetAndDisplay.remote(IRIS_DATA_PARQUET_TARGET_GCS_FQ_URI))

[2m[36m(_get_read_tasks pid=15187, ip=10.126.0.4)[0m   pq_ds.pieces, **prefetch_remote_args
[2m[36m(_get_read_tasks pid=15187, ip=10.126.0.4)[0m   num_files = len(self._pq_ds.pieces)
[2m[36m(_get_read_tasks pid=15187, ip=10.126.0.4)[0m   self._pq_ds.pieces[idx]
[2m[36m(fntReadParquetAndDisplay pid=15146, ip=10.126.0.4)[0m 2024-02-28 18:22:47,551	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[DoRead]
[2m[36m(fntReadParquetAndDisplay pid=15146, ip=10.126.0.4)[0m 2024-02-28 18:22:47,551	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(_get_read_tasks pid=15187, ip=10.126.0.4)[0m   np.array_split(self._pq_ds.pieces, parallelism),


[{'Species': 'Iris-setosa', 'SepalLengthCm': 5.099999904632568},
 {'Species': 'Iris-setosa', 'SepalLengthCm': 4.900000095367432},
 {'Species': 'Iris-setosa', 'SepalLengthCm': 4.699999809265137}]