In [3]:
import numpy as np
import pandas as pd

np.random.seed(42)

# Parameters
n_samples = 100  # Number of sequences
window_size = 50  # 5 seconds at 10 Hz
features = 13    # Number of sensor features

# Base data generation with increased noise and overlap
def generate_base_data(n):
    is_healthy = np.random.choice([0, 1], n)  # 0: affected, 1: healthy
    data = {
        "HR": np.where(is_healthy, np.random.normal(70, 20, n), np.random.normal(68, 20, n)),  # Even more overlap
        "HRV": np.where(is_healthy, np.random.normal(50, 20, n), np.random.normal(40, 15, n)),  # Even more overlap
        "QRS": np.random.normal(1.0, 0.5, n),  # Increased noise
        "AccX": np.random.normal(0.2, 1.5, n),  # Increased noise
        "AccY": np.random.normal(0.2, 1.5, n),
        "AccZ": np.random.normal(9.8, 0.8, n),
        "GyroX": np.random.normal(5, 40, n),  # Increased noise
        "GyroY": np.random.normal(5, 40, n),
        "GyroZ": np.random.normal(5, 40, n),
        "EMG_Amp": np.random.normal(0.5, 0.4, n),  # Increased noise
        "EMG_Freq": np.random.normal(15, 15, n),  # Even more overlap
    }
    df = pd.DataFrame(data)
    df["Total_Acc"] = np.sqrt(df["AccX"]**2 + df["AccY"]**2 + df["AccZ"]**2)
    df["Tilt"] = np.degrees(np.arctan2(df["AccY"], df["AccZ"]))
    return df

# Generate time-series sequences
sequences = []
labels = {"Parkinson’s": [], "Fall": [], "Stroke": []}
for _ in range(n_samples):
    base = generate_base_data(1).iloc[0]
    sequence = []
    for t in range(window_size):
        noise = np.random.normal(0, 0.2, features)  # Increased noise
        sample = base + noise
        sequence.append(sample)
    sequence = np.array(sequence)

    # Assign labels with even more ambiguity
    parkinson = 1 if (base["HRV"] < 45 and base["GyroX"] > 10 and base["EMG_Freq"] > 10) else 0  # Very relaxed thresholds
    fall = 1 if (base["Total_Acc"] > 10) else 0  # Lowered threshold
    stroke = 1 if (base["HR"] > 75 and base["HRV"] < 30 and base["EMG_Amp"] < 0.5) else 0  # Very relaxed thresholds

    # Simulate effects with less intensity
    if fall:
        fall_idx = np.random.randint(0, window_size-10)
        sequence[fall_idx:, [3, 4, 5]] += np.random.normal(6, 2, (window_size-fall_idx, 3))  # Reduced intensity
    if parkinson:
        sequence[:, [6, 7, 8]] += np.random.normal(20, 10, (window_size, 3))  # Reduced intensity
        sequence[:, 10] += np.random.normal(5, 3, window_size)  # Reduced intensity

    sequences.append(sequence)
    labels["Parkinson’s"].append(parkinson)
    labels["Fall"].append(fall)
    labels["Stroke"].append(stroke)

X = np.array(sequences)
y = np.array([labels["Parkinson’s"], labels["Fall"], labels["Stroke"]]).T

# Save for later use
np.save("X_sequences.npy", X)
np.save("y_labels.npy", y)

print("Dataset generated successfully!")
print("X shape:", X.shape)
print("y shape:", y.shape)

Dataset generated successfully!
X shape: (100, 50, 13)
y shape: (100, 3)


In [5]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models
from sklearn.metrics import accuracy_score

# Parameters
window_size = 50
features = 13

# Load data
X = np.load("X_sequences.npy")
y = np.load("y_labels.npy")

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Build model
model = models.Sequential([
    layers.Conv1D(32, kernel_size=3, activation="relu", input_shape=(window_size, features)),
    layers.MaxPooling1D(pool_size=2),
    layers.LSTM(30, return_sequences=False),
    layers.Dense(16, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.01)),
    layers.Dropout(0.5),
    layers.Dense(3, activation="sigmoid")
])

