# 1) Introduction

Radial forging is widely used in industry to manufacture components for a broad range of sectors including automotive, medical, aerospace, rail and industrial. 

The Advanced Forming Research Centre (AFRC) at the University of Strathclyde, Glasgow, houses a GFM SKK10/R radial forge that has been used as a testbed for this project.  A total number of 81 parts were forged over one day of operation. 
99 sensors are involved in the process of forging. Data provided for the purpose of building these tutorials contains lot of sensors which might be correlated to deviations from nominal dimensions of production parts. 

Each forged part was then measured using a CMM to provide dimensional output relative to a target specification and tolerances. 

Some of these sensors are not in use and some of these were chosen as relevant sensors for specific production phase (heating/transfer/forging) in *Strathcylde_AFRC_machine_learning_tutorials*. 

The workflow of this and other notebooks is presented on the image below:
![Workflow.PNG](attachment:Workflow.PNG)

First step will be to examine if some other sensors can be also considered as relevant sensors for some production phase. This will be conducted for only one part because the same process is repeated for all other parts.

In [None]:
import pandas as pd 
import time
%pip install openpyxl
from pathlib import Path
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib as mpl
import numpy as np
np.random.seed(42)
from scipy.signal import find_peaks
from scipy import integrate
import matplotlib.cm as cm
from mpl_toolkits.mplot3d import Axes3D
import ipywidgets as widgets
from ipywidgets import interact, interact_manual
from matplotlib._png import read_png
from matplotlib.cbook import get_sample_data

font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (20,10)
mpl.rc('font', **font)

# 2) Relevant sensors

List `data` with length of 81 is containing matrix for each part, where columns are time series from sensors measurements. Column names are names of sensors, and row indices are samples numbers.

In [None]:
data=[0]*81
for i in range(81):
    print("|"*(i+1),(i+1), end="\r")
    file_format="Scope"+str("{:04d}".format(i+1))+".csv"    
    data[i] = pd.read_csv(Path('Data')/'AFRC Radial Forge - Zenodoo Upload v3'/'Data'/'ScopeTraces'/file_format.format(i), header=0, encoding = 'unicode_escape')#, index_col=0)
    time.sleep(1)

Looking at column names of imported data gives us insight into sensors used for this measurement:

In [None]:
data[0].head()

Concatenating the data from all part will make one long time series. Index will be recreated, and timeline will be adjusted. Before concatenating part labels will be added.

In [None]:
for index,df in enumerate(data):
    df['traceID'] = index+1

In [None]:
merged_data = pd.concat(data, ignore_index=True)
merged_data['Time [s]']=(merged_data.index.values)/100.0

Now, we will have a look in data for only one part, because for other parts production cycle consists of the same phases.

In [None]:
merged_data.iloc[0:23328].to_excel("Merged_data_from_1st_sensor.xlsx")

In [None]:
first_sensor=pd.read_excel("Merged_data_from_1st_sensor.xlsx")

Sensors that are not in use or are the part of auxiliary process measurement will be dropped:   

In [None]:
used_sensors=first_sensor.drop(['hydraulic low pressure [bar]','hydraulic high pressure [bar]','A_NOM_Force [kN]','B_ACTpos [mm]','B_ACT_Force [kN]',"B_ACTspd [mm/min]","B_NOMpos [mm]","B_OUT [%]","B_NOMspd [mm/min]","B_NOM_Force [kN]","Feedback B [%]","DB_NOM_Force [kN]","D_ACTpos [mm]","D_ACT_Force [kN]","D_ACTspd [mm/min]","D_NOMpos [mm]","D_OUT [%]","D_NOMspd [mm/min]","D_NOM_Force [kN]", "Feedback D [%]","Lub_ActSpd [rpm]","Hyd_ActSpd [rpm]","O_EMERG","STP || EM","O_MASTOP","$U_GH_NOMVAL_1 (U25W1)","$H1P_Y12 (U11S17)","$H1P_Y11 (U11S7)","$U_GH_NOMEXT_2 (U26S1)","$U_GH_HEATON_2 (U26S0)","$U_GH_NOMEXT_1 (U25S1)"],axis=1)

We can have a look into rest of the data:

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (20,15)
mpl.rc('font', **font)

def overview(num_of_sensor):
    names=list(used_sensors.columns.values)
    plt.plot(used_sensors['Time [s]'],used_sensors[names[num_of_sensor+3]])
    plt.xlabel('Time [s]')
    plt.title(names[num_of_sensor+3])
    plt.ylabel('Values');
interact(overview,num_of_sensor=widgets.IntSlider(min=0, max=99, step=1))

Sensors chosen as relevant sensors for specific production phase (heating/transfer/forging) in *Strathcylde_AFRC_machine_learning_tutorials* as well as those which had been dropped as non - relevant will not be considered in the next plot. Only the rest of the sensors will be analyzed.

