In [None]:
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# Load the calibrated drone data
file_path = 'data/raw/Drone_CoD.csv'
drone_data = pd.read_csv(file_path)

# Ensure the columns are named correctly: 'Frame', 'Time', 'X', 'Y', 'Z'
drone_data.columns = ['Frame', 'Time', 'X', 'Y', 'Z']

# Convert data to numpy array for easier manipulation
frames = drone_data['Frame'].values
times = drone_data['Time'].values
x = drone_data['X'].values / 100  # Convert from mm to meters
y = drone_data['Y'].values / 100  # Convert from mm to meters
z = drone_data['Z'].values / 100  # Convert from mm to meters

# Function to calculate velocity
def calculate_velocity(x, times):
    return np.diff(x) / np.diff(times)

# Function to calculate acceleration
def calculate_acceleration(vx, times):
    return np.diff(vx) / np.diff(times[1:])

# Calculate velocities
vx = calculate_velocity(x, times)
vy = calculate_velocity(y, times)
vz = calculate_velocity(z, times)

# Calculate accelerations
ax = calculate_acceleration(vx, times)
ay = calculate_acceleration(vy, times)
az = calculate_acceleration(vz, times)

# Ensure all arrays are the same size by trimming
min_length = min(len(vx), len(vy), len(vz), len(ax), len(ay), len(az))
vx, vy, vz = vx[:min_length], vy[:min_length], vz[:min_length]
ax, ay, az = ax[:min_length], ay[:min_length], az[:min_length]

# Combine velocities and accelerations into a single array for anomaly detection
motion_data = np.vstack((vx, vy, vz, ax, ay, az)).T

# Normalize the data for Isolation Forest
scaler = StandardScaler()
motion_data_scaled = scaler.fit_transform(motion_data)

# Apply Isolation Forest for anomaly detection
isolation_forest = IsolationForest(n_estimators=100, contamination=0.05, random_state=42)
anomaly_scores = isolation_forest.fit_predict(motion_data_scaled)

# Find indices of anomalies
anomaly_indices = np.where(anomaly_scores == -1)[0]

# Explanation of anomalies
def explain_anomalies(anomaly_indices, vx, vy, vz, ax, ay, az):
    anomalies = []
    for idx in anomaly_indices:
        anomaly_desc = f"Anomaly at index {idx}: "
        if abs(vx[idx]) > np.mean(vx) + 2 * np.std(vx):
            anomaly_desc += f"Vx ({vx[idx]:.2f}) is abnormally high; "
        if abs(vy[idx]) > np.mean(vy) + 2 * np.std(vy):
            anomaly_desc += f"Vy ({vy[idx]:.2f}) is abnormally high; "
        if abs(vz[idx]) > np.mean(vz) + 2 * np.std(vz):
            anomaly_desc += f"Vz ({vz[idx]:.2f}) is abnormally high; "
        if abs(ax[idx]) > np.mean(ax) + 2 * np.std(ax):
            anomaly_desc += f"Ax ({ax[idx]:.2f}) is abnormally high; "
        if abs(ay[idx]) > np.mean(ay) + 2 * np.std(ay):
            anomaly_desc += f"Ay ({ay[idx]:.2f}) is abnormally high; "
        if abs(az[idx]) > np.mean(az) + 2 * np.std(az):
            anomaly_desc += f"Az ({az[idx]:.2f}) is abnormally high; "
        anomalies.append(anomaly_desc.strip())
    return anomalies

# Generate anomaly explanations
anomaly_explanations = explain_anomalies(anomaly_indices, vx, vy, vz, ax, ay, az)

# Visualize the results
plt.figure(figsize=(12, 8))

# Plot position data with anomalies highlighted
plt.subplot(3, 1, 1)
plt.plot(x[:-2], label='X Position')
plt.scatter(anomaly_indices, x[anomaly_indices], color='red', label='Anomalies')
plt.legend()
plt.title('Position with Anomalies (X)')

