## Importing Libraries

Here, we will import all the libraries we will use in future code snippets.


In [2]:
from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds
from brainflow.data_filter import DataFilter, FilterTypes, WindowOperations
import numpy as np
import time

  import pkg_resources


## Define your device's serial port and board ID

First, create a BrainFlowInputParams object, then set the serial_port attribute to your device's serial port.

In [81]:
params = BrainFlowInputParams()
params.serial_port = '/dev/tty' #Change this depending on your device and OS


## Create board object

We now create the BoardShim object that allows us to interact with our device. We first try to create the object using the above parameters. We call prepare_session using the board to verify that the device is properly connected. If there is an issue with the device then it will instead create a synthetic board object. Finally we release the session from the board so that we don't run into any issues trying to prepare a session later (you can only prepare one session at a time).


In [82]:
#Prepares the board for reading data
try:
    board_id = 39 #Change this depending on your device
    board = BoardShim(board_id, params)
    board.prepare_session()
    print("Successfully prepared physical board.")
except Exception as e:
    print(e)
    #If the device cannot be found or is being used elsewhere, creates a synthetic board instead
    print("Device could not be found or is being used by another program, creating synthetic board.")
    board_id = BoardIds.SYNTHETIC_BOARD
    board = BoardShim(board_id, params)
    board.prepare_session()
#Releases the board session
board.release_session()

[2025-11-08 17:56:28.108] [board_logger] [info] incoming json: {
    "file": "",
    "file_anc": "",
    "file_aux": "",
    "ip_address": "",
    "ip_address_anc": "",
    "ip_address_aux": "",
    "ip_port": 0,
    "ip_port_anc": 0,
    "ip_port_aux": 0,
    "ip_protocol": 0,
    "mac_address": "",
    "master_board": -100,
    "other_info": "",
    "serial_number": "",
    "serial_port": "/dev/tty",
    "timeout": 0
}
[2025-11-08 17:56:28.109] [board_logger] [info] Use timeout for discovery: 6
[2025-11-08 17:56:28.223] [board_logger] [info] found 1 BLE adapter(s)
[2025-11-08 17:56:28.641] [board_logger] [info] Found Muse device
2025-11-08 17:56:33.762 Python[16167:214261] Peripheral Connection failed
[2025-11-08 17:56:36.081] [board_logger] [info] Connected to Muse Device
[2025-11-08 17:56:37.086] [board_logger] [info] found control characteristic


Successfully prepared physical board.


2025-11-08 17:56:37.640 Python[16167:308847] Peripheral ready to send: <CBPeripheral: 0x1317720f0, identifier = 5157419F-1563-241C-6EE2-B89592F49332, name = MuseS-79A4, mtu = 158, state = connected>


## Function for collecting data


In [78]:
def readData():
    board.start_stream()
    time.sleep(5) #wait 5 seconds
    data = board.get_board_data()

    board.stop_stream()
    return data

## Print out the raw data


In [79]:
print("Starting Stream")
board.prepare_session()
data = readData()
print(data)

board.release_session()

Starting Stream


[2025-11-08 17:54:44.852] [board_logger] [info] incoming json: {
    "file": "",
    "file_anc": "",
    "file_aux": "",
    "ip_address": "",
    "ip_address_anc": "",
    "ip_address_aux": "",
    "ip_port": 0,
    "ip_port_anc": 0,
    "ip_port_aux": 0,
    "ip_protocol": 0,
    "mac_address": "",
    "master_board": -100,
    "other_info": "",
    "serial_number": "",
    "serial_port": "/dev/tty",
    "timeout": 0
}
[2025-11-08 17:54:44.852] [board_logger] [info] Use timeout for discovery: 6
[2025-11-08 17:54:44.961] [board_logger] [info] found 1 BLE adapter(s)
[2025-11-08 17:54:45.339] [board_logger] [info] Found Muse device
2025-11-08 17:54:50.509 Python[16167:214261] Peripheral Connection failed
[2025-11-08 17:54:52.901] [board_logger] [info] Connected to Muse Device
[2025-11-08 17:54:53.907] [board_logger] [info] found control characteristic
2025-11-08 17:54:54.530 Python[16167:307691] Peripheral ready to send: <CBPeripheral: 0x1170f7320, identifier = FCF06D8C-2C26-4E5D-B506-B

[[ 6.95000000e+02  6.95000000e+02  6.95000000e+02 ...  7.97000000e+02
   7.97000000e+02  7.97000000e+02]
 [ 9.65332031e+02  9.65332031e+02  9.65332031e+02 ...  5.78125000e+02
   4.82421875e+02  3.24218750e+02]
 [ 9.65820312e+02  9.65820312e+02  9.65820312e+02 ... -9.67285156e+02
  -9.30664062e+02 -9.09179688e+02]
 ...
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 [ 1.76264969e+09  1.76264969e+09  1.76264969e+09 ...  1.76264970e+09
   1.76264970e+09  1.76264970e+09]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]]


2025-11-08 17:54:59.538 Python[16167:307691] Peripheral ready to send: <CBPeripheral: 0x1170f7320, identifier = FCF06D8C-2C26-4E5D-B506-B5B1EEC924EB, name = Muse-F4A7, mtu = 158, state = connected>


## Filter out the unnecessary channels

get only EEG singals

In [83]:
#We want to isolate just the eeg data
eeg_channels = board.get_eeg_channels(board_id)
print(eeg_channels)
eeg_data = data[eeg_channels]
print(eeg_data.shape)
print(eeg_data)

