Imports

In [19]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from sklearn.metrics import mean_squared_error, mean_absolute_error
from scipy.stats import pearsonr
from scipy.optimize import fsolve, curve_fit

Plot Function

In [20]:
def plot_comparison(df, df_packet_loss, metric, fit_type):
    fig = go.Figure()

    metric_fit = metric + '_Fit'

    # Plot the original data line
    fig.add_trace(go.Scatter(x=df['Time'], y=df[metric], mode='lines', name='Original Sensor Data'))

    # Plot the packet loss data line with basic interpolation
    fig.add_trace(go.Scatter(x=df_packet_loss['Time'], y=df_packet_loss[metric], mode='lines', name='Basic interpolation', opacity=1))

    # Plot the packet loss data line with better fitted interpolation based on cumulative data field
    fig.add_trace(go.Scatter(x=df_packet_loss['Time'], y=df_packet_loss[metric_fit], mode='lines', name=f'Cumulative fitted', opacity=1))


    fig.update_layout(
        title=f'{metric}: Cumulative Data fit using <b>{fit_type}</b> method<br>',
        xaxis_title='Time (s)',
        yaxis_title=metric,
        legend_title='Dataset'
    )

    fig.show()

Summary Stats Function

In [21]:
summary_power_df = pd.DataFrame({
                     f'Power': ['Average Difference', 'Mean Absolute Error', 
                    'Pearson Correlation Coefficient', 'Standard Deviation of Differences'],
                    })
summary_pedal_df = pd.DataFrame({
                     f'Pedal Angle': ['Average Difference', 'Mean Absolute Error', 
                    'Pearson Correlation Coefficient', 'Standard Deviation of Differences'],
                    })

# statistical values for comparison between interpolated values and the original values 
def get_stats(df_orig,df_loss,metric,fit_type):
    original_data = df_orig.loc[df_orig['Signal'] == False, metric]
    if fit_type == 'Basic':
        interpolated_data = df_loss.loc[df_loss['Signal'] == False, metric]
    else:
        interpolated_data = df_loss.loc[df_loss['Signal'] == False, metric+'_Fit']

    # Calculate difference in the averages 
    avg_diff = original_data.mean() - interpolated_data.mean()
    # Calculate MAE
    mae = mean_absolute_error(original_data, interpolated_data)
    # Calculate Pearson Correlation Coefficient
    corr_coefficient, _ = pearsonr(original_data, interpolated_data)
    # Calculate Standard Deviation of Differences
    std_diff = np.std(original_data - interpolated_data)

    return (avg_diff, mae, corr_coefficient, std_diff)


# formatting for the summary output
def format_float(value):
    if isinstance(value, float):
        return "{:.3f}".format(value)
    return value


Generation of time-series data that represents example power and pedal_angle data.   
<sub>
Time Interval: Time is generated from 0 to 100 seconds, at intervals of 0.01 seconds.  
Power: The initial power starts at 250 Watts and varies randomly within the range of -2 to +2 Watts.  
pedal_angle: A sinusoidal function is used to generate the pedal_angle, varying over a period of 0.7 seconds.  </sub>  


In [22]:
# Time column: 0 to 100 seconds in 0.01-second intervals
time = np.arange(0, 20, 0.01)

# Power column: Starts at 250 and varies randomly between [-1, +1]
power = 250 + np.random.uniform(-2, 2, len(time)).cumsum()

# pedal_angle column: Sinusoidal variation over 0.7 seconds (85rpm, 70 records) 
pedal_angle = 180 * np.sin(2 * np.pi * time / 0.7)

# Create the DataFrame
df = pd.DataFrame({
    'Time': time,
    'Power': power,
    'Pedal_Angle': pedal_angle,
    'Signal': True
})

# Add Cumulative Power and Cumulative Pedal_Angle columns
df['Cumulative Power'] = df['Power'].cumsum()
offset = 180  # required to keep all angles positive
df['Cumulative Pedal_Angle'] = (df['Pedal_Angle'] + offset).cumsum()

Simulation of packet loss in sensor data  
<sub>
The code simulates packet loss by iterating through each record in the DataFrame.  
For each record, a 1 in 50 chance is used to decide whether to set the next 20 records for 'Power' and 'Pedal_Angle' to NaN.  
These NaN values will be interpolated, simulating a real-world scenario where data might be lost and estimated. </sub>

