In [1]:
import json
import pandas as pd
import pathlib
from nsys_events import filter_time
import numpy as np
from tqdm import tqdm

In [2]:
with open("Llama7B_trace_unmerged/nsys_events_intermediate_output.json", "r") as f:
    data = json.load(f)

In [3]:
comm_info = data['comm_info']
new_output_dir = pathlib.Path("Llama7B_N4_GPU16_TP1_PP1_DP16_BS32_output")
new_comm_info = pd.read_csv(new_output_dir / "comm_info.csv")
new_comm_ring_info = pd.read_csv(new_output_dir / "comm_ring_info.csv")
new_comm_tree_info = pd.read_csv(new_output_dir / "comm_tree_info.csv")
new_kernel_events = pd.read_csv(new_output_dir / "kernel_events.csv")
new_profiling_interval = pd.read_csv(new_output_dir / "profiling_interval.csv")
new_coll_info = pd.read_csv(new_output_dir / "coll_info.csv")
new_coll_kernels = pd.read_csv(new_output_dir / "coll_kernels.csv")
new_comm_data = pd.read_csv(new_output_dir / "comm_data.csv")
new_comm_data_coll = new_comm_data.merge(new_coll_info, left_on="eventId", right_on="association", how="inner")

In [4]:
new_coll_kernels

Unnamed: 0,nWarps,count,chunkCount,workCount,lastChunkCount,workOffset,sendbuff,recvbuff,association
0,3,1,8192,1,70368744185856,0,70386895028736,70386895028736,796755
1,3,1,8192,1,70368744185856,0,70386895028224,70386895028224,796760
2,3,1,4096,1,187647121166336,0,70386895028224,70386895028224,796795
3,3,1,8192,1,70368744185856,0,70386895028736,70386895028736,796815
4,3,1,8192,1,70368744185856,0,70386895029248,70386895029248,796835
...,...,...,...,...,...,...,...,...,...
302667,17,18481664,2097152,2314240,70368746274816,9256960,70420320361984,70420043137024,796617
302668,17,18481664,2097152,2314240,70368746274816,11571200,70420320361984,70420043137024,796617
302669,17,18481664,2097152,2314240,70368746274816,13885440,70420320361984,70420043137024,796617
302670,17,18481664,2097152,2281984,70368746274816,16199680,70420320361984,70420043137024,796617


In [5]:
gpuId_relationships = {}
# verify all communicators
for commId in new_comm_info["commId"].unique():
    assert commId in comm_info.keys()
    df_rows = new_comm_info[new_comm_info["commId"] == commId]
    comm_dict = comm_info[commId]
    # verify size
    assert len(df_rows["nRanks"].unique()) == 1
    new_nRanks = df_rows["nRanks"].unique()[0]
    assert comm_dict["nranks"] == new_nRanks

    # construct and verify relationship
    gpuId_to_rank = comm_dict["gpuId_To_rank"]
    rank_to_gpuId = {v: k for k, v in gpuId_to_rank.items()}
    for rank in rank_to_gpuId:
        rank_df_row = df_rows[df_rows["rank"] == int(rank)]
        assert len(rank_df_row) == 1
        new_gpuId = tuple(rank_df_row.iloc[0][["nodeId", "pid"]])
        gpuId = rank_to_gpuId[rank]
        assert gpuId_relationships.setdefault(int(gpuId), new_gpuId) == new_gpuId
    # gpuId_relationships[
    # verify rank to gpuId mapping
    rank_to_gpuId_dict = {int(rank): comm_dict["rank_To_rankInfo"][rank]["gpuId"] for rank in comm_dict["rank_To_rankInfo"]}
    for rank, gpuId in rank_to_gpuId_dict.items():
        assert str(rank) == str(gpuId_to_rank[str(gpuId)])
    
    # verify ring topology
    ring_rows = new_comm_ring_info[new_comm_ring_info["commId"] == commId]
    ring_dicts = {rank: comm_dict["rank_To_rankInfo"][rank]["channel_info"]["Ring"] for rank in comm_dict["rank_To_rankInfo"]}
    for rank in ring_dicts:
        ring_row = ring_rows[ring_rows["myRank"] == int(rank)]
        chnl_to_topo_dict = {int(topo["channel_Id"]): (int(topo["previous_rank"]), int(topo["next_rank"])) for topo in ring_dicts[rank]}
        assert len(chnl_to_topo_dict) == len(ring_row)
        chnl_to_topo_df_dict = {row["channelId"]: (row["prevRank"], row["nextRank"]) for _, row in ring_row.iterrows()}
        assert chnl_to_topo_dict == chnl_to_topo_df_dict
    
    # verify tree topology
    tree_rows = new_comm_tree_info[new_comm_tree_info["commId"] == commId]
    tree_dicts = {rank: comm_dict["rank_To_rankInfo"][rank]["channel_info"]["Tree"] for rank in comm_dict["rank_To_rankInfo"]}
    for rank in tree_dicts:
        tree_row = tree_rows[tree_rows["myRank"] == int(rank)]
        chnl_to_topo_dict = {int(topo["channel_Id"]): (int(topo["parent_rank"]), tuple(int(topo[f"child_{i}_rank"]) for i in range(1,4))) for topo in tree_dicts[rank]}
        assert len(chnl_to_topo_dict) == len(tree_row)
        chnl_to_topo_df_dict = {row["channelId"]: (row["parentRank"], (row["child1Rank"], row["child2Rank"], row["child3Rank"])) for _, row in tree_row.iterrows()}
        assert chnl_to_topo_dict == chnl_to_topo_df_dict

