In [None]:
#!/usr/bin/env python
# coding: utf-8
#######################################################################
# How to setup AndroSensor:
# Install AndroSensor
# Ensure when installing, allow app to run in background.
# Open settings and change recording interval to fast.
#
#
#
#######################################################################
import glob
import os
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = 'browser'
import numpy as np
import scipy as sp
import scipy.signal as sg

px.set_mapbox_access_token(open(".mapbox_token").read())

BATTERY_ENERGY_CAPACITY = 752.4 # Kilo Joules

# Parse data from ebike datalogger

In [None]:
# read raw data
my_cols = range(16)
df = pd.read_csv("raw_data/data_28-4-21.csv",
            names=my_cols,
            engine='c')

df['Datetime'] = pd.to_datetime(df[0])
df.sort_values(by='Datetime',inplace = True)

df.rename(columns={0: 'datetime_string',
                       1: 'sensor',
                      }, inplace=True)

df = df[~(df['Datetime'] < '2020-03-12 18:46:00')]

In [None]:
from filter import signal_filter

# Number of PAS magnets
N_PAS_MAGNETS = 12

# pressure at sea level where the readings are being taken.  
qnh=1032.57

def energy_from_power_time(datetime_series, power_series):
    """
    Return power in kilo joules
    """
    time_delta = datetime_series.diff().dt.total_seconds().fillna(0)
    energy = power_series*time_delta
    return energy.sum()/1000000

def pulse_width_pas_to_rpm(pulse_width):   
    return 1000000/pulse_width/N_PAS_MAGNETS

def pulse_width_to_rpm(pulse_width):   
    return 1000000/pulse_width

def get_altitude(pressure,temperature):
    # The temperature should be the outdoor temperature. 
    # Use the manual_temperature variable if temperature adjustments are required.
    altitude = ((pow((qnh / pressure), (1.0 / 5.257)) - 1) * (temperature + 273.15)) / 0.0065
    return altitude

def insert_time(row):
    return row['Datetime'].replace(minute=int(row['minute']),second=int(row['second']),microsecond=int(row['millisecond']*1000))


def process(df):
    mask = df["sensor"] == 'gps'
    df_gps = df[mask]

    df_gps.rename(columns={2: 'hour',
                           3: 'minute',
                           4: 'second',
                           5: 'millisecond',
                           6: 'latitude',
                           7: 'longitude',
                           8: 'altitude',
                           9: 'GPS Speed [km/h]',
                           10: 'sats',
                           11: 'gnssFixOK',
                           12: 'fix_type',
                          }, inplace=True)

    df_gps['Datetime'] = df_gps.apply(lambda r: insert_time(r), axis=1)
    df_gps.sort_values(by='Datetime',inplace = True)

    time_delta = df_gps["Datetime"].diff().dt.total_seconds().fillna(0)
    df_gps['acceleration'] = df_gps["GPS Speed [km/h]"].diff()/time_delta

    df_gps.dropna(axis=1, how='all',inplace=True)
    df_gps.head()

    mask = df["sensor"] == 'pas'
    df_pas = df[mask]


    df_pas.rename(columns={2: 'pulse_delay_us',
                          }, inplace=True)
    df_pas.dropna(axis=1, how='all',inplace=True)
    df_pas = df_pas[df_pas['pulse_delay_us'] > 4000]

    df_pas['rpm'] = df_pas.apply(lambda x: pulse_width_pas_to_rpm(x['pulse_delay_us']), axis=1)

    df_pas.head()

    mask = df["sensor"] == 'motor_speed'
    df_ms = df[mask]

    df_ms.rename(columns={2: 'pulse_delay_us',
                          }, inplace=True)
    df_ms.dropna(axis=1, how='all',inplace=True)

    df_ms = df_ms[df_ms['pulse_delay_us'] > 60000]




    df_ms['rpm'] = df_ms.apply(lambda x: pulse_width_to_rpm(x['pulse_delay_us']), axis=1)

    time_delta = df_ms["Datetime"].diff().dt.total_seconds().fillna(0)
    df_ms['motor_acceleration'] = df_ms["rpm"].diff()/time_delta


    df_ms.head()

    mask = df["sensor"] == 'ina226'
    df_ina = df[mask]

    df_ina.rename(columns={2: 'INA226 ID',
                           3: 'Battery Voltage',
                           4: 'V_shunt',
                           5: 'Current',
                           6: 'Power',
                          }, inplace=True)
    df_ina.dropna(axis=1, how='all',inplace=True)
    df_ina.reset_index()


    df_ina['time_int'] = df_ina['Datetime'].apply(lambda x: x.value)

    Period = 5 # seconds(adjust this)
    N = Period/0.010
    #s = df_ina["Power"].groupby(df_ina["Power"].index // N).mean()
    df_ina["Power_averaged"] = signal_filter(df_ina['Power'], highcut=2, method='butterworth_ba', order=2)



    print("Total Energy Consumption[KiloJoules]",energy_from_power_time(df_ina["Datetime"],df_ina["Power"]))
    df_ina.head()

    mask = df["sensor"] == 'baro'
    df_baro = df[mask]

    df_baro.rename(columns={2: 'temperature[degreeC]',
                           3: 'Pressure[mbar]',
                           4: 'humidity[rh%]',
                          }, inplace=True)





    df_baro['Baro_Altitude'] = df_baro.apply(lambda x: get_altitude(x['Pressure[mbar]'], x['temperature[degreeC]']), axis=1)
    df_baro['filtered_Baro_Altitude'] = signal_filter(df_baro['Baro_Altitude'], highcut=10, method='butterworth_ba', order=2)



    df_baro["time_delta"] = df_baro["Datetime"].diff().dt.total_seconds().fillna(0)
    df_baro['slope'] = df_baro["filtered_Baro_Altitude"].diff()/df_baro["time_delta"]



    df_baro.dropna(axis=1, how='all',inplace=True)

    df_baro.head()
    
    return df_gps, df_ina, df_baro
    

