In [1]:
import boto3, re, json
from datetime import datetime
import pandas as pd
import pytz

In [2]:
result_path_pattern = re.compile(r"result-partition-(?P<partition>[0-9]+MB)/(?P<data_size>[0-9]+GB)/(run )?(?P<run_no>[0-9]+)")
search_result = result_path_pattern.search("result-partition-75MB/1GB/1")
search_dict = search_result.groupdict()

In [3]:
pattern = re.compile(r"result-partition-[0-9]+MB/((total)|([0-9]+GB))/(run )?[0-9]+")

In [4]:
def get_event_details(event):
    return event.get('executionSucceededEventDetails') or \
        event.get('executionFailedEventDetails') or \
        event.get('executionStartedEventDetails') or \
        event.get('stateEnteredEventDetails') or {}

def extract_event_details(history, results, verbose):
    state = 0
    execution_start, execution_end = None, None
    inference_start, inference_end = None, None
    result_path = None

    # Print execution events
    for event in history['events']:
        timestamp = event['timestamp']
        event_type = event['type']
        details = get_event_details(event)
        
        if verbose:print(f"{timestamp} - {event_type}")
        if details is not None and verbose:
            print(f"  Details: {details}")
            
        try:
            if event_type == 'ExecutionStarted': 
                execution_start = timestamp
            elif event_type == 'ExecutionSucceeded': execution_end = timestamp
            elif event_type == 'TaskStateEntered': 
                if state ==0:
                    input_json = json.loads(details['input'])
                    result_path = input_json['result_path']
                    # print(f"{timestamp}: Result Path: {result_path}")
                
                    # if pattern.fullmatch(result_path) is None:
                    if "demo" in result_path or result_path in results['result_path']:
                        # print(f"Skipping {result_path}.")
                        return results
                    
                    result_path_splitted = result_path.split('/')
                    # "result-partition-100MB/1GB/1" or "result-partition-100MB/1GB/run 1"
                    
                    if 'Batches' in result_path:
                        data_size = result_path_splitted[-4]
                    else:
                        data_size = result_path_splitted[-2]
                    if data_size == 'total': data_size = '12.6GB'
                    
                    run_no = result_path_splitted[-1].replace('run ', '')
                    partition_size = input_json['data_prefix']
                    batch_size = input_json['batch_size']
                
                state += 1
            elif event_type == 'MapStateEntered':
                # input_json = json.loads(details['input'])
                 
                inference_start = timestamp
                state += 1
            elif event_type == 'MapStateExited': 
                inference_end = timestamp
                state += 1
                
        except Exception as e:
            print(f"  Error {e.with_traceback}")
            return results
            
    if verbose: print("-" * 80)
    
    delta = (execution_end - execution_start)
    total_duration = delta.seconds +  delta.microseconds / 1e6 # convert to seconds
    if verbose: print(f"Total Duration: {total_duration:.2f} seconds.")

    delta = (inference_end - inference_start)
    inference_duration = delta.seconds +  delta.microseconds / 1e6 # convert to seconds
    if verbose: print(f"Inference Duration: {inference_duration:.2f} seconds.")

    results['result_path'].append(result_path)
    results['data (GB)'].append(data_size)
    results['run'].append(run_no)
    results['partition (MB)'].append(partition_size)
    results['total_duration (s)'].append(total_duration)
    results['inference_duration (s)'].append(inference_duration)
    results['batch_size'].append(batch_size)
    results['batch_varying'].append('Batches' in result_path)
    
    return results