In [23]:
###
# SIMULATE PACKET LOSS
###
# Initialize an empty list to hold the indices to be dropped 
drop_indices = []

# Add random dropouts to dataset (except first and last 100 recs)
for i in range(100, len(df)):
    # Decide with a 1 in 50 chance whether to set the next 20 records to NaN
    if np.random.choice([True, False], p=[1/50, 1 - 1/50]):
        # Add the indices of the next 20 records to the list, provided they are within bounds
        drop_indices.extend(range(i, min(i + 20, len(df))))

# Create a copy for our new Packet Loss dataFrame 
df_packet_loss = df.copy()

# Set the 'Power' and 'Pedal_Angle' to NaN for the selected indices 
df_packet_loss.loc[drop_indices, ['Power', 'Pedal_Angle', 'Signal']] = np.nan
# Also keep a permanent record of dropouts in Signal field 
df_packet_loss.loc[drop_indices, ['Signal']] = False
df.loc[drop_indices, ['Signal']] = False

# Initialize columns for fitted power and pedal_angle
df_packet_loss['Power_Fit'] = df_packet_loss['Power']
df_packet_loss['Pedal_Angle_Fit'] = df_packet_loss['Pedal_Angle']


Basic Interpolation

In [24]:
# Interpolate the NaN values in Power and pedal_angle fields
df_packet_loss[['Power', 'Pedal_Angle']] = df_packet_loss[['Power', 'Pedal_Angle']].interpolate()


# Summary Stats
summary_power_df['Basic'] = get_stats(df,df_packet_loss,'Power','Basic')
formatted_df = summary_power_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))
print()
summary_pedal_df['Basic'] = get_stats(df,df_packet_loss,'Pedal_Angle','Basic')
formatted_df = summary_pedal_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))


| Power                             |   Basic |
|:----------------------------------|--------:|
| Average Difference                |  -0.085 |
| Mean Absolute Error               |   1.559 |
| Pearson Correlation Coefficient   |   0.994 |
| Standard Deviation of Differences |   1.99  |

| Pedal Angle                       |   Basic |
|:----------------------------------|--------:|
| Average Difference                | -11.153 |
| Mean Absolute Error               |  51.856 |
| Pearson Correlation Coefficient   |   0.826 |
| Standard Deviation of Differences |  73.085 |


Avg Value Interpolation based on cumulative data

In [25]:
# Function to handle filling missing data with average recovered from cumulative field
def avg_value_fill(start, end, df):
    num_missing = end - start + 1
    
    # Calculate average power of missing records 
    tot_power = (df.loc[end + 1, 'Cumulative Power'] - df.loc[start - 1, 'Cumulative Power']) 
    avg_power = tot_power / (num_missing + 1)
    df.loc[start:end, 'Power_Fit'] = avg_power
    
    # Calculate average pedal_angle of missing records (accounting for offset)
    tot_pedal_angle = (df.loc[end + 1, 'Cumulative Pedal_Angle'] - df.loc[start - 1, 'Cumulative Pedal_Angle'] - (offset * (num_missing + 1))) 
    avg_pedal_angle = tot_pedal_angle / (num_missing + 1)
    df.loc[start:end, 'Pedal_Angle_Fit'] = avg_pedal_angle
    
    
    
# Initialize start_index
start_index = None

# Reset the lost data again
df_packet_loss.loc[df_packet_loss['Signal']==False, ['Power_Fit','Pedal_Angle_Fit']] = np.NaN

# Loop through DataFrame to find missing signal sections
for i in range(len(df_packet_loss)):
    if df_packet_loss.loc[i, 'Signal'] == False:
        if start_index is None:
            start_index = i
    elif start_index is not None:
        end_index = i - 1  # The last index of the contiguous signal lost range
        avg_value_fill(start_index, end_index, df_packet_loss)
        start_index = None  # Reset for the next contiguous block

# Plot         
plot_comparison(df, df_packet_loss, 'Power', 'Avg Cumulative')
plot_comparison(df, df_packet_loss, 'Pedal_Angle', 'Avg Cumulative')