plt.subplot(3, 1, 2)
plt.plot(y[:-2], label='Y Position')
plt.scatter(anomaly_indices, y[anomaly_indices], color='red', label='Anomalies')
plt.legend()
plt.title('Position with Anomalies (Y)')

plt.subplot(3, 1, 3)
plt.plot(z[:-2], label='Z Position')
plt.scatter(anomaly_indices, z[anomaly_indices], color='red', label='Anomalies')
plt.legend()
plt.title('Position with Anomalies (Z)')

plt.tight_layout()
plt.show()

# Print detected anomalies and explanations
print("Anomalies detected at indices:", anomaly_indices)
for explanation in anomaly_explanations:
    print(explanation)

# Print average velocity and acceleration
print(f"\nAverage Velocity: Vx = {np.mean(vx):.2f} m/s, Vy = {np.mean(vy):.2f} m/s, Vz = {np.mean(vz):.2f} m/s")
print(f"Average Acceleration: Ax = {np.mean(ax):.2f} m/s^2, Ay = {np.mean(ay):.2f} m/s^2, Az = {np.mean(az):.2f} m/s^2")


In [None]:
import pandas as pd
import numpy as np
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# Load the calibrated drone data
file_path = 'data/raw/Drone_CoD.csv'  # Your file path
drone_data = pd.read_csv(file_path)

# Ensure the columns are named correctly: 'Frame', 'Time', 'X', 'Y', 'Z'
drone_data.columns = ['Frame', 'Time', 'X', 'Y', 'Z']

# Convert data to numpy array for easier manipulation
frames = drone_data['Frame'].values
times = drone_data['Time'].values
x = drone_data['X'].values / 100  # Convert from mm to meters
y = drone_data['Y'].values / 100  # Convert from mm to meters
z = drone_data['Z'].values / 100  # Convert from mm to meters

# Function to calculate velocity
def calculate_velocity(x, times):
    return np.diff(x) / np.diff(times)

# Function to calculate acceleration
def calculate_acceleration(vx, times):
    return np.diff(vx) / np.diff(times[1:])

# Calculate velocities
vx = calculate_velocity(x, times)
vy = calculate_velocity(y, times)
vz = calculate_velocity(z, times)

# Calculate accelerations
ax = calculate_acceleration(vx, times)
ay = calculate_acceleration(vy, times)
az = calculate_acceleration(vz, times)

# Ensure all arrays are the same size by trimming
min_length = min(len(vx), len(vy), len(vz), len(ax), len(ay), len(az))
vx, vy, vz = vx[:min_length], vy[:min_length], vz[:min_length]
ax, ay, az = ax[:min_length], ay[:min_length], az[:min_length]

# Combine velocities and accelerations into a single array for anomaly detection
motion_data = np.vstack((vx, vy, vz, ax, ay, az)).T

# Normalize the data for One-Class SVM
scaler = StandardScaler()
motion_data_scaled = scaler.fit_transform(motion_data)

# Apply One-Class SVM for anomaly detection
one_class_svm = OneClassSVM(kernel='rbf', gamma='auto', nu=0.05)  # nu controls the proportion of outliers
anomaly_scores = one_class_svm.fit_predict(motion_data_scaled)

# Find indices of anomalies
anomaly_indices = np.where(anomaly_scores == -1)[0]

