In [4]:
import os
import math
from sympy import *
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit, least_squares
from lmfit import minimize, Minimizer, Parameters, Parameter, report_fit
import matplotlib.pyplot as plt
import lmfit

In [None]:
def load_csv_data(file_name, subdir=''):
    """
    Loads data from .csv file in to DataFrame

    :param file_name: .csv file name in string
    :param subdir: optional parameter to specify the subdirectory of the file
    :return: extracted data in DataFrame
    """

    file_dir = os.path.realpath('../')
    print(file_dir)
    for root, dirs, files in os.walk(file_dir):
        if root.endswith(subdir):
            for name in files:
                if name == file_name:
                    file_path = os.path.join(root, name)

    df = pd.read_csv(file_path)

    return df

In [None]:
def LF_Noise(component = 1):
    """ Inital parameters and bounds for each paramter according to LF (0-0.01Hz)in the paper
    """
    LMparams = Parameters()

    # The code below is to load the initial paramters if we are running the NLLSR on all 3 components at once

    LMparams.add('A1_FS', value = 10.)
    LMparams.add('A2_FS', value = 10.)
    LMparams.add('A3_FS', value = 10.)
    LMparams.add('w1_FS', value = 0, min = 0, max = 0.01*2*math.pi)
    LMparams.add('w2_FS', value = 0.005*2*math.pi, min = 0, max = 0.01*2*math.pi)
    LMparams.add('w3_FS', value = 0.01*2*math.pi, min = 0, max = 0.01*2*math.pi)
    LMparams.add('phi1_FS', value = 0, min = -math.pi, max = math.pi)
    LMparams.add('phi2_FS', value = 0, min = -math.pi, max = math.pi)
    LMparams.add('phi3_FS', value = 0, min = -math.pi, max = math.pi)

    # The code below is to load the initial paramters if we are running the NLLSR on one component at a time

    # if component == 1:
    #     LMparams.add('A_FS', value = 10.)
    #     LMparams.add('w_FS', value = 0, min = 0, max = 0.01*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)
    # elif component == 2:
    #     LMparams.add('A_FS', value = 10.)
    #     LMparams.add('w_FS', value = 0.005*2*math.pi, min = 0, max = 0.01*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)
    # elif component == 3:
    #     LMparams.add('A_FS', value = 10.)
    #     LMparams.add('w_FS', value = 0.01*2*math.pi, min = 0, max = 0.01*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)

    return LMparams

def MF_Noise(component = 1):
    """ Inital parameters and bounds for each paramter according to MF (0.01-0.25Hz) in the paper
    """
    LMparams = Parameters()

    # The code below is to load the initial paramters if we are running the NLLSR on all 3 components at once

    LMparams.add('A1_FS', value = 10.)
    LMparams.add('A2_FS', value = 10.)
    LMparams.add('A3_FS', value = 10.)
    LMparams.add('w1_FS', value = 0.02*2*math.pi, min = 0.01*2*math.pi, max = 0.25*2*math.pi)
    LMparams.add('w2_FS', value = 0.03*2*math.pi, min = 0.01*2*math.pi, max = 0.25*2*math.pi)
    LMparams.add('w3_FS', value = 0.03*2*math.pi, min = 0.01*2*math.pi, max = 0.25*2*math.pi)
    LMparams.add('phi1_FS', value = 0, min = -math.pi, max = math.pi)
    LMparams.add('phi2_FS', value = 0, min = -math.pi, max = math.pi)
    LMparams.add('phi3_FS', value = 0, min = -math.pi, max = math.pi)

    # The code below is to load the initial paramters if we are running the NLLSR on one component at a time

    # if component == 1:
    #     LMparams.add('A_FS', value = 10.)
    #     LMparams.add('w_FS', value = 0.02*2*math.pi, min = 0.01*2*math.pi, max = 0.25*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)
    # elif component == 2:
    #     LMparams.add('A_FS', value = 10.)
    #     LMparams.add('w_FS', value = 0.03*2*math.pi, min = 0.01*2*math.pi, max = 0.25*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)
    # elif component == 3:
    #     LMparams.add('A_FS', value = 10.)
    #     LMparams.add('w_FS', value = 0.03*2*math.pi, min = 0.01*2*math.pi, max = 0.25*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)

    return LMparams

