In [None]:
import dask
import dask.dataframe as dd
import itertools as it
import numpy as np
import pandas as pd
from time import time
import os
import ray
import ray.data as rd
from ray.data.aggregate import AggregateFn
import gc

# Initialize Ray
ray.init(_temp_dir="/tmp/ray", object_store_memory=10**9)

indices = ['filename', 'application', 'io_zone', 'redundancy_type']
combinations = list(it.combinations(indices, r=2))[:5]

# Function to log results to a file
def log_results_to_file(result_file, time, memory):
    with open(result_file, 'a') as f:
        f.write(f"Times: {time}, Memory: {memory}\n")
    
    # Check if the file was created
    if os.path.exists(result_file):
        print(f"File {result_file} created/updated successfully!")
    else:
        print(f"Error: {result_file} was not created.")

@ray.remote
def base_hard_ray_queries(log_dir, result_file):
    print(f",base_hard_ray_queries")
    
    # Load dataset using Ray
    df = ray.data.read_csv(log_dir)
    query_index = 0

    for ix, iy in combinations:
        for col in ['request_io_size_bytes', 'disk_time']:
            query_index += 1
            t1 = time()

            try:
                grouped = df.groupby([ix]).aggregate(
                    AggregateFn(
                        init=lambda _: [],
                        accumulate_row=lambda acc, row: acc + [row[iy]],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{iy}_list"
                    ),
                    AggregateFn(
                        init=lambda _: 0,
                        accumulate_row=lambda acc, row: acc + row[col],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{col}_sum"
                    )
                ).materialize()

                exploded = grouped.flat_map(
                    lambda row: [
                        {**row, iy: item}
                        for item in row[f"{iy}_list"]
                    ]
                ).materialize()

                result = exploded.groupby([iy]).aggregate(
                    AggregateFn(
                        init=lambda _: [],
                        accumulate_row=lambda acc, row: acc + [row[ix]],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{ix}_list"
                    ),
                    AggregateFn(
                        init=lambda _: 0,
                        accumulate_row=lambda acc, row: acc + row[f"{col}_sum"],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{col}_sum"
                    )
                ).materialize()

                # Compute memory usage and elapsed time
                time_elapsed = time() - t1
                memory_usage = result.size_bytes()

                # Log results
                with open(result_file, "a") as f:
                    f.write(f"hard,Q{query_index},{time_elapsed},{memory_usage}\n")

                print(f"hard,Q{query_index},{time_elapsed},{memory_usage}")

            except Exception as e:
                print(f"Error in query {query_index}: {e}")
                continue

@ray.remote
def iomax_hard_ray_queries(log_dir, result_file):
    print(f",iomax_hard_ray_queries")
    
    # Load dataset using Ray
    df = ray.data.read_csv(log_dir)
    query_index = 0
    grouped_df = None 

    for ix, iy in combinations:
        for col in ['request_io_size_bytes', 'disk_time']:
            query_index += 1
            t1 = time()
            memory_usage = 0

            try:
                if query_index == 1:
                    x = df.groupby(indices).aggregate(
                        AggregateFn(
                            init=lambda _: 0,
                            accumulate_row=lambda acc, row: acc + row['request_io_size_bytes'],
                            merge=lambda acc1, acc2: acc1 + acc2,
                            name="request_io_size_bytes"
                        ),
                        AggregateFn(
                            init=lambda _: 0,
                            accumulate_row=lambda acc, row: acc + row['disk_time'],
                            merge=lambda acc1, acc2: acc1 + acc2,
                            name="disk_time"
                        )
                    ).materialize()

                    # Compute memory usage (approximation)
                    if isinstance(x, ray.data.Dataset):
                        memory_usage = x.size_bytes()
                    else:
                        memory_usage = x.memory_usage(deep=True).sum()

                grouped = x.groupby([ix]).aggregate(
                    AggregateFn(
                        init=lambda _: [],
                        accumulate_row=lambda acc, row: acc + [row[iy]],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{iy}_list"
                    ),
                    AggregateFn(
                        init=lambda _: 0,
                        accumulate_row=lambda acc, row: acc + row[col],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{col}_sum"
                    )
                ).materialize()

                exploded = grouped.flat_map(
                    lambda row: [
                        {**row, iy: item}
                        for item in row[f"{iy}_list"]
                    ]
                ).materialize()

                result = exploded.groupby([iy]).aggregate(
                    AggregateFn(
                        init=lambda _: [],
                        accumulate_row=lambda acc, row: acc + [row[ix]],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{ix}_list"
                    ),
                    AggregateFn(
                        init=lambda _: 0,
                        accumulate_row=lambda acc, row: acc + row[f"{col}_sum"],
                        merge=lambda acc1, acc2: acc1 + acc2,
                        name=f"{col}_sum"
                    )
                ).materialize()

                # Compute elapsed time
                time_elapsed = time() - t1

                # Log results
                log_results_to_file(result_file, time_elapsed, memory_usage)
                print(f"hard,Q{query_index},{time_elapsed},{memory_usage}")

            except Exception as e:
                import traceback
                print(f"Error in query {query_index}: {e}")
                traceback.print_exc()
                continue
    del df, result
    gc.collect()