In [None]:
other_sensors=used_sensors.drop(['Schlagzahl [1/min]',"Power [kW]","Force [kN]","A_ACT_Force [kN]","A_NOMpos [mm]","A_ACTpos [mm]","DB_ACT_Force [kN]","SBA_ActPos [mm]","IP_ActPos [mm]","IP_NomPos","TMP_Ind_U1 [°C]","TMP_Ind_F [°C]","L_ACTpos [mm]","L_NOMpos [mm]","R_ACTpos [mm]","R_NOMpos [mm]","EXZ_pos [deg]",'A_ACTspd [mm/min]','A_NOMspd [mm/min]','A_OUT [%]','Feedback A [%]','DB_ACTpos [mm]','DB_ACTspd [mm/min]','DB_NOMpos [mm]','DB_OUT [%]','DB_NOMspd [mm/min]','Feedback DB [%]','L_ACTspd [mm/min]','L_OUT [%]','L_NOMspd [mm/min]','Feedback L [%]','R_ACTspd [mm/min]','R_OUT [%]','R_NOMspd [mm/min]','Feedback R [%]','SBA_NomPos [mm] [mm]','SBA_OUT [%]','Feedback SBA [%]',"ForgingBox_Temp","$U_GH_HEATON_1 (U25S0).1"],axis=1)

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (20,5)
mpl.rc('font', **font)
def overview_other_sensors(num_of_sensor):
   
    plt.plot(other_sensors['Time [s]'], other_sensors[[num_of_sensor]])
    plt.xlabel('Time [s]')
    plt.title(num_of_sensor)
    plt.ylabel('Values');
interact(overview_other_sensors,num_of_sensor=list(other_sensors.columns[3:].values))

Based on the plot, additional sensors that will be considered are:
- A_ges_vibr
- INDA_NOMpos [deg]
- FRC_Volt
- RamRetract_ActSpd [rpm]
- W1 Durchfluss [I]
- W2 Durchfluss [I]
- L1.R_B41 (bar)

In [None]:
other_sensors.columns.values

In [None]:
used_sensors=used_sensors.drop(['EXZ_pos [deg]','DB_ACTpos [mm]',"IP_ActSpd [mm/min]","IP_NomSpd [mm/min]",'INDA_ACTpos [deg]','INDA_NOMspd [U/min]','INDA_OUT [%]','INDA_ACTspd [U/min]',
       'Speed Vn_1 [rpm]', 'NOMforceSPA [kN]', '$F_F41L (I14S8)','SPA_OUT [%]','Feedback_SPA [%]','IP_Out [%]',
       'ACTforceSPA [kN]', '$U_GH_HEATON_1 (U25S0)','$E_GH_FAULT_2 (I26S21)','L_ACTspd [mm/min]',
       'R_ACTspd [mm/min]', 'SBA_NomPos [mm] [mm]', 'A_ACTspd [mm/min]',
       'DB_ACTspd [mm/min]','SBA_OUT [%]', 'DB_NOMpos [mm]', 'L_OUT [%]', 'R_OUT [%]', 'Feedback SBA [%]',
       'A_OUT [%]', 'DB_OUT [%]', 'L_NOMspd [mm/min]',
       'R_NOMspd [mm/min]','A_NOMspd [mm/min]',"Feedback A [%]",
       'Feedback DB [%]','DB_NOMspd [mm/min]', 'Feedback L [%]', 'Feedback R [%]','$E_GH_FAULT_1 (I25S21)', '$B12R_Y11 (U14S16)', 'Unnamed: 0','c01w', 'c02w', 'Timer Tick [ms]', 'Block-Nr','traceID','Time [s]'],axis=1)

Total number of sensors which will be analyzed is:

In [None]:
len(used_sensors.columns)

and they are:

In [None]:
used_sensors.columns.values

These sensors will be analyzed for all parts:

In [None]:
parts=[None]*81
for i in range(len(data)):
    parts[i]=data[i][used_sensors.columns.values].copy()

In [None]:
parts[0].head()

In [None]:
digital_sig_heating = merged_data['$U_GH_HEATON_1 (U25S0).1']>0
heating_diff = digital_sig_heating.astype('int').diff()

digital_sig_forge = merged_data['Force [kN]']>0
forge_diff = digital_sig_forge.astype('int').diff()

In [None]:
heating_start=heating_diff[heating_diff==1].index.values
heating_stop=heating_diff[heating_diff==-1].index.values


In [None]:
forged_sensors=used_sensors[['Power [kW]', 'Force [kN]', 'A_ges_vibr','Schlagzahl [1/min]', 'RamRetract_ActSpd [rpm]',
       'A_ACTpos [mm]', 'L_ACTpos [mm]', 'R_ACTpos [mm]',
       'SBA_ActPos [mm]', 'A_ACT_Force [kN]', 'DB_ACT_Force [kN]',
       'L_NOMpos [mm]', 'R_NOMpos [mm]', 'INDA_NOMpos [deg]',
       'A_NOMpos [mm]', 'Frc_Volt','ForgingBox_Temp', 'TMP_Ind_U1 [°C]', 'TMP_Ind_F [°C]','W2 Durchfluss [l]', 'W1 Durchfluss [l]']]

In [None]:
forged_ph_parts=[None]*81
for i in range(len(parts)):
    forged_ph_parts[i]=merged_data[forged_sensors.columns.values][forge_diff[forge_diff==1].index.values[i]:forge_diff[forge_diff==-1].index.values[i]].copy()
    print(parts[i].shape)

In [None]:
heated_sensors=used_sensors[['TMP_Ind_U1 [°C]','IP_ActPos [mm]', 'IP_NomPos']]

In [None]:
heatedparts=[None]*81
for i in range(len(parts)):
    heatedparts[i]=merged_data[heated_sensors.columns.values][heating_start[i]:heating_stop[i]].copy()
    print(heatedparts[i].shape)

In [None]:
cycle_length_forge=[None]*(len(forged_ph_parts))
for i in range(len(forged_ph_parts)):
    cycle_length_forge[i]=(parts[i].shape[0])

