In [1]:
import struct
import math
import pyarrow as pa
import pyarrow.parquet as pq
from ctypes import *
import sys, os
from pathlib import Path
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import dask.dataframe as dd
from time import sleep

In [2]:
logs_dir="/p/gpfs1/haridev/recorder_logs/lbann_jag_32"
parquet_folder="/p/gpfs1/haridev/parquet/lbann_jag_32_3"
os.makedirs(parquet_folder, exist_ok=True)
MEMORY_TO_USE=1024.0*1024.0*1024.0

In [3]:
class RecorderReader:
    
    def __init__(self, logs_dir, parquet_folder, generate_parquet = True, mem_use = 1024*1024*1024, threads = os.cpu_count(), num_ranks = -1):
        self.MEMORY_TO_USE = mem_use
        self.parquet_folder = parquet_folder
        self.global_log_file = os.path.join(logs_dir, "recorder.mt")
        self.local_rank_metadata =  os.path.join(logs_dir, '{}.mt')
        self.local_rank_data =  os.path.join(logs_dir, '{}.itf')
        self.GM = self.get_global_metadata()
        self.load_func_list()
        self.LMs = self.get_local_metadata()
        if num_ranks == -1:
            num_ranks = self.GM.total_ranks
        if generate_parquet:
            self.generate_parquet_files(threads, num_ranks)
        self.records = self.get_records()
        
    def str2char_p(self, s):
        return c_char_p( s.encode('utf-8') )
    
    
    def get_global_metadata(self):
        global_metadata_fs = Path(self.global_log_file).stat().st_size
        #print(global_metadata_fs)
        global_metadata = GlobalMetadata()
        with open(self.global_log_file, mode='rb') as file:
            content = file.read(sizeof(GlobalMetadata))
            memmove(pointer(global_metadata), content, sizeof(GlobalMetadata))
            global_metadata.update_py();
            #print(global_metadata)
        return global_metadata
    
    def get_local_metadata(self):
        local_metadatas = []
        for rank in range(self.GM.total_ranks):
            local_metadata_fn = self.local_rank_metadata.format(rank)
            local_metadata_fs = Path(local_metadata_fn).stat().st_size
            read_bf_size = local_metadata_fs
            local_metadata = LocalMetadata()
            with open(local_metadata_fn, mode='rb') as file:
                content = file.read(read_bf_size)
                memmove(pointer(local_metadata), content[:sizeof(LocalMetadata)], sizeof(LocalMetadata))
                local_metadata.update_py()

                offset = sizeof(LocalMetadata)
                local_metadata.file_sizes = []
                local_metadata.filemap = []
                for i in range(local_metadata.num_files):
                    #skip file_id
                    offset += sizeof(c_int) 

                    #read file size
                    local_file_size=c_size_t()
                    memmove(pointer(local_file_size), content[offset:offset+sizeof(c_size_t)], sizeof(c_size_t))
                    local_metadata.file_sizes.append(local_file_size.value)
                    offset += sizeof(c_size_t)

                    #read file name length
                    file_name_len = int.from_bytes(content[offset:offset+sizeof(c_int)], byteorder='little')
                    offset += sizeof(c_int)

                    #read file name
                    filename_bytes = bytearray(content[offset:offset+file_name_len])
                    filename = filename_bytes.decode('utf-8')
                    local_metadata.filemap.append(filename)
                    offset += file_name_len
                    #print("iter",i, local_metadata.filemap[i], local_metadata.file_sizes[i])
            local_metadatas.append(local_metadata)
        return local_metadatas
    
    def thread_print(self, string):
        print(f'{string}\n', end='')
    
    def generate_parquet_file_rank(self, rank):
        num_funcs = len(self.funcs)
        local_metadata = self.LMs[rank]
        local_data_fn = self.local_rank_data.format(rank)
        with open(local_data_fn, mode='rb') as file:        
            file.seek(0,os.SEEK_END)        
            local_data_fs = file.tell()
            file.seek(0,0)
            #print("\n",rank, local_data_fs)
            read_bf_size = local_data_fs
            if local_data_fs > self.MEMORY_TO_USE:
                read_bf_size = self.MEMORY_TO_USE
            record_offset = 0
            record_index = 0
            total_iterations = math.ceil(local_data_fs/MEMORY_TO_USE)
            iteration_index = 0
            write = True
            while record_offset < local_data_fs:
                iteration_index+=1
                pq_filename=os.path.join(self.parquet_folder,"{}_{}.parquet".format(rank,iteration_index))
                if os.path.exists(pq_filename):
                    self.thread_print("Skiping file {} as it exists.".format(pq_filename))
                    sleep(1)
                    write = False
                    break
                content = file.read(read_bf_size)
                records = {
                    'index': [],
                    'rank': [],
                    'status': [],
                    'tstart': [],
                    'tend': [],
                    'func_id': [],
                    'res': [],
                    'arg_count': [],
                    'args_str': [],
                    'args_1': [],
                    'args_2': [],
                    'args_3': [],
                    'args_4': [],
                    'args_5': []
                }
                schema = pa.schema({
                        "index" : pa.int32(),
                        "rank" : pa.int32(),
                        "status"  : pa.int32(),
                        "tstart" : pa.float32(),
                        "tend" : pa.float32(),
                        "func_id" : pa.string(),
                        "res" : pa.int32(),
                        "arg_count" : pa.int32(),
                        "args_str": pa.binary(),
                        "args_1": pa.binary(),
                        "args_2": pa.binary(),
                        "args_3": pa.binary(),
                        "args_4": pa.binary(),
                        "args_5": pa.binary()
                  })
                while record_offset < read_bf_size:
                    if record_offset + 15 > read_bf_size:
                        record_offset = read_bf_size
                    else:
                        record = Record()
                        offset = 0
                        #record status
                        status=c_ubyte()
                        memmove(pointer(status), content[record_offset + offset :
                                                         record_offset + offset + sizeof(c_ubyte)],
                                sizeof(c_ubyte))
                        record.status = status.value
                        offset += sizeof(c_ubyte)
                        #record tstart
                        tstart=c_int()
                        memmove(pointer(tstart), content[record_offset + offset : 
                                                         offset + sizeof(c_int)], 
                                sizeof(c_int))
                        record.tstart = tstart.value * self.GM.time_resolution
                        offset += sizeof(c_int)
                        #record tend
                        tend=c_int()
                        memmove(pointer(tend), content[record_offset + offset : 
                                                       record_offset + offset + sizeof(c_int)], 
                                sizeof(c_int))
                        record.tend = tend.value * self.GM.time_resolution
                        offset += sizeof(c_int)
                        #record res
                        res=c_int()
                        memmove(pointer(res), content[record_offset + offset : 
                                                      record_offset + offset + sizeof(c_int)], 
                                sizeof(c_int))
                        record.res = res.value
                        offset += sizeof(c_int)
                        #record func_id
                        func_id=c_ubyte()
                        memmove(pointer(func_id), content[record_offset + offset : 
                                                          record_offset + offset + sizeof(c_ubyte)], 
                                sizeof(c_ubyte))
                        #print(func_id)
                        record.func_id = func_id.value
                        offset += sizeof(c_ubyte)
                        #record args
                        last_arg_index = content[record_offset + offset + 1:].find(bytes('\n', 'utf-8'))
                        arg_str = content[record_offset + offset + 1: record_offset + offset + 1 + last_arg_index]
                        record.args = arg_str.split(bytes(' ', 'utf-8'))
                        record.arg_count = len(record.args)
                        record_size = offset + 1 + last_arg_index
                        record_offset += record_size + 1
                        #print(record)
                        file.seek(record_offset, 0)


                        record_index += 1
                        #create dict
                        records['index'].append(record_index)
                        records['rank'].append(rank)
                        records['status'].append(record.status)
                        records['tstart'].append(record.tstart)
                        records['tend'].append(record.tend)
                        
                        if num_funcs > record.func_id:
                            records['func_id'].append(self.funcs[record.func_id])
                            #print(record, arg_str,self.funcs[record.func_id])
                        else:
                            #print(record.func_id)
                            records['func_id'].append(str(record.func_id))
                        records['res'].append(record.res)
                        records['arg_count'].append(record.arg_count)
                        records['args_str'].append(arg_str)
                        for i in range(5):
                            if i < record.arg_count:
                                records['args_{}'.format(i+1)].append(record.args[i])
                            else:
                                records['args_{}'.format(i+1)].append("")

                        #print("Appended {} {} of {}".format(record_offset, record_index,local_metadata.total_records), end='\r')
                
                offset = record_offset % int(MEMORY_TO_USE)
                if write:
                    arrow_table = pa.Table.from_pydict(dict(
                        zip(schema.names, (records['index'],records['rank'], records['status'], 
                                           records['tstart'], records['tend'],
                                           records['func_id'], records['res'],
                                          records['arg_count'], records['args_str'],
                                          records['args_1'], records['args_2'],
                                          records['args_3'], records['args_4'],
                                          records['args_5']))
                    ),schema=schema)
                    self.thread_print("Writing file {} with record {}.".format(pq_filename,record_index))
                    pq.write_table(arrow_table, pq_filename, use_dictionary=False)
                #print(record_read_bytes, len(records))
        return rank
    
    def generate_parquet_files(self, threads, num_ranks):
        #self.generate_parquet_file_rank(0)
        
        with ThreadPoolExecutor(max_workers = threads) as executor:
            future_gen = {executor.submit(self.generate_parquet_file_rank, rank): rank for rank in range(num_ranks)}
            for future in concurrent.futures.as_completed(future_gen):
                rank = future_gen[future]
                try:
                    data = future.result()
                except Exception as exc:
                    self.thread_print('%r generated an exception: %s' % (rank, exc))
                else:
                    self.thread_print('%r data written' % (rank))
            
    def get_records(self):
        return dd.read_parquet("{}/*.parquet".format(parquet_folder), engine="pyarrow")
    
    


    def load_func_list(self):
        #print(sizeof(GlobalMetadata))
        with open(self.global_log_file, 'rb') as f:
            f.seek(24, 0)
            self.funcs = f.read().splitlines()
            self.funcs = [func.decode('utf-8').replace("PMPI", "MPI") for func in self.funcs]

