In [None]:
import copy
import numpy as np
import pandas as pd
import holoviews as hv
from daq.pico import CSV
from scipy.optimize import fmin, minimize, basinhopping, fsolve
from easier import ParamState, shade, Item
from sklearn.linear_model import LinearRegression, RidgeCV, LassoCV, Ridge
from harmonic import Harmonic
hv.extension('bokeh')

In [None]:
%opts Curve [width=400, height=400 show_grid=True tools=['hover']]


In [None]:
file_name, fundamental_freq = './data_20171220/20171220-0003.csv', 20
# file_name, fundamental_freq = './data_20171220/20171220-0004.csv', 20
# file_name, fundamental_freq = './data_20171220/20171220-0005.csv', 20
# file_name, fundamental_freq = './data_20171220/20171220-0001.csv', 20  # air

params = ParamState(
    alpha=None,
    method='lassocv',
    num_freqs=3,
    simple_freq_fit=True,
    periods=2,
    max_sample_freq=1e7,
    normalize=True
)
params

In [None]:
class HarmonicAnalyzer:
    def __init__(self, params):
        self.params = params
        self.df = None
        self.df_fit = None
        self.file_name = None
        self.delta = None
        
    def normalizer(self, y, scale=None, return_scale=False):
        if scale is None:
            scale = 1. / (np.sqrt(2) * np.std(y))
        if return_scale:
            return scale * y, scale
        else:
            return scale * y

    def demeaner(self, df):
        for col in df.columns:
            if col == 't':
                continue
            df.loc[:, col] = df.loc[:, col] - df.loc[:, col].mean()
        return df
    
    def fit(self, file_name, fundamental_freq):
        self.file_name = file_name
        self.df, self.df_fit = self._get_fit_frames(file_name, fundamental_freq)
        
    def _get_phase_delta(self, h_obj1, h_obj2, t_ref):
        zero1 = fsolve(h_obj1.predict, t_ref)
        zero2 = fsolve(h_obj2.predict, zero1)
        delta = zero2 - zero1
        return delta
        
    def _get_fit_frames(self, file_name, fundamental_freq):
        # make a local reference to the params
        params = self.params
        
        # load the file and make sure all data has zero-mean
        df = CSV(file_name, a='sig_gen', b='res_volt', c='sec_volt', max_sample_freq=params.max_sample_freq).df
        df = self.demeaner(df)  

        # refine the guess of the fundamental frequency
        h = Harmonic(freq=fundamental_freq, num_freqs=1)
        f0 = h.refine_frequency(df.t, df.sig_gen, simple=params.simple_freq_fit).f0
        

        # initialize harmonic objects for resistor and secondary voltages
        h_resistor = Harmonic(freq=f0, num_freqs=params.num_freqs)
        h_secondary = Harmonic(freq=f0, num_freqs=params.num_freqs)

        # fit the harmonic objects to the data
        h_resistor.fit(df.t, df.res_volt, method=params.method, alpha=params.alpha)
        h_secondary.fit(df.t, df.sec_volt, method=params.method, alpha=params.alpha)
        
        # set a reference time to be near the center of the dataset centered on
        # an ascending zero crossing of the secondary voltage
        t_ref = df.t.iloc[len(df) // 2]
        t_ref = fsolve(h_secondary.predict, t_ref)[0]
        if h_secondary.derivative().predict(t_ref) < 0:
            t_ref = t_ref + 0.5 * h.period
        self.t_ref = t_ref

        # take derivatives so that I can set the phases so that actual/expected have peaks at same time
        h_actual_deriv = h_secondary.derivative()
        h_expected_deriv = -h_resistor.derivative(2)

        # normalize all signals to have fits of amplitudes of about 1
        if params.normalize:
            # run a fit for the entire input dataframe
            res_fit = h_resistor.predict(df.t)
            sec_fit = h_secondary.predict(df.t)
            
            # find the scale parameters from the fits
            _, res_scale = self.normalizer(res_fit, return_scale=True)
            _, sec_scale = self.normalizer(sec_fit, return_scale=True)
            
            # normalize the raw data by the fit scales
            df.loc[:, 'res_volt'] = res_scale * df.res_volt
            df.loc[:, 'sec_volt'] = sec_scale * df.sec_volt

        # limit the dataframe to be only the number of periods anchored to t_ref
        df = df[(df.t >= t_ref) & (df.t < t_ref + params.periods * h.period)].reset_index(drop=True)
        
        # compute a delta that will make it so that the peaks of actual and expected signals
        # coincide as closely as possible
        delta1 = self._get_phase_delta(h_expected_deriv, h_actual_deriv, t_ref + .2 * h.period)
        delta2 = self._get_phase_delta(h_expected_deriv, h_actual_deriv, t_ref + .7 * h.period)
        self.delta = delta = .5 * (delta1 + delta2)

        # make fits for actual signal and phase adjusted expected signal
        actual_secondary = h_secondary.predict(df.t)
        expected_secondary = -h_resistor.derivative().predict(df.t - delta)
        
        # normalize the fits if requested
        if params.normalize:
            actual_secondary = self.normalizer(actual_secondary)
            expected_secondary = self.normalizer(expected_secondary)

        # create a dataframe of the fits
        kwargs = dict(
            t=df.t,
            actual=actual_secondary,
            expected=expected_secondary,
        )
        df_fit = pd.DataFrame(kwargs, columns=kwargs.keys())
        
        # return the frames with raw data and fits
        return df, df_fit    

    
ha = HarmonicAnalyzer(params)
ha.fit(file_name, fundamental_freq,)
df = ha.df
dff = ha.df_fit
            
            
display(df.head())
display(dff.head())

In [None]:
(
    hv.Curve((dff.actual, dff.expected), kdims=['actual'], vdims=['expected'])
    + hv.Curve((dff.actual, dff.expected - dff.actual), kdims=['actual'], vdims=['expected - actual'])
    + (
        hv.Curve((df.t, df.sec_volt), label='data', kdims=['time'], vdims=['amplitude'])
        * hv.Curve((dff.t, dff.actual), label='actual')
        * hv.Curve((dff.t, dff.expected), label='expected')
    )
    
).cols(1)