## Utilities to Upload, Ingest and Verify Logfiles From SSDs to s3 to Aletheia


In [1]:
import boto3
import requests
import json
import os
import argparse
import pandas as pd
import time

import numpy as np
from datetime import datetime 
#from brtdevkit.ml import Workflow
import pytz

#from brtdevkit.ml import Workflow
from brtdevkit.ml.core import KubeflowPipeline, KubeflowPipelineRun

image_ingestion_pipeline = None
isp_target_rom = '07090103'

utc = pytz.UTC

from warnings import filterwarnings
filterwarnings("ignore")

### To Complete the Process of Getting Logfiles On an SSD into Images in Aletheia:</br>
* Upload the SSD to AWS via the AWS CLI
* Verify the Upload
* Kick off ingestion workflows (will typically take several hours including ISP processing)
* Verify Ingestion
</br>

#### Outside of This Loop for SSDs, Verify That All Logs Collected to Date Have Been Ingested Into Aletheia

### Functions to Carry Out Each Step


In [2]:
# Initialize s3 and set environment variables
import time
s3 = boto3.client('s3')
image_ingestion_experiment_name = 'shasta_image_ingest'
image_ingestion_pipeline_name = 'Shasta Image Data Processor'
image_ingestion_pipeline = None

def get_logfile_directories(bucket, prefix, just_fsize =False):
    """
    Finds S3 directories with flatbuffer logfiles in them.
    :param bucket: S3 bucket to search in
    :param prefix: the prefix (pathname) underneath the bucket to be searched
    """

    s3 = boto3.client('s3')

    dir_list = []
    fsize = 0
    kwargs = {'Bucket': bucket, 'Prefix': prefix}
    while True:
        resp = s3.list_objects_v2(MaxKeys =10000, **kwargs)
        
        # This bit of code handles pagination in the boto library.
        if 'Contents' in resp:
            for entry in resp['Contents']:
                if entry['Key'].endswith('.bfbs'):
                    dir_list.append(os.path.dirname(entry['Key']))
                    fsize += entry['Size']
            try:
                kwargs['ContinuationToken'] = resp['NextContinuationToken']
            except KeyError:
                break
        else:
            break
    if just_fsize == True:
        return fsize
    else:
        return dir_list, fsize

def get_SSD_file_size(start_path):
    """
    Given a path to an SSD, get the size of the files. 
    """
    
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            # skip if it is symbolic link
            if not os.path.islink(fp):
                total_size += os.path.getsize(fp)

    return total_size

def check_upload_complete(bucket, prefix, SSD_path):
    """
    Given a bucket and prefix in S3, as well as the path to the SSD from my laptop
    Verify that eveything on the SSD was in fact uploaded to S3. 
    
    ** TESTED: Works great for DCM logs. Probably not for machines with all their VPUs.
    """
    
    # Retrieve all log file names on the SSD
    SSD_dirs = []
    for f in os.listdir(SSD_path):
        li = os.listdir(SSD_path + f)
        SSD_dirs.append(prefix  +f)

    # Retrieve all logfile names in S3
    s3_dirs, fsize = get_logfile_directories(bucket = bucket, prefix = prefix)
    s3_dirs = set(s3_dirs)

    print(f'There are {len(SSD_dirs)} logfiles on the SSD and {len(s3_dirs)} in s3.')
    
    # Check SSD against S3
    s3_fsize = 0
    for n in np.unique(SSD_dirs):
        if n in s3_dirs:
            fsize = get_logfile_directories(bucket = bucket, prefix = n, just_fsize=True)
            s3_fsize = s3_fsize + fsize
    
    # Compare filesizes
    s3_size = s3_fsize/1000000
    SSD_fsize = get_SSD_file_size(SSD_path)/1000000
    equal_fsize = abs(SSD_fsize - s3_size) < 1000
    
    # Compare logs
    all_logs_in_s3 = set(SSD_dirs).discard(set(s3_dirs)) == None
    
    if all_logs_in_s3 and equal_fsize:
        print(f'File size of {s3_size:.0f} MB confirmed')
        print('All files on drive accounted for in S3.')
        #return dirs, s3_size
    elif all_logs_in_s3:
        print(f'Could not verify filesize. There is a discepancy of {SSD_fsize - s3_size} out of {SSD_fsize}. Check paths')
    
    else:
        print('Upload looks strange. Verify path.')
        return set(SSD_dirs).discard(set(s3_dirs)), s3_size

