In [None]:
from functools import partial
from io import FileIO
from itertools import chain
from multiprocessing import Pool
from pathlib import Path
import re
import time
from typing import MutableSequence
import psutil

from bs4 import BeautifulSoup
from cytoolz import merge
from google.protobuf.json_format import MessageToDict
from hostess.directory import (
    index_breadth_first, make_level_table, make_treeframe
)
from hostess.monitors import (
    make_monitors, make_stat_printer, make_stat_records
)
from hostess.utilities import notary
from more_itertools import divide, distribute
import pandas as pd
from rich import print as rp, inspect as ri
from yamcs.client import YamcsClient

# from mdbparser import find_toplevel_def_files, parse_ground_parameters
from yamcs_server_utilz import (
    get_yamcsd_url,
    run_yamcs_server, 
    open_yamcs_socket, 
    serve_packet, 
    read_packet,
    toggle_yamcsd_tcp
)

In [None]:
of_interest = (
    # these denote image file publications rather than blobs
#     '/ViperGround/Images/Hazcam_back_left_image',
#     '/ViperGround/Images/Hazcam_back_right_image',
#     '/ViperGround/Images/Hazcam_front_left_image',
#     '/ViperGround/Images/Hazcam_front_right_image',
    '/ViperGround/Images/ImageData/Hazcam_back_left_icer',
#     '/ViperGround/Images/ImageData/Hazcam_back_left_jpeg',
#     '/ViperGround/Images/ImageData/Hazcam_back_left_slog',
    '/ViperGround/Images/ImageData/Hazcam_back_right_icer',
#     '/ViperGround/Images/ImageData/Hazcam_back_right_jpeg',
#     '/ViperGround/Images/ImageData/Hazcam_back_right_slog',
    '/ViperGround/Images/ImageData/Hazcam_front_left_icer',
#     '/ViperGround/Images/ImageData/Hazcam_front_left_jpeg',
#     '/ViperGround/Images/ImageData/Hazcam_front_left_slog',
    '/ViperGround/Images/ImageData/Hazcam_front_right_icer',
#     '/ViperGround/Images/ImageData/Hazcam_front_right_jpeg',
#     '/ViperGround/Images/ImageData/Hazcam_front_right_slog',
    "/ViperGround/Images/ImageData/Navcam_left_icer",
    "/ViperGround/Images/ImageData/Navcam_right_icer",
    "/ViperGround/Images/ImageData/Aftcam_left_icer",
    "/ViperGround/Images/ImageData/Aftcam_right_icer",
#     '/ViperGround/Mapping/navcamDEM',
#     '/ViperGround/Mapping/navcamDEMHighRes',
#     '/ViperGround/Mapping/navcamHazmapColor',
#     '/ViperGround/Mapping/navcamOrthoimage',
#     '/ViperGround/Mapping/navcamOrthoimageHighRes',
#     '/ViperRover/CameraIo/aftCamLeft',
#     '/ViperRover/CameraIo/aftCamRight',
#     '/ViperRover/CameraIo/hazcam1',
#     '/ViperRover/CameraIo/hazcam2',
#     '/ViperRover/CameraIo/hazcam3',
#     '/ViperRover/CameraIo/hazcam4',
#     '/ViperRover/CameraIo/imageHeaderMsg',
#     '/ViperRover/CameraIo/imagePointer',
#     '/ViperRover/CameraIo/inputImageType',
#     '/ViperRover/CameraIo/navcamLeft',
#     '/ViperRover/CameraIo/navcamRight',
#     # only the top of the queue
# #     '/ViperRover/ImageCache/downlinkQueue',
#     # raw image data? we don't have to watch it probably
#     '/ViperRover/ImageCache/imageData',
#     '/ViperRover/ImageCache/inputImageType',
#     # probably not a thing
# #     '/ViperRover/ImageCache/losslessCache',
#     '/ViperRover/ImageCache/outputImageType',
#     '/ViperRover/ImageProcessing/cacheQueueItems',
#     '/ViperRover/ImageProcessing/cacheQueueNumEntries',
#     '/ViperRover/ImageProcessing/cameraQueueItems',
#     '/ViperRover/ImageProcessing/cameraQueueNumEntries',
#     '/ViperRover/ImageProcessing/imageHeaderMsg',
#     # probably memory pointer in classic sense
#     '/ViperRover/ImageProcessing/imagePointer',
#     '/ViperRover/ImageProcessing/inputImageType',
#     '/ViperRover/ImageProcessing/latestCameraId',
#     '/ViperRover/ImageProcessing/latestImageId',
#     '/ViperRover/ImageProcessing/latestProcessingStage',
#     # probably, again, onboard processing only
#     '/ViperRover/ImageProcessing/rawPoolFreeSlots',
#     '/ViperRover/ImageProcessing/slogPoolFreeSlots',
#     '/ViperRover/LightsControl/state'
)
    # includes a 'reserved' value but probably does not refer
    # to slots
