Converting pip cells

In [None]:
%horus convert

In [None]:
pip install --upgrade pip

In [None]:
# Install requirements
!pip install -r Requirements.txt 

In [None]:
# View AWS Configuration
!aws configure set default.region us-east-1
!aws configure list


Restart kernel to avoid problems.

----Code Start----

Import statements

In [2]:
# Imports 
import os
from ray.util import inspect_serializability
import ray
import pyarrow.fs as pq
import pandas as pd

# Making use of datetime for dates, works for every day of the year (30,31,28 day problems go away)
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta

Connect to ray cluster

In [3]:
from ray.util.client import ray as rayclient
if rayclient.is_connected():
    ray.util.disconnect()

ray.util.connect('{ray_head}:10001'.format(ray_head=os.environ['RAY_CLUSTER']))

{'num_clients': 1,
 'python_version': '3.8.12',
 'ray_version': '1.12.1',
 'ray_commit': '4863e33856b54ccf8add5cbe75e41558850a1b75',
 'protocol_version': '2022-03-16'}

Reusable definitions here

In [4]:
# Reusable definitions here
bucketName = 'DH-SECURE-THANOS-RAY-USE'
endpoint = 'https://s3.upshift.redhat.com'

prefixPathRead = 'raydev'
prefixPathWrite = 'raydev-write-demo'
metricName = 'cluster_version'

AWS_ACCESS_KEY = %env AWS_ACCESS_KEY_ID
AWS_SECRET_KEY = %env AWS_SECRET_ACCESS_KEY

year = '2021'
month = '01'
day = '01'

# Read path
read_path = f'{prefixPathRead}/metric={metricName}'

# Write path
write_path = f'{prefixPathWrite}/metric={metricName}'


In [5]:
# S3FileSystem in PyArrow
fs_pyarrow = pq.S3FileSystem(access_key=AWS_ACCESS_KEY, secret_key=AWS_SECRET_KEY, endpoint_override=endpoint)

Following cells reads one days worth of parquet files and writes back a single parquet file
Month 1, Day 1.
1. Fails currently because worker nodes keep dying.
2. Success! Successful compaction for a days worth of parquets. Next is a month.

In [None]:
# Read path
read_path = f'{prefixPathRead}/metric={metricName}'
read_bucket_uri = f's3://{bucketName}/{read_path}/year={year}/month={month}/day={day}'

# Write path
write_path = f'{prefixPathWrite}/metric={metricName}'
write_bucket_uri = f's3://{bucketName}/{write_path}/year={year}/month={month}/day={day}'

In [None]:
# S3FileSystem in PyArrow
fs_pyarrow = pq.S3FileSystem(access_key=AWS_ACCESS_KEY, secret_key=AWS_SECRET_KEY, endpoint_override=endpoint)
# Reading parquet using Ray through filesystem
df = ray.data.read_parquet(paths=read_bucket_uri, filesystem=fs_pyarrow)

# Writing back a single parquet file
df.repartition(1).write_parquet(path=write_bucket_uri, filesystem=fs_pyarrow)

Following cells try compaction for a whole month:
1. Reading from multiple directories seems to be not supported
2. Union of dataframes is supported, can be done before the compaction step. https://github.com/ray-project/ray/issues/24598
Status: Fails due to memory limits, cannot load a dataframe for more than several days worth of parquets

In [None]:
# Read path
read_path = f'{prefixPathRead}/metric={metricName}'
read_bucket_uri = f's3://{bucketName}/{read_path}/year={year}/month={month}/day={day}'
# Holds paths 
read_bucket_multiple = []

# Creating paths for every day in the month.
for i in range(2,32):
    read_bucket_multiple.append(f"s3://{bucketName}/{read_path}/year={year}/month={month}/day="f"{i:02d}")
                           
#print(read_bucket_multiple)


# Write path
write_path = f"{prefixPathWrite}/metric={metricName}"
write_bucket_uri = f's3://{bucketName}/{write_path}/year={year}/month={month}'


In [None]:
# S3FileSystem in PyArrow
fs_pyarrow = pq.S3FileSystem(endpoint_override=endpoint)


# Initial read - First day to start a dataframe
master_df = ray.data.read_parquet(paths=f's3://{bucketName}/{prefixPathRead}/metric={metricName}/year={year}/month={month}/day=01', filesystem=fs_pyarrow)

# Reading remaining days
for x in read_bucket_multiple:
    df = ray.data.read_parquet(paths=x, filesystem=fs_pyarrow) # Reading parquets for each day using Ray through filesystem
    master_df = master_df.union(df) # Followed by unioning the dataframe each time
    


In [None]:
# Writing back a single parquet file
df.repartition(1).write_parquet(path=write_bucket_uri, filesystem=fs_pyarrow)

Making use of datetime to run compaction for every day, union dataframes and finally writeback