# Explanation of anomalies
def explain_anomalies(anomaly_indices, vx, vy, vz, ax, ay, az):
    anomalies = []
    for idx in anomaly_indices:
        anomaly_desc = f"Anomaly at index {idx}: "
        if abs(vx[idx]) > np.mean(vx) + 2 * np.std(vx):
            anomaly_desc += f"Vx ({vx[idx]:.2f}) is abnormally high; "
        if abs(vy[idx]) > np.mean(vy) + 2 * np.std(vy):
            anomaly_desc += f"Vy ({vy[idx]:.2f}) is abnormally high; "
        if abs(vz[idx]) > np.mean(vz) + 2 * np.std(vz):
            anomaly_desc += f"Vz ({vz[idx]:.2f}) is abnormally high; "
        if abs(ax[idx]) > np.mean(ax) + 2 * np.std(ax):
            anomaly_desc += f"Ax ({ax[idx]:.2f}) is abnormally high; "
        if abs(ay[idx]) > np.mean(ay) + 2 * np.std(ay):
            anomaly_desc += f"Ay ({ay[idx]:.2f}) is abnormally high; "
        if abs(az[idx]) > np.mean(az) + 2 * np.std(az):
            anomaly_desc += f"Az ({az[idx]:.2f}) is abnormally high; "
        anomalies.append(anomaly_desc.strip())
    return anomalies

# Generate anomaly explanations
anomaly_explanations = explain_anomalies(anomaly_indices, vx, vy, vz, ax, ay, az)

# Visualize the results
plt.figure(figsize=(12, 8))

# Plot position data with anomalies highlighted
plt.subplot(3, 1, 1)
plt.plot(x[:-2], label='X Position')
plt.scatter(anomaly_indices, x[anomaly_indices], color='red', label='Anomalies')
plt.legend()
plt.title('Position with Anomalies (X)')

plt.subplot(3, 1, 2)
plt.plot(y[:-2], label='Y Position')
plt.scatter(anomaly_indices, y[anomaly_indices], color='red', label='Anomalies')
plt.legend()
plt.title('Position with Anomalies (Y)')

plt.subplot(3, 1, 3)
plt.plot(z[:-2], label='Z Position')
plt.scatter(anomaly_indices, z[anomaly_indices], color='red', label='Anomalies')
plt.legend()
plt.title('Position with Anomalies (Z)')

plt.tight_layout()
plt.show()

# Print detected anomalies and explanations
print("Anomalies detected at indices:", anomaly_indices)
for explanation in anomaly_explanations:
    print(explanation)

# Print average velocity and acceleration
print(f"\nAverage Velocity: Vx = {np.mean(vx):.2f} m/s, Vy = {np.mean(vy):.2f} m/s, Vz = {np.mean(vz):.2f} m/s")
print(f"Average Acceleration: Ax = {np.mean(ax):.2f} m/s^2, Ay = {np.mean(ay):.2f} m/s^2, Az = {np.mean(az):.2f} m/s^2")


In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
import matplotlib.pyplot as plt
from keras._tf_keras.keras.models import Model
from keras._tf_keras.keras.layers import Input, Dense, Dropout, BatchNormalization
from keras._tf_keras.keras.optimizers import Adam
from keras._tf_keras.keras.callbacks import EarlyStopping

# Load the calibrated drone data
file_path = 'data/raw/Drone_CoD.csv'  # Your file path
drone_data = pd.read_csv(file_path)

# Ensure the columns are named correctly: 'Frame', 'Time', 'X', 'Y', 'Z'
drone_data.columns = ['Frame', 'Time', 'X', 'Y', 'Z']

# Convert data to numpy array for easier manipulation
frames = drone_data['Frame'].values
times = drone_data['Time'].values
x = drone_data['X'].values / 100  # Convert from mm to meters
y = drone_data['Y'].values / 100  # Convert from mm to meters
z = drone_data['Z'].values / 100  # Convert from mm to meters

# Function to calculate velocity
def calculate_velocity(coord, times):
    return np.diff(coord) / np.diff(times)

# Function to calculate acceleration
def calculate_acceleration(velocity, times):
    return np.diff(velocity) / np.diff(times[1:])

# Calculate velocities
vx = calculate_velocity(x, times)
vy = calculate_velocity(y, times)
vz = calculate_velocity(z, times)

# Calculate accelerations
ax = calculate_acceleration(vx, times)
ay = calculate_acceleration(vy, times)
az = calculate_acceleration(vz, times)