def submit_image_ingestion_workflow(bucket, s3_key, retry=50):
    """
    Wrapper around the brtdevkit image ingestion pipeline object that is error tolerant.
    :param bucket: S3 bucket name
    :param s3_key: s3 key of object to be processed
    :param retry: number of times to retry the worflow submission in case of failure
    :return: tuple of pipeline id and s3_key
    """

    global image_ingestion_pipeline  # NOQA. Use a global so we only have to set this var once and
                                     # can run tests importing this file without retrieving the pipeline.
                                     # TODO: Encapsulate in a class.
    if not image_ingestion_pipeline:
        image_ingestion_pipeline = KubeflowPipeline.retrieve(name=image_ingestion_pipeline_name)
    for i in range(retry):
        try:
            now = datetime.now().strftime("%d %B %Y %I:%M:%S %p")
            run = image_ingestion_pipeline.submit(
                job_name=f'Run of {image_ingestion_pipeline.name} at {now}',
                experiment_name=image_ingestion_experiment_name,
                params={
                    's3_bucket': bucket,
                    's3_key': s3_key,
                    'optional_args': '-u'
                    #'optional_args':{'camer'}
                })
        except Exception as e:
            print(f'Attempt {i+1}: error encountered while processing {s3_key}, retrying...')
            print(e)
            time.sleep(5)
            continue
        break
    run_id = run['id']
    print(f"launched image ingestion pipeline {run_id} on key {s3_key}")
    return run_id, s3_key

def ingest_new_SSD_logs(SSD_path, dcm):
    """
    Given a list of logfiles in S3, launch ingestion workflows for those S3 logfiles. 
    """
    SSD_dirs = []
    for f in os.listdir(SSD_path):
        li = os.listdir(SSD_path + f)
        SSD_dirs.append(prefix +f)
    print(f'There are {len(SSD_dirs)} logs to ingest.') 
    for i in SSD_dirs[0:5]:
        submit_image_ingestion_workflow('brt-dcm-data', dcm + '/0/full/' +i)
        time.sleep(30)
        
def verify_ingest(df, in_s3):
    """
    Given a dataframe of all DCM images from 2021, and a list of logfiels in S3, find the difference between the two.
    These will be the logfiles we still need to ingest. 
    """
    # Check which s3_paths are present in Aletheia (2021 data only)
    paths = list(df.s3_path.unique())
    in_db = set([p.split('s3://brt-dcm-data/')[1] for p in paths])
    missing = list(in_s3.difference(in_db))
    print(f'There are {len(missing)} s3 logs not accounted for in Aletheia.')
    return sorted(missing)

In [4]:
bucket = 'brt-dcm-data'
prefix = 'porpoise-sysbox1/0/full/'
d, f = get_logfile_directories(bucket, prefix)
print(len(d))
d[0:5]

3364


['porpoise-sysbox1/0/full/2018-01-28_15-59-09.238302761',
 'porpoise-sysbox1/0/full/2018-01-28_15-59-09.238302761',
 'porpoise-sysbox1/0/full/2018-01-28_15-59-09.238302761',
 'porpoise-sysbox1/0/full/2018-01-28_15-59-09.238302761',
 'porpoise-sysbox1/0/full/2018-01-28_15-59-18.355660711']

In [23]:
new_logs = []

for m in d:
    #print(m[32:34])
    if (m[24:32] == '2021-04-') and (int(m[32:34]) > 16):
        new_logs.append(m)
        
nl = np.unique(new_logs)

In [25]:
import time

def submit_image_ingestion_workflow(bucket, s3_key, retry=50):
    """
    Wrapper around the brtdevkit image ingestion pipeline object that is error tolerant.
    :param bucket: S3 bucket name
    :param s3_key: s3 key of object to be processed
    :param retry: number of times to retry the worflow submission in case of failure
    :return: tuple of pipeline id and s3_key
    """

    global image_ingestion_pipeline  # NOQA. Use a global so we only have to set this var once and
                                     # can run tests importing this file without retrieving the pipeline.
                                     # TODO: Encapsulate in a class.
    if not image_ingestion_pipeline:
        image_ingestion_pipeline = KubeflowPipeline.retrieve(name=image_ingestion_pipeline_name)
    for i in range(retry):
        try:
            now = datetime.now().strftime("%d %B %Y %I:%M:%S %p")
            run = image_ingestion_pipeline.submit(
                job_name=f'Run of {image_ingestion_pipeline.name} at {now}',
                experiment_name=image_ingestion_experiment_name,
                params={
                    's3_bucket': bucket,
                    's3_key': s3_key,
                    'optional_args': '-u'
                })
        except Exception as e:
            print(f'Attempt {i+1}: error encountered while processing {s3_key}, retrying...')
            print(e)
            time.sleep(5)
            continue
        break
    run_id = run['id']
    print(f"launched image ingestion pipeline {run_id} on key {s3_key}")
    return run_id, s3_key