In [6]:
# verify all the cupti kernel events
cupti_events = data["cupti_kernel_results"]
cupti_events_flat = {}
for goalRank, rank_dict in cupti_events.items():
    for gpuId, streams in rank_dict.items():
        assert gpuId not in cupti_events_flat
        cupti_events_flat[gpuId_relationships[int(gpuId)]] = streams

new_kernel_events = filter_time(new_profiling_interval, new_kernel_events) # questionable here

for gpu, gpu_df in new_kernel_events.groupby(["nodeId", "pid"]):
    cupti_events_dict = cupti_events_flat[gpu]
    for streamId, stream_df in gpu_df.groupby("streamId"):
        cupti_events_list = sorted(cupti_events_dict[str(streamId)], key=lambda x: x["ts_gpu_start"])
        assert len(cupti_events_list) == len(stream_df)
        stream_df_sorted = stream_df.sort_values(by="start")
        for old_event, (_, new_event) in zip(cupti_events_list, stream_df_sorted.iterrows()):
            assert old_event["gpu_event_type"] == new_event["collective"]
            assert old_event["ts_gpu_start"] == new_event["start"]
            assert old_event["ts_gpu_end"] == new_event["end"]


In [10]:
# verify all the host side kernel events
with open("Llama7B_trace_unmerged/nsys_events_merged_output.json", "r") as f:
    nvtx_events = json.load(f)
nvtx_events_flat = {}
for goalRank, rank_dict in nvtx_events.items():
    for gpuId, streams in rank_dict.items():
        assert gpuId not in nvtx_events_flat
        nvtx_events_flat[gpuId_relationships[int(gpuId)]] = streams

# removing the streamId keys for easier matching
for gpu, streams in nvtx_events_flat.items():
    streams = [streams[stream] for stream in streams]
    for i, stream in enumerate(streams): # inflate the grouped collectives
        inflated_stream = []
        for event in stream:
            if event["event_type"] == "GroupColl":
                coll_template = event.copy()
                group_gpu_start = coll_template["ts_gpu_start"]
                coll_template["event_type"] = coll_template.pop("coll_type")
                coll_template.pop("coll_events")
                coll_template["in_group"] = True
                coll_template["ts_gpu_start"] = coll_template["ts_gpu_end"]
                for i, coll_event in enumerate(event["coll_events"]):
                    new_event = coll_template.copy()
                    new_event.update(coll_event)
                    if i == 0:
                        new_event["ts_gpu_start"] = group_gpu_start
                    inflated_stream.append(new_event)
            else:
                event["in_group"] = False
                inflated_stream.append(event)
        streams[i] = inflated_stream

    stream_lengths = [len(stream) for stream in streams]
    assert len(np.unique(stream_lengths)) == len(stream_lengths) # for the matching to work properly
    streams = {len(stream): stream for stream in streams}
    # print(streams)
    nvtx_events_flat[gpu] = streams

