In [53]:
from typing import Dict, List, Optional
from dataclasses import dataclass
import copy
from file_manager import FileManager
import os
import time
import datetime
import boto3
from ddb_handler import DynamoDBHandler

@dataclass
class NodeInfo:
    name: str
    ip: str
    ibdev: List[str]

In [55]:
DynamoDBHandler.create_table_if_not_exists("nwcd-l4-v1-nodes", 'node_name')

Creating table nwcd-l4-v1-nodes...


True

In [34]:
cluster_name = "nwcd-l4-v1" # os.environ.get('CLUSTER_NAME', 'default-cluster')
ecs_client = boto3.client('ecs')
container_instance_arns = []
paginator = ecs_client.get_paginator('list_container_instances')

for page in paginator.paginate(cluster=cluster_name):
    container_instance_arns.extend(page['containerInstanceArns'])

if container_instance_arns:
    desp_response = ecs_client.describe_container_instances(
            cluster=cluster_name,
            containerInstances=container_instance_arns,
            include=['TAGS']  # Include tags in the response
        )

In [35]:
aws ecs put-attributes \
    --cluster nwcd-l4-v1 \
    --attributes name=node_name,value=node004,targetId=arn:aws:ecs:us-east-1:633205212955:container-instance/nwcd-l4-v1/fc0f7fbdd633481885e154c98213b0f5


['arn:aws:ecs:us-east-1:633205212955:container-instance/nwcd-l4-v1/a27e3327be4e41a99cb39d326f7e2ebc',
 'arn:aws:ecs:us-east-1:633205212955:container-instance/nwcd-l4-v1/adb5b3187b22471c8994179b11b1e158',
 'arn:aws:ecs:us-east-1:633205212955:container-instance/nwcd-l4-v1/e26b0672b1c149e092d6694f0ba042f1',
 'arn:aws:ecs:us-east-1:633205212955:container-instance/nwcd-l4-v1/fc0f7fbdd633481885e154c98213b0f5']

In [13]:
os.system('''export CLUSTER_NAME="nwcd-gpus-v2"''')
os.system('''export JOB_MANAGE_TABLE="$CLUSTER_NAME-jobs"''')
os.system('''export TASK_MANAGE_TABLE="$CLUSTER_NAME-tasks"''')
os.system('''export NODE_MANAGE_TABLE="$CLUSTER_NAME-nodes"''')


class NodeManager:
    def __init__(self):
        print('Node Manager initiated..')
        self.node_config = FileManager.load_yaml("node_mapping_info.yaml")
        
        # Initialize nodes from config
        self.nodes = {
            name: NodeInfo(
                name=name,
                ip=info['ip'],
                ibdev=info['ibdev']
            )
            for name, info in self.node_config.items()
        }
        
        # Initialize DynamoDB table if needed
        self.table_name = "nwcd-gpus-v2-nodes" #os.environ.get('NODE_MANAGE_TABLE', 'node_status_table')
        print('## start creating table ##')
        self.ddb_initialized = DynamoDBHandler.create_table_if_not_exists(self.table_name, 'node_name')
        
        # Initialize ECS client and sync node status with DDB
        if self.ddb_initialized:
            cluster_name = "nwcd-gpus-v2" # os.environ.get('CLUSTER_NAME', 'default-cluster')
            ecs_client = boto3.client('ecs')
            container_instance_arns = []
            paginator = ecs_client.get_paginator('list_container_instances')

            for page in paginator.paginate(cluster=cluster_name):
                container_instance_arns.extend(page['containerInstanceArns'])

            if container_instance_arns:
                desp_response = ecs_client.describe_container_instances(
                        cluster=cluster_name,
                        containerInstances=container_instance_arns,
                        include=['TAGS']  # Include tags in the response
                    )

                for i, inst_arn in enumerate(container_instance_arns):
                    for attrdict in desp_response['containerInstances'][i]['attributes']:
                        if attrdict['name'] == 'name':
                            node_name = attrdict['value']
                            if node_name in self.nodes:
                                # Use DynamoDBHandler to write item
                                DynamoDBHandler.write_item(
                                    table_name=self.table_name,
                                    item={
                                        'node_name': node_name,
                                        'container_instance_id': inst_arn.split('/')[-1],
                                        'container_instance_arn': inst_arn,
                                        'cluster_name': cluster_name,
                                        'last_status': 'AVAILABLE',
                                        'ip': self.nodes.get(node_name).ip,
                                        'ibdev': self.nodes.get(node_name).ibdev,
                                        'created_at': datetime.datetime.now().isoformat(),
                                        'updated_at': datetime.datetime.now().isoformat()
                                    }
                                )

xx = NodeManager()

Node Manager initiated..
## start creating table ##
Table nwcd-gpus-v2-nodes already exists


In [52]:
from ddb_handler import DynamoDBHandler
DynamoDBHandler.scan_table("nwcd-l4-v1-nodes")