for i in nl[5:]:
    submit_image_ingestion_workflow(bucket='brt-dcm-data', s3_key = i)
    time.sleep(30)

launched image ingestion pipeline b78d5a46-1215-4b67-a03a-9879dbace002 on key porpoise-sysbox1/0/full/2021-04-20_14-56-14.874151309
launched image ingestion pipeline 5133a244-6a5f-419e-ab69-bb37e7979945 on key porpoise-sysbox1/0/full/2021-04-20_15-01-19.445722962
launched image ingestion pipeline ffea8990-afb8-41e6-9062-c3e82e951b36 on key porpoise-sysbox1/0/full/2021-04-20_15-03-57.144754216
launched image ingestion pipeline 88382aeb-56e9-4917-a6b9-a9220d6ab049 on key porpoise-sysbox1/0/full/2021-04-20_15-13-57.877413844
launched image ingestion pipeline 854ae98b-07b1-4483-81d9-f58b970fa7cc on key porpoise-sysbox1/0/full/2021-04-20_15-19-00.285750208
launched image ingestion pipeline 35db1d27-6995-439c-9cf5-0686d014a909 on key porpoise-sysbox1/0/full/2021-04-20_15-22-28.231276854
launched image ingestion pipeline 6c1cced6-d44b-46e9-a73b-74f73a10c0f5 on key porpoise-sysbox1/0/full/2021-04-20_15-25-40.097126456
launched image ingestion pipeline 9a9a1a7d-efbd-405e-8360-531bcdc14a0e on ke

### With the SSD Still Plugged in, Verify Upload and Ingest

In [55]:
# Check To See if All Files Have Been uploaded from a SSD

def check_upload_complete(bucket, prefix, SSD_path):
    """
    Given a bucket and prefix in S3, as well as the path to the SSD from my laptop
    Verify that eveything on the SSD was in fact uploaded to S3. 
    
    ** TESTED: Works great for DCM logs. Probably not for machines with all their VPUs.
    """
    
    # Retrieve all log file names on the SSD
    SSD_dirs = []
    for f in os.listdir(SSD_path):
        li = os.listdir(SSD_path + f)
        SSD_dirs.append(prefix  + 'vpu0-0a/full/' +f)

    # Retrieve all logfile names in S3
    s3_dirs, fsize = get_logfile_directories(bucket = bucket, prefix = prefix)
    s3_dirs = set(s3_dirs)

    print(f'There are {len(SSD_dirs)} logfiles on the SSD and {len(s3_dirs)} in s3.')
    
    # Check SSD against S3
    s3_fsize = 0
    for n in np.unique(SSD_dirs):
        if n in s3_dirs:
            fsize = get_logfile_directories(bucket = bucket, prefix = n, just_fsize=True)
            s3_fsize = s3_fsize + fsize
    
    # Compare filesizes
    s3_size = s3_fsize/1000000
    SSD_fsize = get_SSD_file_size(SSD_path)/1000000
    equal_fsize = abs(SSD_fsize - s3_size) < 1000
    
    # Compare logs
    all_logs_in_s3 = set(SSD_dirs).discard(set(s3_dirs)) == None
    
    if all_logs_in_s3 and equal_fsize:
        print(f'File size of {s3_size:.0f} MB confirmed')
        print('All files on drive accounted for in S3.')
        #return dirs, s3_size
    elif all_logs_in_s3:
        print(f'Could not verify filesize. There is a discepancy of {SSD_fsize - s3_size} out of {SSD_fsize}. Check paths')
    
    else:
        print('Upload looks strange. Verify path.')
        return set(SSD_dirs).discard(set(s3_dirs)), s3_size

bucket = 'brt-dcm-data'
prefix = 'dcm18/'
SSD_path ='/media/williamroberts/SSD223/dcm18-sysbox1/vpu0-0a/full/'

check_upload_complete(bucket , prefix , SSD_path )

KeyboardInterrupt: 

In [None]:
SSD_path = '/media/williamroberts/SSD209/dcm12-sysbox1/vpu0-0a/full/'

