In [1]:
# importing all required packages
# ray start --head --port=6379 
import ray
import time
import psutil
import numpy as np

In [2]:
# starting up a ray instance
ray.init(address="auto")

2025-01-07 02:17:10,313	INFO worker.py:1636 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-01-07 02:17:10,391	INFO worker.py:1812 -- Connected to Ray cluster. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.16
Ray version:,2.40.0
Dashboard:,http://127.0.0.1:8265


[33m(raylet)[0m [2025-01-07 02:17:18,935 E 38180 182883] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-01-07_02-08-53_797992_38125 is over 95% full, available space: 8.99148 GB; capacity: 278.466 GB. Object creation will fail if spilling is required.
[33m(raylet)[0m [2025-01-07 02:17:28,994 E 38180 182883] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-01-07_02-08-53_797992_38125 is over 95% full, available space: 8.99083 GB; capacity: 278.466 GB. Object creation will fail if spilling is required.
[33m(raylet)[0m [2025-01-07 02:17:39,084 E 38180 182883] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-01-07_02-08-53_797992_38125 is over 95% full, available space: 8.99057 GB; capacity: 278.466 GB. Object creation will fail if spilling is required.
[33m(raylet)[0m [2025-01-07 02:17:49,109 E 38180 182883] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-01-07_02-08-53_797992_38125 is over 95% full, available space: 8.99014 GB; capacity:

In [3]:
# checking if ray cluster is initialized
ray.is_initialized()

True

In [4]:
@ray.remote
def compute(x):
    return x * 2

In [5]:
@ray.remote
def compute_heavy(x):
    time.sleep(1)
    return x * 2

In [6]:
@ray.remote(num_cpus=2)
def compute_heavier(x):
    # Simulate a computationally intensive task with matrix multiplication
    matrix_size = 1000  # Size of the matrix
    matrix_a = np.random.rand(matrix_size, matrix_size)
    matrix_b = np.random.rand(matrix_size, matrix_size)
    result = np.dot(matrix_a, matrix_b)
    return np.sum(result)

In [7]:
futures = [compute_heavy.remote(i) for i in range(10)]
results = ray.get(futures)
print(results)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [8]:
# Get cluster-wide resources
cluster_resources = ray.cluster_resources()
print("Cluster Resources:", cluster_resources)

Cluster Resources: {'CPU': 8.0, 'memory': 5271680205.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 2147483648.0, 'node:__internal_head__': 1.0}


In [9]:
print("Resources before task execution:", ray.cluster_resources() , "\n")
futures = [compute_heavy.remote(i) for i in range(10)]
results = ray.get(futures)
print("Resources after task execution:", ray.cluster_resources() , "\n")
print(results)

Resources before task execution: {'CPU': 8.0, 'object_store_memory': 2147483648.0, 'node:127.0.0.1': 1.0, 'memory': 5271680205.0, 'node:__internal_head__': 1.0} 

Resources after task execution: {'CPU': 8.0, 'object_store_memory': 2147483648.0, 'node:127.0.0.1': 1.0, 'memory': 5271680205.0, 'node:__internal_head__': 1.0} 

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [10]:
# Function to check current CPU usage
def get_cpu_usage():
    return psutil.cpu_percent(interval=1)

In [11]:
# checking resources before executing the task
before_resources = ray.cluster_resources()
print("Resources before task execution:", before_resources)

# cehcking cpu usage before the executing the task
before_cpu = get_cpu_usage()
print("CPU usage before task execution:", before_cpu)

# submitting multiple tasks in parallel
futures = [compute_heavier.remote(i) for i in range(10)]

# checking cpu usage while there are tasks running
during_cpu = get_cpu_usage()
print("CPU usage during task execution:", during_cpu)

# checking resources used during task execution
during_resources = ray.cluster_resources()
print("Resources during task execution:", during_resources)

# running the tasks and collecting the results
results = ray.get(futures)

# checking cpu usage after task execution
after_cpu = get_cpu_usage()
print("CPU usage after task execution:", after_cpu)

# checking resources after executing tasks
after_resources = ray.cluster_resources()
print("Resources after task execution:", after_resources)

# printing out the results
print(results)

Resources before task execution: {'CPU': 8.0, 'node:__internal_head__': 1.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 2147483648.0, 'memory': 5271680205.0}
CPU usage before task execution: 87.8
CPU usage during task execution: 94.9
Resources during task execution: {'CPU': 8.0, 'object_store_memory': 2147483648.0, 'node:127.0.0.1': 1.0, 'node:__internal_head__': 1.0, 'memory': 5271680205.0}
CPU usage after task execution: 88.7
Resources after task execution: {'CPU': 8.0, 'memory': 5271680205.0, 'node:127.0.0.1': 1.0, 'node:__internal_head__': 1.0, 'object_store_memory': 2147483648.0}
[np.float64(249771080.616331), np.float64(250205369.64775905), np.float64(249944264.83820343), np.float64(250103880.57130757), np.float64(249758699.81612733), np.float64(250043783.98052943), np.float64(250222259.6777461), np.float64(249909234.6633425), np.float64(249877197.6409919), np.float64(250104497.27232122)]


In [12]:
import ray
import numpy as np
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

@ray.remote(num_cpus=8)
def compute_heavier(x):
    matrix_size = 9000
    matrix_a = np.random.rand(matrix_size, matrix_size)
    matrix_b = np.random.rand(matrix_size, matrix_size)
    result = np.dot(matrix_a, matrix_b)
    return np.sum(result)

# Run tasks in parallel using tqdm to show progress, limiting to 2 concurrent tasks
task_ids = [compute_heavier.remote(i) for i in range(10)]

# Use ThreadPoolExecutor to limit concurrent execution to 2 tasks
results = []
with ThreadPoolExecutor(max_workers=2) as executor:
    # Run the tasks and collect results as they complete
    for task_id in tqdm(executor.map(ray.get, task_ids), total=len(task_ids), desc="Executing tasks", unit="task"):
        results.append(task_id)

# Print the results
for i, result in enumerate(results):
    print(f"Result of task {i}: {result}")

Executing tasks: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [04:40<00:00, 28.06s/task]

Result of task 0: 182221432534.01727
Result of task 1: 182284692871.89856
Result of task 2: 182253309120.48502
Result of task 3: 182272822676.4526
Result of task 4: 182236392323.33167
Result of task 5: 182263253087.99643
Result of task 6: 182243708527.96887
Result of task 7: 182245353196.41867
Result of task 8: 182243042875.1554
Result of task 9: 182267758528.59763





In [13]:
# shutting down ray instance
ray.shutdown()

In [None]:
import json
from tqdm import tqdm  # Import tqdm for progress bar
import pandas as pd

# Load your trace data from a JSON file
with open('timeline-03000000-2025-01-07_01-37-15.json') as f:
    trace_data = json.load(f)

# Extract relevant information: task name, start time, duration, category, etc.
events = []

# Iterate through trace data with tqdm to display progress
for event in tqdm(trace_data, desc="Processing events", unit="event"):
    try:
        # Check if 'ts' exists in the event, if not, skip this event
        if 'ts' in event and 'dur' in event and 'args' in event and 'task_id' in event['args']:
            event_data = {
                'name': event['name'],
                'start_time': event['ts'],
                'duration': event['dur'],
                'category': event['cat'],
                'task_id': event['args']['task_id'],
            }
            events.append(event_data)
    except KeyError as e:
        # Handle any unexpected missing fields
        print(f"Missing key: {e}, skipping event: {event}")

# Now events contains a list of dictionaries that can be used for visualization

# Convert events to a DataFrame for easier manipulation
df = pd.DataFrame(events)

# Prepare a Gantt chart-style visualization using Plotly (example)
import plotly.express as px

fig = px.timeline(df, 
                  x_start="start_time", 
                  x_end="start_time",  # Default, adjust based on task duration
                  y="name", 
                  color="category",  # Use category for coloring
                  title="Task Execution Timeline")

fig.update_yaxes(categoryorder="total ascending")  # Sort tasks by their name
fig.show()


In [None]:
print(df['category'].unique())

In [None]:
import pandas as pd
import plotly.express as px

# Assuming 'events' is the list of dictionaries you've processed
df = pd.DataFrame(events)

# Check unique categories for coloring
print(df['category'].unique())

# Fill NaN categories if any
df['category'] = df['category'].fillna('Unknown')

# Convert the category column to a categorical type
df['category'] = pd.Categorical(df['category'])

# Convert start_time and end_time to proper datetime objects
# Assuming 'start_time' is in microseconds (adjust if in nanoseconds)
df['start_time'] = pd.to_datetime(df['start_time'], unit='us')

# Calculate the end time for each event
df['end_time'] = df['start_time'] + pd.to_timedelta(df['duration'], unit='us')

# Prepare the Gantt chart visualization
fig = px.timeline(df, 
                   x_start="start_time", 
                   x_end="end_time",  # Use the calculated end_time
                   y="name", 
                   color="category",  # Use category for coloring
                   title="Task Execution Timeline")

# Sort tasks by their name if needed
fig.update_yaxes(categoryorder="total ascending")  

# Display the plot
fig.show()


In [None]:
import json

# Load the trace data from your JSON file
with open('timeline-03000000-2025-01-07_01-37-15.json') as f:
    trace_data = json.load(f)

# Extract all unique pid values
pids = set()

for event in trace_data:
    pids.add(event.get('pid', None))  # Use .get() to avoid KeyError if 'pid' is missing

# Print the unique pid values
print("Unique PIDs:", pids)