# Ensure all arrays are the same size by trimming
min_length = min(len(vx), len(vy), len(vz), len(ax), len(ay), len(az))
vx, vy, vz = vx[:min_length], vy[:min_length], vz[:min_length]
ax, ay, az = ax[:min_length], ay[:min_length], az[:min_length]

# Combine velocities and accelerations into a single array for anomaly detection
motion_data = np.vstack((vx, vy, vz, ax, ay, az)).T

# Create a placeholder for the ground truth array
num_samples = motion_data.shape[0]
y_true = np.random.choice([0, 1], size=(num_samples,), p=[0.9, 0.1])  # Example ground truth

# Step 1: Scale the data
scaler = StandardScaler()
motion_data_scaled = scaler.fit_transform(motion_data)

# Step 2: Split the data
X_train, X_test, y_train, y_test = train_test_split(motion_data_scaled, y_true, test_size=0.2, random_state=42)

# Step 3: Fit Isolation Forest Model
iso_forest = IsolationForest(contamination=0.1, random_state=42)
iso_forest.fit(X_train)

# Predict anomalies
y_pred_iso = iso_forest.predict(X_test)
y_pred_iso = np.where(y_pred_iso == -1, 1, 0)  # Map -1 to 1 (anomaly), 1 to 0 (normal)

# Step 4: Fit One-Class SVM Model
svm = OneClassSVM(kernel='rbf', gamma='scale', nu=0.1)
svm.fit(X_train)

# Predict anomalies
y_pred_svm = svm.predict(X_test)
y_pred_svm = np.where(y_pred_svm == -1, 1, 0)  # Map -1 to 1 (anomaly), 1 to 0 (normal)

# Step 5: Fit Autoencoder Model
input_dim = motion_data_scaled.shape[1]  # Number of features
encoding_dim = 3  # Adjust as needed

# Define the Autoencoder architecture
input_layer = Input(shape=(input_dim,))
encoded = Dense(encoding_dim, activation='relu')(input_layer)
decoded = Dense(input_dim, activation='sigmoid')(encoded)

# Create the Autoencoder model
autoencoder = Model(inputs=input_layer, outputs=decoded)
autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')

# Train the Autoencoder
autoencoder.fit(X_train, X_train, epochs=100, batch_size=32, shuffle=True, validation_split=0.2)

# Step 6: Make predictions with the Autoencoder
predictions = autoencoder.predict(X_test)

# Compute reconstruction error
reconstruction_error = np.mean(np.power(X_test - predictions, 2), axis=1)

# Set a threshold for anomalies (e.g., 95th percentile)
threshold = np.percentile(reconstruction_error, 95)
y_pred_autoencoder = [1 if e > threshold else 0 for e in reconstruction_error]

# Step 7: Evaluate the models
print("Isolation Forest Metrics:")
print("Precision:", precision_score(y_test, y_pred_iso))
print("Recall:", recall_score(y_test, y_pred_iso))
print("F1 Score:", f1_score(y_test, y_pred_iso))
print("ROC AUC Score:", roc_auc_score(y_test, y_pred_iso))

print("\nOne-Class SVM Metrics:")
print("Precision:", precision_score(y_test, y_pred_svm))
print("Recall:", recall_score(y_test, y_pred_svm))
print("F1 Score:", f1_score(y_test, y_pred_svm))
print("ROC AUC Score:", roc_auc_score(y_test, y_pred_svm))

print("\nAutoencoder Metrics:")
print("Precision:", precision_score(y_test, y_pred_autoencoder))
print("Recall:", recall_score(y_test, y_pred_autoencoder))
print("F1 Score:", f1_score(y_test, y_pred_autoencoder))
print("ROC AUC Score:", roc_auc_score(y_test, y_pred_autoencoder))


In [None]:
import cv2
import numpy as np
import time
import csv
import os

# Camera calibration parameters for each camera
fx1, fy1 = 700, 700  # Focal lengths for the front camera
cx1, cy1 = 320, 240  # Principal points for the front camera
fx2, fy2 = 700, 700  # Focal lengths for the side camera
cx2, cy2 = 320, 240  # Principal points for the side camera