def ingest_new_SSD_logs(SSD_path, dcm, v='vpu0-0a'):
    """
    Given a list of logfiles in S3, launch ingestion workflows for those S3 logfiles. 
    """
    SSD_dirs = []
    for f in os.listdir(SSD_path):
        li = os.listdir(SSD_path + f)
        SSD_dirs.append(f)
    print(f'There are {len(SSD_dirs)} logs to ingest.') 
    for i in SSD_dirs:
        #print('brt-dcm-data', dcm + '/vpu0-0a/full/' +i)
        submit_image_ingestion_workflow('brt-dcm-data', dcm + '/'+ v +'/full/' +i)
        time.sleep(30)
        
vpus = ['vpu0-1a', 'vpu0-2a', 'vpu0-3a', 'vpu0-4a', 'vpu0-10a', 'vpu0-11a', 'vpu0-12a', 'vpu0-13a', 'vpu0-14a']

#for v in vpus:
    #ingest_new_SSD_logs('/media/williamroberts/SSD118/db4-sysbox1/' + v + '/full/', 'db8')
        
ingest_new_SSD_logs(SSD_path, 'dcm12') # Ingest after 130

There are 219 logs to ingest.
launched image ingestion pipeline b5c77cc7-3000-4335-92a7-a99081e409f2 on key dcm12/vpu0-0a/full/2021-06-29_01-29-14.940401284_197_1818.335926628
launched image ingestion pipeline b9690ebb-30d0-4c16-a229-57a0850ccbe3 on key dcm12/vpu0-0a/full/2021-07-06_16-04-58.677250304_221_793.701921197
launched image ingestion pipeline d77a1ec7-738a-4fac-8493-b805cee31bdf on key dcm12/vpu0-0a/full/2021-07-06_16-39-30.602806674_221_2865.601380021
launched image ingestion pipeline 9d809da1-9148-4054-9a98-32e08ee0e323 on key dcm12/vpu0-0a/full/2021-07-06_16-35-38.557842347_221_2633.556415470
launched image ingestion pipeline 54312d35-acae-4f78-8276-fdce8e3ebab5 on key dcm12/vpu0-0a/full/2021-06-26_23-06-27.793663147_189_1158.997155979
launched image ingestion pipeline 46452bc9-e653-43f5-a0bb-44098064e90b on key dcm12/vpu0-0a/full/2021-07-02_22-24-47.569261365_208_392.996689781
launched image ingestion pipeline 4f845531-56b1-4e93-b5b3-82ec388bba67 on key dcm12/vpu0-0a/full

In [96]:
# This will take some time depending on the number of logfiles on the SSD
# Follow the Kubeflow runs at: https://kubeflow.brtws.com/_/pipeline/#/experiments/details/638eda8c-17cd-42be-bbb8-ab7a45647283
# You can also follow ISP processing progress at: https://grafana.brtws.com/d/AWSSQS000/aws-sqs?from=now-24h&orgId=1&to=now&var-datasource=CloudWatch&var-queue=isp_medium_priority_prod&var-region=default
for i in SSD_dirs:
    submit_image_ingestion_workflow('brt-dcm-data', i)
    time.sleep(30)


launched image ingestion pipeline 65ae65b7-3877-446b-9b3c-cd0541a7157e on key dcm14/0/full/2021-04-27_20-05-55.439067087_83_4210.580051503
launched image ingestion pipeline fa0a3422-ea0e-49f9-a149-abcf5c60a3e5 on key dcm14/0/full/2021-05-05_19-46-51.726701077_104_3476.726666805
launched image ingestion pipeline 7bf913a9-6603-40ae-9f62-be9f6a961dae on key dcm14/0/full/2021-05-05_19-08-17.343964765_104_1162.343930237
launched image ingestion pipeline 1b6aaaad-9677-4878-93ca-577073976ded on key dcm14/0/full/2021-04-28_16-32-27.657436520_92_255.658392136
launched image ingestion pipeline 08150ad4-4e9e-4bd4-a685-424b87b8b8b3 on key dcm14/0/full/2021-05-06_01-05-08.497570586_112_2075.684745274
launched image ingestion pipeline 73c69a48-9592-4a68-9fb1-5515d2db4f36 on key dcm14/0/full/2021-04-30_17-49-26.123169205_98_3094.338461013
launched image ingestion pipeline f59fc750-933f-4f6a-8932-62e3b5d0a68a on key dcm14/0/full/2021-04-30_19-10-37.565067440_100_1139.721681424
launched image ingestion