df_gps, df_ina, df_baro = process(df)

In [None]:
dataframes = {"GPS":df_gps,"Battery_Parameters":df_ina,"Environmental":df_baro}

for df in dataframes:
    dataframes[df].to_csv(df+".csv")


## Display GPS positions

In [None]:
# Display GPS positions
fig = px.scatter_mapbox(df_gps,
                        lat="latitude",
                        lon="longitude",
                        color="GPS Speed [km/h]",#"slope",#"LOCATION Altitude ( m)",,#"Speed(km/h)", # "abs_acceleration" or "gps_acceleration" or "power"
                        zoom=14,
                        hover_data=["Datetime", "altitude","sats"],
                        #size="LOCATION Accuracy ( m)"
                       )



fig.update_layout(mapbox_style="open-street-map")
fig.update_layout(margin={"r": 0, "t": 0, "l": 0, "b": 0})
fig.write_html("output/plot_accurate_gps.html")
fig.show()

# Do machine learning

### Try using different variables for ML

In [None]:
import xgboost as xgb
from xgboost import plot_importance
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score, KFold
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt 

### Combine Altitude with power variables

In [None]:
df_ml = None
df_ml = pd.merge_asof(df_ina,df_baro , on = 'Datetime', direction = 'nearest')
df_ml.head()

In [None]:
x, y = df_ml[["slope","filtered_Baro_Altitude"]], df_ml["Power_averaged"]

In [None]:
xtrain, xtest, ytrain, ytest = train_test_split(x, y, test_size=0.25, shuffle=False)

xgbr = xgb.XGBRegressor(verbosity=0)
print(xgbr)

xgbr.fit(xtrain, ytrain)

score = xgbr.score(xtrain, ytrain)   

print("Training score: ", score)

# - cross validataion 
scores = cross_val_score(xgbr, xtrain, ytrain, cv=5)
print("Mean cross-validation score: %.2f" % scores.mean())

kfold = KFold(n_splits=10, shuffle=True)
kf_cv_scores = cross_val_score(xgbr, xtrain, ytrain, cv=kfold )
print("K-fold CV average score: %.2f" % kf_cv_scores.mean())

 

ypred = xgbr.predict(xtest)
mse = mean_squared_error(ytest, ypred)
print("MSE: %.2f" % mse)
print("RMSE: %.2f" % (mse**(1/2.0)))

_ = plot_importance(xgbr, height=0.9)


## Plot predicted and actual data

In [None]:
import numpy as np
import plotly.express as px
import plotly.graph_objects as go

fig = None
ypred = xgbr.predict(xtest)
fig = go.Figure()
fig.add_traces(go.Scatter(x=df_ml["Datetime"].tail(len(ytest)), y=ytest, name='Actual data'))
fig.add_traces(go.Scatter(x=df_ml["Datetime"].tail(len(ytest)), y=ypred, name='Regression Fit'))
fig.show()

In [None]:
actual_energy = energy_from_power_time(df_ml["Datetime"].tail(len(ytest)),df_ml["Power"].tail(len(ytest)))
actual_energy

In [None]:
predicted_energy = energy_from_power_time(df_ml["Datetime"].tail(len(ytest)),ypred)
predicted_energy

In [None]:
Accuracy = 1 - (predicted_energy - actual_energy)/actual_energy
Accuracy

## Display all variables

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = make_subplots(rows=9, cols=1,
                    shared_xaxes=True,
                    vertical_spacing=0.01)

fig.update_layout(hovermode="x unified")
 

# fig = make_subplots(rows=2, cols=1,
#                     specs=[[{'type': 'xy'}],
#                            [{'type': 'mapbox'}]])