baseline = 1000  # Distance between the two cameras in mm (baseline for triangulation)

# Conversion factors for pixel to mm conversion (based on calibration)
conversion_factor_x_front = 1 / fx1
conversion_factor_y_front = 1 / fy1
conversion_factor_x_side = 1 / fx2
conversion_factor_y_side = 1 / fy2

# Open video files or cameras
cap_front = cv2.VideoCapture('data/video/Drone2.mp4')
cap_side = cv2.VideoCapture('data/video/Drone1.mp4')

# Initialize variables
previous_position = None
previous_time = None
trajectory_data = []

def calculate_3d_position(x1, y1, x2):
    """Calculate 3D position using triangulation based on the front and side view coordinates."""
    disparity = abs(x1 - x2)
    z = fx1 * baseline / disparity if disparity != 0 else 0
    x_mm = (x1 - cx1) * conversion_factor_x_front * z
    y_mm = (y1 - cy1) * conversion_factor_y_front * z
    return np.array([x_mm, y_mm, z])

def calculate_velocity_acceleration(new_position, previous_position, dt):
    """Calculate velocity and acceleration vectors."""
    velocity = (new_position - previous_position) / dt
    if len(trajectory_data) > 1:
        prev_velocity = trajectory_data[-1]['velocity']
        acceleration = (velocity - prev_velocity) / dt
    else:
        acceleration = np.array([0, 0, 0])
    return velocity, acceleration

# Specify the path where the CSV file will be created
csv_file_path = 'C:/Users/SAKET JHA/Desktop/Drone_Motion_Data/trajectory_data.csv'

# Create the directory if it does not exist
os.makedirs(os.path.dirname(csv_file_path), exist_ok=True)