def HF_Noise(component = 1):
    """ Inital parameters and bounds for each paramter according to HF (0.25-0.5Hz) in the paper
    """
    LMparams = Parameters()

    # The code below is to load the initial paramters if we are running the NLLSR on all 3 components at once

    LMparams.add('A1_FS', value = 1.)
    LMparams.add('A2_FS', value = 1.)
    LMparams.add('A3_FS', value = 1.)
    LMparams.add('w1_FS', value = 0.25*2*math.pi, min = 0.25*2*math.pi, max = 0.5*2*math.pi)
    LMparams.add('w2_FS', value = 0.375*2*math.pi, min = 0.25*2*math.pi, max = 0.5*2*math.pi)
    LMparams.add('w3_FS', value = 0.5*2*math.pi, min = 0.25*2*math.pi, max = 0.5*2*math.pi)
    LMparams.add('phi1_FS', value = 0, min = -math.pi, max = math.pi)
    LMparams.add('phi2_FS', value = 0, min = -math.pi, max = math.pi)
    LMparams.add('phi3_FS', value = 0, min = -math.pi, max = math.pi)

    # The code below is to load the initial paramters if we are running the NLLSR on one component at a time

    # if component == 1:
    #     LMparams.add('A_FS', value = 1.)
    #     LMparams.add('w_FS', value = 0.25*2*math.pi, min = 0.25*2*math.pi, max = 0.5*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)
    # elif component == 2:
    #     LMparams.add('A_FS', value = 1.)
    #     LMparams.add('w_FS', value = 0.375*2*math.pi, min = 0.25*2*math.pi, max = 0.5*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)
    # elif component == 3:
    #     LMparams.add('A_FS', value = 1.)
    #     LMparams.add('w_FS', value = 0.5*2*math.pi, min = 0.25*2*math.pi, max = 0.5*2*math.pi)
    #     LMparams.add('phi_FS', value = 0, min = -math.pi, max = math.pi)

    return LMparams

In [None]:
class DS(object):
    """ Module 2: Drive Scenario
    """
    def __init__(self):
        

In [None]:
class DC(object):
    """ Module 1: Drive Cycle
    """
    def __init__(self, dc_length):
        
        t_DC = dc_length
        sum_t_DS = 0
        while t_DC > sum_t_DS:
            pass
        pass


In [12]:
class Acceleration(object):
    def __init__(self):
        self.bins = pd.DataFrame([0.2,0.4,0.6,0.8,1,1.2,1.4,1.6,1.8,2])
        self.data_points = pd.DataFrame([1,4,12,24,20,29,9,1,0,0])

In [None]:
class Cruising_Duration(object):
    def __init__(self):
        self.bins = pd.DataFrame([0,12,24,36,48,60,72,84,96,108,120])
        self.data_points = pd.DataFrame([3,14,28,26,12,7,2,4,3,0,1])

In [None]:
class Average_Crusing_Speed(object):
    def __init__(self):
        self.bins = pd.DataFrame([2,4,6,8,10,12,14,16,18,20])
        self.data_points = pd.DataFrame([0,1,2,6,18,35,22,11,4,1])

In [None]:
class Deceleration(object):
    def __init__(self):
        self.bins = pd.DataFrame([0,0.5,1,1.5,2,2.5,3,3.5,4,4.5])
        self.data_points = pd.DataFrame([0,0,6,14,22,20,16,13,6,3])

In [6]:
def Gaussian_param():
    """ Inital parameters for each Gaussian characteristic paramter 
    """
    LMparams = Parameters()
    # there are two sets of values because the paper used 2 curve (n=2) to fit the distributions
    LMparams.add('alpha_1', value = 1.)
    LMparams.add('alpha_2', value = 1.)
    LMparams.add('sigma_1', value = 1.)
    LMparams.add('sigma_2', value = 1.)
    LMparams.add('meu_1', value = 0.)
    LMparams.add('meu_2', value = 0.)

    return LMparams

In [7]:
class Probability_Functions(object):
    def __init__(self, x, y, n):
        self.x = x
        self.y = y
        self.original_y = y
        # the number of Gaussian distribution used to describe the data
        self.n = n

    def single_component(self, alpha_i, sigma_i, meu_i):
        """ Returns the single Gaussian component as described in the sum of eqn (10)
        """
        exp_component = -((self.x-meu_i)^2)/(2*(sigma_i^2))
        fcn = (alpha_i/(sigma_i*math.sqrt(2*math.pi)))*math.exp(exp_component)
        return fcn

    def eqn_model(self, params):
        """ Returns the Gaussian component sum as described in eqn (10)
        """
        model = None
        # runs a loop for the number of of Gaussian distibutions used to describe the data, and sums the single components 
        for component in range(1,self.n+1):
            alpha_i = 'alpha_'+str(component)
            sigma_i = 'sigma_'+str(component)
            meu_i = 'meu_'+str(component)
            model += self.single_component(params[alpha_i],params[sigma_i],params[meu_i])

        return model

    def fnc2min(self, params):
        """ Returns the residuals for the model
        """
        return (self.y - self.eqn_model(params))

    def NLLSR(self, LMparams):
        """ Returns the result of the NLLSR using LMFit
        """
        # uses least swuares method to minimize the parameters given by LMparams according to the residuals given by self.fnc2min
        LMFitmin = Minimizer(self.fnc2min, LMparams)
        LMFitResult = LMFitmin.minimize(method='least_squares')
        lmfit.printfuncs.report_fit(LMFitResult.params)

        return LMFitResult