#     '/ViperRover/CameraIo/encodedBothCCUsHousekeeping',
    # probably just a packet stream counter.
#     '/ViperRover/ImageCache/chunkId',
    # cacheQueueItems and cameraQueue items, while possibly interesting,
    # do not appear to contain information about the CCU slots.
    # note that NIRVSS parameters do not appear to contain useful information,
# at least not explicitly.

In [None]:
# note: throwing exceptions inside threads appears to terminate 
# the server process due to some kind of shielding thing. unclear 
# whether this just happens in Jupyter. anyway, make sure
# to use timeouts consistently. 
# note: if the server quits ungracefully, it may leave rockDB
# lock files in _global.rdb and viper.rdb, and you will have
# to clean them. it can also corrupt other files in yamcs-data,
# so it might be best to just delete the whole subdirectory 
# (it automatically recreates it at runtime)
server_process = run_yamcs_server(enable_tcp=True)
# unless loglevel is 0...
# yamcsd_url = get_yamcsd_url(server_process)
yamcsd_url = 'localhost:8090/yamcs'
server_process.running

In [None]:
def unpack_parameter_value(value):
    rec = {}
    for key in (
        'eng_value',
        'generation_time',
        'monitoring_result',
        'name',
        'processing_status',
        'range_condition',
        'raw_value',
        'reception_time',
        'validity_duration',
        'validity_status'
    ):
        rec[key] = getattr(value, key)
    return rec

def unpack_parameters(messages):
    return [
        unpack_parameter_value(value) 
        for value in messages.parameters
    ]

def unpack_parameters_into(cache: MutableSequence):
    def unpacker(message):
        cache.extend(unpack_parameters(message))
    return unpacker


def subscribe_and_report(parameters, url, pid):
    client = YamcsClient(url)
    processor = client.get_processor('viper', 'realtime')
    cache = []
    unpacker = unpack_parameters_into(cache)
    subscription = processor.create_parameter_subscription(
        parameters, on_data=unpacker
    )
    exception = None
    is_done, is_closed = False, False
    while subscription.running():
        response = None
        try:
            response = subscription.exception(0.2)
            if isinstance(response, Exception):
                if not psutil.pid_exists(server_process.pid):
                    status = 'server shutdown'
                else:
                    status = 'unknown'
                return {
                    'status': status,
                    'response': response, 
                    'cache': cache,
                    'parameters': parameters
                }
        except Exception as ex:
            if "timed" in str(ex).lower():
                continue
            return {
                'status': 'failed', 
                'response': response, 
                'request_exception': ex,
                'cache': cache,
                'parameters': parameters
            }
    if psutil.pid_exists(server_process.pid):
        status = 'server shutdown'
    else:
        status = 'unknown'
    return {
        'status': 'ok', 'cache': cache, 'parameters': parameters
    }

In [None]:
# client = YamcsClient(yamcsd_url)
# processor = client.get_processor('viper', 'realtime')
# cache = []
# unpacker = unpack_parameters_into(cache)
# subscription = processor.create_parameter_subscription(
#     of_interest + ('/Ccsds/apid',), on_data=unpacker
# )


In [None]:
yamcsd_url = 'localhost:8090/yamcs'
client = YamcsClient(yamcsd_url)
mdb = client.get_mdb('viper')
params = tuple(mdb.list_parameters())
parameter_paths = [p.qualified_name for p in params]
iio = [p.qualified_name for p in params if 'wheel' in p.qualified_name.lower()]
# ri(iio[0], all=True)

In [None]:
chunks = [tuple(d) for d in distribute(10, of_interest)]
pool, results = Pool(10), []
for chunk in chunks:
    future = pool.apply_async(
        subscribe_and_report, (chunk, yamcsd_url, server_process.pid)
    )
    results.append(future)
pool.close()