In [None]:
min_length_forge=min(cycle_length_forge)
print("Minimum length of time signals for the forging phase is :",min_length_forge, "and it is in",cycle_length_forge.index(min_length_forge),". cycle")

In [None]:
cycle_length_heat=[None]*(len(heatedparts))
for i in range(len(forged_ph_parts)):
    cycle_length_heat[i]=(heatedparts[i].shape[0])

In [None]:
min_length_heat=min(cycle_length_heat)
print("Minimum length of time signals  for the heating phase is :",min_length_heat, "and it is in",cycle_length_heat.index(min_length_heat),". cycle")

In [None]:
forging_sensors=[None]*(forged_ph_parts[0].shape[1])
for p in range (forged_ph_parts[0].shape[1]):
    forging_sensors[p] = np.zeros((len(forged_ph_parts),min_length_forge))
    for k in range(len(forged_ph_parts)):
             forging_sensors [p][k]=forged_ph_parts[k].iloc[:5619,p].values

In [None]:
 forging_sensors[0].shape

In [None]:
sigma=float(input("Enter the value of white noise standard deviation:"))

In [None]:
#adding white noise
for i in range((len(forging_sensors))):
    for k in range((len(forging_sensors[i]))):
           forging_sensors[i][k]=forging_sensors[i][k]+ np.random.randn(len(forging_sensors[i][k]))*sigma

In [None]:
from PyDynamic.uncertainty.propagate_DFT import GUM_DFT

In [None]:
from PyDynamic.uncertainty.propagate_DFT import GUM_DFT,DFT2AmpPhase,Time2AmpPhase_multi

In [None]:

A_df_forged=[None]*len(forging_sensors)
UAP_df_forged=[None]*len(forging_sensors)
P_df_forged=[None]*len(forging_sensors)
for i in range(len(forging_sensors)):
    A_df_forged[i],P_df_forged[i],UAP_df_forged[i]=Time2AmpPhase_multi(forging_sensors[i],np.ones(forging_sensors[i].shape[0])*sigma**2)

In [None]:
import h5py

In [None]:
hf_a_forged = h5py.File('Amplitudes_forged.hdf5', 'w')
hf_p_forged = h5py.File('Phases_forged.hdf5', 'w')
hf_uap_forged=h5py.File('Uncertainties_forged.hdf5', 'w')

In [None]:
for i in range(len(forging_sensors)):
    hf_a_forged["A_df"+str(i)]= A_df_forged[i]
    hf_p_forged["P_df"+str(i)]=P_df_forged[i]
    hf_uap_forged["UAP"+str(i)]=UAP_df_forged[i]

In [None]:
hf_a_forged.close()
hf_p_forged.close()
hf_uap_forged.close()

So, there were 99 sensors at the beginning. Sensors which are not in use or are used in auxilliary process measurements were droped. Other sensors were analyzed and 22 have been considered as relevant.

# 3) Features extraction

## 3.1. Uncertainty propagation

For the uncertainty propagation, software package PyDynamic will be used. The software is based on Discrete Fourier Transform (DFT) with some FFT algorithm.
The time domain signal is *x(t).* A white noise will be added to this signal:

$$x_{n}(t) = x(t)+\epsilon$$

White noise has normal distribution ${\mathcal {N}}(0 ,\sigma ^{2})$ and standard deviation that can be specified by user. 

For every sensor, measurements of each part will be considered as one cycle. It means that for every sensor, there will be 81 cycle because there are 81 parts.

In [None]:
import PyDynamic

In [None]:
from PyDynamic import __version__ as version
version

Period of sampling is 0.01 s. This means that signals are sampled at frequency of 100 Hz.The number of sampling points (signal length) varies afrom part to part and because of this, cycles have to be separated into list elements. Unequal signal length will be considered in the next steps.

In [None]:
sensors=[None]*(parts[0].shape[1])
for p in range (parts[0].shape[1]):
    sensors[p] = [None]*(len(parts))
    for k in range(len(parts)):
            sensors[p][k]=parts[k].iloc[:,p].values


The time domain signal will presented in frequency domain with associated uncertainty *ux* as squared standard deviation representing noise variances $\sigma ^{2}$ and standard deviation that can be specified by user. 

In [None]:
sigma=float(input("Enter the value of white noise standard deviation:"))

In [None]:
#adding white noise
for i in range((len(sensors))):
    for k in range((len(sensors[i]))):
           sensors[i][k]=sensors[i][k]+ np.random.randn(len(sensors[i][k]))*sigma

In [None]:
sensors[0][0]

Number of sampling points for the first sensor and first cycle (part):

In [None]:
len(sensors[0][0])

In [None]:
from PyDynamic.uncertainty.propagate_DFT import GUM_DFT

In [None]:
from PyDynamic.uncertainty.propagate_DFT import GUM_DFT,DFT2AmpPhase

Function `Perform_Fourier` uses three functions from PyDynamic: *GUM_DFT*, *DFT2AmpPhase*, *GUM_DFTfreq*. First, by GUM_DFT applied on time signals and related uncertainties, we will get real and imaginary parts contained in vector *F*, as well as their uncertainties (vector *UF*). When only white noise is considered, all off-diagonal elements of *UF* are equal to zero. For this reason, *UF* is vector and not covariance matrix. Then, from the results of *GUM_DFT*, function *DFT2AmpPhase* will provide amplitudes, phases and their uncertainties.