ray.get(base_hard_ray_queries.remote("../datasets_thesios_io_traces/dataset-200k.csv", "results_ray_hard_queries_csv/results_ray_plain_hard_200k.txt"))

# Shut down Ray after queries complete
ray.shutdown()

2024-11-18 20:08:55,155	INFO worker.py:1816 -- Started a local Ray instance.


[36m(base_hard_ray_queries pid=118969)[0m ,base_hard_ray_queries


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Aggregate]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(<lambda>)]
[36m(raylet)[0m Spilled 2991 MiB, 100 objects, write throughput 548 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate]


[36m(base_hard_ray_queries pid=118969)[0m hard,Q1,14.658638000488281,13606312


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Aggregate]
[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(<lambda>)]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(raylet)[0m Spilled 4675 MiB, 228 objects, write throughput 684 MiB/s.


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate]


[36m(base_hard_ray_queries pid=118969)[0m hard,Q2,9.577636480331421,13606312


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Aggregate]


(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(<lambda>)]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate]


[36m(base_hard_ray_queries pid=118969)[0m hard,Q3,3.8548502922058105,13600063


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Aggregate]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(<lambda>)]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate]


[36m(base_hard_ray_queries pid=118969)[0m hard,Q4,4.077474355697632,13600063


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Aggregate]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(<lambda>)]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
[36m(raylet)[0m Spilled 8516 MiB, 452 objects, write throughput 854 MiB/s.


[36m(base_hard_ray_queries pid=118969)[0m hard,Q5,6.6014084815979,13600055


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Aggregate]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(<lambda>)]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate]


[36m(base_hard_ray_queries pid=118969)[0m hard,Q6,6.532519340515137,13600055


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

(pid=118969) Sort Sample 2:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Map 3:   0%|                                                         | 0.00/1.00 [00:00<?…

(pid=118969) Shuffle Reduce 4:   0%|                                                      | 0.00/1.00 [00:00<?…

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Aggregate]


(pid=118969) Running 0: 0.00 row [00:00, ? row/s]

[36m(base_hard_ray_queries pid=118969)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-18_20-08-53_320657_118638/logs/ray-data
[36m(base_hard_ray_queries pid=118969)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(<lambda>)]
[36m(raylet)[0m Spilled 16641 MiB, 597 objects, write throughput 983 MiB/s.
[36m(raylet)[0m Spilled 33331 MiB, 705 objects, write throughput 1066 MiB/s.
[36m(raylet)[0m Spilled 65886 MiB, 915 objects, write throughput 837 MiB/s.


In [2]:
import gc
gc.collect()

0