# Summary Stats
summary_power_df['Avg Cumulative'] = get_stats(df,df_packet_loss,'Power','Avg Cumulative')
formatted_df = summary_power_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))
print()
summary_pedal_df['Avg Cumulative'] = get_stats(df,df_packet_loss,'Pedal_Angle','Avg Cumulative')
formatted_df = summary_pedal_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))

| Power                             |   Basic |   Avg Cumulative |
|:----------------------------------|--------:|-----------------:|
| Average Difference                |  -0.085 |           -0.022 |
| Mean Absolute Error               |   1.559 |            1.559 |
| Pearson Correlation Coefficient   |   0.994 |            0.994 |
| Standard Deviation of Differences |   1.99  |            2     |

| Pedal Angle                       |   Basic |   Avg Cumulative |
|:----------------------------------|--------:|-----------------:|
| Average Difference                | -11.153 |           -0.781 |
| Mean Absolute Error               |  51.856 |           60.958 |
| Pearson Correlation Coefficient   |   0.826 |            0.807 |
| Standard Deviation of Differences |  73.085 |           75.027 |


Quadratic Interpolation based on cumulative data

In [26]:
# equations for the quadratic
def equations(vars, start_val, end_val, num_miss, tot_val):
    a, b, c = vars
    up_lim = int(num_miss + 1)  # values range from n=0 to n=num_miss+1 (i.e. includes the known start and end values)
    eq1 = a*0**2 + b*0 + c - start_val
    eq2 = a*up_lim**2 + b*up_lim + c - end_val
    eq3 = sum([a*x**2 + b*x + c for x in range(up_lim+1)]) - (tot_val + start_val)
    return [eq1, eq2, eq3]

# Fill missing data from cumulative field using quadratic
def quadratic_fill(start, end, df):
    num_missing = end - start + 1
    
    # POWER
    # Calculate average power of missing records 
    tot_power = (df.loc[end + 1, 'Cumulative Power'] - df.loc[start - 1, 'Cumulative Power']) 
    start_power = df.loc[start - 1, 'Power']
    end_power = df.loc[end + 1, 'Power']
    # Solve the equations
    initial_guess = [1, 1, 1]
    a, b, c = fsolve(equations, initial_guess, args=(start_power, end_power, num_missing, tot_power)) 
    # Generate the missing values
    values = [a*x**2 + b*x + c for x in range(num_missing+2)]  # range includes start and ending known values
    # Fill in the missing values 
    df.loc[start:end, 'Power_Fit'] = values[1:-1]

    # PEDAL_ANGLE
    # Calculate average power of missing records 
    tot_pedal_angle = (df.loc[end + 1, 'Cumulative Pedal_Angle'] - df.loc[start - 1, 'Cumulative Pedal_Angle']) - (num_missing * offset)  # 180 was offset used
    start_pedal_angle = df.loc[start - 1, 'Pedal_Angle']
    end_pedal_angle = df.loc[end + 1, 'Pedal_Angle']
    # Solve the equations
    initial_guess = [1, 1, 1]
    a, b, c = fsolve(equations, initial_guess, args=(start_pedal_angle, end_pedal_angle, num_missing, tot_pedal_angle)) 
    # Generate the missing values
    values = [a*x**2 + b*x + c for x in range(num_missing+2)]  # range includes start and ending known values
    # Fill in the missing values 
    df.loc[start:end, 'Pedal_Angle_Fit'] = values[1:-1]
    
# Initialize start_index
start_index = None

# Reset the lost data again
df_packet_loss.loc[df_packet_loss['Signal']==False, ['Power_Fit','Pedal_Angle_Fit']] = np.NaN

# Loop through DataFrame to find missing signal sections
for i in range(len(df_packet_loss)):
    if df_packet_loss.loc[i, 'Signal'] == False:
        if start_index is None:
            start_index = i
    elif start_index is not None:
        end_index = i - 1  # The last index of the contiguous signal lost range
        quadratic_fill(start_index, end_index, df_packet_loss)
        start_index = None  # Reset for the next contiguous block

# Plot         
plot_comparison(df, df_packet_loss, 'Power', 'Quadratic')
plot_comparison(df, df_packet_loss, 'Pedal_Angle', 'Quadratic')