### After Waiting For ISP Processing of NRG Images to Complete, Verify Ingest Has Concluded Successfully
First, query aletheia for all 2021 DCM data (or a subset if you know where to look for the specific logs)

In [49]:
import pandas as pd
import time
import numpy as np
from matplotlib import pyplot as plt, rcParams

import brtdevkit
from brtdevkit.core.db import DBConnector, DatetimeFilter
from brtdevkit.core.db.db_filters import *  # We need this for pre-defined filters, e.g., ProjectFilter, DatetimeFilter

def get_shasta_data(filters={}, start=None, end=None, limit=None):
    """
    Query relevant Shasta data for calculations. 
    """
    start_time = time.time()
    connector = DBConnector()
    img_filters = {'project_name': 'shasta', **filters}
    if start is not None or end is not None:
        img_filters = [img_filters, DatetimeFilter(key="collected_on", start=start, end=end)]
    df = connector.get_documents_df('image', img_filters, limit=limit)
    elapsed_time = time.time() - start_time
    return df, elapsed_time

start = datetime(2021,3, 29) # The first field was collected on 3/30/2021
end = datetime(2021, 7, 28)

dcms_2021_1 = ['DCM11', 'DCM12', 'DCM13', 'DCM14', 'DCM15', 'DCM16', 'DCM17', 'DCM18', 'DCM19']
dcms_2021_2 = ['DCM20', 'DCM21', 'DCM22', 'DCM23', 'DCM24', 'DCM25', 'DCM26', 'DCM27', 'DCM28']
machines_2021 = ['DB1', 'DB2', 'DB3', 'DB4', 'DB5', 'DB6', 'DB7', 'DB8', 'DB9', 'DBMULE2']

dcms_2020 =  ['DCM-MANATEE', 'DCM-WALRUS', 'DCM-SEAL', 'DCM-OTTER', 'DCM-PORPOISE', 'DCM-DOLPHIN']
#{'$in': dcms_2021} 

filters = {"artifacts.kind": "nrg", 'robot_name' : 'DB9'
          }

df, elapsed_time = get_shasta_data(filters=filters, start=start, end=end)
#df['grower_farm_field'] = df['grower'] + '_' + df['farm'] + '_' + df['operating_field_name']
print(f"Queried {len(df)} images in {elapsed_time:.2f} s.")

Queried 427997 images in 189.23 s.


In [35]:
# For Brazilian DCMs

dolphin , f = get_logfile_directories('brt-dcm-data', 'dolphin-sysbox1')
otter , f = get_logfile_directories('brt-dcm-data', 'otter-sysbox1')

in_s3_br = set(otter + dolphin)
len(in_s3_br)

1345

In [52]:
# Check to see if all DCM logs in S3 are accounted for in the Aletheia database
# If Desired, Verify all DCM Logs in s3 (recommended periodically)

dcm11 , f = get_logfile_directories('brt-dcm-data', 'dcm11')
dcm12 , f = get_logfile_directories('brt-dcm-data', 'dcm12')
dcm13 , f = get_logfile_directories('brt-dcm-data', 'dcm13')
dcm14 , f = get_logfile_directories('brt-dcm-data', 'dcm14')
dcm15, f = get_logfile_directories('brt-dcm-data', 'dcm15')
dcm16 , f = get_logfile_directories('brt-dcm-data', 'dcm16')
dcm17, f = get_logfile_directories('brt-dcm-data', 'dcm17')
dcm18 , f = get_logfile_directories('brt-dcm-data', 'dcm18')
dcm19, f = get_logfile_directories('brt-dcm-data', 'dcm19')

NameError: name 'dcm20' is not defined

In [84]:
dcm20 , f = get_logfile_directories('brt-dcm-data', 'dcm20')
dcm21 , f = get_logfile_directories('brt-dcm-data', 'dcm21')
dcm22 , f = get_logfile_directories('brt-dcm-data', 'dcm22')
dcm23 , f = get_logfile_directories('brt-dcm-data', 'dcm23')
dcm24, f = get_logfile_directories('brt-dcm-data', 'dcm24')
dcm25 , f = get_logfile_directories('brt-dcm-data', 'dcm25')
dcm26, f = get_logfile_directories('brt-dcm-data', 'dcm26')
dcm27 , f = get_logfile_directories('brt-dcm-data', 'dcm27')
dcm28, f = get_logfile_directories('brt-dcm-data', 'dcm28')

