In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
plt.rc('figure', figsize=(8,8))

import random
import os.path

import pycamhd.lazycache as lazycache     ## For accessing data through a Lazycache instance
import pycamhd.lazyqt as lazyqt           ## For accessing data on disk or through HTTP

import json

## Local imports
from timer import Timer


from dask import compute,delayed,threaded,multiprocessing

In [2]:
video_lengths = dict()

In [3]:
## Default values

## These paths must exist in all locations ... the code doesn't check...
ci_url_root = "https://rawdata.oceanobservatories.org/files/"

paths = ['/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T000000Z.mov',
         '/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T030000Z.mov',
         '/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T060000Z.mov',
         '/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T090000Z.mov',
         '/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T120000Z.mov',
         '/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T150000Z.mov',
         '/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T180000Z.mov',
         '/RS03ASHS/PN03B/06-CAMHDA301/2016/01/01/CAMHDA301-20160101T210000Z.mov'
        ]
local_path = "/data"

local_cached_url   = 'http://localhost:8080/'
local_uncached_url = 'http://localhost:9080/'

local_nginx_url    = 'http://localhost:9081/'

gae_uncached_url   = 'https://camhd-app-dev-nocache.appspot.com/'
gae_cached_url   = 'https://camhd-app-dev.appspot.com/'

reps = 100

nthreads = [1,2,4,8]

import platform
hostname = platform.node()
config_file = "%s_config.json" % hostname

results_file = "%s_results.json" % hostname


In [4]:
## import local configuration
if os.path.isfile(config_file):
    print("Loading config file %s" % config_file)
    
    with open( config_file, 'r' ) as f:
        conf = json.load( f )

        
    if 'local_path' in conf.keys(): local_path = conf['local_path']
    if 'reps' in conf.keys():       reps = conf['reps']

Loading config file ursine_config.json


In [5]:
print("Configuration")
print("Local_path: %s" % local_path )
print("Reps: %d" % reps )

Configuration
Local_path: /home/aaron/canine/workspace/camhd_analysis/rawdata_mirror/rawdata.oceanobservatories.org/files
Reps: 100


In [9]:

def movie_length( movie ):
    if movie in video_lengths.keys():
        vid_length = video_lengths[ movie ]
    else:
        ## Retrieve (and cache) total number of frames in movie
        print("need to query %s" % movie )
        metadata = lazyqt.get_metadata( ci_url_root + movie )
        vid_length = metadata['num_frames']
        video_lengths[movie] = vid_length
    
    return vid_length

def random_frames( count, seed = -1 ):
    if seed >= 0: 
        print("Setting random seed to %d" % seed)
        rng = random.Random(seed)
    else:
        rng = random.Random()
        
    movies = rng.choices( paths, k=count )
    return [[m, rng.randrange( 1, movie_length(m) )] for m in movies]

def test_kernel( repo, frame ):
    with Timer() as t:
        ## Todo.   Check quality of outcome
        img = repo.get_frame( frame[0], frame[1] )
        if img is None:
            valid = False
        else:       
            sz = img.shape
            if (sz[0] == 1080) and (sz[1] == 1920):
                valid = True
            else:
                valid = False
            
    out =  {'valid': valid, 'path': frame[0], 'frame': frame[1]}
    if valid:
        out['msecs'] = t.msecs
        
    return out
                
def do_test_repo( repo, count, seed = -1, nthreads = 1 ):
    if nthreads == 1:
        results = [test_kernel( repo, frame ) for frame in random_frames( count, seed=seed )]  
    else:
        values = [delayed(test_kernel)( repo, frame ) for frame in random_frames( count, seed=seed )]  
        results = compute(*values, get=threaded.get, num_workers=nthreads)
    
    return results
    

def plot_times( results, desc ):
    msec = [t['msecs'] for t in results if t['valid']]
    if len(msec) < 1: return
    
    print("%s: %f ms per frame" % (desc,np.mean(msec)) )

    plt.hist( msec, bins = 20, normed=True )
    plt.xlabel('msec')
    plt.ylabel('frequency')

    