It is possible that PyDynamic raises a warning, such as:
*Some amplitude values are below the defined threshold.
The GUM formulas may become unreliable and a Monte Carlo approach is recommended instead.*

This means that amplitudes are small relative to the uncertainty associated with real and imaginary parts and the GUM uncertainty propagation becomes unreliable and a Monte Carlo method is recommended instead.The default threshold in GUM2DFT is 1.0, but may be adjusted for specific applications.

Function returns:
- freq -  From Nyquist's theorem we know that the largest frequency component in the original signal must be half the sampling frequency. So, from a number of points of signals (*n_of_sampling_pts*) sampled at 100Hz we get (*n_of_sampling_pts*/2+1) unique spectral points covering the range 0 to 50Hz.,
- A - amplitudes of signals in frequency domain 
- P - phases of signals in frequency domain 
- UAP - uncertainties of time domain signals (standard squared uncertainties of amplitudes,covariance between amplitudes and phases, and standard squared uncertainties of phases).

In [None]:
def Perform_Fourier(sensor,sigma):
    n_of_sampling_pts=len(sensor)
    sample_period=0.01
    time=0.01*n_of_sampling_pts# number of sampling points
    time_steps=np.arange(0, time, 0.01)  
    freq=PyDynamic.uncertainty.propagate_DFT.GUM_DFTfreq(n_of_sampling_pts,float(time)/n_of_sampling_pts)
    ux=sigma**2
    N=n_of_sampling_pts//2+1
    selector=np.arange(N)
    ns=len(selector)
    A=np.zeros(ns)
    P=np.zeros_like(A)
    UAP=np.zeros(3*ns)
    X,UX=GUM_DFT(sensor,ux)
    A, P, UAP_m = DFT2AmpPhase(X, UX, keep_sparse=True)
    UAP[:ns] = UAP_m.data[0][:N][selector]
    UAP[ns:2*ns] = UAP_m.data[1][UAP_m.offsets[1]:2*N+UAP_m.offsets[1]][selector]
    UAP[ 2*ns:] = UAP_m.data[0][N:][selector]
    return freq,A,P,UAP

### 3.1.1 Unequal length of time signals

Since the length of time signals varies, Fourier transform will be first performed for the cycles with minimum and maximum length. These two time signals will be considered as two extreme cases for solving this problem. 

In [None]:
cycle_length=[None]*(len(parts))
for i in range(len(parts)):
    cycle_length[i]=(parts[i].shape[0])

In [None]:
max_length=max(cycle_length)
print("Maximum length of time signals is:",max_length, "and it is in",cycle_length.index(max_length),". cycle")

In [None]:
min_length=min(cycle_length)
print("Minimum length of time signals is:",min_length, "and it is in",cycle_length.index(min_length),". cycle")

We will have a look at Fourier transform of time signals in 42nd cycle for all of 22 sensors.

In [None]:
A_sensors_42=[None]*len(sensors)
P_sensors_42=[None]*len(sensors)
UAP_sensors_42=[None]*len(sensors)
freq_42=[None]*len(sensors)
for i in range(len(sensors)):
    freq_42[i],A_sensors_42[i], P_sensors_42[i], UAP_sensors_42[i]=Perform_Fourier(sensors[i][42],sigma)
                        

We will have a look at Fourier transform of time signals in 17th cycle for all of 22 sensors.

In [None]:
A_sensors_17=[None]*len(sensors)
P_sensors_17=[None]*len(sensors)
UAP_sensors_17=[None]*len(sensors)
freq_17=[None]*len(sensors)
for i in range(len(sensors)):
    freq_17[i],A_sensors_17[i], P_sensors_17[i], UAP_sensors_17[i]=Perform_Fourier(sensors[i][17],sigma)

The next plot shows the differences in amplitudes between both cycles. Be aware of different frequency bins because of the length of time signals.

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (20,10)
mpl.rc('font', **font)
def overview_cycles(i):
    plt.subplot(2,1,1)
    plt.plot(freq_17[i],A_sensors_17[i], label="17th cycle")
    plt.xlabel('Frequencies [Hz]')
    plt.ylabel('Amplitudes');
    plt.yscale('log')
    num_of_sensor=list(parts[0].columns.values)
    plt.title(num_of_sensor[i])
    plt.legend()
    plt.subplot(2,1,2)
    plt.yscale('log')
    plt.plot(freq_42[i],A_sensors_42[i],label="42nd cycle")
    plt.xlabel('Frequencies [Hz]')
    plt.ylabel('Amplitudes');
    plt.legend()
interact(overview_cycles,i=widgets.IntSlider(min=0, max=24, step=1))

#### 3.1.1.1 Value padding

In order to analyze the amplitudes at the same frequencies for all cycles, the simplest way is to add some values at the end of short time series so that they have the length of the longest one.By appending artificial zeros to the signal, the frequency grid becomes denser when applying the DFT. 

As a reminder, the longest time signal has length: 

In [None]:
max_length=max(cycle_length)
print("Maximum length of time signals is:",max_length, "and it is in",cycle_length.index(max_length),". cycle")

The shortest time signal has length:

In [None]:
min_length=min(cycle_length)
print("Minimum length of time signals is:",min_length, "and it is in",cycle_length.index(min_length),". cycle")

Three cases of adding values will be examined:
- zeros
- mean value of the time signal
- last value of time signal.
The effect of adding values will be analyzed on the shortest time signal (17) for all sensors.