In [4]:
class LocalMetadata(Structure):
    _fields_ = [
            ("c_start_timestamp", c_double),
            ("c_end_timestamp", c_double),
            ("c_num_files", c_int),
            ("c_total_records", c_int),
            ("c_filemap", POINTER(c_char_p)),
            ("c_file_sizes", POINTER(c_size_t)),
            ("c_function_count", c_int*256),
    ]
    def __init__(self):
        self.filenames=[]
        self.start_timestamp = 0
        self.end_timestamp = 0
        self.num_files = 0
        self.total_records = 0
        self.filemap = []
        self.file_sizes = []
        self.function_count = []
    
    def update_py(self):
        self.start_timestamp = self.c_start_timestamp
        self.end_timestamp = self.c_start_timestamp
        self.num_files = self.c_num_files
        self.total_records = self.c_total_records
    
    def __repr__ (self):
        return 'LocalMetadata(start_timestamp=' + str(self.start_timestamp) + \
                ' ,end_timestamp=' + str(self.end_timestamp) + \
                ' ,num_files=' + str(self.num_files) + \
                ' ,total_records=' + str(self.total_records) + \
                ')'

class GlobalMetadata(Structure):
    _fields_ = [
            ("c_time_resolution", c_double),
            ("c_total_ranks", c_int),
            ("c_peephole_window_size", c_int),
    ]
    def __init__(self):
        self.time_resolution = 0
        self.total_ranks = 0
        self.peephole_window_size = 0
    def update_py(self):
        self.time_resolution = self.c_time_resolution
        self.total_ranks = self.c_total_ranks
        self.peephole_window_size = self.c_peephole_window_size
        #deinit c structs
    
    def __repr__ (self):
        return 'GlobalMetadata(time_resolution=' + str(self.time_resolution) + \
                ' ,total_ranks=' + str(self.total_ranks) + \
                ' ,peephole_window_size=' + str(self.peephole_window_size) + \
                ')'