# verify the collective data
new_comm_data_coll = filter_time(new_profiling_interval, new_comm_data_coll)
# new_comm_data_with_commId = new_comm_data_coll.merge(new_comm_info[["commId", "commHash", "nodeId", "pid"]], on=["commHash", "nodeId", "pid"], how="left")
for gpu, gpu_df in tqdm(new_comm_data_coll.groupby(["nodeId", "pid"])):
    gpu_streams = nvtx_events_flat[gpu]
    streams_df = [stream_df for _, stream_df in gpu_df.groupby("stream")]
    streams_df = sorted(streams_df, key=lambda x: len(x))
    stream_df_lengths = [len(stream_df) for stream_df in streams_df]
    assert len(np.unique(stream_df_lengths)) == len(stream_df_lengths) # for the matching
    streams_df = {len(stream_df): stream_df for stream_df in streams_df}
    for length, stream_df in streams_df.items():
        if length not in gpu_streams:
            print(f"Length {length} not in gpu_streams keys {list(gpu_streams.keys())} for gpu {gpu}, skipping...")
            continue
        gpu_stream = gpu_streams[length]
        # print(gpu_stream)
        stream_df = stream_df.sort_values(by="start")
        gpu_stream = sorted(gpu_stream, key=lambda x: x["ts_start"])
        for old_event, (_, new_event) in zip(gpu_stream, stream_df.iterrows()):
            try:
                assert old_event["event_type"] == new_event["collective"], f"Mismatch collective type: old {old_event['event_type']} vs new {new_event['collective']}"
                assert old_event["data_size"] == new_event["data_size"], f"Mismatch data_size: old {old_event['data_size']} vs new {new_event['data_size']}"
                assert old_event["commId"] == new_event["commId"], f"Mismatch commId: old {old_event['commId']} vs new {new_event['commId']}"
                assert int(old_event.get("red_op", 0)) == new_event["redOp"], f"Mismatch red_op: old {old_event.get('red_op', 0)} vs new {new_event['redOp']}"
                assert int(old_event["algorithm"]) == new_event["algo"], f"Mismatch algorithm: old {old_event['algorithm']} vs new {new_event['algo']}"
                assert int(old_event["protocol"]) == new_event["proto"], f"Mismatch protocol: old {old_event['protocol']} vs new {new_event['proto']}"
                assert old_event["chunkSteps"] == new_event["chunkSteps"], f"Mismatch chunkSteps: old {old_event['chunkSteps']} vs new {new_event['chunkSteps']}"
                assert old_event["sliceSteps"] == new_event["sliceSteps"], f"Mismatch sliceSteps: old {old_event['sliceSteps']} vs new {new_event['sliceSteps']}"
                assert old_event["stepSize"] == new_event["stepSize"], f"Mismatch stepSize: old {old_event['stepSize']} vs new {new_event['stepSize']}"
                assert old_event["in_group"] == (new_event["groupId"] != -1), f"Mismatch in_group: old {old_event['in_group']} vs new {new_event['groupId'] != -1}"
                assert old_event["ts_gpu_start"] == new_event["start"], f"Mismatch ts_gpu_start: old {old_event['ts_gpu_start']} vs new {new_event['start']}"
                assert old_event["ts_gpu_end"] == new_event["end"], f"Mismatch ts_gpu_end: old {old_event['ts_gpu_end']} vs new {new_event['end']}"
                # TODO: currently failing the test for type_size
                if old_event["event_type"] != "AllGather":
                    assert old_event["type_size"] == new_event["type_size"], f"Mismatch type_size: old {old_event['type_size']} vs new {new_event['type_size']}"
            except AssertionError as e:
                print(f"AssertionError for gpu {gpu}, event old: {old_event}, new: {new_event.to_dict()}")
                raise e
            
            new_coll_elements = new_coll_kernels[new_coll_kernels["association"] == new_event["eventId"]]
            new_coll_elements = new_coll_elements.sort_values(by="sendbuff")
            old_coll_elements = sorted(old_event["elems"], key=lambda x: x["sendbuff"])
            assert len(old_coll_elements) == len(new_coll_elements), f"Mismatch number of collective elements: old {len(old_coll_elements)} vs new {len(new_coll_elements)}"
            for old_elem, (_, new_elem) in zip(old_coll_elements, new_coll_elements.iterrows()):
                assert old_elem["sendbuff"] == new_elem["sendbuff"], f"Mismatch sendbuff: old {old_elem['sendbuff']} vs new {new_elem['sendbuff']}"
                assert old_elem["recvbuff"] == new_elem["recvbuff"], f"Mismatch recvbuff: old {old_elem['recvbuff']} vs new {new_elem['recvbuff']}"
                assert old_elem["count"] == new_elem["count"], f"Mismatch count: old {old_elem['count']} vs new {new_elem['count']}"
                assert old_elem["chunkCount"] == new_elem["chunkCount"], f"Mismatch chunkCount: old {old_elem['chunkCount']} vs new {new_elem['chunkCount']}"
                assert old_elem["workCount"] == new_elem["workCount"], f"Mismatch workCount: old {old_elem['workCount']} vs new {new_elem['workCount']}"
                assert old_elem["lastChunkCount"] == new_elem["lastChunkCount"], f"Mismatch lastChunkCount: old {old_elem['lastChunkCount']} vs new {new_elem['lastChunkCount']}"
                assert old_elem["workOffset"] == new_elem["workOffset"], f"Mismatch workOffset: old {old_elem['workOffset']} vs new {new_elem['workOffset']}"
            # assert old_event[]


100%|██████████| 16/16 [00:03<00:00,  4.91it/s]