Function `add_value` will be created. It will take time signals for all parts as arguments and return *sensors_value_0*, *sensors_value_mean*, *sensors_value_last* with values 0, mean value and last element value respectively added to the end of time signals.

In [None]:
def add_value(all_parts):
    sensors_value_0=[None]*(all_parts[0].shape[1])
    sensors_value_mean=[None]*(all_parts[0].shape[1])
    sensors_value_last=[None]*(all_parts[0].shape[1])
    for p in range (all_parts[0].shape[1]):
        sensors_value_0[p]=np.zeros((len(all_parts),max_length))
        sensors_value_mean[p]=np.zeros_like(sensors_value_0[p])
        sensors_value_last[p]=np.zeros_like(sensors_value_0[p])
        for k in range(len(all_parts)):
                if all_parts[k].shape[0]==max_length:
                    sensors_value_0[p][k,:]=all_parts[k].iloc[:,p].values
                    sensors_value_mean[p][k,:]=sensors_value_0[p][k,:]
                    sensors_value_last[p][k,:]=sensors_value_0[p][k,:]
                else:
                    sensors_value_0[p][k,:all_parts[k].shape[0]]=parts[k].iloc[:,p].values
                    sensors_value_mean[p][k,:all_parts[k].shape[0]]= sensors_value_0[p][k,:all_parts[k].shape[0]]
                    sensors_value_last[p][k,:all_parts[k].shape[0]]= sensors_value_0[p][k,:all_parts[k].shape[0]]
                    sensors_value_0[p][k,parts[k].shape[0]:]=0
                    sensors_value_mean[p][k,parts[k].shape[0]:]=np.mean(all_parts[k].iloc[-100:,p].values)
                    sensors_value_last[p][k,parts[k].shape[0]:]=all_parts[k].iloc[-1,p]
    return sensors_value_0,sensors_value_mean,sensors_value_last

##### Function execution

In [None]:
sensors_zero,sensors_mean,sensors_last=add_value(parts)

In [None]:
sensors_zero[0].shape

In [None]:
# adding white noise:

for i in range((len(sensors_zero))):
    for k in range((sensors_zero[0].shape[0])):
        sensors_zero[i][k,:]=sensors_zero[i][k,:]+ np.random.randn(sensors_zero[i].shape[1])*sigma
        sensors_mean[i][k,:]=sensors_mean[i][k,:]+ np.random.randn(sensors_mean[i].shape[1])*sigma
        sensors_last[i][k,:]=sensors_last[i][k,:]+ np.random.randn(sensors_last[i].shape[1])*sigma

Fourier transform into frequency domain will be performed for all cases.

In [None]:
A_sensors_17_zero=[None]*len(sensors_zero)
P_sensors_17_zero=[None]*len(sensors_zero)
UAP_sensors_17_zero=[None]*len(sensors_zero)
freq_17_zero=[None]*len(sensors_zero)
for i in range(len(sensors_zero)):
    freq_17_zero[i],A_sensors_17_zero[i], P_sensors_17_zero[i], UAP_sensors_17_zero[i]=Perform_Fourier(sensors_zero[i][17],sigma)
                    

In [None]:
A_sensors_17_mean=[None]*len(sensors_mean)
P_sensors_17_mean=[None]*len(sensors_mean)
UAP_sensors_17_mean=[None]*len(sensors_mean)
freq_17_mean=[None]*len(sensors_mean)
for i in range(len(sensors_mean)):
    freq_17_mean[i],A_sensors_17_mean[i], P_sensors_17_mean[i], UAP_sensors_17_mean[i]=Perform_Fourier(sensors_mean[i][17],sigma)
                    

In [None]:
A_sensors_17_last=[None]*len(sensors_last)
P_sensors_17_last=[None]*len(sensors_last)
UAP_sensors_17_last=[None]*len(sensors_last)
freq_17_last=[None]*len(sensors_last)
for i in range(len(sensors_last)):
    freq_17_last[i],A_sensors_17_last[i], P_sensors_17_last[i], UAP_sensors_17_last[i]=Perform_Fourier(sensors_last[i][17],sigma)
                    

##### 3.1.1.2 Padding based on cubic spline interpolation

Spline interpolation requires two essential steps: 

- (1) a spline representation of the curve is computed, and 
- (2) the spline is evaluated at the desired points. 

The direct method of representing a curve and obtaining spline coefficients is in a two- dimensional plane using the function splrep. The first two arguments are the only ones required, and these provide the x-axis and y-axis components of the curve. The normal output is a 3-tuple,(*t*,*c*,*k*), containing the knot-points,*t*, the coefficients *c* and the order *k* of the spline. The default spline order is cubic.

The keyword argument, *s* , is used to specify the amount of smoothing to perform during the spline fit. The default value of  is  where  is the number of data-points being fit. Therefore, if no smoothing is desired a value of *s*=0 should be passed to the routines.

Once the spline representation of the data has been determined, functions are available for evaluating the spline (splev). 

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import interpolate

To evaluate the spline, new values of x-axis are needed and actually, x-axis values are time steps. For each cycle, number of additional points on x-axis depends on the difference between its cycle length and the highest cycle length. Difference is divided by 100 because sampling period is 100 ms (or 0.01 s)

In [None]:
time_diff=[None]*len(cycle_length)
for i in range(len(cycle_length)):
    time_diff[i]=(cycle_length[42]-cycle_length[i])/100

First, cubic spline is created based on the original signal length and that is how variable tck is obtained. Then, this spline is used to obtain new y-axis values.

In [None]:
import math
sensors_spl=[None]*(parts[0].shape[1])
   