[{'updated_at': '2025-03-06T07:59:26.112900',
  'ibdev': ['mlx5_10', 'mlx5_11', 'mlx5_12', 'mlx5_13'],
  'created_at': '2025-03-06T05:13:25.725112',
  'cluster_name': 'nwcd-l4-v1',
  'container_instance_arn': 'arn:aws:ecs:us-east-1:633205212955:container-instance/nwcd-l4-v1/fc0f7fbdd633481885e154c98213b0f5',
  'ip': '10.11.141.120',
  'container_instance_id': 'fc0f7fbdd633481885e154c98213b0f5',
  'node_name': 'node004',
  'last_status': 'AVAILABLE'},
 {'updated_at': '2025-03-06T07:59:26.183550',
  'ibdev': ['mlx5_10', 'mlx5_11', 'mlx5_12', 'mlx5_13'],
  'created_at': '2025-03-06T05:13:25.593195',
  'cluster_name': 'nwcd-l4-v1',
  'container_instance_arn': 'arn:aws:ecs:us-east-1:633205212955:container-instance/nwcd-l4-v1/e26b0672b1c149e092d6694f0ba042f1',
  'ip': '10.11.133.25',
  'container_instance_id': 'e26b0672b1c149e092d6694f0ba042f1',
  'node_name': 'node003',
  'last_status': 'AVAILABLE'},
 {'updated_at': '2025-03-06T08:02:37.913181',
  'ibdev': ['mlx5_10', 'mlx5_11', 'mlx5_12', 

In [56]:
from ddb_handler import DynamoDBHandler
DynamoDBHandler.scan_table("nwcd-l4-v1-jobs")

[{'updated_at': '2025-03-06T07:54:48.071986',
  'assigned_nodes': ['node003', 'node004'],
  'created_at': '2025-03-06T07:54:48.071994',
  'cluster_name': 'nwcd-l4-v1',
  'submittd_ecs_task_ids': ['ff0e11f17cd7440f967dc2d1f923d0bc',
   'd101c24b45a0402bbcc640cae81d74fd'],
  'num_nodes': Decimal('2'),
  'status': 'IN_PROGRESS',
  'job_timestamp': '20250306-075445',
  'submittd_container_inst_ids': ['e26b0672b1c149e092d6694f0ba042f1',
   'fc0f7fbdd633481885e154c98213b0f5'],
  'job_id': 'job1-20250306-075445-158e4cae',
  'retry': Decimal('0')},
 {'updated_at': '2025-03-06T07:39:52.655532',
  'assigned_nodes': ['node002', 'node001'],
  'created_at': '2025-03-06T07:39:52.655545',
  'cluster_name': 'nwcd-l4-v1',
  'submittd_ecs_task_ids': ['e80093343bd64dfabda415662e498387',
   '99a00fb5992a4af3980a3e8993092aea'],
  'num_nodes': Decimal('2'),
  'status': 'IN_PROGRESS',
  'job_timestamp': '20250306-073949',
  'submittd_container_inst_ids': ['adb5b3187b22471c8994179b11b1e158',
   'a27e3327be4e4

In [None]:
from ddb_handler import DynamoDBHandler
DynamoDBHandler.scan_table("nwcd-l4-v1-tasks")

[{'task_def_revision': '18',
  'created_at': '2025-03-06T03:12:43.663541',
  'cluster_name': 'nwcd-gpus-v2',
  'status': 'IN_PROGRESS',
  'job_timestamp': '20250306-031242',
  'ecs_task_id': '4c27005017904caa8bb33e9deaad28a8',
  'job_id': 'job1-20250306-031242-dd271b89',
  'node_name': 'node003',
  'retry': Decimal('0'),
  'task_def_arn': 'ECSTrainingJob-job1:18',
  'updated_at': '2025-03-06T03:12:43.663531',
  'container_inst_id': 'b5fcea53241b4d30aeaf6c9f6287b580',
  'node_index_in_job': Decimal('0'),
  'task_def_name': 'ECSTrainingJob-job1',
  'job_num_nodes': Decimal('1')}]

In [48]:
dynamodb = boto3.resource('dynamodb')
resp = DynamoDBHandler.get_item('nwcd-l4-v1-jobs', {'job_id':'job1-20250306-080121-5c33e936'})
# resp['submittd_ecs_task_ids']
resp

{'updated_at': '2025-03-06T08:01:24.089932',
 'assigned_nodes': ['node002', 'node001'],
 'created_at': '2025-03-06T08:01:24.089944',
 'cluster_name': 'nwcd-l4-v1',
 'submittd_ecs_task_ids': ['da31c3194df747a09cfe70d094ef8eee',
  '2526374ed1ed4d4296aa20135ee2a3f8'],
 'num_nodes': Decimal('2'),
 'status': 'IN_PROGRESS',
 'job_timestamp': '20250306-080121',
 'submittd_container_inst_ids': ['adb5b3187b22471c8994179b11b1e158',
  'a27e3327be4e41a99cb39d326f7e2ebc'],
 'job_id': 'job1-20250306-080121-5c33e936',
 'retry': Decimal('0')}

In [57]:
from datetime import datetime
success = DynamoDBHandler.update_item(
            table_name='nwcd-l4-v1-jobs', 
            key={'job_id': 'job1-20250306-082133-ac89ee5b'},
            update_expression="SET job_status = :s, updated_at = :t",
            expression_values={
                ':s': 'USER_STOPPED',
                ':t': datetime.now().isoformat()
            }
        )