class Record(Structure):
    _fields_ = [
            ("c_status", c_char),
            ("c_tstart", c_double),
            ("c_tend", c_double),
            ("c_func_id", c_ubyte),
            ("c_arg_count", c_int),
            ("c_args", POINTER(c_char_p)),    # Note in python3, args[i] is 'bytes' type
            ("c_res", c_int),
    ]
    def __init__(self):
        self.status = ''
        self.tstart = 0
        self.tend = 0
        self.func_id = 0
        self.arg_count = 0
        self.res = 0
        self.args = []
    def update_py(self):
        self.status = self.c_status
        self.tstart = self.c_tstart
        self.tend = self.c_tend
        self.func_id = self.c_func_id
        self.arg_count = self.c_arg_count
        self.res = self.c_res
        
        #deinit c structs
        self.c_status = None
        self.c_tstart =  None
        self.c_tend =  None
        self.c_func_id = None
        self.c_arg_count = None
        self.c_res = None
    def to_dict(self):
        return {
            'status': self.status,
            'tstart': self.tstart,
            'tend': self.tend,
            'func_id': self.func_id,
            'arg_count': self.arg_count,
            'res': self.res,
            'args': self.args            
        }
    def __repr__ (self):
        return 'Record(status=' + str(self.status) + \
                ' ,tstart=' + str(self.tstart) + \
                ' ,tend=' + str(self.tend) + \
                ' ,c_func_id=' + str(self.func_id) + \
                ' ,c_arg_count=' + str(self.arg_count) + \
                ' ,c_res=' + str(self.res) + \
                ')'