def get_step_function_logs(
    state_machine_arn, start_date=None, end_date=None,
    verbose=False, profile_name=None
):
    """
    Collects the events log for a specific AWS Step Functions state machine.

    :param state_machine_arn: The ARN of the Step Functions state machine
    :param start_date: Optional, filter logs starting from this date (timezone-aware datetime object)
    :param end_date: Optional, filter logs until this date (timezone-aware datetime object)
    :param profile_name: Optional, AWS profile name to use
    """
    # Initialize boto3 session with profile if provided
    if profile_name:
        session = boto3.Session(profile_name=profile_name)
        stepfunctions_client = session.client('stepfunctions', region_name='us-east-1')
    else:
        stepfunctions_client = boto3.client('stepfunctions', region_name='us-east-1')
        
    results = {
        key: [] for key in [
            'result_path', 'data (GB)','batch_size', 'run', 
            'partition (MB)', 'total_duration (s)', 
            'inference_duration (s)', 'batch_varying']
    }

    try:
        # List executions for the state machine
        executions = stepfunctions_client.list_executions(
            stateMachineArn=state_machine_arn,
            statusFilter='SUCCEEDED',  # You can filter by RUNNING, FAILED, etc.
            maxResults=1000 # update if you have more than 1000 executions
        )

        print(f"Fetching logs for Step Functions state machine: {state_machine_arn}\n")
        print(f"Found {len(executions['executions'])} executions.\n")

        # Iterate through executions
        for execution in executions['executions']:
            execution_arn = execution['executionArn']
            start_time = execution['startDate']
            stop_time = execution['stopDate']
            
            # Ensure start_date and end_date are timezone-aware and in UTC
            if start_date:
                start_date = start_date.astimezone(pytz.utc)
            if end_date:
                end_date = end_date.astimezone(pytz.utc)

            # Filter by start_date and end_date if provided
            if start_date and start_time < start_date:
                continue
            if end_date and stop_time > end_date:
                continue
            
            if verbose: print(f"Execution ARN: {execution_arn}")
            if verbose: print(f"Start Time: {start_time}, Stop Time: {stop_time}")

            # Get execution history
            history = stepfunctions_client.get_execution_history(
                executionArn=execution_arn,
                reverseOrder=False  # Set to True if you want events in reverse order
            )
            
            results = extract_event_details(history, results, verbose)
            # break
            
    except stepfunctions_client.exceptions.ResourceNotFound:
        print(f"The state machine ARN {state_machine_arn} does not exist.")
    except Exception as e:
        print(f"An error occurred: {e}")
        
    del results['result_path']
    return results

In [10]:
state_machine_arn = "arn:aws:states:us-east-1:448324707516:stateMachine:DataParallel-CosmicAI"

# Optional: Define a time range (use timezone-aware UTC datetimes)
start_date = datetime(2024, 11, 11, 0, 0, 0, tzinfo=pytz.utc)  # Example: Start from this date
end_date = None # datetime(2024, 11, 22, 23, 0, 0, tzinfo=pytz.utc)    # Example: Until this date

results = get_step_function_logs(state_machine_arn, start_date, end_date, profile_name= "cylon")

Fetching logs for Step Functions state machine: arn:aws:states:us-east-1:448324707516:stateMachine:DataParallel-CosmicAI

Found 41 executions.



In [11]:
df = pd.DataFrame(results)
df = df[(df['data (GB)'] != '100MB') & (df['partition (MB)'] != 'data')]
# because the first ones are latest
df.drop_duplicates(subset=['data (GB)', 'batch_size', 'run', 'partition (MB)', 'batch_varying'], inplace=True, keep='first')
df.head(30)

Unnamed: 0,data (GB),batch_size,run,partition (MB),total_duration (s),inference_duration (s),batch_varying
0,768GB,512,3,100MB,189.391,137.992,False
1,768GB,512,2,100MB,237.553,183.381,False
2,768GB,512,1,100MB,206.236,149.544,False
3,512GB,512,3,100MB,226.658,171.852,False
4,512GB,512,2,100MB,159.606,106.549,False
5,512GB,512,1,100MB,168.03,112.27,False
6,256GB,512,3,100MB,124.557,71.847,False
7,256GB,512,2,100MB,144.661,93.242,False
8,256GB,512,1,100MB,136.589,81.686,False
9,983GB,512,3,100MB,109.01,58.997,False


In [12]:
# Convert TB to GB first, then remove units and convert to float
df['data (GB)'] = df['data (GB)'].str.replace('TB', '000').str.replace('GB', '').astype(float)
df['partition (MB)'] = df['partition (MB)'].str.replace('MB', '').astype(int)
df['num_worlds'] = ((df['data (GB)'] * 1024 + df['partition (MB)'] -1 ) // df['partition (MB)']).astype(int)

In [13]:
df = df[['partition (MB)', 'data (GB)', 'batch_size', 'batch_varying', 'run', 'num_worlds', 'total_duration (s)', 'inference_duration (s)']]
df.sort_values(by=['partition (MB)', 'data (GB)','batch_size','batch_varying', 'run'], inplace=True)

In [14]:
df.round(2).to_csv('./results/state_machine_logs.csv', index=False)