# Summary Stats
summary_power_df['Quadratic'] = get_stats(df,df_packet_loss,'Power','Quadratic')
formatted_df = summary_power_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))
print()
summary_pedal_df['Quadratic'] = get_stats(df,df_packet_loss,'Pedal_Angle','Quadratic')
formatted_df = summary_pedal_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))

| Power                             |   Basic |   Avg Cumulative |   Quadratic |
|:----------------------------------|--------:|-----------------:|------------:|
| Average Difference                |  -0.085 |           -0.022 |      -0     |
| Mean Absolute Error               |   1.559 |            1.559 |       1.198 |
| Pearson Correlation Coefficient   |   0.994 |            0.994 |       0.996 |
| Standard Deviation of Differences |   1.99  |            2     |       1.506 |

| Pedal Angle                       |   Basic |   Avg Cumulative |   Quadratic |
|:----------------------------------|--------:|-----------------:|------------:|
| Average Difference                | -11.153 |           -0.781 |      -7.782 |
| Mean Absolute Error               |  51.856 |           60.958 |      13.162 |
| Pearson Correlation Coefficient   |   0.826 |            0.807 |       0.992 |
| Standard Deviation of Differences |  73.085 |           75.027 |      16.359 |


Triangulated fit of the missing Power data based on the cumulated value 