def save_results( results, keys, misc = {} ):
    if os.path.isfile(results_file):
        with open(results_file) as f:
            try:
                jresults = json.load( f )
            except json.JSONDecodeError:
                jresults = dict()
            
    else:
        jresults = dict()
    
    newdata = dict()
    newdata['results'] = results
    newdata.update(misc)  
    
    ## Nested keys
    d = jresults
    for k in keys[:-1]:
        d = d.setdefault(k, {})
        
    d[keys[-1]] = newdata

    
    with open(results_file,'w') as f:
        json.dump( jresults, f, indent=2 )
   
    

def run_analysis( repo, keys, count, seed = -1 ):
    for nt in nthreads:

        if seed >= 0:
            seed = seed+nt

        with Timer() as t:
            results = do_test_repo( repo, count, nthreads=nt, seed=seed )
            
        newkey = keys + ["%d_threads" % nt]
        plot_times( results, newkey )
        
        print("Total time: %f" % t.msecs )
        
        misc={'nthreads': nt, 'total_msec': t.msecs }
        if seed >= 0: misc['seed'] = seed
        
        with Timer() as t:
            save_results(results, newkey, misc ) 
        
        print("%f msec to save results" % t.msecs )

# Direct LazyQT conversion

### Direct disk access

In [None]:
run_analysis( lazyqt.LazyQtAccessor( local_path ), ["lazyqt","local_disk"], count=reps)

### Local HTTP server serving local data

In [None]:
run_analysis( lazyqt.LazyQtAccessor( local_nginx_url ), ["lazyqt","local_nginx"], count=reps )

### Contacting CI directly 

In [None]:
run_analysis( lazyqt.LazyQtAccessor( ci_url_root ), ["lazyqt","ci"], count=reps )

# Local lazycache server, non-caching


### Using local disk

In [None]:
def lazycache_berna( host ):
    return lazycache.LazycacheAccessor("%s/v1/berna/data/" % host)

def lazycache_nginx( host ):
    return lazycache.LazycacheAccessor("%s/v1/nginx_data/" % host)

def lazycache_ci( host ):
        return lazycache.LazycacheAccessor("%s/v1/org/oceanobservatories/rawdata/files/" % host)
    
keys = ["lazycache","local","uncached"]

In [None]:

## run this one twice to demonstrate caching isn't enabled

seed = random.randint(0,65535)
repo = lazycache_berna( local_uncached_url )

run_analysis( repo, keys + ["local_disk"], seed = seed, count=reps  )
run_analysis( repo, keys + ["local_disk"], seed = seed, count=reps  )

### connecting to local HTTP server

In [None]:
run_analysis( lazycache_nginx( local_uncached_url ), keys + ["local_nginx"], count=reps  )

### contacting CI directly

In [None]:
run_analysis( lazycache_ci( local_uncached_url ), keys + ["ci"], count=reps  )

# Local Lazycache server, with caching enabled


In [None]:
keys = ["lazycache","local","cached"]

seed = random.randint(0,65535)
run_analysis( lazycache_berna( local_cached_url ), keys + ["local_disk"], seed = seed, count=reps )
run_analysis( lazycache_berna( local_cached_url ), keys + ["local_disk"], seed = seed, count=reps )

seed = random.randint(0,65535)
run_analysis( lazycache_nginx( local_cached_url ), keys + ["local_nginx"], seed = seed, count=reps  )
run_analysis( lazycache_nginx( local_cached_url ), keys + ["local_nginx"], seed = seed, count=reps  )

seed = random.randint(0,65535)
run_analysis( lazycache_ci( local_cached_url ), keys + ["ci"], seed = seed, count=reps  )
run_analysis( lazycache_ci( local_cached_url ), keys + ["ci"], seed = seed, count=reps  )

# Google App Engine instance of Lazycache

## Without cache

In [None]:
keys = ["lazycache", "gae", "uncached"]

seed = random.randint(0,65535)
run_analysis( lazycache_ci( gae_uncached_url ), keys + ["ci"], seed = seed, count=reps )
run_analysis( lazycache_ci( gae_uncached_url ), keys + ["ci"], seed = seed, count=reps  )

### With cache

In [None]:
keys = ["lazycache", "gae", "cached"]

seed = random.randint(0,65535)
run_analysis( lazycache_ci( gae_cached_url ), keys + ["ci"], seed = seed, count=reps  )
run_analysis( lazycache_ci( gae_cached_url ), keys + ["ci"], seed = seed, count=reps  )