# DJI Flight Data Next-Step Prediction

This notebook builds an AI model to predict the next-step values of drone telemetry (OSD.pitch, OSD.roll, OSD.yaw, OSD.xSpeed, OSD.ySpeed, OSD.zSpeed) using current and control inputs from DJI flight logs.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras

# Custom train_test_split, StandardScaler, mean_squared_error, r2_score will be defined below


## Load and Explore Dataset
Load the DJI flight log and inspect its structure.

In [None]:
def train_test_split(X, y, test_size=0.2, shuffle=False):
    n_samples = X.shape[0]
    n_test = int(n_samples * test_size)
    if shuffle:
        idx = np.random.permutation(n_samples)
        X, y = X[idx], y[idx]
    X_train, X_test = X[:-n_test], X[-n_test:]
    y_train, y_test = y[:-n_test], y[-n_test:]
    return X_train, X_test, y_train, y_test

class StandardScaler:
    def fit(self, X):
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        self.std_[self.std_ == 0] = 1.0
        return self
    def transform(self, X):
        return (X - self.mean_) / self.std_
    def fit_transform(self, X):
        return self.fit(X).transform(X)
    def inverse_transform(self, X):
        return X * self.std_ + self.mean_

def mean_squared_error(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

def r2_score(y_true, y_pred):
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true, axis=0)) ** 2)
    return 1 - ss_res / ss_tot


In [None]:
df = pd.read_csv('DJIFlightRecord_2025-02-17_5D.csv')
df.head(), df.columns.tolist()

## Preprocess Data
Handle missing values and normalize features.

In [None]:
# Select relevant columns
cols = [
    'OSD.pitch', 'OSD.roll', 'OSD.yaw',
    'OSD.xSpeed [MPH]', 'OSD.ySpeed [MPH]', 'OSD.zSpeed [MPH]',
    'RC.aileron', 'RC.elevator', 'RC.throttle', 'RC.rudder'
]
df_selected = df[cols].dropna()
df_selected.info()
df_selected.describe()

## Feature Engineering
Create input and target features for next-step prediction.

In [None]:
# Features: all columns at time t
# Targets: OSD.pitch, OSD.roll, OSD.yaw, OSD.xSpeed, OSD.ySpeed, OSD.zSpeed at time t+1
feature_cols = [
    'OSD.pitch', 'OSD.roll', 'OSD.yaw',
    'OSD.xSpeed [MPH]', 'OSD.ySpeed [MPH]', 'OSD.zSpeed [MPH]',
    'RC.aileron', 'RC.elevator', 'RC.throttle', 'RC.rudder'
]
target_cols = [
    'OSD.pitch', 'OSD.roll', 'OSD.yaw',
    'OSD.xSpeed [MPH]', 'OSD.ySpeed [MPH]', 'OSD.zSpeed [MPH]',
]

X = df_selected[feature_cols].iloc[:-1].values
y = df_selected[target_cols].iloc[1:].values
print('X shape:', X.shape)
print('y shape:', y.shape)

## Split Data into Training and Testing Sets
Split the data for training and evaluation.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# Normalize features and targets
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_train_scaled = scaler_X.fit_transform(X_train)
X_test_scaled = scaler_X.transform(X_test)
y_train_scaled = scaler_y.fit_transform(y_train)
y_test_scaled = scaler_y.transform(y_test)

print('X_train_scaled shape:', X_train_scaled.shape)
print('y_train_scaled shape:', y_train_scaled.shape)

## Build Prediction Model
Define a neural network to predict next-step telemetry.

In [None]:
model = keras.Sequential([
    keras.layers.Input(shape=(X_train_scaled.shape[1],)),
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(y_train_scaled.shape[1])
])

model.compile(optimizer='adam', loss='mse', metrics=['mae'])
model.summary()

## Train the Model
Train the neural network on the training data.

In [None]:
history = model.fit(
    X_train_scaled, y_train_scaled,
    validation_data=(X_test_scaled, y_test_scaled),
    epochs=1000, batch_size=64, verbose=1
)

plt.plot(history.history['loss'], label='train loss')
plt.plot(history.history['val_loss'], label='val loss')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.title('Training History')
plt.show()

## Evaluate the Model
Evaluate model performance on the test set.

In [None]:
y_pred_scaled = model.predict(X_test_scaled)
y_pred = scaler_y.inverse_transform(y_pred_scaled)
y_test_orig = scaler_y.inverse_transform(y_test_scaled)

mse = mean_squared_error(y_test_orig, y_pred)
r2 = r2_score(y_test_orig, y_pred)
print(f"Test MSE: {mse:.4f}")
print(f"Test R^2: {r2:.4f}")

## Predict Next Step
Use the trained model to predict the next step for a sample input.

In [None]:
# Take a random row from the test set as an example
idx = np.random.randint(X_test_scaled.shape[0])
sample_input = X_test_scaled[idx].reshape(1, -1)
predicted_next_scaled = model.predict(sample_input)
predicted_next = scaler_y.inverse_transform(predicted_next_scaled)

print(f'Predicted next step (OSD.pitch, OSD.roll, OSD.yaw, OSD.xSpeed, OSD.ySpeed, OSD.zSpeed) for test index {idx}:')
print(predicted_next[0])

print('Actual next step:')
print(scaler_y.inverse_transform(y_test_scaled[idx].reshape(1, -1))[0])