In [27]:
def triangular_interpolation(start_val, end_val, num_missing, total_sum):
   
    # Calculate the current average between start and end values
    current_avg = (start_val + end_val) / 2.0
    
    # Calculate the difference between the total missing sum and the current average
    total_diff = total_sum - current_avg * num_missing
    
    # Determine the value at the midpoint
    mid_value = current_avg + (total_diff / (num_missing / 2))
    
    # Initialize the list of interpolated values
    interpolated_values = []
    
    # # Determine the direction of interpolation
    # if mid_value >= current_avg:
    #     # Interpolate linearly to the midpoint, then back down
    #     for i in range(1, num_missing + 1):
    #         if i <= num_missing // 2:
    #             value = np.linspace(start_val, mid_value, num_missing // 2 + 1)[i]
    #         else:
    #             value = np.linspace(mid_value, end_val, (num_missing + 1) // 2)[i - num_missing // 2]
    #         interpolated_values.append(value)
    # else:
    #     # Interpolate linearly down to the midpoint, then back up
    #     for i in range(1, num_missing + 1):
    #         if i <= num_missing // 2:
    #             value = np.linspace(start_val, mid_value, num_missing // 2 + 1)[i]
    #         else:
    #             value = np.linspace(mid_value, end_val, (num_missing + 1) // 2)[i - num_missing // 2]
    #         interpolated_values.append(value)
    # Interpolate linearly to the midpoint, then back down
    first_half = np.linspace(start_val, mid_value, num_missing // 2 + 1)
    second_half = np.linspace(mid_value, end_val, (num_missing + 1) // 2)

    # Interpolate linearly to the midpoint, then back down
    for i in range(1, num_missing + 1):
        if i <= num_missing // 2:
            value = first_half[min(i, len(first_half) - 1)]
        else:
            value = second_half[min(i - num_missing // 2, len(second_half) - 1)]
        interpolated_values.append(value)

    
    return interpolated_values

def triangular_fill(start, end, df):
    
    # Data required for triangular interpolation function 
    num_missing = end - start + 1
    start_power = df.loc[start - 1, 'Power']
    end_power = df.loc[end + 1, 'Power']
    tot_missing_power = (df.loc[end + 1, 'Cumulative Power'] - df.loc[start - 1, 'Cumulative Power']) - end_power
    interpolated_values = triangular_interpolation(start_power, end_power, num_missing, tot_missing_power)
    df.loc[start:end, 'Power_Fit'] = interpolated_values



# Initialize start_index
start_index = None

# Reset the lost data again
df_packet_loss.loc[df_packet_loss['Signal']==False, ['Power_Fit','Pedal_Angle_Fit']] = np.NaN

# Loop through DataFrame
for i in range(len(df_packet_loss)):
    if pd.isna(df_packet_loss.loc[i, 'Power_Fit']):
        if start_index is None:
            start_index = i
    elif start_index is not None:
        end_index = i - 1  # The last index of the contiguous NaN range
        triangular_fill(start_index, end_index, df_packet_loss)
        start_index = None  # Reset for the next contiguous block


# Plot         
plot_comparison(df, df_packet_loss, 'Power', 'Triangulation')

# Summary Stats
summary_power_df['Triangulated'] = get_stats(df,df_packet_loss,'Power','Triangulated')
formatted_df = summary_power_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))
print()

formatted_df = summary_pedal_df.applymap(format_float)
print(formatted_df.to_markdown(index=False))


| Power                             |   Basic |   Avg Cumulative |   Quadratic |   Triangulated |
|:----------------------------------|--------:|-----------------:|------------:|---------------:|
| Average Difference                |  -0.085 |           -0.022 |      -0     |         -0.035 |
| Mean Absolute Error               |   1.559 |            1.559 |       1.198 |          1.237 |
| Pearson Correlation Coefficient   |   0.994 |            0.994 |       0.996 |          0.996 |
| Standard Deviation of Differences |   1.99  |            2     |       1.506 |          1.56  |

| Pedal Angle                       |   Basic |   Avg Cumulative |   Quadratic |
|:----------------------------------|--------:|-----------------:|------------:|
| Average Difference                | -11.153 |           -0.781 |      -7.782 |
| Mean Absolute Error               |  51.856 |           60.958 |      13.162 |
| Pearson Correlation Coefficient   |   0.826 |            0.807 |       0.992 |
| Stan

Sinusoidal Interpolation based on cumulative data

In [28]:
# # Define the sine function for curve fitting
# def sine_function(x, A, f, phi):
#     return A * np.sin(2 * np.pi * f * x + phi)

# def get_missing_values(start, end, df, metric):
#     # no. of recs to sample for sine wave characteristics (phase)
#     sample_size = 100 
#     # Known values before and after the missing data
#     known_x = np.concatenate([np.arange(start-sample_size, start), np.arange(end+1, end+sample_size+1)])
#     known_y = df.loc[known_x, metric].values
#     # Fit the sine function to the known data
#     params, _ = curve_fit(sine_function, known_x, known_y)
#     # Generate the missing values
#     missing_x = np.arange(start, end+1)
#     missing_y = sine_function(missing_x, *params)
    
#     return missing_y

# # Fill missing data using sinusoidal method
# def sinusoidal_fill(start, end, df):
#     num_missing = end - start + 1
    
#     # PEDAL_ANGLE
#     # Known sum of the missing values
#     tot_pedal_angle = (df.loc[end + 1, 'Cumulative Pedal_Angle'] - df.loc[start - 1, 'Cumulative Pedal_Angle']) - (num_missing * offset)  # 180 was offset used
    
#     # Get the initial interpolated values
#     missing_values = get_missing_values(start, end, df, 'Pedal_Angle')
    
#     # Calculate the initial sum
#     initial_sum = np.sum(missing_values)
    
#     # Calculate the difference
#     difference = tot_pedal_angle - initial_sum
    
#     # Calculate weights for each point based on its value
#     weights = np.sin(np.linspace(0, np.pi, num_missing))
    
#     # Normalize the weights so they sum to 1
#     weights = weights / np.sum(weights)
    
#     # Distribute the difference according to the weights
#     missing_values += difference * weights
    
#     # Fill in the missing values
#     df.loc[start:end, 'Pedal_Angle_Fit'] = missing_values

# # Initialize start_index
# start_index = None
 
# Reset the lost data again
# df_packet_loss.loc[df_packet_loss['Signal']==False, ['Power_Fit','Pedal_Angle_Fit']] = np.NaN

# # Loop through DataFrame to find missing signal sections
# for i in range(len(df_packet_loss)):
#     if df_packet_loss.loc[i, 'Signal'] == False:
#         if start_index is None:
#             start_index = i
#     elif start_index is not None:
#         end_index = i - 1  # The last index of the contiguous signal lost range
#         sinusoidal_fill(start_index, end_index, df_packet_loss)
#         start_index = None  # Reset for the next contiguous block

# # Plot         
# plot_comparison(df, df_packet_loss, 'Pedal_Angle', 'Sinusoidal')


In [29]:
print("Original Data Vs Basic and Fitted Interpolation")

Original Data Vs Basic and Fitted Interpolation