In [16]:
dcm11 , f = get_logfile_directories('brt-dcm-data', 'dcm11')
dcm12 , f = get_logfile_directories('brt-dcm-data', 'dcm12')
dcm14 , f = get_logfile_directories('brt-dcm-data', 'dcm14')

jft = set(dcm11 + dcm12 + dcm14)
len(jft)

2480

In [15]:
db, f = get_logfile_directories('brt-dcm-data', 'db2')
missing = verify_ingest(df, set(db))
print(len(np.unique(missing)))
print(sorted(np.unique(missing))[0:10])

There are 52 s3 logs not accounted for in Aletheia.
52
['dcm33/0/full/2021-05-19_19-03-12.248710678_17_2305.474838075', 'dcm33/0/full/2021-05-19_22-15-14.277825634_20_408.951241649', 'dcm33/0/full/2021-05-19_22-15-19.302720335_20_413.976136446', 'dcm33/0/full/2021-05-19_22-15-27.334432646_20_422.007848597', 'dcm33/vpu0-0a/full/2021-06-01_18-51-31.526544343_28_1516.611694551', 'dcm33/vpu0-0a/full/2021-06-01_18-52-41.842929970_28_1586.928080082', 'dcm33/vpu0-0a/full/2021-06-01_18-54-03.172883551_28_1668.258033727', 'dcm33/vpu0-0a/full/2021-06-01_18-55-08.471716609_28_1733.556866817', 'dcm33/vpu0-0a/full/2021-06-01_18-56-20.808882251_28_1805.894032427', 'dcm33/vpu0-0a/full/2021-06-01_18-57-40.149592492_28_1885.234742700']


In [106]:
mi = [i for i in missing if i[0:13] == 'dcm16/vpu0-0a']
mi[0:5]

['dcm16/vpu0-0a/full/2021-05-31_15-34-54.251941500_116_355.405043484',
 'dcm16/vpu0-0a/full/2021-05-31_15-40-33.545393449_116_694.698495209',
 'dcm16/vpu0-0a/full/2021-05-31_15-44-15.366226473_116_916.519328457',
 'dcm16/vpu0-0a/full/2021-05-31_15-48-35.337014065_116_1176.490115761',
 'dcm16/vpu0-0a/full/2021-05-31_15-51-54.102441530_116_1375.255543353']

In [50]:
def verify_ingest_db(df, in_s3, robot_name):
    """
    Given a dataframe of all DCM images from 2021, and a list of logfiles in S3, find the difference between the two.
    These will be the logfiles we still need to ingest. 
    """
    # Check which s3_paths are present in Aletheia (2021 data only)
    paths = list(df.s3_path.unique())
    paths = [ p for p in paths if (p[0:18] == 's3://brt-dcm-data/') ]
    paths = [p.split('s3://brt-dcm-data/')[1] for p in paths]
    in_db = [p.split('/log')[0] for p in paths]
    print(in_s3[0:5])
    # omit logs from vpu0-0a
    in_s3 = [v for v in in_s3  if (v.split(robot_name)[1][0:8] != '/vpu0-0a')]
    # omit anthing but full logs
    in_s3 = [v for v in in_s3  if len(v.split('full'))>1]
    in_s3 = set([d.split('/log')[0] for d in in_s3])
    missing = list(in_s3.difference(in_db))
    print(f'There are {len(missing)} s3 logs not accounted for in Aletheia.')
    return missing

robot = 'db9'
db, f = get_logfile_directories('brt-dcm-data', robot)
indb = verify_ingest_db(df, db, robot)

len(indb)

['db9/vpu0-0a/full/2021-04-20_19-44-39.934046338_79_13567.642429221', 'db9/vpu0-0a/full/2021-04-20_19-44-39.934046338_79_13567.642429221/log_timestamps/0/aos/remote_timestamps/vpu0-10a/0/aos/aos-message_bridge-Timestamp', 'db9/vpu0-0a/full/2021-04-20_19-44-39.934046338_79_13567.642429221/log_timestamps/0/aos/remote_timestamps/vpu0-10a/lead/brt-SystemParameters', 'db9/vpu0-0a/full/2021-04-20_19-44-39.934046338_79_13567.642429221/log_timestamps/0/aos/remote_timestamps/vpu0-10a/pose/brt-vpu-pose-fbs-Origin', 'db9/vpu0-0a/full/2021-04-20_19-44-39.934046338_79_13567.642429221/log_timestamps/0/aos/remote_timestamps/vpu0-11a/0/aos/aos-message_bridge-Timestamp']
There are 755 s3 logs not accounted for in Aletheia.