In [None]:
sock = open_yamcs_socket()
rawfile = 'packets.b6s3.part-square.fixed.raw'
packetstream = FileIO(rawfile, 'rb')
mons = make_monitors()
mons.pop('cpu')
mons.pop('memory')
mons.pop('diskio')
mons.pop('networkio')
mons.pop('disk')
dashrecs = make_stat_records(mons)
dash = make_stat_printer(mons)

In [None]:
# repeat ad nauseam -- 1071166 total packets
# packetstream.seek(0)
dash()
i = 0
try:
    reading = True
    while reading:
#     for n in range(100000):
        packet = read_packet(packetstream)
        serve_packet(sock, packet)
        i += 1
        if i % 30000 == 0:
            print(i)
            print(dash(which='interval').replace(';','\n'))
            print(
                'crashed processes:', 
                len(tuple(filter(lambda r: r.ready(), results)))
            )
#             time.sleep(0.00001)
except Exception as ex:
    if str(ex) == "unpack requires a buffer of 6 bytes":
        print("reached end of file.")
        reading = False
    else:
        raise
finally:
    sock.close()
#     server_process.terminate()
    packetstream.close()

In [None]:
print(dash(which='total'))
print(
    'crashed processes:', 
    len(tuple(filter(lambda r: r.ready(), results)))
)

In [None]:
server_process.terminate()
recs = []
for ix, result in enumerate(results):
    print(ix)
    recs.append(result.get())
pool.terminate()

In [None]:
import pickle

with open('of_interest_23_07_21.pkl', 'wb') as stream:
    pickle.dump(recs, stream)

In [None]:
archive = client.get_archive('viper')

In [None]:
for group in archive.list_processed_parameter_groups():
    frame_count = 0
    for pp_group in archive.list_processed_parameter_group_histogram(group):
        for rec in pp_group.records:
            frame_count += rec.count
    print(f"  {group: <40} {frame_count: >20}")

In [None]:
roverground = [p for p in parameters if p.qualified_name.startswith('/Viper')]

In [None]:
roverground[3].qualified_name

In [None]:
for p in roverground:
    history = archive.list_parameter_ranges(p)
    if len(history) > 0:
        break

In [None]:
try:
    next(iter(values))
except Exception as e:
    print(type(e))

In [None]:
from yamcs.core.exceptions import NotFound

In [None]:
unpack_parameter_value(value)

In [None]:
records = []
for parameter in roverground:
    qname = parameter.qualified_name.lower()
    if 'wheel' in qname:
        continue
    if ('rover' in qname) and ('image' not in qname):
        continue
    try:
        values = tuple(archive.list_parameter_values(parameter))
    except NotFound:
        print(f"{parameter}: no values")
        continue
    print(f"{parameter}: {len(values)}")
    for value in values:
        records.append(unpack_parameter_value(value))
    break

In [None]:
pv = next(iter(phist))

In [None]:
iteriter = iter(phist)

In [None]:
next(iter(phist)

In [None]:

# lower level stuff:
# viper = next(filter(lambda i: i.name == 'viper', client.list_instances()))
# instance = viper._proto
# mdb = instance.missionDatabase
# spacesystems = list(map(MessageToDict, mdb.spaceSystem))

In [None]:
1

In [None]:
results[2].get()

In [None]:
server_process.terminate()

In [None]:
results[0].ready()

In [None]:
results = [r.get() for r in results]

In [None]:
from hostess.utilities import mb
from hostess.profilers import asizeof

In [None]:
mb(asizeof(cache))

In [None]:
len(cache)

In [None]:
unpack_parameters(values[3])

In [None]:
dir(values[3].parameters[0])

In [None]:
server_process.err[-30:]

In [None]:
psubscription.result(1)

In [None]:
psubscription.done()

In [None]:
ri(psubscription, all=True)

In [None]:
server_process.err[-3:]

In [None]:
psubscription.result()

In [None]:
1

In [None]:
ri(subscription, methods=True)

In [None]:
sub = system.sub[0]

In [None]:
sub.sub

In [None]:
type(sub)

In [None]:
subscriptions = ground.sub

In [None]:
link = links[0]._proto

In [None]:
import json

In [None]:
json.loads(link.spec)

In [None]:
ri(link)

In [None]:
links = tuple(client.list_links("viper"))
ri(links[0], all=True)

In [None]:
type(viper._proto)

In [None]:
ri(client.ctx, all=True)

In [None]:
dir(client)

In [None]:
defs = find_toplevel_def_files()
systems, params = parse_ground_parameters(defs['ground'])