# FPV telemetry overlay for Ardupilot/Betaflight
Convert ardupilot/betaflight telemetry log to SSA/SRT subtitles

In [None]:
# import & prepare log

import os
import datetime
import pandas as pd
# import pysrt
import geopy.distance
import numpy as np
from scipy import stats
import yaml
import matplotlib.pyplot as plt

# load config
def load_config(config_file):
    with open(config_file, "r") as stream:
        try:
            config = yaml.safe_load(stream)
            return config
        except yaml.YAMLError as exc:
            print(exc)

# import log
def import_log_ardupilot(path):
    df = pd.read_csv(path)
    df['time'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
    gps = df['GPS'].str.split(' ', 1, expand=True).rename(columns={0:'lat', 1:'lon'})
    df['lat'] = gps['lat'].astype(float)
    df['lon'] = gps['lon'].astype(float)
    df['alt'] = df['Alt(m)'].round(0).astype(int)
    # ASpd & GSpd - These sensors value actually in m/s, not knots, see
    # https://github.com/Clooney82/MavLink_FrSkySPort/wiki/1.2.-FrSky-Taranis-Telemetry
    df['air_spd'] = (3.6*df['GSpd(kts)']).round(0).astype(int) # m/s to km/h, from AIR speed sensor
    df['bat'] = df['VFAS(V)']
    df['curr'] = df['CURR(A)']
    df['thr'] = (df['Thr'] / 20.48 + 50).round(0).astype(int) # -1024/1024 to percent
    return df[['time', 'lat', 'lon', 'alt', 'RSSI(dB)', 'air_spd', 'bat', 'curr', 'thr', 'ARM']]

def import_log_betaflight(path):
    df = pd.read_csv(path)
    df['time'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
    gps = df['GPS'].str.split(' ', 1, expand=True).rename(columns={0:'lat', 1:'lon'})
    df['lat'] = gps['lat'].astype(float)
    df['lon'] = gps['lon'].astype(float)
    df['alt'] = df['Alt(m)']
    df['spd'] = (1.852*df['GSpd(kts)']).round(1) # knots to km/h
    df['bat'] = df['VFAS(V)']
    df['curr'] = df['Curr(A)']
    df['mah'] = df['Fuel(%)']
    df['sats'] = df['Tmp2(@C)'] % 100
    df['thr'] = (df['Thr'] / 20.48 + 50).round(0).astype(int) # -1024/1024 to percent
    df.drop(['Date', 'Time', 'GSpd(kts)', 'Alt(m)', 'Thr', 'Curr(A)', 'Fuel(%)'], axis=1, inplace=True)
    return df[['time', 'lat', 'lon', 'alt', '0420', 'RSSI(dB)', 'spd', 'sats', 'bat', 'curr', 'mah', 'thr']]

# import DJI FPV Goggles subtiltes
def import_dji_srt(path):
    sub_cols = ['signal', 'ch', 'flightTime',\
                'uavBat', 'glsBat', 'uavBatCells', 'glsBatCells',\
                'delay', 'bitrate', 'rcSignal']
    subs = pysrt.open(path)
    df = pd.DataFrame([[o.start.to_time(), o.end.to_time(), o.text] for o in subs], columns=['start', 'end', 'text'])
    df[sub_cols] = df['text'].str.split(' ',expand=True)
    df.drop('text', 1, inplace=True)
    for col in sub_cols:
        df[col] = df[col].str.split(':').str[1]            
    df.replace('[a-zA-Z]*$', '', regex=True, inplace=True) # strip units (V, ms, Mbps)
    return df

def clear_broken_GPS(df):
    gps_broken = (np.abs(stats.zscore(df['lat'])) > 3) | (np.abs(stats.zscore(df['lon'])) > 3)
    df.loc[gps_broken, ['lat', 'lon']] = np.NaN, np.NaN    

# calc speed from previous and next points GPS coordinates
def calc_gps_speed(df, shift=3):
    def calc_speed(row):
        dist = geopy.distance.distance(row.gps_prev, row.gps_next).km
        time = (row.time_next - row.time_prev) / pd.Timedelta(hours=1)
        return dist / time
    
    t = df[['time', 'lat', 'lon']].copy()
    t['gps'] = t.apply(lambda x: np.NaN if np.isnan(x.lat) else (x.lat, x.lon), axis=1)    
    t['gps_prev'] = t.shift(shift)['gps']
    t['gps_next'] = t.shift(-shift)['gps']
    t['time_prev'] = t.shift(shift)['time']
    t['time_next'] = t.shift(-shift)['time']
    t = t.fillna(method='bfill').fillna(method='ffill')
    return t.apply(calc_speed, axis=1)

# calc distance from home in meters
def calc_0420(df):
    t = df[['lat', 'lon']].copy()
    t['gps'] = t.apply(lambda x: np.NaN if np.isnan(x.lat) else (x.lat, x.lon), axis=1)    
    gps0 = t.iloc[0].gps
    return t.apply(lambda x: np.NaN if np.isnan(x.gps).any() else round(geopy.distance.distance(gps0, x.gps).m), axis=1)

# # calc values from Betaflight log format
# def calc_log_betaflight(df):
#     df['spd_avg'] = calc_gps_speed(df)
#     df['curr'] = df.rolling(window=4, min_periods=1, center=True, win_type='gaussian')['Curr(A)'].mean(std=2.5)
#     df['eff'] = (df['curr']/df['spd_avg'] * 1000)
#     df.loc[df['eff']>999, 'eff'] = 0
#     return df

# def format_log(df):
#     df['spd'] = df['spd'].round(0).astype(int)
#     df['spd_avg'] = df['spd_avg'].round(0).astype(int)
#     df['bat'] = df['bat'].round(1)
#     df['curr'] = df['curr'].round(0).astype(int)
#     df['mah'] = df['mah'].round(0).astype(int)
#     df['eff'] = df['eff'].round(0).astype(int)
#     df['Alt(m)'] = df['Alt(m)'].round(0).astype(int)
#     return df[['time', 'lat', 'lon', 'Alt(m)', '0420', 'RSSI(dB)', 'spd', 'spd_avg', 'sats', 'bat', 'curr', 'mah', 'eff', 'thr']]

In [None]:
# analyse log

# find log sections by gap in Time
def find_log_sections(df, gap = '0:1:0'):
    df = df.copy(deep=True)
    df['SectionStart'] = False
    df.iloc[0, df.columns.get_loc('SectionStart')] = True
    df.loc[df['time'].diff() > gap, 'SectionStart'] = True
    sections = pd.concat([df[(df.SectionStart)|(df.SectionStart.shift(-1))],df.tail(1)])
    
    sections['FlightTime'] = sections.time.shift(-1) - sections.time
    return sections[sections.SectionStart][['time', 'lat', 'lon', 'FlightTime']]

def plot_log(df):
    t = df[['alt', 'spd', 'eff']].copy()
    t.loc[t['eff']==0, 'eff'] = float('NaN')
    t.loc[t['eff']>400, 'eff'] = float('NaN')
    t.loc[t['spd']>100, 'spd'] = float('NaN')
    t['eff'] = t.rolling(window=20, min_periods=1, center=True)['eff'].mean()
#     t['eff'] = t['eff']/2
    t['alt'] = t['alt']/10
    t.plot(figsize=(16,10), grid=True)

In [None]:
# export_subtitles

def format_time_srt(s):
    hours, remainder = divmod(s, 3600)
    minutes, remainder = divmod(remainder, 60)
    seconds, remainder = divmod(remainder, 1)
    return '{:02}:{:02}:{:02},{:03}'.format(int(hours), int(minutes), int(seconds), int(remainder*1000))

def format_srt(x):
#     + 'SG=' + str(x.SG) + '   '\
    return str(x.name + 1) + '\n' + format_time_srt(x.start) + ' --> ' + format_time_srt(x.end) + '\n'\
    + str(x.alt) + ' m   '\
    + str(x.spd) + ' km/h   '\
    + str(x.curr) + ' A\n\n'

ssa_header = '''[Script Info]
PlayResX: 1280
PlayResY: 720
WrapStyle: 1

[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, Alignment, Outline
Style: Default, Segoe UI Symbol,36,&HFFFFFF,5,1
Style: A1, Segoe UI Symbol,36,&HFFFFFF,7,1
Style: A2, Segoe UI Symbol,36,&HFFFFFF,4,1
Style: A3, Segoe UI Symbol,36,&HFFFFFF,1,1

[Events]
Format: Start, End, Style, Text
'''

def format_time_ssa(s):
    hours, remainder = divmod(s, 3600)
    minutes, remainder = divmod(remainder, 60)
    seconds, remainder = divmod(remainder, 1)
    return f'{int(hours):02}:{int(minutes):02}:{int(seconds):02}.{int(remainder*100):02}'

def format_ssa(columns, x):
    s = f'Dialogue: {format_time_ssa(x.start)},{format_time_ssa(x.end)}' + ',A1,{\pos(16,16)}'
    if "sats" in columns: s += f'📡 {x.sats}\\N'
    if "RSSI(dB)" in columns: s += f' {x["RSSI(dB)"]}\\N'
    if "0420" in columns: s += f' {x["0420"]}\\N'
    s+= '\n'
    
    s += f'Dialogue: {format_time_ssa(x.start)},{format_time_ssa(x.end)}' + ',A2,{\pos(16,350)}'
    if "alt" in columns: s += f'{x.alt} m\\N'
    if "spd" in columns: s += f'{x.spd:.0f} ㎞/h\\N'
    if "air_spd" in columns: s += f'air  {x.air_spd:.0f} ㎞/h\\N'
    if "gps_spd" in columns: s += f'gps {x.gps_spd:.0f} ㎞/h\\N'
    if "eff" in columns: s += f'EFF {x.eff:.0f}\\N'
    s+= '\n'
    
#     + f'🔋{x.curr:2d} A\\N'\
    s += f'Dialogue: {format_time_ssa(x.start)},{format_time_ssa(x.end)}' + ',A3,{\pos(16,700)}'
#     if "thr" in columns: s += f'THR {x.thr} %\\N'
    if "curr" in columns: s += f'🔋{x.curr} A\\N'
    if "bat" in columns: s += f'{x.bat} V\\N'
    if "mah" in columns: s += f'{x.mah} mAh\\N'
    s+= '\n'
    return s

def export_subtitles_file(df, out_file, config):
    if config["output-format"] == "ssa":
        ext = '.ssa'
        formatter = format_ssa
    elif config["output-format"] == "ssa":
        ext = '.srt'
        formatter = format_srt
    else:
        raise Exception('unknown config["output-format"]')
    
    with open(out_file + ext, "w", encoding='utf-8') as f:
        if formatter == format_ssa:
            f.write(ssa_header)
        for index, row in df.iterrows():
            f.write(formatter(df.columns, row))

def export_subtitles(df, config):
    max_length_sec = 5
    t = df.iloc[config["skip-log-rows"]:].reset_index(drop=True)
    t['start'] = (t['time'] - t.iloc[0]['time']).dt.total_seconds() # seconds from start
    t['start'] = t['start'] * config["speed-correction"] # recorder speed deviation compensation
    t['start'] = t['start'] + config["shift-sec"]
    t['end'] = t.shift(-1)['start']   
    mask = t['end'] - t['start'] > max_length_sec
    t.loc[mask, 'end'] = t.loc[mask, 'start'] + max_length_sec    
    t.dropna(subset=['end'], inplace=True) # drop last row
        
    for file, length_sec in config["video-files"].items():
        out_file = config["output-dir"] + os.path.splitext(file)[0]
        export_subtitles_file(t[t.start < length_sec], out_file, config)
        t = t[t.start >= length_sec]
        t['start'] = t['start'] - length_sec
        t['end'] = t['end'] - length_sec

In [None]:
# import log
def import_log(config):
    log_type = config["log"]["type"]
    log_file = config["log"]["file"]
    output_dir = config["output-dir"]
    video_files = config["video-files"]
    shift_sec = config["shift-sec"]
    if log_type == "betaflight":
        df = import_log_betaflight(log_file)
        return df
    if log_type == "ardupilot":
        df = import_log_ardupilot(log_file)
        df = df[df['ARM']>0].reset_index(drop=True)
        clear_broken_GPS(df)
        df['0420'] = calc_0420(df)
        df['gps_spd'] = calc_gps_speed(df,7)
        df['curr_avg'] = df.rolling(window=100, min_periods=1, center=True, win_type='gaussian')['curr'].mean(std=2.5)
        df['eff'] = (df['curr_avg']/df['gps_spd'] * 1000)
        df.loc[df['eff']>1000, 'eff'] = 0
        return df


# config_file = "c:/temp/Video/2021-07-25_Titan@/GH010371.yaml"
# config_file = "c:/temp/Video/2020-12-25_Bixler3@/TelemetryOverlay.yaml"
# config_file = "c:/Projects/FPV/_Video/2020-12-25_Bixler3@/TelemetryOverlay-PICT0001.yaml"
config_file = "c:/Projects/FPV/_Video/2020-12-25_Bixler3@/TelemetryOverlay-RC_ALL.yaml"
# log_file = 'c:/Dropbox/Projects/FPV/_Logs/Taranis/Titan-2022-04-15.csv'

config = load_config(config_file)
df = import_log(config)

# pd.set_option('display.max_rows', 5000)
df

In [None]:
# overview

# calc curr_avg
df['curr_avg'] = df.rolling(window=100, min_periods=1, center=True, win_type='gaussian')['curr'].mean(std=2.5)

# chart
fig, ax = plt.subplots()
df[['alt']].plot(ax=ax)
ax = df[['curr_avg']].plot(ax=ax, secondary_y=True, figsize=(27,13), grid=True)
plt.show()

# flights
print('Flights:')
find_log_sections(df)

In [None]:
# generate subtitles
# sync subtitle in video player (z, Z keys in MPV player), than set shift_sec to that value

export_subtitles(df, config)

In [None]:
# video processing

# embed ssa into video
# ffmpeg -i 2021-07-25_Titan-fragment.mp4 -vf "ass=2021-07-25_Titan-fragment.ssa" -b:v 20M -c:a copy subtitled_movie.mp4
# copy part of video
# ffmpeg -ss 00:01:30 -to 00:07:30 -i C:\Projects\FPV\_Video\2021-07-25_Titan@\GH010371.MP4 -c copy output.mp4

In [None]:
# EFF plot

df[['alt', 'air_spd', 'gps_spd', 'eff']].plot(figsize=(27,15), grid=True)