for p in range (parts[0].shape[1]):
    sensors_spl[p]=np.zeros((len(parts),max_length))
        
    for k in range(len(parts)):
        if parts[k].shape[0]==max_length:
            sensors_spl[p][k,:]=parts[k].iloc[:,p].values
                    
        else:
            time_steps=np.arange(0,time_diff[k],0.01)   
            x = np.arange(0,cycle_length[k]* 0.01, 0.01)
            y = parts[k].iloc[:,p].values
            tck = interpolate.splrep(x, y, s=0)
            xnew = np.arange(0,time_diff[k],0.01)   
            ynew = interpolate.splev(xnew, tck, der=0)
            sensors_spl[p][k,:parts[k].shape[0]]=parts[k].iloc[:,p].values
            sensors_spl[p][k,parts[k].shape[0]:]=ynew

In [None]:
# adding white noise:

for i in range((len(sensors_spl))):
    for k in range((sensors_spl[0].shape[0])):
        sensors_spl[i][k,:]=sensors_spl[i][k,:]+ np.random.randn(sensors_spl[i].shape[1])*sigma

Now, Fourier transform can be perfomed.

In [None]:
A_sensors_17_spl=[None]*len(sensors_spl)
P_sensors_17_spl=[None]*len(sensors_spl)
UAP_sensors_17_spl=[None]*len(sensors_spl)
freq_17_spl=[None]*len(sensors_spl)
for i in range(len(sensors_spl)):
    freq_17_spl[i],A_sensors_17_spl[i], P_sensors_17_spl[i], UAP_sensors_17_spl[i]=Perform_Fourier(sensors_spl[i][17],sigma)
                    

The next plot shows the amplitudes for given frequencies for the 17th cycle with added zero value, mean value,last element value and cubic spline. It can be seen that deviation of amplitudes for zero padding is often the most significant. That is shown on the joint, but also in separate plots. Note that, when compared with ('Amplitudes - without padding') all of the padded results have different frequency bins.

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (25,25)
mpl.rc('font', **font)
def overview_amplitudes(i):
    plt.plot(freq_17[i],A_sensors_17[i],label=  'Amplitudes - without padding')
    plt.plot(freq_17_zero[i],A_sensors_17_zero[i], label=  'Amplitudes - zero padding')
    plt.plot(freq_17_mean[i],A_sensors_17_mean[i],label=  'Amplitudes - mean padding')
    plt.plot(freq_17_last[i],A_sensors_17_last[i], label='Amplitudes - last element padding' )
    plt.plot(freq_17_spl[i],A_sensors_17_spl[i], label=  'Amplitudes - spline interpolation')
    plt.yscale('log')
    plt.ylabel('Amplitudes');
    num_of_sensor=list(parts[0].columns.values)
    plt.title(num_of_sensor[i])
    plt.legend()

interact(overview_amplitudes,i=widgets.IntSlider(min=0, max=24, step=1))

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (25,25)
mpl.rc('font', **font)
def overview_amplitudes(i):

    num_of_sensor=list(parts[0].columns.values)
    plt.title(num_of_sensor[i])
    plt.subplot(4,1,1)
    plt.plot(freq_17_zero[i],A_sensors_17_zero[i],label=  'Amplitudes - zero padding')
    plt.plot(freq_17[i],A_sensors_17[i],label=  'Amplitudes - without padding')
    plt.yscale('log')
    plt.ylabel('Amplitudes ');
    plt.legend()
    plt.title(num_of_sensor[i])
    plt.subplot(4,1,2)
    plt.plot(freq_17_mean[i],A_sensors_17_mean[i],label=  'Amplitudes - mean value padding')
    plt.plot(freq_17[i],A_sensors_17[i],label=  'Amplitudes - without padding')
    plt.yscale('log')
    plt.ylabel('Amplitudes ');
    plt.legend()
    plt.subplot(4,1,3)
    plt.plot(freq_17_last[i],A_sensors_17_last[i],label=  'Amplitudes - last element padding')
    plt.plot(freq_17[i],A_sensors_17[i],label=  'Amplitudes - without padding')
    plt.yscale('log')
    plt.subplot(4,1,4)
    plt.plot(freq_17_spl[i],A_sensors_17_spl[i],label=  'Amplitudes - spline interpolation')
    plt.plot(freq_17[i],A_sensors_17[i],label=  'Amplitudes - without padding')
    plt.xlabel('Frequencies [Hz]')
    plt.yscale('log')
    plt.ylabel('Amplitudes ' );
    plt.legend()

interact(overview_amplitudes,i=widgets.IntSlider(min=0, max=24, step=1))


The effect of padding with some values will also be checked through rescaling DFT values with (1/(cycle_length * 0.01)), where cycle_length corresponds to the number of points for every signal (its length) and 0.01 represents the sampling period of 0.01s. Comparisons for different cases of padding will be shown on the joint and separate plots.

In [None]:
A_sensors_17_zero_scaled=[None]*len(sensors_last)
A_sensors_17_last_scaled=[None]*len(sensors_last)
A_sensors_17_mean_scaled=[None]*len(sensors_last)
A_sensors_17_scaled=[None]*len(sensors_last)
A_sensors_17_spl_scaled=[None]*len(sensors_spl)