In [5]:
recorder = RecorderReader(logs_dir, parquet_folder, threads=8, num_ranks = 1)

Skiping file /p/gpfs1/haridev/parquet/lbann_jag_32_3/0_1.parquet as it exists.
0 data written


In [100]:
ddf = recorder.get_records()

In [101]:
ddf=ddf.reset_index().set_index('index')

In [102]:
ddf['rank'].value_counts().compute()

0    327451
Name: rank, dtype: int64

In [103]:
ddf_rank_0 = ddf[ddf["rank"] ==0]

In [104]:
ddf_rank_0.head()

Unnamed: 0_level_0,level_0,rank,status,tstart,tend,func_id,res,arg_count,args_str,args_1,args_2,args_3,args_4,args_5
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
1,0,0,0,2.338808,2.338814,open,-1,3,b'/dev/shm/job2154496201-35619-OMPI_COLL_IBM-0...,b'/dev/shm/job2154496201-35619-OMPI_COLL_IBM-0...,b'557250',b'384',b'',b''
2,1,0,0,3.747584,2.338855,open,53,2,b'/dev/shm/job2154496201-35619-OMPI_COLL_IBM-0...,b'/dev/shm/job2154496201-35619-OMPI_COLL_IBM-0...,b'557058',b'',b'',b''
3,2,0,0,3.747584,2.338876,ftruncate,0,2,b'53 2691072',b'53',b'2691072',b'',b'',b''
4,3,0,0,3.747584,2.338881,mmap,0,6,b'%p 2691072 3 1 53 0',b'%p',b'2691072',b'3',b'1',b'53'
5,4,0,0,3.747584,2.338888,close,0,1,b'53',b'53',b'',b'',b'',b''


In [105]:
ddf_rank_0_open = ddf_rank_0[(ddf_rank_0["func_id"] == "open") &
                             (ddf_rank_0["arg_count"] == 2) &
                             (~ddf_rank_0["res"].isin([-1,0]))]

In [106]:
ddf_rank_0_open=ddf_rank_0_open.reset_index().set_index('index')

In [107]:
ddf_rank_0_open['filename'] = ddf_rank_0_open['args_1'].str.decode("utf-8", errors='ignore').compute()

In [108]:
filenames = ddf[ddf["func_id"] == "open"]['args_1'].unique().compute()

In [109]:
for i in range(10):
    print(filenames[i*100:(i+1)*100])

0     b'/dev/shm/job2154496201-35619-OMPI_COLL_IBM-0...
1     b'/dev/shm/job2154496201-35619-OMPI_COLL_IBM-0...
2     b'\x80\xbc\xce%\x00\xc1\xce%\x00\x00\x00\x00\x...
3     b'\x80\xc6\xce%\x00\xc9\xce%\x00\x00\x00\x00\x...
4     b'\x80\xb5\xcf%\x00\xba\xcf%\x00\x00\x00\x00\x...
                            ...                        
95            b'\x80cK1\x01zK1\x01\x00\x00\x00\x00\x02'
96    b'/sys/devices/pci0004:00/0004:00:00.0/0004:01...
97      b'\x80\xbcK1\x01\xd4K1\x01\x00\x00\x00\x00\x02'
98    b'/sys/devices/pci0004:00/0004:00:00.0/0004:01...
99           b'\x80EL1\x01\\L1\x01\x00\x00\x00\x00\x02'
Name: args_1, Length: 100, dtype: object
100    b'/sys/devices/pci0004:00/0004:00:00.0/0004:01...
101      b'\x80\x90L1\x01\xa9L1\x01\x00\x00\x00\x00\x02'
102    b'/sys/devices/pci0004:00/0004:00:00.0/0004:01...
103      b'\x80\xe0L1\x01\xf7L1\x01\x00\x00\x00\x00\x02'
104    b'/sys/devices/pci0004:00/0004:00:00.0/0004:01...
                             ...                        
1