In [None]:
firstDay = date(2021, 1, 1)
lastDay = firstDay + relativedelta(months = 1)
print(lastDay, firstDay)
duration = lastDay - firstDay
print(duration)
for _ in range(12):
    for i in range(duration.days):
        day = firstDay + timedelta(days = i)
        # Set year, month and and day here
        year, month, day = (day.year, day.month, day.day)
        currentPath = f's3://{bucketName}/{read_path}/year={year}/month='f'{month:02d}''/day='f'{day:02d}'
        print(currentPath)
        # Code here:
        # <Read dataframe>
        df = ray.data.read_parquet(paths=currentPath, filesystem=fs_pyarrow)
        # <Union dataframe> if not day 1.
        if day != 1:
            master_df = master_df.union(df) # Followed by unioning the dataframe each time
        else:
            df = master_df

    # Code here:
    # <Writeback single parquet for the month>
    df.repartition(1).write_parquet(path=f's3://{bucketName}/{write_path}/year={year}/month='f'{month:02d}', filesystem=fs_pyarrow)
    firstDay = lastDay
    lastDay = firstDay + relativedelta(months = 1)


Compact everyday and store as one parquet (365 files)

In [6]:
#%%capture output 
firstDay = date(2021,2,1) #February onwards
lastDay = firstDay + relativedelta(months = 1)
#print(lastDay, firstDay)
duration = lastDay - firstDay
#print(duration)
for _ in range(12):
    for i in range(duration.days):
        day = firstDay + timedelta(days = i)
        # Set year, month and and day here
        year, month, day = (day.year, day.month, day.day)        
        # Code here:
        # <Read dataframe>
        currentReadPath = f's3://{bucketName}/{read_path}/year={year}/month='f'{month:02d}''/day='f'{day:02d}'
        print("Reading from:",currentReadPath)
        df = ray.data.read_parquet(paths=currentReadPath, filesystem=fs_pyarrow)
        # <Writeback single parquet for the day>
        currentWritePath = f's3://{bucketName}/{write_path}/year={year}/month='f'{month:02d}''/day='f'{day:02d}'
        print("Writing to:", currentWritePath)
        df.repartition(1).write_parquet(path=currentWritePath, filesystem=fs_pyarrow)
    firstDay = lastDay
    lastDay = firstDay + relativedelta(months = 1)
    duration = lastDay

Reading from: s3://DH-SECURE-THANOS-RAY-USE/raydev/metric=cluster_version/year=2021/month=02/day=01


Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:01<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:01<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:01<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:01<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:01<?, ?it/s]
Metadata Fetch Progress:   0%|          | 0/8 [00:01<?,

Writing to: s3://DH-SECURE-THANOS-RAY-USE/raydev-write-demo/metric=cluster_version/year=2021/month=02/day=01


Map Progress:  44%|████▍     | 21/48 [00:30<00:38,  1.42s/it][2m[36m(raylet, ip=10.130.2.35)[0m Spilled 3022 MiB, 5 objects, write throughput 458 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[2m[36m(raylet, ip=10.128.3.222)[0m Spilled 2053 MiB, 3 objects, write throughput 259 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
Map Progress:  54%|█████▍    | 26/48 [00:34<00:17,  1.28it/s][2m[36m(raylet, ip=10.131.0.109)[0m Spilled 2542 MiB, 4 objects, write throughput 287 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[2m[36m(raylet, ip=10.129.3.136)[0m Spilled 3134 MiB, 6 objects, write throughput 324 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
Map Progress:  75%|███████▌  | 36/48 [00:47<00:15,  1.32s/it][2m[36m(raylet, ip=10.129.3.136)[0m Spilled 4137 MiB, 8 objects, write throughput 359 MiB/s.
[2m[36m(raylet, ip=10.128.3.222)[0m Spilled 4150 MiB, 6 objects, write throughput 334 MiB/s.
Map Progress:  81%|███████

ConnectionError: Client is shutting down.

Trying to see if Ray remote functions make a difference

In [None]:
#%%capture output 
@ray.remote
def compaction():
    firstDay = date(2021,1,1)
    lastDay = firstDay + relativedelta(months = 1)
    #print(lastDay, firstDay)
    duration = lastDay - firstDay
    #print(duration)
    InitializeS3()
    for _ in range(12):
        for i in range(duration.days):
            day = firstDay + timedelta(days = i)
            # Set year, month and and day here
            year, month, day = (day.year, day.month, day.day)
            currentReadPath = f's3://{bucketName}/{read_path}/year={year}/month='f'{month:02d}''/day='f'{day:02d}'
            print("Reading from:",currentReadPath)
            # Code here:
            # <Read dataframe>
            df = ray.data.read_parquet(paths=currentReadPath, filesystem=fs_pyarrow)
            # <Writeback single parquet for the day>
            currentWritePath = f's3://{bucketName}/{write_path}/year={year}/month='f'{month:02d}''/day='f'{day:02d}'
            df.repartition(1).write_parquet(path=currentWritePath, filesystem=fs_pyarrow)
            print("Writing to:", currentWritePath)
        firstDay = lastDay
        lastDay = firstDay + relativedelta(months = 1)
        duration = lastDay

In [None]:
compaction.remote()

Display cached output

In [None]:
output.show()

Breaking down the code for read and write

In [None]:
#Read one days worth
currentReadPath = f's3://{bucketName}/{read_path}/year=2021/month=02/day=01'
print("Reading from:",currentReadPath)
# Code here:
# <Read dataframe>
df = ray.data.read_parquet(paths=currentReadPath, filesystem=fs_pyarrow)


In [None]:
print(df)

In [None]:
df.repartition(1)

In [None]:
#%%capture output 
currentWritePath = f's3://{bucketName}/{write_path}/year=2021/month=01/day=03'
print("Writing to:", currentWritePath)
df.repartition(1).write_parquet(path=currentWritePath, filesystem=fs_pyarrow)