for k in range(len(sensors_mean)):
    A_sensors_17_scaled[k]=A_sensors_17[k]*(1/(0.01*cycle_length[17]))
    A_sensors_17_spl_scaled[k]=A_sensors_17_spl[k]*(1/(0.01*cycle_length[17]))
    A_sensors_17_zero_scaled[k]=A_sensors_17_zero[k]*(1/(0.01*cycle_length[17]))
    A_sensors_17_last_scaled[k]=A_sensors_17_last[k]*(1/(0.01*cycle_length[17]))
    A_sensors_17_mean_scaled[k]=A_sensors_17_mean[k]*(1/(0.01*cycle_length[17]))

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (25,20)
mpl.rc('font', **font)
def overview_amplitudes_scaled(i):
    
    plt.plot(freq_17[i],A_sensors_17_scaled[i],label=  'Amplitudes - without padding')
    plt.plot(freq_17_zero[i],A_sensors_17_zero_scaled[i], label=  'Amplitudes - zero padding')
    plt.plot(freq_17_mean[i],A_sensors_17_mean_scaled[i], label='Amplitudes - mean element padding' )
    plt.plot(freq_17_last[i],A_sensors_17_last_scaled[i], label='Amplitudes - last element padding' )
    plt.plot(freq_17_spl[i],A_sensors_17_spl_scaled[i], label=  'Amplitudes - spline interpolation')
    plt.yscale('log')
    plt.ylabel('Amplitudes');
    num_of_sensor=list(parts[0].columns.values)
    plt.title(num_of_sensor[i])
    plt.legend()
    plt.xlabel('Frequencies [Hz]')
interact(overview_amplitudes_scaled,i=widgets.IntSlider(min=0, max=24, step=1))

In [None]:
A_sensors_17_xxscaled=[None]*len(sensors)
for k in range(len(sensors)):
    A_sensors_17_xxscaled[k]=A_sensors_17[k]*(1/(0.01*cycle_length[17]))

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (25,20)
mpl.rc('font', **font)
def overview_amplitudes_scaled(i):
    
    plt.plot(freq_17[i], A_sensors_17_scaled[i],label=  'Amplitudes - without padding')
    plt.plot(freq_17_zero[i],A_sensors_17_zero_scaled[i], label=  'Amplitudes - zero padding')
    plt.plot(freq_17_mean[i],A_sensors_17_mean_scaled[i], label='Amplitudes - mean element padding' )
    plt.plot(freq_17_last[i],A_sensors_17_last_scaled[i], label='Amplitudes - last element padding' )
    plt.plot(freq_17_spl[i],A_sensors_17_spl_scaled[i], label=  'Amplitudes - spline interpolation')
    plt.yscale('log')
    plt.ylabel('Amplitudes');
    num_of_sensor=list(parts[0].columns.values)
    plt.title(num_of_sensor[i])
    plt.legend()
    plt.xlabel('Frequencies [Hz]')
interact(overview_amplitudes_scaled,i=widgets.IntSlider(min=0, max=24, step=1))

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (25,25)
mpl.rc('font', **font)
def overview_amplitudes(i):

    num_of_sensor=list(parts[0].columns.values)
    
    plt.subplot(4,1,1)
    plt.plot(freq_17_zero[i],A_sensors_17_zero_scaled[i],label=  'Amplitudes - zero padding')
    plt.plot(freq_17[i],A_sensors_17_scaled[i],label=  'Amplitudes - without padding')
    plt.yscale('log')
    plt.ylabel('Amplitudes ');
    plt.title(num_of_sensor[i])
    plt.legend()
    plt.subplot(4,1,2)
    plt.plot(freq_17_mean[i],A_sensors_17_mean_scaled[i],label=  'Amplitudes - mean value padding')
    plt.plot(freq_17[i],A_sensors_17_scaled[i],label=  'Amplitudes - without padding')
    plt.yscale('log')
    plt.ylabel('Amplitudes ');
    plt.legend()
    plt.subplot(4,1,3)
    plt.plot(freq_17_last[i],A_sensors_17_last_scaled[i],label=  'Amplitudes - last element padding')
    plt.plot(freq_17[i],A_sensors_17_scaled[i],label=  'Amplitudes - without padding')
    plt.yscale('log')
    plt.legend()
    plt.subplot(4,1,4)
    plt.plot(freq_17_spl[i],A_sensors_17_spl_scaled[i],label=  'Amplitudes - spline interpolation')
    plt.plot(freq_17[i],A_sensors_17_scaled[i],label=  'Amplitudes - without padding')
    plt.xlabel('Frequencies [Hz]')
    plt.yscale('log')
    plt.ylabel('Amplitudes ' );
    plt.legend()
interact(overview_amplitudes,i=widgets.IntSlider(min=0, max=24, step=1))


When amplitudes are scaled, it seems there is a big difference in numerical values between different cases. However, the shape of the signal in the frequency domain is similar for all of the cases.

Another check: N% of the highest amplitudes from the frequency domain of shortest time signal in:

a) its original length

b) length with zeros added at the end

c) length with mean value added at the end

d) length with last element value added at the end

will be compared.

Note that frequency bins in a) differ from frequency bins in b), c) and d). The percentage has been chosen as an arbitrary number. It can be user-defined.

Function `sort_amplitudes` will sort all the amplitudes and return number *N* of highest amplitudes for all cases of 17th cycle.

In [None]:
percentage=input("Enter the percentage of spectrum to be sorted:")
N=(round((float(percentage)/100)*max_length))

In [None]:
def sort_amplitudes (amplitudes,N):
    Sorted_amplitudes=np.argsort(amplitudes)[::-1]
    N_highest_amplitudes= amplitudes[Sorted_amplitudes[:N]]

    return N_highest_amplitudes