# Compile model
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0005)
model.compile(optimizer=optimizer, loss="binary_crossentropy", metrics=["accuracy"])

# Train model
history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))

# Evaluate model on test data
y_pred = model.predict(X_test)
y_pred_binary = (y_pred > 0.5).astype(int)

# Calculate accuracy for each label
parkinson_acc = accuracy_score(y_test[:, 0], y_pred_binary[:, 0])
fall_acc = accuracy_score(y_test[:, 1], y_pred_binary[:, 1])
stroke_acc = accuracy_score(y_test[:, 2], y_pred_binary[:, 2])
overall_acc = accuracy_score(y_test, y_pred_binary, normalize=True)

# Print accuracies
print("\nModel Accuracy Metrics:")
print(f"Parkinson’s Prediction Accuracy: {parkinson_acc:.4f}")
print(f"Fall Detection Accuracy: {fall_acc:.4f}")
print(f"Stroke Prediction Accuracy: {stroke_acc:.4f}")
print(f"Overall Accuracy (averaged across labels): {overall_acc:.4f}")

# Save the model using the recommended method, with .keras extension
tf.keras.models.save_model(model, "stroke_parkinson_fall_model.keras")
# The '.keras' extension automatically sets the save format to 'tf'

Epoch 1/10


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 231ms/step - accuracy: 0.4508 - loss: 0.9861 - val_accuracy: 0.4000 - val_loss: 0.9139
Epoch 2/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 103ms/step - accuracy: 0.5445 - loss: 0.9503 - val_accuracy: 0.4500 - val_loss: 0.8903
Epoch 3/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step - accuracy: 0.4914 - loss: 0.9062 - val_accuracy: 0.5500 - val_loss: 0.8707
Epoch 4/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step - accuracy: 0.4719 - loss: 0.8791 - val_accuracy: 0.4500 - val_loss: 0.8587
Epoch 5/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step - accuracy: 0.4867 - loss: 0.8648 - val_accuracy: 0.4000 - val_loss: 0.8409
Epoch 6/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step - accuracy: 0.4602 - loss: 0.8683 - val_accuracy: 0.5000 - val_loss: 0.8298
Epoch 7/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[3

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt

# 1. Load and Prepare Data
df = pd.read_csv('/content/drive/MyDrive/Hackathon2.0/parkinsons_dataset3.csv')  # Assuming the data is saved from previous step

# Reshape data: (samples, time_steps, features)
time_steps = 10
n_features = 7  # EMG, Ax, Ay, Az, Gx, Gy, Gz
n_samples = len(df)
X = df.drop('Label', axis=1).values.reshape(n_samples, time_steps, n_features)
y = df['Label'].values

# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X.reshape(-1, n_features)).reshape(X.shape)

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42
)

# 2. Build LSTM Model
def build_model():
    model = Sequential([
        LSTM(128, input_shape=(time_steps, n_features), return_sequences=True),
        Dropout(0.4),
        LSTM(64),
        Dropout(0.4),
        Dense(32, activation='relu'),
        Dropout(0.3),
        Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy',
                 metrics=['accuracy'])
    return model

# 3. Train Model
model = build_model()
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=32,
    validation_split=0.2,
    callbacks=[early_stopping],
    verbose=1
)

# 4. Evaluate Model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"\nTest Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")

# 5. Early Detection and SOS Function
def predict_parkinsons_early(new_data, threshold=0.7, time_window=5):
    """
    new_data: array of shape (time_steps, 7) [EMG, Ax, Ay, Az, Gx, Gy, Gz]
    threshold: probability threshold for early detection
    time_window: seconds before visible effects (5-30)
    """
    new_data_scaled = scaler.transform(new_data)
    new_data_reshaped = new_data_scaled.reshape(1, time_steps, n_features)
    prediction_prob = model.predict(new_data_reshaped, verbose=0)[0][0]

    if prediction_prob > threshold:
        print(f"ALERT: Potential Parkinson's detected {time_window} seconds early!")
        print(f"Probability: {prediction_prob:.4f}")
        send_sos_alert()
        return True, prediction_prob
    else:
        print(f"No early detection. Probability: {prediction_prob:.4f}")
        return False, prediction_prob