fig.add_trace(go.Scatter(
    x=df_ina["Datetime"],
    y=df_ina["Power"],
    name="Power[mW]",
    hoverinfo='y'),
    row=1, col=1)

fig.add_trace(go.Scatter(
    x=df_ina["Datetime"],
    y=df_ina["Power_averaged"],
    name="Power[mW] averaged",
    hoverinfo='y'),
    row=1, col=1)

fig.add_trace(go.Scatter(
    x=df_ina["Datetime"],
    y=df_ina["Battery Voltage"],
    name="Battery Voltage[V]",
    hoverinfo='y'),
    row=2, col=1)
             

fig.add_trace(go.Scatter(
    
    x=df_gps["Datetime"],
    y=df_gps["GPS Speed [km/h]"],
    name="GPS Speed [km/h]",
    hoverinfo='y'),
    row=3, col=1)

fig.add_trace(go.Scatter(
    
    x=df_ms["Datetime"],
    y=df_ms["motor_acceleration"],
    name="Motor Acceleration(km/h^2)",
    hoverinfo='y'),
    row=4, col=1)

fig.add_trace(go.Scatter(
    
    x=df_gps["Datetime"],
    y=df_gps["altitude"],
    name="GPS Altitude[m]",
    hoverinfo='y'),
    row=5, col=1)

fig.add_trace(go.Scatter(
    
    x=df_baro["Datetime"],
    y=df_baro["Baro_Altitude"],
    name="Baro_Altitude[m]",
    hoverinfo='y'),
    row=6, col=1)

fig.add_trace(go.Scatter(
    
    x=df_baro["Datetime"],
    y=df_baro["filtered_Baro_Altitude"],
    name="Baro_Altitude[m]",
    hoverinfo='y'),
    row=6, col=1)


fig.add_trace(go.Scatter(
    
    x=df_baro["Datetime"],
    y=df_baro["slope"],
    name="Slope[m/s]",
    hoverinfo='y'),
    row=7, col=1)

fig.add_trace(go.Scatter(
    x=df_ms["Datetime"],
    y=df_ms["rpm"],
    name="Motor RPM",
    hoverinfo='y'),
    row=8, col=1)

fig.add_trace(go.Scatter(
    x=df_pas["Datetime"],
    y=df_pas["rpm"],
    name="Pedal Assist Sensor RPM",
    hoverinfo='y'),
    row=9, col=1)
              






fig.update_layout(title_text="Battery Power, Voltage, GPS speed and GPS Acceleration, GPS Altitude")

fig.write_html("output/gps_data_ina_baro.html")

fig.show()

## Display the interesting variables

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = make_subplots(rows=5, cols=1,
                    shared_xaxes=True,
                    vertical_spacing=0.01)

fig.update_layout(hovermode="x unified")
 

# fig = make_subplots(rows=2, cols=1,
#                     specs=[[{'type': 'xy'}],
#                            [{'type': 'mapbox'}]])

fig.add_trace(go.Scatter(
    x=df_ina["Datetime"],
    y=df_ina["Power"],
    name="Power[mW]",
    hoverinfo='y'),
    row=1, col=1)

fig.add_trace(go.Scatter(
    x=df_ina["Datetime"],
    y=df_ina["Power_averaged"],
    name="Power[mW] Averaged",
    hoverinfo='y'),
    row=1, col=1)

fig.add_trace(go.Scatter(
    x=df_ml["Datetime"],
    y=xgbr.predict(df_ml[["slope","filtered_Baro_Altitude"]]),
    name="Power[mW] Prediction",
    hoverinfo='y'),
    row=1, col=1)



fig.add_trace(go.Scatter(
    x=df_ina["Datetime"],
    y=df_ina["Battery Voltage"],
    name="Battery Voltage[V]",
    hoverinfo='y'),
    row=2, col=1)
             

    
fig.add_trace(go.Scatter(
    
    x=df_baro["Datetime"],
    y=df_baro["Baro_Altitude"],
    name="Baro_Altitude[m]",
    hoverinfo='y'),
    row=3, col=1)

fig.add_trace(go.Scatter(
    
    x=df_baro["Datetime"],
    y=df_baro["filtered_Baro_Altitude"],
    name="Baro_Altitude[m] low pass filtered",
    hoverinfo='y'),
    row=3, col=1)

fig.add_trace(go.Scatter(
    x=df_ms["Datetime"],
    y=df_ms["rpm"],
    name="Motor RPM",
    hoverinfo='y'),
    row=4, col=1)

fig.add_trace(go.Scatter(
    x=df_pas["Datetime"],
    y=df_pas["rpm"],
    name="Pedal Assist Sensor RPM",
    hoverinfo='y'),
    row=5, col=1)
              

fig.update_layout(title_text="Battery Power, Voltage, Barometric Altitude, Motor RPM, Pedal Assist Sensor RPM")

fig.write_html("output/gps_data_ina_baro.html")

fig.show()