In [None]:
import logging; logging.basicConfig(level=logging.DEBUG)
import time
import uuid
import pprint
import datetime as dt
import json

import dropbot
from dropbot import SerialProxy, metadata
from ipywidgets import widgets
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from base_node import BaseNode

%matplotlib inline

try:
    proxy.terminate()
except:
    pass

# Connect to the connected DropBot and print it's properties
proxy = SerialProxy()


def system_info(proxy):
    start_time = time.time()
    results = {}
    
    results['control board'] = {}
    results['control board']['uuid'] = str(proxy.uuid)
    results['control board']['properties'] = proxy.properties.to_dict()
    results['control board']['config'] = proxy.config.to_dict()

    results['soft_i2c_scan'] = proxy.soft_i2c_scan().tolist()
    
    proxy.initialize_switching_boards()
    results['number_of_channels'] = proxy.number_of_channels
    results['utc_timestamp'] = dt.datetime.utcnow().isoformat()
    results['test_duration'] = time.time() - start_time
    return results


def test_i2c(proxy):
    start_time = time.time()
    results = {}
    results['i2c_scan'] = {}
    for address in proxy.i2c_scan():
        if address in [32, 33, 34]:
            node = BaseNode(proxy, int(address))
            info = {'name': node.name().split('\0', 1)[0],
                    'hardware_version': node.hardware_version().split('\0', 1)[0],
                    'software_version': node.software_version().split('\0', 1)[0],
                    'uuid': str(node.uuid)
                   }
            results['i2c_scan'].update({int(address): info})
        elif address == 80:
            n_bytes = proxy.i2c_eeprom_read(80, 0, 1)
            data = proxy.i2c_eeprom_read(80, 1, n_bytes)
            board = metadata.Hardware.FromString(data.tobytes())
            info = {'name': board.name,
                    'hardware_version': board.version,
                    'uuid': str(uuid.UUID(bytes=board.uuid))
                   }
            results['i2c_scan'].update({int(address): info})
        elif address == proxy.config.i2c_address:
            pass
        else:
            results['i2c_scan'].update({address: {}})
    results['utc_timestamp'] = dt.datetime.utcnow().isoformat()
    results['test_duration'] = time.time() - start_time
    return results
    
    
def test_voltage(proxy, delay=0):
    # test the measured voltage for a range of target voltages
    start_time = time.time()

    proxy.voltage = proxy.min_waveform_voltage
    proxy.hv_output_enabled = True
    measured_voltage = []

    target_voltage = np.linspace(proxy.min_waveform_voltage,
                          proxy.max_waveform_voltage,
                          10)[1:-1]
    for v in target_voltage:
        proxy.voltage = v
        time.sleep(delay)
        measured_voltage.append(proxy.measured_voltage)

    measured_voltage = np.array(measured_voltage)
    proxy.voltage = proxy.min_waveform_voltage + 5
    
    return {'utc_timestamp': dt.datetime.utcnow().isoformat(),
            'target': target_voltage,
            'measured': measured_voltage,
            'test_duration': time.time() - start_time}


def test_shorts(proxy):
    start_time = time.time()
    shorts = proxy.detect_shorts()
    return {'utc_timestamp': dt.datetime.utcnow().isoformat(),
            'shorts': shorts,
            'test_duration': time.time() - start_time}


def sample_feedback(i, plot=False, n_samples=50, gain_power=5):
    c = proxy.select_on_board_test_capacitor(i)
    time.sleep(.05)
    df_volts = pd.DataFrame({'volts': proxy.analog_reads_simple(11, n_samples) * 3.3 / 2**16})
    c = proxy.select_on_board_test_capacitor(-1)
    v_gnd = np.mean(df_volts)
    v_rms = np.sqrt(np.mean((df_volts - v_gnd)**2))
    v_abs = np.abs(df_volts - v_gnd)
    v_abs_mean = np.mean(v_abs)
    filter_th = v_abs_mean * 1.5
    v_filtered_mean = np.mean(v_abs[v_abs < filter_th])
    c_meas = v_filtered_mean.values[0] / proxy.voltage * 0.15e-6
    
    if plot:
        plt.figure()
        plt.plot(df_volts.values - v_gnd[0])
        plt.plot(plt.xlim(), [filter_th, filter_th], 'k--')
        plt.plot(plt.xlim(), [-filter_th, -filter_th], 'k--')
        plt.figure()
        plt.hist(np.abs(df_volts.values - v_gnd[0]), 30)
        plt.plot([filter_th, filter_th], plt.ylim(), 'k--')
    return c_meas