##### Function execution

In [None]:
N_highest_amplitudes_17=[None]*len(sensors)
N_highest_amplitudes_17_zero=[None]*len(sensors_zero)
N_highest_amplitudes_17_mean=[None]*len(sensors_mean)
N_highest_amplitudes_17_last=[None]*len(sensors_last)
N_highest_amplitudes_17_spl=[None]*len(sensors_spl)
for i in range(len(sensors)):
    N_highest_amplitudes_17[i]= sort_amplitudes(A_sensors_17[i],N)
    N_highest_amplitudes_17_zero[i]= sort_amplitudes(A_sensors_17_zero[i],N)
    N_highest_amplitudes_17_mean[i]=sort_amplitudes(A_sensors_17_mean[i],N)
    N_highest_amplitudes_17_last[i]=sort_amplitudes(A_sensors_17_last[i],N)
    N_highest_amplitudes_17_spl[i]=sort_amplitudes(A_sensors_17_spl[i],N)

The highest amplitudes for 17th cycle of all sensors are presented in the plot below. We can see that neither of padding cases perfectly matches the highest amplitudes without padding, but last element padding looks like the most similar. Because of that, it will be choosen as the way of creating the same length of time signals for all sensors.

In [None]:
%matplotlib inline
font = {'family' : 'Times New Roman', 'weight' : 'normal', 'size'   : 20}
mpl.rcParams['figure.figsize'] = (20,25)
mpl.rc('font', **font)
def amplitudes_zero(i):
    num_of_sensor=list(parts[0].columns.values)
    plt.plot(np.arange(N),(N_highest_amplitudes_17[i]),label="Amplitudes - without padding" )
    plt.ylabel("Amplitudes") 
    plt.title(num_of_sensor[i])
    plt.plot(np.arange(N),(N_highest_amplitudes_17_zero[i]),label="Amplitudes - zero padding")

    plt.plot(np.arange(N),(N_highest_amplitudes_17_mean[i]),label="Amplitudes mean value padding")

    plt.plot(np.arange(N),(N_highest_amplitudes_17_last[i]),label="Amplitudes - last element padding")
    plt.plot(np.arange(N),(N_highest_amplitudes_17_spl[i]),label="Amplitudes - spline")
    plt.yscale("log")
    plt.legend()
interact(amplitudes_zero,i=widgets.IntSlider(min=0, max=24, step=1))    

Based on the padding with the value of last element of time domain signals, Fourier transform for all sensors and cycles will be performed. This will result with amplitudes, phases and their uncertainties. For simplicity, the focus will be on the amplitudes and their standard squared uncertainties.  

After that, two ways of extracting 200 amplitudes will be presented. One way is to sort all the columns of amplitudes (corresponding to the frequency bins) in descending order and to extract 200 of the highest ones. 

The second approach is to perform Pearson correlation between these columns and one column of measured data and to choose the columns with highest correlation coefficients. This approach will be explained in details later. 

In [None]:
from PyDynamic.uncertainty.propagate_DFT import Time2AmpPhase_multi
A_df=[None]*len(sensors_last)
UAP_df=[None]*len(sensors_last)
P_df=[None]*len(sensors_last)
for i in range(len(sensors_last)):
    A_df[i],P_df[i],UAP_df[i]=Time2AmpPhase_multi(sensors_spl[i],np.ones(sensors_spl[i].shape[0])*sigma**2)

Data will be written to the .hdf5 file in order to avoid long-lasting calculation everytime when the Jupyter Notebook is opened

In [None]:
import h5py
hf_a = h5py.File('Amplitudes_spl.hdf5', 'w')
hf_p=h5py.File('Phases_spl.hdf5', 'w')
hf_uap=h5py.File('Uncertainties_spl.hdf5', 'w')

In [None]:
for i in range(len(sensors_spl)):
    hf_a["A_df"+str(i)]=A_df[i]
    hf_p["P_df"+str(i)]=P_df[i]
    hf_uap["UAP"+str(i)]=UAP_df[i]

In [None]:

A_df_zero=[None]*len(sensors_zero)
UAP_df_zero=[None]*len(sensors_zero)
P_df_zero=[None]*len(sensors_zero)
for i in range(len(sensors_zero)):
    A_df_zero[i],P_df_zero[i],UAP_df_zero[i]=Time2AmpPhase_multi(sensors_zero[i],np.ones(sensors_zero[i].shape[0])*sigma**2)

In [None]:
hf_a_zero = h5py.File('Amplitudes_zero.hdf5', 'w')
hf_p_zero = h5py.File('Phases_zero.hdf5', 'w')
hf_uap_zero=h5py.File('Uncertainties_zero.hdf5', 'w')

In [None]:
for i in range(len(sensors_spl)):
    hf_a_zero["A_df"+str(i)]=A_df_zero[i]
    hf_p_zero["P_df"+str(i)]=P_df_zero[i]
    hf_uap_zero["UAP"+str(i)]=UAP_df_zero[i]

In [None]:
hf_a.close()
hf_p.close()
hf_uap.close()
hf_a_zero.close()
hf_p_zero.close()
hf_uap_zero.close()

In [None]:
hf_a.close()
hf_p.close()
hf_uap.close()

In [None]:
SS=pd.DataFrame(UAP_df[0])
SS

https://docs.scipy.org/doc/scipy/reference/tutorial/interpolate.html