755

In [51]:
vpus = [n for n in indb if n[4] == 'v']
print(len(indb))
len(np.unique(vpus))

755


755

In [52]:
vpus[0:10]

['db9/vpu0-4a/full/2021-06-04_19-31-43.089322973_261_4519.567481783',
 'db9/vpu0-3a/full/2021-06-10_19-45-28.298933093_289_9469.614497722',
 'db9/vpu0-12a/full/2021-06-05_21-05-13.838737011_277_892.415336059',
 'db9/vpu0-14a/full/2021-05-08_17-34-34.281011488_173_12894.541370553',
 'db9/vpu0-12a/full/2021-06-08_17-34-34.440144858_294_588.713912895',
 'db9/vpu0-14a/full/2021-06-16_21-00-19.237840838_344_12466.923374850',
 'db9/vpu0-13a/full/2021-05-11_20-01-39.059219095_181_6080.488913710',
 'db9/vpu0-4a/full/2021-05-08_16-57-22.701320519_168_10663.101777460',
 'db9/vpu0-11a/full/2021-06-15_15-27-27.702873433_334_2705.108433287',
 'db9/vpu0-2a/full/2021-06-05_21-27-14.976913604_268_2213.786690860']

In [53]:
df = None

In [85]:
in_s3 =  set(dcm20 + dcm21 + dcm22 + dcm23  + dcm24 + dcm25 + dcm26 +dcm27 + dcm28 )
len(in_s3)

907

In [16]:
for i in vpus:
    submit_image_ingestion_workflow('brt-dcm-data', i)
    time.sleep(30)

launched image ingestion pipeline cee6ccfe-9bc6-4488-8215-b810e866c5df on key dcm25/vpu0-0a/full/2021-05-24_18-20-55.241924798_56_2152.541077182
launched image ingestion pipeline aa20edac-1213-4de1-8ef0-344a3445cc2d on key dcm25/vpu0-0a/full/2021-05-20_22-25-51.571510008_49_664.776981464
launched image ingestion pipeline cb6bbfcc-75e7-4393-89c3-d3f1065d9e3e on key dcm25/vpu0-0a/full/2021-05-27_14-54-29.040817782_76_384.445881078
launched image ingestion pipeline c9ccc417-c052-4f02-9c4e-c5f67864b3bc on key dcm25/vpu0-0a/full/2021-05-27_15-13-58.655300800_76_1554.060364160
launched image ingestion pipeline 31d79524-8963-401a-bee6-f4b9d85be1af on key dcm25/vpu0-0a/full/2021-05-21_15-38-17.129990254_53_842.342395054
launched image ingestion pipeline c83f044a-efec-45ac-a954-f046161ddbd9 on key dcm25/vpu0-0a/full/2021-05-24_21-16-24.963515086_58_855.438916430
launched image ingestion pipeline a44a6784-6304-42f7-a90b-bf4e9d8832f5 on key dcm25/vpu0-0a/full/2021-05-25_20-11-44.319817129_66_1145

In [31]:
lr= sorted(np.unique(vpus))
mr = sorted(vpus)
print(lr[0])
print(mr[0])

print(mr == lr)

db2/vpu0-10a/full/2021-06-09_18-12-42.454382431_443_6052.370048027
db2/vpu0-10a/full/2021-06-09_18-12-42.454382431_443_6052.370048027
True


In [124]:
robot_name = 'db8'
in_s3 = [v for v in db8_in_s3  if (v.split(robot_name)[1][0:8] != '/vpu0-0a')]
    # omit anthing but full logs
in_s3 = [v for v in in_s3  if len(v.split('full'))>1]
in_s3 = set([d.split('/log')[0] for d in in_s3])
print(len(in_s3))
list(in_s3)[10:20]

228