In [13]:
accel_obj = Acceleration()
print(accel_obj.bins)
param_obj = Gaussian_param()
accel_prob_obj = Probability_Functions(accel_obj.bins, accel_obj.data_points,2)
hey = accel_prob_obj.NLLSR(param_obj)


plt.plot(accel_prob_obj.x,accel_prob_obj.original_y,'b')
yy = accel_prob_obj.eqn_model(hey.params)
plt.plot(accel_prob_obj.x, yy,'r', label = 'best_fit')
plt.legend(loc='best')
plt.show()



     0
0  0.2
1  0.4
2  0.6
3  0.8
4  1.0
5  1.2
6  1.4
7  1.6
8  1.8
9  2.0


NameError: name 'n' is not defined

In [None]:
class Velocity_Noise(object):
    def __init__(self,t,y):
        self.t = t
        self.y = y
        self.original_y = y
        self.original_y_mean = self.original_y.mean()

    def subtract_avg(self):
        """Removes the average speed from the observations
        """
        self.y = self.y - self.original_y_mean
        return self.y

    def subtract(self, array):
        self.y = self.y - array
        return self.y

    def single_component(self, A_i_FS, w_i_FS, phi_i_FS):
        """ Returns a single velocity noise component as described in the sum of eqn (5)
        """
        return A_i_FS * np.sin( (w_i_FS*self.t) + phi_i_FS )
     
    def eqn_model(self, params):
        """ Returns the velocity noise FS model as described in eqn (5)
        """
        # put all the paramters in a list
        A_FS = [params['A1_FS'],params['A2_FS'],params['A3_FS']]
        w_FS = [params['w1_FS'],params['w2_FS'],params['w3_FS']]
        phi_FS = [params['phi1_FS'],params['phi2_FS'],params['phi3_FS']]

        # equation (5), sum of 3 components
        model = self.single_component(A_FS[0],w_FS[0], phi_FS[0])
        model += self.single_component(A_FS[1],w_FS[1], phi_FS[1])
        model += self.single_component(A_FS[2],w_FS[2], phi_FS[2])

        return model

    def fnc2min(self, params):
        """ Returns the residuals (eqn 7) for the model for when running all 3 components at once
        """
        return (self.y - self.eqn_model(params))

    # def fnc2min(self, params):
    #     """ Returns the residuals (eqn 7) for the model for when running one component at a time
    #     """
    #     return (self.y - self.single_component(params['A_FS'], params['w_FS'], params['phi_FS']))


    def NLLSR(self, LMparams):
        """ Returns the result of the NLLSR using LMFit
        """
        # uses least swuares method to minimize the parameters given by LMparams according to the residuals given by self.fnc2min
        LMFitmin = Minimizer(self.fnc2min, LMparams)
        LMFitResult = LMFitmin.minimize(method='least_squares')
        lmfit.printfuncs.report_fit(LMFitResult.params)

        return LMFitResult

In [None]:
class DP(object):
    """ Module 3: Drive Pulse
    """
    def __init__(self,t,y):
        # velocity_noise = None
        # accel = None
        # cruise = None
        # decel = None
        # idle = None
        # t_DP = None
        self.t = t
        self.y = y


# The code below is for when we are balancing all 3 components of a Frequency Spectrum (FS) at ONCE
## Each block corresponds to one FS