# Open a CSV file to overwrite the previous data
with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["Timestamp", "X (mm)", "Y (mm)", "Z (mm)", "Velocity (mm/s)", "Acceleration (mm/s²)"])

    while cap_front.isOpened() and cap_side.isOpened():
        ret1, frame1 = cap_front.read()
        ret2, frame2 = cap_side.read()

        if not ret1 or not ret2:
            break

        # Resize frames for easier viewing if needed
        frame1 = cv2.resize(frame1, (640, 480))
        frame2 = cv2.resize(frame2, (640, 480))

        gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
        gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

        # Simple thresholding for drone detection (replace with actual method if possible)
        ret1, thresh1 = cv2.threshold(gray1, 127, 255, cv2.THRESH_BINARY)
        ret2, thresh2 = cv2.threshold(gray2, 127, 255, cv2.THRESH_BINARY)

        contours1, _ = cv2.findContours(thresh1, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        contours2, _ = cv2.findContours(thresh2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

        if contours1 and contours2:
            largest_contour1 = max(contours1, key=cv2.contourArea)
            largest_contour2 = max(contours2, key=cv2.contourArea)

            x1, y1, w1, h1 = cv2.boundingRect(largest_contour1)
            x2, y2, w2, h2 = cv2.boundingRect(largest_contour2)

            position = calculate_3d_position(x1 + w1 // 2, y1 + h1 // 2, x2 + w2 // 2)

            current_time = time.time()
            if previous_time:
                dt = current_time - previous_time
            else:
                dt = 0
            previous_time = current_time

            if previous_position is not None:
                velocity, acceleration = calculate_velocity_acceleration(position, previous_position, dt)
            else:
                velocity, acceleration = np.array([0, 0, 0]), np.array([0, 0, 0])

            # Write the data point to the CSV file, overwriting previous data
            writer.writerow([current_time, position[0], position[1], position[2], np.linalg.norm(velocity), np.linalg.norm(acceleration)])

            previous_position = position

            # Draw bounding boxes
            cv2.rectangle(frame1, (x1, y1), (x1 + w1, y1 + h1), (0, 255, 0), 2)
            cv2.rectangle(frame2, (x2, y2), (x2 + w2, y2 + h2), (0, 255, 0), 2)
            cv2.putText(frame1, f"3D Position (mm): {position}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        cv2.imshow('Front View Tracking', frame1)
        cv2.imshow('Side View Tracking', frame2)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap_front.release()
cap_side.release()
cv2.destroyAllWindows()

print(f"Data extraction complete. Data saved to '{csv_file_path}'.")


In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np

# Load the video file
video_path = 'data/video/Drone2.mp4'
cap = cv2.VideoCapture(video_path)

# Check if video opened successfully
if not cap.isOpened():
    print("Error opening video file")
    exit()

# Initialize the tracker
# For OpenCV 4.5 and newer, use the cv2.legacy module if needed
try:
    tracker = cv2.TrackerMIL_create()  # Ensure this method is available in your version
except AttributeError:
    print("TrackerMIL_create() method is not available. Try using an alternative tracker.")
    cap.release()
    cv2.destroyAllWindows()
    exit()

# Read the first frame
ret, frame = cap.read()
if not ret:
    print("Failed to grab frame")
    cap.release()
    cv2.destroyAllWindows()
    exit()

# Define the initial bounding box
# If cv2.selectROI() is not working, you can manually define bbox
bbox = (100, 100, 200, 200)  # Example: (x, y, width, height)
# Uncomment the following line if you want to use GUI ROI selection
# bbox = cv2.selectROI(frame, False)

# Initialize the tracker with the first frame and bounding box
tracker.init(frame, bbox)

# Variables to store data
frame_numbers = []
x_positions = []
y_positions = []

# Process video frames
frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Update the tracker
    success, bbox = tracker.update(frame)
    
    if success:
        # Extract the center of the bounding box as the object's position
        x_center = int(bbox[0] + bbox[2] / 2)
        y_center = int(bbox[1] + bbox[3] / 2)
        frame_numbers.append(frame_count)
        x_positions.append(x_center)
        y_positions.append(y_center)
    
    frame_count += 1

cap.release()

# Plotting the extracted data
plt.figure(figsize=(12, 6))

# Plot X and Y positions
plt.subplot(2, 1, 1)
plt.plot(frame_numbers, x_positions, label='X Position', color='blue')
plt.xlabel('Frame Number')
plt.ylabel('X Position (pixels)')
plt.legend()

plt.subplot(2, 1, 2)
plt.plot(frame_numbers, y_positions, label='Y Position', color='red')
plt.xlabel('Frame Number')
plt.ylabel('Y Position (pixels)')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
import cv2
import numpy as np
import time
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt

# Camera calibration parameters for each camera
# Replace these with actual values obtained from camera calibration
fx1, fy1 = 700, 700  # Focal lengths for the front camera
cx1, cy1 = 320, 240  # Principal points for the front camera
fx2, fy2 = 700, 700  # Focal lengths for the side camera
cx2, cy2 = 320, 240  # Principal points for the side camera

baseline = 1000  # Distance between the two cameras in mm (baseline for triangulation)

# Conversion factors for pixel to mm conversion (based on calibration)
conversion_factor_x_front = 1 / fx1
conversion_factor_y_front = 1 / fy1
conversion_factor_x_side = 1 / fx2
conversion_factor_y_side = 1 / fy2

# Open video files or cameras
cap_front = cv2.VideoCapture('data/video/Drone2.mp4')
cap_side = cv2.VideoCapture('data/video/Drone1.mp4')

# Initialize variables
previous_position = None
previous_time = None
trajectory_data = []

def calculate_3d_position(x1, y1, x2):
    """Calculate 3D position using triangulation based on the front and side view coordinates."""
    disparity = abs(x1 - x2)
    z = fx1 * baseline / disparity if disparity != 0 else 0
    x_mm = (x1 - cx1) * conversion_factor_x_front * z
    y_mm = (y1 - cy1) * conversion_factor_y_front * z
    return np.array([x_mm, y_mm, z])

def calculate_velocity_acceleration(new_position, previous_position, dt):
    """Calculate velocity and acceleration vectors."""
    velocity = (new_position - previous_position) / dt
    if len(trajectory_data) > 1:
        prev_velocity = trajectory_data[-1]['velocity']
        acceleration = (velocity - prev_velocity) / dt
    else:
        acceleration = np.array([0, 0, 0])
    return velocity, acceleration

while cap_front.isOpened() and cap_side.isOpened():
    ret1, frame1 = cap_front.read()
    ret2, frame2 = cap_side.read()

    if not ret1 or not ret2:
        break

    # Resize frames for easier viewing if needed
    frame1 = cv2.resize(frame1, (640, 480))
    frame2 = cv2.resize(frame2, (640, 480))

    gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    # Simple thresholding for drone detection (replace with actual method if possible)
    ret1, thresh1 = cv2.threshold(gray1, 127, 255, cv2.THRESH_BINARY)
    ret2, thresh2 = cv2.threshold(gray2, 127, 255, cv2.THRESH_BINARY)

    contours1, _ = cv2.findContours(thresh1, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    contours2, _ = cv2.findContours(thresh2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    if contours1 and contours2:
        largest_contour1 = max(contours1, key=cv2.contourArea)
        largest_contour2 = max(contours2, key=cv2.contourArea)

        x1, y1, w1, h1 = cv2.boundingRect(largest_contour1)
        x2, y2, w2, h2 = cv2.boundingRect(largest_contour2)

        position = calculate_3d_position(x1 + w1 // 2, y1 + h1 // 2, x2 + w2 // 2)

        current_time = time.time()
        if previous_time:
            dt = current_time - previous_time
        else:
            dt = 0
        previous_time = current_time

        if previous_position is not None:
            velocity, acceleration = calculate_velocity_acceleration(position, previous_position, dt)
        else:
            velocity, acceleration = np.array([0, 0, 0]), np.array([0, 0, 0])

        trajectory_data.append({
            'position': position,
            'velocity': velocity,
            'acceleration': acceleration,
            'timestamp': current_time
        })

        previous_position = position

        # Draw bounding boxes
        cv2.rectangle(frame1, (x1, y1), (x1 + w1, y1 + h1), (0, 255, 0), 2)
        cv2.rectangle(frame2, (x2, y2), (x2 + w2, y2 + h2), (0, 255, 0), 2)
        cv2.putText(frame1, f"3D Position (mm): {position}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    cv2.imshow('Front View Tracking', frame1)
    cv2.imshow('Side View Tracking', frame2)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap_front.release()
cap_side.release()
cv2.destroyAllWindows()

# Analyze trajectory data for anomalies or deviations
positions = np.array([data['position'] for data in trajectory_data])

# Smooth trajectory data
smoothed_positions = savgol_filter(positions, window_length=5, polyorder=2, axis=0)

# Calculate deviations (anomalies)
deviation = np.linalg.norm(positions - smoothed_positions, axis=1)
threshold = 50  # Static threshold in mm for anomaly detection
anomalies = deviation > threshold
anomaly_indices = np.where(anomalies)[0]

print("Trajectory Analysis Complete.")
print("Detected Anomalies at Frames:", anomaly_indices)

# Explain anomalies based on velocity, acceleration, and position deviation
for idx in anomaly_indices:
    position_deviation = deviation[idx]
    velocity = trajectory_data[idx]['velocity']
    acceleration = trajectory_data[idx]['acceleration']
    
    explanation = f"Anomaly detected at frame {idx}:\n"
    explanation += f"  - Deviation from smoothed path: {position_deviation:.2f} mm\n"
    explanation += f"  - Velocity: {np.linalg.norm(velocity):.2f} mm/s\n"
    explanation += f"  - Acceleration: {np.linalg.norm(acceleration):.2f} mm/s²\n"
    
    if np.linalg.norm(velocity) > 50:
        explanation += "  - High velocity, possible rapid movement or drone jerk.\n"
    if np.linalg.norm(acceleration) > 50:
        explanation += "  - High acceleration, possibly due to a sudden change in direction or speed.\n"
    
    print(explanation)

# Plotting the results using matplotlib
plt.figure(figsize=(10, 6))
plt.plot(range(len(deviation)), deviation, label="Deviation from Smoothed Path")
plt.axhline(y=threshold, color='r', linestyle='--', label="Anomaly Threshold")

# Highlight detected anomalies
plt.scatter(anomaly_indices, deviation[anomaly_indices], color='red', label="Anomalies")

plt.xlabel("Frame Number")
plt.ylabel("Deviation (mm)")
plt.title("Drone Trajectory Deviation and Anomaly Detection")
plt.legend()
plt.show()


In [None]:
import cv2
import numpy as np
from scipy.spatial import distance
from scipy.signal import savgol_filter
import time

# Initialize the video capture
cap = cv2.VideoCapture(0)  # Adjust for your video source or camera index

# Initialize parameters
previous_position = None
previous_time = None
trajectory_data = []

def calculate_velocity_acceleration(new_position, previous_position, dt):
    # Calculate velocity (distance per unit time)
    velocity = np.array((new_position - previous_position) / dt)
    
    # Calculate acceleration if enough data points are available
    if len(trajectory_data) > 1:
        prev_velocity = trajectory_data[-1]['velocity']
        acceleration = (velocity - prev_velocity) / dt
    else:
        acceleration = np.array([0, 0, 0])  # No acceleration for the first frame
    
    return velocity, acceleration

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    # Detect the drone in frame (Here, we assume some object detection mechanism is used)
    # For this example, we’re simply tracking a specific color range
    hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # Assuming your drone is of a specific color (modify this range as necessary)
    mask = cv2.inRange(hsv_frame, (40, 70, 70), (80, 255, 255))
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    
    # If the drone is detected, calculate its position in 3D space
    if contours:
        # Assume largest contour is the drone
        largest_contour = max(contours, key=cv2.contourArea)
        x, y, w, h = cv2.boundingRect(largest_contour)
        
        # Dummy Z coordinate (replace with actual depth data from depth camera)
        z = 1000  # Replace with depth information in mm
        
        # Convert (x, y, z) to millimeters based on camera calibration data (this part depends on your camera setup)
        x_mm = x * conversion_factor_x
        y_mm = y * conversion_factor_y
        position = np.array([x_mm, y_mm, z])
        
        # Calculate time delta
        current_time = time.time()
        if previous_time:
            dt = current_time - previous_time
        else:
            dt = 0
        previous_time = current_time
        
        # Calculate velocity and acceleration
        if previous_position is not None:
            velocity, acceleration = calculate_velocity_acceleration(position, previous_position, dt)
        else:
            velocity, acceleration = np.array([0, 0, 0]), np.array([0, 0, 0])
        
        # Append data for trajectory analysis
        trajectory_data.append({
            'position': position,
            'velocity': velocity,
            'acceleration': acceleration,
            'timestamp': current_time
        })
        
        previous_position = position
        
        # Display tracking information
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
        cv2.putText(frame, f"Position (mm): {position}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    
    cv2.imshow("Drone Tracking", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

# Trajectory Analysis: smoothness, accuracy, precision
positions = np.array([data['position'] for data in trajectory_data])

# Smoothing trajectory data for analysis (e.g., Savitzky-Golay filter)
smoothed_positions = savgol_filter(positions, window_length=5, polyorder=2, axis=0)

# Analyze deviations or anomalies in smoothness, accuracy
deviation = np.linalg.norm(positions - smoothed_positions, axis=1)
anomalies = deviation > threshold  # Define a threshold for deviations

print("Trajectory Analysis Complete.")
print("Detected Anomalies:", np.where(anomalies)[0])  # Indices of anomaly frames