['db8/vpu0-2a/full/2021-06-09_20-39-21.317797596_359_764.712383189',
 'db8/vpu0-14a/full/2021-06-10_15-59-25.134464136_281_11706.818771931',
 'db8/vpu0-3a/full/2021-06-10_15-59-05.875493610_343_11687.338647656',
 'db8/vpu0-14a/full/2021-06-10_15-58-57.398867230_281_11679.083174993',
 'db8/vpu0-13a/full/2021-06-11_14-56-27.545658460_373_2393.529725240',
 'db8/vpu0-1a/full/2021-06-10_16-32-26.883237997_361_13688.579352407',
 'db8/vpu0-11a/full/2021-06-11_14-56-59.433583804_374_2425.661908635',
 'db8/vpu0-14a/full/2021-06-11_14-46-14.245065688_286_1780.507646301',
 'db8/vpu0-12a/full/2021-06-10_15-54-27.629244018_364_11409.007836013',
 'db8/vpu0-13a/full/2021-06-10_14-08-33.026096031_368_5054.520648780']

In [125]:
for i in list(in_s3):
    submit_image_ingestion_workflow('brt-dcm-data', i)
    time.sleep(30)

launched image ingestion pipeline 71400bf8-7980-43f7-bac0-02d4e698385e on key db8/vpu0-1a/full/2021-06-11_14-48-04.276660442_366_1853.561335462
launched image ingestion pipeline 657cd0b4-22db-4061-9750-375a16feef40 on key db8/vpu0-10a/full/2021-06-10_14-06-33.727684440_383_4935.345559932
launched image ingestion pipeline 3d33d8c1-0a02-4efc-9966-44654f89e46a on key db8/vpu0-12a/full/2021-06-10_15-59-05.890441111_364_11687.269032850
launched image ingestion pipeline 70fade3e-1b15-416d-a6c2-565e927f5995 on key db8/vpu0-12a/full/2021-06-11_14-36-15.789576550_369_1181.731651484
launched image ingestion pipeline 4c67dc9f-ee00-4fed-b1d1-014f2bdf0d16 on key db8/vpu0-11a/full/2021-06-10_14-08-33.034139113_370_5054.784347962
launched image ingestion pipeline dcfcb61d-efeb-4d78-81ee-e020c9352220 on key db8/vpu0-1a/full/2021-06-11_14-56-27.517447561_366_2356.802122741
launched image ingestion pipeline 15872d28-0121-406d-8040-a515b84b7b96 on key db8/vpu0-1a/full/2021-06-11_14-36-15.837744995_366_11

In [5]:
def submit_image_ingestion_workflow_db(bucket, s3_key, retry=50):
    """
    Wrapper around the brtdevkit image ingestion pipeline object that is error tolerant.
    :param bucket: S3 bucket name
    :param s3_key: s3 key of object to be processed
    :param retry: number of times to retry the worflow submission in case of failure
    :return: tuple of pipeline id and s3_key
    """

    global image_ingestion_pipeline  # NOQA. Use a global so we only have to set this var once and
                                     # can run tests importing this file without retrieving the pipeline.
                                     # TODO: Encapsulate in a class.
    if not image_ingestion_pipeline:
        image_ingestion_pipeline = KubeflowPipeline.retrieve(name=image_ingestion_pipeline_name)
    for i in range(retry):
        try:
            now = datetime.now().strftime("%d %B %Y %I:%M:%S %p")
            run = image_ingestion_pipeline.submit(
                job_name=f'Run of {image_ingestion_pipeline.name} at {now}',
                experiment_name=image_ingestion_experiment_name,
                params={
                    's3_bucket': bucket,
                    's3_key': s3_key,
                    'optional_args': '-u',
                    'optional_args': '-c0000'
                })
        except Exception as e:
            print(f'Attempt {i+1}: error encountered while processing {s3_key}, retrying...')
            print(e)
            time.sleep(5)
            continue
        break
    run_id = run['id']
    print(f"launched image ingestion pipeline {run_id} on key {s3_key}")
    return run_id, s3_key

In [28]:
# These logs cannot be ingested because of errors (jobs always fail with exit codes 134, 139)

bad_logs = ['dcm11/0/full/2021-04-12_20-11-21.560889820_117_457.700285692', 
           'dcm11/0/full/2021-04-12_20-20-15.207594838_117_991.346990646',
          'dcm13/0/full/2021-04-14_17-35-08.530899564_95_570.222910779',
           'dcm13/0/full/2021-04-06_19-48-18.536140039_68_1117.581160359',
        'dcm11/0/full/2021-04-12_20-20-15.207594838_117_991.346990646', 
        'dcm11/0/full/2021-04-12_20-11-21.560889820_117_457.700285692',
        'dcm13/0/full/2021-03-29_17-10-18.706782732_37_5593.882559140',
        'dcm11/0/full/2021-03-31_22-58-21.127151520_90_322.236384096'
          ]