def test_on_board_feedback_calibration(proxy, plot=False):
    start_time = time.time()

    proxy.voltage = 100
    proxy.hv_output_enabled = True

    c = []
    for i in [-1, 0, 1, 2]:
        c.append(sample_feedback(i, plot))

    proxy.hv_output_enabled = False
    return {'utc_timestamp': dt.datetime.utcnow().isoformat(),
            'c_measured': c,
            'test_duration': time.time() - start_time}


def test_channels(proxy):
    start_time = time.time()
    shorts = proxy.detect_shorts()
    proxy.voltage = 100
    proxy.hv_output_enabled = True

    start_time = time.time()
    threshold = 5e-12
    n_reps = 1
    n_channels = proxy.number_of_channels
    test_channels = np.arange(0, n_channels)
    c = np.zeros([n_channels, n_reps])

    for i, channel_i in enumerate(test_channels):
        for j in range(n_reps):
            if channel_i in shorts:
                continue
            state = np.zeros(n_channels)
            state[channel_i] = 1
            proxy.state_of_channels = state
            c[i, j] = sample_feedback(-1, n_samples=50)
    proxy.state_of_channels = np.zeros(proxy.number_of_channels)
    proxy.hv_output_enabled = False

    
    return {'utc_timestamp': dt.datetime.utcnow().isoformat(),
            'test_channels': test_channels,
            'shorts': shorts,
            'c': c,
            'n_reps': n_reps,
            'test_duration': time.time() - start_time}
        
        
def display_system_info_results(info):    
    print '\nControl board:\n' + '-' * 80
    
    print '\n  uuid:', info['control board']['uuid']
    print '\n  Properties:'
    for k, v in info['control board']['properties'].items(): print '    %s: %s' % (k, v)   
    print '\n  Config:'
    for k, v in info['control board']['config'].items(): print '    %s: %s' % (k, v)

    print '\nsoft i2c scan:', info['soft_i2c_scan']
    print 'number_of_channels:', info['number_of_channels']        


def display_test_i2c_results(results):
    print '\ni2c scan:\n' + '-' * 80
    
    for address in sorted(results['i2c_scan']):
        data = results['i2c_scan'][address]
        info_string = ''
        if 'name' in data.keys():
            info_string = data['name']
        if 'hardware_version' in data.keys():
            info_string += " v%s" % data['hardware_version']
        if 'software_version' in data.keys():
            info_string += ", firmware: v%s" % data['software_version']
        if 'uuid' in data.keys():
            info_string += ", uuid: %s" % data['uuid']

        print '  %s: %s' % (address, info_string)    
    
    
def display_test_voltage_results(results):
    print '\nTest voltage results:\n' + '-' * 80
    
    measured_voltage = np.array(results['measured'])
    target_voltage = np.array(results['target'])

    print '  target_voltage:', target_voltage
    print '  measured_voltage:', measured_voltage

    # calculate the average rms error
    r = measured_voltage - target_voltage
    print '  rms_error = %.1f%%' % (100 * np.sqrt(np.mean((r / target_voltage)**2)))
    
    plt.figure()
    # plot the measured vs target votage
    plt.plot(target_voltage, measured_voltage, 'o')
    plt.plot(target_voltage, target_voltage, 'k--')
    plt.xlabel('Target voltage')
    plt.ylabel('Measured voltage')
    
    