In [None]:
if __name__ == '__main__':
    # loads the csv file
    subdir = 'caltrans_processed_drive_cycles/data/1035198_1'
    file_name = '2012-05-22.csv'
    data = load_csv_data(file_name, subdir)
    # get a slice of the data with a relatively long driving pulse
    data = data.iloc[1002:1096,:]
    # show the slice of data
    plt.plot(data.loc[:,'timestamp'], data.loc[:,'speed_mph'])
    plt.show()
    # get the slice of ONLY cruising period
    cruising_data = data.iloc[25:86,:]

    # create a numpy array of just t values starting at t=1
    t = np.linspace(1,len(cruising_data),len(cruising_data))
    # create a numpy array of speed_mph values
    y = cruising_data.loc[:,'speed_mph'].to_numpy()
    # initialise the DP object
    vn_obj = Velocity_Noise(t,y)
    # deduct the average from the cruising period speed values (from fig3a to fig3b) and store as y
    y = vn_obj.subtract_avg()

    original_y = y
    
    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(LF_Noise())
    plt.plot(t,y,'b')
    yy = hi.params['A1_FS'] * np.sin( (hi.params['w1_FS']*t) + hi.params['phi1_FS'])
    yy = yy + hi.params['A2_FS'] * np.sin( (hi.params['w2_FS']*t) + hi.params['phi2_FS'])
    yy = yy + hi.params['A3_FS'] * np.sin( (hi.params['w3_FS']*t) + hi.params['phi3_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of LF spectrum
    y = vn_obj.subtract(yy)
    reconstructed = yy


In [None]:
    # perform NLLSR with the initial parameters suggested by LMParams
    hi2 = vn_obj.NLLSR(MF_Noise())

    plt.plot(t,y,'b')
    yy = hi2.params['A1_FS'] * np.sin( (hi2.params['w1_FS']*t) + hi2.params['phi1_FS'])
    yy = yy + hi2.params['A2_FS'] * np.sin( (hi2.params['w2_FS']*t) + hi2.params['phi2_FS'])
    yy = yy + hi2.params['A3_FS'] * np.sin( (hi2.params['w3_FS']*t) + hi2.params['phi3_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of LF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

In [None]:
    # perform NLLSR with the initial parameters suggested by LMParams
    hi3 = vn_obj.NLLSR(HF_Noise())
    plt.plot(t,y,'b')
    yy = hi3.params['A1_FS'] * np.sin( (hi3.params['w1_FS']*t) + hi3.params['phi1_FS'])
    yy = yy + hi3.params['A2_FS'] * np.sin( (hi3.params['w2_FS']*t) + hi3.params['phi2_FS'])
    yy = yy + hi3.params['A3_FS'] * np.sin( (hi3.params['w3_FS']*t) + hi3.params['phi3_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')hd t5 .
    plt.show()
    # remove previous component of LF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

    plt.plot(t,original_y,'b')
    plt.plot(t, reconstructed,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()



# The code below is for when we are running one component at once 
## Each block is for one Frequency Spectrum

In [None]:
if __name__ == '__main__':
    # loads the csv file
    subdir = 'caltrans_processed_drive_cycles/data/1035198_1'
    file_name = '2012-05-22.csv'
    data = load_csv_data(file_name, subdir)
    # get a slice of the data with a relatively long driving pulse
    data = data.iloc[1002:1096,:]
    plt.plot(data.loc[:,'timestamp'], data.loc[:,'speed_mph'])
    plt.show()
    # get the slice of ONLY cruising period
    cruising_data = data.iloc[25:86,:]

    # create a numpy array of just t values starting at t=1
    t = np.linspace(1,len(cruising_data),len(cruising_data))
    # create a numpy array of speed_mph values
    y = cruising_data.loc[:,'speed_mph'].to_numpy()
    # initialise the DP object
    vn_obj = Velocity_Noise(t,y)
    # deduct the average from the cruising period speed values (from fig3a to fig3b) and store as y
    y = vn_obj.subtract_avg()

    original_y = y
    
    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(LF_Noise(1))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of LF spectrum
    y = vn_obj.subtract(yy)
    reconstructed = yy

    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(LF_Noise(2))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of LF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(LF_Noise(3))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of LF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy


In [None]:
    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(MF_Noise(1))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of MF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(MF_Noise(2))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of MF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(MF_Noise(3))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of MF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

In [None]:
    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(HF_Noise(1))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of HF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(HF_Noise(2))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of HF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

    # perform NLLSR with the initial parameters suggested by LMParams
    hi = vn_obj.NLLSR(HF_Noise(3))
    plt.plot(t,y,'b')
    yy = vn_obj.single_component(hi.params['A_FS'],hi.params['w_FS'],hi.params['phi_FS'])
    plt.plot(t, yy,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
    # remove previous component of HF spectrum
    y = vn_obj.subtract(yy)
    reconstructed += yy

    plt.plot(t,original_y,'b')
    plt.plot(t, reconstructed,'r', label = 'best_fit')
    plt.legend(loc='best')
    plt.show()