[1, 2, 3, 4]
(4, 1236)
[[  965.33203125   965.33203125   965.33203125 ...   578.125
    482.421875     324.21875   ]
 [  965.8203125    965.8203125    965.8203125  ...  -967.28515625
   -930.6640625   -909.1796875 ]
 [ -504.39453125  -625.9765625   -637.6953125  ...  -690.4296875
   -893.06640625 -1000.        ]
 [-1000.         -1000.         -1000.         ...  -228.515625
   -860.3515625  -1000.        ]]


## Function to extract EEG frequency bands and save to DataFrame

In [34]:
import numpy as np
import pandas as pd
from scipy.signal import butter, lfilter

In [84]:
import pandas as pd
from scipy.signal import butter, lfilter

# Function to create a bandpass filter
def bandpass_filter(data, lowcut, highcut, fs, order=4):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return lfilter(b, a, data)

def process_eeg_data(eeg_data, sampling_rate):
    # Frequency bands in Hz
    bands = {
        "Alpha": (7.5, 13),
        "Beta": (13, 30),
        "Gamma": (30, 44)
    }

    # Channel labels corresponding to the Muse EEG device
    channel_labels = ['TP9', 'AF7', 'AF8', 'TP10']

    # Initialize a dictionary to store the filtered signals and raw signals
    data_dict = {}

    # Extract each frequency band for all channels
    for band_name, (low, high) in bands.items():
        for i, channel in enumerate(channel_labels):
            filtered_signal = bandpass_filter(eeg_data[i, :], low, high, sampling_rate)
            data_dict[f"{band_name}_{channel}"] = filtered_signal

    # Add the raw data to the dictionary
    for i, channel in enumerate(channel_labels):
        data_dict[f"RAW_{channel}"] = eeg_data[i, :]

    # Convert the dictionary to a DataFrame
    df = pd.DataFrame(data_dict)

    return df


In [85]:
#We want to isolate just the eeg data
eeg_channel_names = board.get_eeg_names(board_id)
print(eeg_channel_names)

eeg_channels = board.get_eeg_channels(board_id)
eeg_data = data[eeg_channels]
sampling_rate = board.get_sampling_rate(board_id)
df = process_eeg_data(eeg_data, sampling_rate)

print(df)

['TP9', 'AF7', 'AF8', 'TP10']
       Alpha_TP9  Alpha_AF7  Alpha_AF8  Alpha_TP10    Beta_TP9   Beta_AF7  \
0       0.016894   0.016903  -0.008827   -0.017501    1.118844   1.119410   
1       0.142311   0.142383  -0.076486   -0.147421    7.949686   7.953707   
2       0.594784   0.595085  -0.328909   -0.616144   26.184055  26.197299   
3       1.679127   1.679977  -0.954332   -1.739430   52.883084  52.909833   
4       3.661721   3.663581  -2.134763   -3.793233   70.854067  70.890472   
...          ...        ...        ...         ...         ...        ...   
1231   46.713829 -10.617693  22.251262   42.332878 -277.219867 -13.370484   
1232   76.505295  -5.398770  21.312482   40.874665 -164.254863 -11.709184   
1233  100.367329   0.176833  18.805082   35.987096   -6.944868  -8.048820   
1234  118.432652   5.760965  15.033529   28.320779  148.014972  -2.843769   
1235  131.189653  10.998472  10.403586   18.801614  254.200643   3.023555   

       Beta_AF8   Beta_TP10  Gamma_TP9  Gamma

## Feature extraction


In [86]:
import numpy as np

# Function to calculate the average power of the filtered signal
def calculate_power(filtered_signal):
    return np.mean(filtered_signal**2)

# Function to calculate the mean amplitude of the filtered signal
def calculate_mean_amplitude(filtered_signal):
    return np.mean(np.abs(filtered_signal))

# Extract power and mean amplitude for each band
def extract_features(filtered_data):
    features = {}
    
    for key, filtered_signal in filtered_data.items():
        power = calculate_power(filtered_signal)
        mean_amplitude = calculate_mean_amplitude(filtered_signal)
        features[key] = {
            "power": power,
            "mean_amplitude": mean_amplitude
        }
    
    return features


In [66]:
def average_features(features):
    averaged_features = {}
    
    for band in ['Alpha', 'Beta', 'Gamma']:
        band_features = {key: value for key, value in features.items() if band in key}
        averaged_features[band] = {
            "avg_power": np.mean([f["power"] for f in band_features.values()]),
            "avg_mean_amplitude": np.mean([f["mean_amplitude"] for f in band_features.values()])
        }
    
    return averaged_features




In [67]:

def detect_stress(averaged_features):
    # Define threshold values based on research or empirical data
    alpha_threshold = 0.5  # Example threshold for Alpha power
    beta_threshold = 1.5   # Example threshold for Beta power

    # Use thresholds to detect stress
    stress_detected = False
    if averaged_features["Alpha"]["avg_power"] < alpha_threshold and averaged_features["Beta"]["avg_power"] > beta_threshold:
        stress_detected = True
        
    return stress_detected


In [87]:
# Step 1: Filter EEG data
filtered_data = process_eeg_data(eeg_data, sampling_rate)

# Step 2: Extract features (e.g., power, mean amplitude)
features = extract_features(filtered_data)

# Step 3: Average features across channels
averaged_features = average_features(features)

# Step 4: Detect stress based on the averaged features
stress = detect_stress(averaged_features)

# Step 5: Trigger stress-relief actions if needed
if stress:
    print("Stress detected! Recommend breathing exercises.")

print(averaged_features)



{'Alpha': {'avg_power': np.float64(7740.249499348779), 'avg_mean_amplitude': np.float64(43.00533860754599)}, 'Beta': {'avg_power': np.float64(6821.637856843211), 'avg_mean_amplitude': np.float64(43.829178842249476)}, 'Gamma': {'avg_power': np.float64(1219.3623144880673), 'avg_mean_amplitude': np.float64(20.207517521298236)}}