def display_test_on_board_feedback_calibration_results(results):
    c_measured = np.array(results['c_measured'])
    
    print '\nTest on-board feedback calibration results:\n' + '-' * 80

    print '  Measured capacitance:', c_measured
    
    C_nominal = np.array([0, 10e-12, 100e-12, 470e-12])
    plt.figure()
    plt.plot(C_nominal * 1e12, c_measured * 1e12, 'o')
    plt.plot(C_nominal * 1e12, C_nominal * 1e12, 'k--')
    plt.xlabel('Nominal capacitance (pF)')
    plt.ylabel('Measured capacitance (pF)')
    
    
def display_test_shorts_results(results):
    shorts = results['shorts']
    print '\nTest shorts results:\n' + '-' * 80
    if len(shorts):
        print "  Shorts on channels %s" % ", ".join([str(x) for x in shorts])
    else:
        print "  No shorts"
        
        
def display_test_channels_results(results):
    print '\nTest channels results:\n' + '-' * 80
    
    c = np.array(results['c'])
    test_channels = np.array(results['test_channels'])
    shorts = results['shorts']
    n_channels = len(test_channels)
    n_reps = results['n_reps']
    
    if len(c) == 0:
        return
    
    nc = test_channels[np.min(c, 1) < 5e-12].tolist()
    for x in shorts:
        nc.remove(x)
    
    plt.figure()
    plt.bar(range(c.shape[0]), np.mean(c, 1) / 1e-12, yerr=np.std(c, 1) / 1e-12)
    plt.title('DropBot system: %s' % str(proxy.uuid)[-8:])
    plt.xlabel("Channel");
    plt.ylabel("Capacitance (pF)")

    plt.figure()
    plt.hist(c.flatten() / 1e-12, 20)
    plt.ylabel("# of channels")
    plt.xlabel("Capacitance (pF)")

    if len(shorts) or len(nc):
        print "  The following channels failed (%d of %d / %.1f %%):" % (len(shorts) + len(nc),
                                                                        n_channels,
                                                                        float(len(shorts) + len(nc)) /
                                                                             n_channels * 100)
        if len(shorts):
            print "    shorts (%d of %d / %.1f %%): %s" % (len(shorts), n_channels,
                                                             float(len(shorts)) / n_channels * 100,
                                                             ", ".join([str(x) for x in shorts]))                                                
        if len(nc):
            print "    no connection (%d of %d / %.1f %%): %s" % (len(nc), n_channels,
                                                             float(len(nc)) / n_channels * 100,
                                                             ", ".join([str(x) for x in nc]))
            if n_reps > 1:
                for x in nc:
                    n_fails = np.count_nonzero(c[x, :] < 5e-12)
                    print "\n    Channel %d failed %d of %d reps (%.1f %%)" % (x, n_fails, n_reps, 100.0 * n_fails / n_reps)
    else:
        print "  All channels passed"

In [None]:
# perform QC tests
total_time = 0

tests = ['system_info',
         'test_i2c',
         'test_voltage',
         'test_shorts',
         'test_on_board_feedback_calibration',
         'test_channels']
results = {}

for test in tests:
    exec('results["%s"] = %s(proxy); duration = results["%s"]["test_duration"]' % (test, test, test))
    print "%s: %.1f s" % (test, duration)
    total_time += duration

print "-" * 80
print "Total time: %.1f s\n" % total_time

# need to create a custom encoder to serialize numpy datatypes
class MyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.uint8):
            return float(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(MyEncoder, self).default(obj)

# write the results to a file
filepath = ('.results/results-%s.json' % 
            dt.datetime.utcnow().isoformat().replace(':', '.'))

with open(filepath, 'w') as output:
    json.dump(results, output, cls=MyEncoder)
    
print 'Results saved to "%s"' % filepath

In [None]:
# load the results from the file
print 'Loading results from "%s"...\n' % filepath
with open(filepath, 'r') as input:
    results = json.load(input)

# display the results of all QC tests
for test in tests:
    exec('display_%s_results(results["%s"])' % (test, test))