def send_sos_alert():
    print("SOS Alert Sent: Emergency services notified.")

# 6. Simulate Real-time Prediction
sample_idx = 0
sample_data = X_test[sample_idx]
true_label = y_test[sample_idx]

detected, prob = predict_parkinsons_early(sample_data, threshold=0.7, time_window=5)
label_text = "Parkinson's" if true_label == 1 else "No Parkinson's"
print(f"True Label: {label_text}")

# 7. Plot Training History
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout()
plt.show()

# 8. Batch Prediction for Validation
def validate_early_detection(X_data, y_data, threshold=0.7, time_window=5):
    correct_detections = 0
    false_positives = 0
    false_negatives = 0

    for i in range(len(X_data)):
        detected, prob = predict_parkinsons_early(X_data[i], threshold, time_window)
        true_label = y_data[i]

        if detected and true_label == 1:
            correct_detections += 1
        elif detected and true_label == 0:
            false_positives += 1
        elif not detected and true_label == 1:
            false_negatives += 1

    print("\nValidation Results:")
    print(f"Correct Early Detections: {correct_detections}")
    print(f"False Positives: {false_positives}")
    print(f"False Negatives: {false_negatives}")
    detection_rate = correct_detections / (correct_detections + false_negatives) if (correct_detections + false_negatives) > 0 else 0
    print(f"Detection Rate: {detection_rate:.4f}")

validate_early_detection(X_test[:10], y_test[:10])

# 9. Validation with Specific Input for Positive Detection
parkinsons_input = np.array([
    [200, 0.3, -0.2, 0.4, 5.0, -6.0, 7.0],
    [210, 0.4, -0.3, 0.5, 6.0, -5.0, 8.0],
    [205, 0.5, -0.4, 0.6, 7.0, -4.0, 9.0],
    [220, 0.6, -0.5, 0.7, 8.0, -3.0, 10.0],
    [215, 0.5, -0.6, 0.8, 9.0, -2.0, 11.0],
    [225, 0.4, -0.7, 0.9, 10.0, -1.0, 12.0],
    [230, 0.3, -0.8, 1.0, 11.0, 0.0, 13.0],
    [235, 0.2, -0.9, 1.1, 12.0, 1.0, 14.0],
    [240, 0.1, -1.0, 1.2, 13.0, 2.0, 15.0],
    [245, 0.0, -1.1, 1.3, 14.0, 3.0, 16.0]
])

print("\nTesting with synthetic Parkinson's input:")
detected, prob = predict_parkinsons_early(parkinsons_input, threshold=0.7, time_window=5)
print(f"Expected Outcome: Parkinson's should be detected")

# 10. User Input Testing Function
def test_with_user_input():
    print("\n=== User Input Testing Mode ===")
    print("Enter 10 sets of sensor readings (EMG, Ax, Ay, Az, Gx, Gy, Gz)")
    print("Format: space-separated values (e.g., '200 0.3 -0.2 0.4 5.0 -6.0 7.0')")
    print("Enter 'quit' to exit")

    user_data = []

    while len(user_data) < time_steps:
        try:
            user_input = input(f"Reading {len(user_data) + 1}/{time_steps}: ")

            if user_input.lower() == 'quit':
                print("Exiting user input mode...")
                return

            # Split and convert input to float
            values = [float(x) for x in user_input.split()]

            # Verify correct number of features
            if len(values) != n_features:
                print(f"Error: Please enter exactly {n_features} values (EMG, Ax, Ay, Az, Gx, Gy, Gz)")
                continue

            user_data.append(values)

            # When we have enough readings, make prediction
            if len(user_data) == time_steps:
                user_array = np.array(user_data)
                print("\nProcessing your input...")
                detected, prob = predict_parkinsons_early(user_array, threshold=0.7, time_window=5)
                user_data = []  # Reset for next set of readings

        except ValueError:
            print("Error: Please enter valid numeric values separated by spaces")
            continue
        except Exception as e:
            print(f"An error occurred: {e}")
            continue

# Run the user input testing
if __name__ == "__main__":
    # Run all previous steps first
    # [Assuming the model is trained and variables are available]

    # Start user input testing
    test_with_user_input()