**Author:** Johannes Peter Knoll

# Introduction

Within this notebook you will learn and test everything that was implemented to preprocess the data
for the neural network.

Note:   This notebook is rather for those who want to make sure everything works correctly. It is very thorough
        and therefore unnecessary if you only want to get a quick start into the predictions. If that is the case, head
        to 'Classification_Demo.ipynb'


# Thorough Demonstration of 'dataset_processing.py'

In [1]:
# The autoreload extension allows you to tweak the code in the imported modules
# and rerun cells to reflect the changes.
%load_ext autoreload
%autoreload 2

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

## Managing Data

In this section we demonstrate the implemented class that helps you to manage the data you want to pass to the
neural network model. \
Its main purpose is to store the data in a uniform way, distribute it into pids and make it
easily accessible in a memory efficient way.

In [None]:
from dataset_processing import *

import random
import copy
import os

### Basics

The 'Database' is a directory that consists of multiple .pkl files—one for the database configuration, the others
for storing data into one or multiple pids.

Each datapoint is saved as dictionary and can contain the following keys:
- unique identifier (key: "ID")
- RRI signal (key: "RRI")
- MAD signal (key: "MAD")
- SLP signal (key: "SLP")
- predicted Sleep-Labels (key: "SLP_predicted")
- predicted individual probabilities for every sleep stage (key: "SLP_predicted_probability")

### Creating Database

When initializing a new database (calling 'SleepDataManager' on non-existent path) the class will
automatically create a directory containing the default database configuration saved as .pkl file. \
When initializing on an existing path, the class accesses the database configuration from the existing file.

In [20]:
data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")
database_configuration = data_manager.database_configuration

for key in database_configuration.keys():
    print(f"{key}: {database_configuration[key]}")

RRI_frequency: 4
MAD_frequency: 1
SLP_frequency: 0.03333333333333333
sleep_stage_label: None
signal_length_seconds: None
wanted_shift_length_seconds: None
absolute_shift_deviation_seconds: None
number_datapoints: [0, 0, 0, 0]


Don't mind all the parameters yet. Necessary ones will be explained below.

### Changing Database Configuration

Of the above parameters, only the uniform signal frequencies can be changed, as demonstrated below.

In [None]:
updated_frequencies = {"RRI_frequency": 2, "SLP_frequency": 2}
data_manager.change_uniform_frequencies(updated_frequencies)

del data_manager, database_configuration

# the change is saved globally:
another_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")
database_configuration = another_data_manager.database_configuration

print("\nDatabase configuration in new instance on same path:\n")
for key in database_configuration.keys():
    print(f"{key}: {database_configuration[key]}")


Database configuration in new instance on same path:

RRI_frequency: 2
MAD_frequency: 1
SLP_frequency: 2
sleep_stage_label: None
signal_length_seconds: None
wanted_shift_length_seconds: None
absolute_shift_deviation_seconds: None
number_datapoints: [0, 0, 0, 0]


In [None]:
clean_and_remove_directory("Processing_Demonstration/")

### Saving Data

To ensure the data is uniform you must always provide the sampling frequency for each signal when saving 
(keys: "RRI_frequency", "MAD_frequency", "SLP_frequency"). \
Furthermore, when adding SLP signals, you need to provide the key: "sleep_stage_label" which is a dictionary
that is supposed to tell what sleep stage your label correspond to. (Example below)

In [None]:
# sleep stage labels in shhs dataset:
# "wake": 0,    "N1": 1,    "N2": 2,    "N3": 3,    "REM": 5,   "artifact": "other integers"

# in the nn we only divide between wake, LS, DS, REM, and artifact. Above, N1 must be redeclared as "wake", 
# N2 as "LS" and N3 as "DS":
shhs_labels = {"wake": [0, 1], "LS": [2], "DS": [3], "REM": [5], "artifact": ["other"]}

Let's create some data with differing sampling frequencies:

In [22]:
signal_time_in_seconds = 120
rri_frequency = 3 # instead of default: 4 Hz
slp_frequency = 1/20 # instead of default: 1/30 Hz

# creating signals and printing manually scaled versions
rri_signal = np.array([random.randint(1, 5) for i in range(int(signal_time_in_seconds * rri_frequency))], dtype=np.float64)
print(f"First datapoints of RRI signal: {rri_signal[:10]} (shape: {rri_signal.shape})")
slp_signal = [random.randint(1, 5) for i in range(int(signal_time_in_seconds * slp_frequency))]
print(f"First datapoints of SLP signal: {slp_signal[:10]} (shape: {len(slp_signal)})")

random_sleep_stage_labels = {"wake": [0], "LS": [1], "DS": [2], "REM": [3], "artifact": ["other"]}

new_datapoint = {
    "ID": "any",
    "RRI": rri_signal,
    "RRI_frequency": rri_frequency,
    "SLP": slp_signal,
    "SLP_frequency": slp_frequency,
    "sleep_stage_label": random_sleep_stage_labels
}

First datapoints of RRI signal: [4. 5. 1. 4. 5. 1. 3. 1. 3. 5.] (shape: (360,))
First datapoints of SLP signal: [4, 4, 5, 3, 2, 1] (shape: 6)


Now we save the data and investigate the scaling applied to the signals.
(Note: Saving a datapoint with an already existing ID overwrites the "old" values. You are notified in this case.)

The idea is to assign a time stamp to each datapoint in the original and the (new, still unexisting) scaled signal. (index within signal / sampling frequency -> recording time (in seconds))
Then, for signals like RRI and MAD, containing continous values, to calculate a scaled datapoint, we just interpolate its value from the
two original datapoints its corresponding time stamp lies inbetween.
For signals like SLP, containing classification labels, we just take the value of the original datapoint with the closest time stamp.

In [None]:
# initialize database
data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")

# saving the new datapoint
data_manager.save(copy.deepcopy(new_datapoint))

# overwriting old datapoint (with same values for demonstration)
data_manager.save(copy.deepcopy(new_datapoint))

ID 'any' already exists in the data file. Existing keys will be overwritten with new values.


In [30]:
# load and print the data
data_dict = data_manager.load(0)

for key in data_dict.keys(): # type: ignore
    if key in ["RRI", "MAD", "SLP"]:
        print(key + ":", data_dict[key][:10], data_dict[key].shape) # type: ignore
    else:
        print(key + ":", data_dict[key]) # type: ignore

ID: any
RRI: [4.   4.75 3.   1.75 4.   4.75 3.   1.5  3.   1.5 ] (480,)
SLP: [4 4 3 2] (4,)


#### Speed up data saving

As indicated above, every ID in the database will be checked when saving a new datapoint, leading to unnecessary
computation time when saving many datapoints. To speed up saving, it is recommended to check if all ID's you are
about to save beforehand and then disable the ID checking (with setting: 'unique_id=True').

In [48]:
# ID's of new datapoints:
list_of_ids = ["1", "two", "11"]

# check if IDs are unique (raises an error if not)
data_manager.check_if_ids_are_unique(list_of_ids)

# save new datapoints without checking for uniqueness
for id in list_of_ids:
    new_datapoint["ID"] = id
    data_manager.save(copy.deepcopy(new_datapoint), unique_id=True)


All ID's are unique.


### Load Data

Data can be loaded in multiple ways using a string or an integer:
- If it's a string that equals a key in the data dictionaries, it will return all entities of that specific key in the database.
- If it's a different string, then it will treat it as an ID and return the corresponding data dictionary.
- If it's an integer, it will treat it as position in the database and return the corresponding data dictionary.

In [40]:
loaded_data = data_manager.load("RRI")
# loaded_data = data_manager["RRI"] # same as above
print(len(loaded_data)) # type: ignore

4


In [37]:
# load data by ID
loaded_data = data_manager.load("1") # or data_manager["1"]

for key in loaded_data.keys(): # type: ignore
    if key in ["RRI", "MAD", "SLP"]:
        print(key + ":", loaded_data[key].shape) # type: ignore
    else:
        print(key + ":", loaded_data[key]) # type: ignore

ID: 1
RRI: (480,)
SLP: (4,)


In [None]:
# load data by index
loaded_data = data_manager.load(2) # or data_manager[2]

for key in loaded_data.keys(): # type: ignore
    if key in ["RRI", "MAD", "SLP"]:
        print(key + ":", loaded_data[key].shape) # type: ignore
    else:
        print(key + ":", loaded_data[key]) # type: ignore

ID: two
RRI: (480,)
SLP: (4,)


### Remove Data

Data can be loaded in multiple ways using a string or an integer:
- If it's a string that equals a key in the data dictionaries, it will remove this key and corresponding value from all dictionaries within the database.
- If it's a different string, it will treat it as an ID and remove the corresponding data dictionary.
- If it's an integer, it will treat it as position in the database and remove the corresponding data dictionary.

In [49]:
data_manager.remove("RRI")

# print all data
for dict in data_manager:
    print("-"*20)
    for key in dict.keys():
        if key in ["RRI", "MAD", "SLP"]:
            print(key + ":", dict[key].shape) # type: ignore
        else:
            print(key + ":", dict[key]) # type: ignore
print("-"*20)

--------------------
ID: any
SLP: (4,)
--------------------
ID: 1
SLP: (4,)
--------------------
ID: two
SLP: (4,)
--------------------
ID: 11
SLP: (4,)
--------------------


In [50]:
data_manager.remove("two")

# print all data ID's
print(data_manager["ID"])

['any', '1', '11']


In [51]:
data_manager.remove(0)

# print all data ID's
print(data_manager["ID"])

['1', '11']


### Other Minor Operations:

To provide an overview of other minor operations, we'll restore the datapoints first (by emptying the database and resaving them).

In [70]:
# delete all data
data_manager.empty_database()

new_datapoint["ID"] = "any"  # reset ID for new datapoint
data_manager.save(copy.deepcopy(new_datapoint))

list_of_ids = ["1", "two", "11"]
for id in list_of_ids:
    new_datapoint["ID"] = id
    data_manager.save(copy.deepcopy(new_datapoint), unique_id=True)

#### Iterating over Database

In [71]:
for datapoint in data_manager:
    print(datapoint["ID"])

any
1
two
11


#### Checking if database holds datapoint with certain ID

In [75]:
specific_id = "1"
if specific_id in data_manager:
    print(f"Datapoint with \"ID\" = {specific_id} is in the database.")

Datapoint with "ID" = 1 is in the database.


#### Printing class instance

In [76]:
print(data_manager)

File path: Processing_Demonstration/data.pkl
Database Configuration: {'RRI_frequency': 4, 'MAD_frequency': 1, 'SLP_frequency': 0.03333333333333333, 'sleep_stage_label': {'wake': [0], 'LS': [1], 'DS': [2], 'REM': [3], 'artifact': ['other']}, 'signal_length_seconds': None, 'wanted_shift_length_seconds': None, 'absolute_shift_deviation_seconds': None, 'number_datapoints': [4, 0, 0, 0]}


### Cropping Signal Length

A neural network can only process data of the same shape. Datapoints with reduced size can be padded to ensure they match the required size.
This will of course not be done beforehand, as it would cost unnecessary storage space and is therefore not demonstrated here.

Unlike shorter signals, longer ones need to be cropped or splitted into multiple datapoints to ensure they are not oversized.
Splitting oversized datapoints within the database into multiple ones with the desired length is shown below.

Every datapoint holds multiple signals (RRI, MAD, SLP, etc.).
The number of splits resulting from an oversized datapoint depends on the parameter: 'wanted_shift_length_seconds'.
The starting points of consecutive parts (splits) are shifted by this value, approximately, resulting in a certain number of splitted parts.

Of course, not all shift lengths are useful.
We want each starting point to correspond with a value for each signal.
As each signal might have a different sampling frequency, the algorithm will look for a shift length close to the user desired one ('wanted_shift_length_seconds') so that the shift length multiplied by the sampling frequency equals a natural number for every sampling frequency.
The parameter 'absolute_shift_deviation_seconds' defines how far this shift length is allowed to deviate from the wanted shift length.

Note that the algorithm will raise an error if no suitable shift length can be found for the set parameters.


#### Long Maximum Signal Length

In [107]:
# delete all data
data_manager.empty_database()

# create and save data of different lengths
signal_times = [28800, 36000, 36030, 54000] # [8h, 10h, 10h + 30s, 15h] in seconds
rri_frequency, mad_frequency, slp_frequency = 4, 1, 1/30
random_sleep_stage_labels = {"wake": [0], "LS": [1], "DS": [2], "REM": [3], "artifact": ["other"]}

for i in range(len(signal_times)):
    signal_time_in_seconds = signal_times[i]

    rri_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * rri_frequency))], dtype=np.float64)
    mad_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * mad_frequency))], dtype=np.float64)
    slp_signal = [random.randint(1, 5) for _ in range(int(signal_time_in_seconds * slp_frequency))]

    new_datapoint = {
        "ID": str(i),
        "RRI": rri_signal,
        "RRI_frequency": rri_frequency,
        "MAD": mad_signal,
        "MAD_frequency": mad_frequency,
        "SLP": slp_signal,
        "SLP_frequency": slp_frequency,
        "sleep_stage_label": random_sleep_stage_labels
    }

    data_manager.save(new_datapoint)

In [109]:
# printing some pre-split information
signal_durations = np.array([len(entry) for entry in data_manager["RRI"]])/rri_frequency # type: ignore
print(f"Datapoints within the database: {len(data_manager)}")
print(f"ID: {data_manager["ID"]}")
print(f"Corresponding Signal duration in seconds: {signal_durations}")
print(f"Total duration of all signals in seconds: {np.sum(signal_durations)} (= {data_manager.calculate_total_signal_duration()} ?)")

Datapoints within the database: 4
ID: ['0', '1', '2', '3']
Corresponding Signal duration in seconds: [28800. 36000. 36030. 54000.]
Total duration of all signals in seconds: 154830.0 (= 154830.0 ?)


In [110]:
# splitting oversized data into multiple database entries with signal lengths of at most 10 hours
data_manager.crop_oversized_data(
    signal_length_seconds = 36000,  # 10 hours in seconds
    wanted_shift_length_seconds = 5400, # 1.5 hours in seconds
    absolute_shift_deviation_seconds = 1800, # 30 minutes in seconds
)


Splitting entries within Processing_Demonstration/data.pkl into multiple ones to ensure the contained signals span at most across: 36000 seconds.
Initializing progress bar...

   ✅: 100.0% [█████████████████████████] 4 / 4 | 11 ms / 11 ms (2.6 ms/it) |


In [111]:
# printing some post-split information
signal_durations = np.array([len(entry) for entry in data_manager["RRI"]])/rri_frequency # type: ignore
print(f"Datapoints within the database: {len(data_manager)}")
print(f"ID: {data_manager["ID"]}")
print(f"Corresponding Signal duration in seconds: {signal_durations}")
print(f"Total duration of all signals in seconds: {np.sum(signal_durations)} (= {data_manager.calculate_total_signal_duration()} ?)")

Datapoints within the database: 8
ID: ['0', '1', '2', '2*', '3', '3*', '3*', '3*']
Corresponding Signal duration in seconds: [28800. 36000. 36000. 30630. 36000. 36000. 36000. 36000.]
Total duration of all signals in seconds: 275430.0 (= 275430.0 ?)


We now have more datapoints than before (8 instead of 4) and the total signal duration increased, as the splitted parts overlap due to our settings.
As soon as a signal exceeds the maximum length of 10 hours, the datapoints are split (see ID=2).

As we can see, they do not all have a duration of exactly 10 hours.
This is due to the last splitted part containing the final datapoints after the shifting the starting position.
This can but must not be 10 hours (compare ID = 2 and 3)

The splitted parts have a similar ID: one with the original ID, the others with the original ID plus a '*'.
Do not worry to much about this, the one without the star just holds information that applies to all others (see below).

To ensure the datapoints can later be refused correctly, the algorithm that splits the individual datapoints saves some
additional information to the splitted datapoints. For us, they are not relevant.

In [114]:
# print keys of splitted datapoints
for data_point in data_manager:
    print("-"*30)
    for key in data_point.keys(): # type: ignore
        if key not in ["RRI", "MAD", "SLP"]:
            print(f"{key}: {data_point[key]}")
print("-"*30)

------------------------------
ID: 0
------------------------------
ID: 1
------------------------------
ID: 2
shift_length_seconds: 5400
shift: 0
------------------------------
ID: 2*
shift: 1
------------------------------
ID: 3
shift_length_seconds: 6000
shift: 0
------------------------------
ID: 3*
shift: 1
------------------------------
ID: 3*
shift: 2
------------------------------
ID: 3*
shift: 3
------------------------------


#### Short Maximum Signal Length

Essentially, this does not change compared to the case above.
This section functions as a test to ensure that massive splitting does not take too much computation time.

In [117]:
# delete all data
data_manager.empty_database()

# create and save data of different lengths
signal_time_in_seconds = 54000 # [15h] in seconds
rri_frequency, mad_frequency, slp_frequency = 4, 1, 1/30
random_sleep_stage_labels = {"wake": [0], "LS": [1], "DS": [2], "REM": [3], "artifact": ["other"]}

rri_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * rri_frequency))], dtype=np.float64)
mad_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * mad_frequency))], dtype=np.float64)
slp_signal = [random.randint(1, 5) for _ in range(int(signal_time_in_seconds * slp_frequency))]

new_datapoint = {
    "ID": "0",
    "RRI": rri_signal,
    "RRI_frequency": rri_frequency,
    "MAD": mad_signal,
    "MAD_frequency": mad_frequency,
    "SLP": slp_signal,
    "SLP_frequency": slp_frequency,
    "sleep_stage_label": random_sleep_stage_labels
}

data_manager.save(new_datapoint)

In [118]:
# printing some pre-split information
signal_durations = np.array([len(entry) for entry in data_manager["RRI"]])/rri_frequency # type: ignore
print(f"Datapoints within the database: {len(data_manager)}")
print(f"ID: {data_manager["ID"]}")
print(f"Corresponding Signal duration in seconds: {signal_durations}")
print(f"Total duration of all signals in seconds: {np.sum(signal_durations)} (= {data_manager.calculate_total_signal_duration()} ?)")

Datapoints within the database: 1
ID: ['0']
Corresponding Signal duration in seconds: [54000.]
Total duration of all signals in seconds: 54000.0 (= 54000.0 ?)


In [119]:
# splitting oversized data into multiple database entries with signal lengths of at most 10 hours
data_manager.crop_oversized_data(
    signal_length_seconds = 30,
    wanted_shift_length_seconds = 30,
    absolute_shift_deviation_seconds = 0,
)


Splitting entries within Processing_Demonstration/data.pkl into multiple ones to ensure the contained signals span at most across: 30 seconds.
   ✅: 100.0% [██████████████████████████] 1 / 1 | 34 ms / 34 ms (34 ms/it) |


More than factor 10 increase in computation time seems a lot. But it actually is alright. It used to be worse, ..., 'Maybe get a coffee, book and go on a 2 week trip' worse to be exact.

In [121]:
# printing some post-split information
signal_durations = np.array([len(entry) for entry in data_manager["RRI"]])/rri_frequency # type: ignore
print(f"Datapoints within the database: {len(data_manager)}")
print(f"ID: {np.array(data_manager["ID"])}")
print(f"Corresponding Signal duration in seconds: {signal_durations}")
print(f"Total duration of all signals in seconds: {np.sum(signal_durations)} (= {data_manager.calculate_total_signal_duration()} ?)")

Datapoints within the database: 1800
ID: ['0' '0*' '0*' ... '0*' '0*' '0*']
Corresponding Signal duration in seconds: [30. 30. 30. ... 30. 30. 30.]
Total duration of all signals in seconds: 54000.0 (= 54000.0 ?)


### Reversing Signal Split

The ultimate goal is to either train and validate the network model or to predict sleep stages for some data.
In the latter scenario, after predicting, you might want to reverse the signal split to restore the original shape of your data.

Depending on your settings, some parts may overlap, providing you with multiple predictions for certain parts of the data.

In [25]:
# delete all data
data_manager.empty_database()

# create and save data of different lengths
signal_times = [36000, 54000] # [10h, 15h] in seconds
rri_frequency, mad_frequency, slp_frequency = 4, 1, 1/30
random_sleep_stage_labels = {"wake": [0], "LS": [1], "DS": [2], "REM": [3], "artifact": ["other"]}

for i in range(len(signal_times)):
    signal_time_in_seconds = signal_times[i]

    rri_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * rri_frequency))], dtype=np.float64)
    mad_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * mad_frequency))], dtype=np.float64)
    slp_signal = [random.randint(1, 5) for _ in range(int(signal_time_in_seconds * slp_frequency))]

    new_datapoint = {
        "ID": str(i),
        "RRI": rri_signal,
        "RRI_frequency": rri_frequency,
        "MAD": mad_signal,
        "MAD_frequency": mad_frequency,
        "SLP": slp_signal,
        "SLP_frequency": slp_frequency,
        "sleep_stage_label": random_sleep_stage_labels
    }

    data_manager.save(new_datapoint)

original_rri = data_manager["RRI"] # type: ignore
original_mad = data_manager["MAD"] # type: ignore
original_slp = data_manager["SLP"] # type: ignore

# splitting oversized data into multiple database entries with signal lengths of at most 10 hours
data_manager.crop_oversized_data(
    signal_length_seconds = 36000,  # 10 hours in seconds
    wanted_shift_length_seconds = 5400, # 1.5 hours in seconds
    absolute_shift_deviation_seconds = 1800, # 30 minutes in seconds
)

# add artificial data in shape of predicted sleep stages to the database
for data_point in data_manager:
    artificial_data = {
        "ID": data_point["ID"],
        "SLP_predicted": np.array([random.randint(0, 4) for _ in range(len(data_point["SLP"]))], dtype=np.int64),
        "SLP_predicted_probability": np.array([[random.random() for _ in range(5)] for _ in range(len(data_point["SLP"]))], dtype=np.float64),
        "SLP_frequency": data_manager.database_configuration["SLP_frequency"],
    }
    data_manager.save(artificial_data)

# print data
print("-"*30)
for data_point in data_manager:
    if data_point["ID"][-1] == "*": # just print the first splitted datapoint
        for key in data_point.keys(): # type: ignore
            if key in ["RRI", "MAD", "SLP", "SLP_predicted", "SLP_predicted_probability"]:
                print(f"{key}: {data_point[key].shape}") # type: ignore
            else:
                print(f"{key}: {data_point[key]}")
        break
print("-"*30)


Splitting entries within Processing_Demonstration/data.pkl into multiple ones to ensure the contained signals span at most across: 36000 seconds.
   ✅: 100.0% [███████████████████████] 2 / 2 | 5.9 ms / 5.9 ms (2.9 ms/it) |
ID '0' already exists in the data file. Existing keys will be overwritten with new values.
ID '1' already exists in the data file. Existing keys will be overwritten with new values.
ID '1*' already exists in the data file. Existing keys will be overwritten with new values.
ID '1*' already exists in the data file. Existing keys will be overwritten with new values.
ID '1*' already exists in the data file. Existing keys will be overwritten with new values.
ID '1*' already exists in the data file. Existing keys will be overwritten with new values.
ID '1*' already exists in the data file. Existing keys will be overwritten with new values.
ID '1*' already exists in the data file. Existing keys will be overwritten with new values.
ID '1*' already exists in the data file. E

In [26]:
data_manager.reverse_signal_crop()


Distributing splitted data parts into individual files (Subprocess of Reversing Signal Split):
   ✅: 100.0% [███████████████████████] 5 / 5 | 4.0 ms / 4.0 ms (798 µs/it) |

Merging data points back into the main file and reversing the Signal Split:
   ✅: 100.0% [███████████████████████] 1 / 1 | 252 ms / 252 ms (252 ms/it) |


In [27]:
# check if the original data is restored
split_reversed_rri = data_manager["RRI"] # type: ignore
split_reversed_mad = data_manager["MAD"] # type: ignore
split_reversed_slp = data_manager["SLP"] # type: ignore

rri_distances, mad_distances, slp_distances = 0, 0, 0
for i in range(len(original_rri)): # type: ignore
    rri_distances += np.abs(np.array(original_rri[i]) - np.array(split_reversed_rri[i])).sum() # type: ignore
for i in range(len(original_mad)): # type: ignore
    mad_distances += np.abs(np.array(original_mad[i]) - np.array(split_reversed_mad[i])).sum() # type: ignore
for i in range(len(original_slp)): # type: ignore
    slp_distances += np.abs(np.array(original_slp[i]) - np.array(split_reversed_slp[i])).sum() # type: ignore

if rri_distances == 0 and mad_distances == 0 and slp_distances == 0:
    print("Original data restored successfully!")
else:
    print("Data restoration failed!")
    print(f"RRI distances: {rri_distances}, MAD distances: {mad_distances}, SLP distances: {slp_distances}")

Original data restored successfully!


In [30]:
reconstructed_dict = data_manager.load("1")
print("SLP_predicted_probability shape:", reconstructed_dict["SLP_predicted_probability"].shape) # type: ignore

different_number_entries = []
for slp in reconstructed_dict["SLP_predicted"]: # type: ignore
    different_number_entries.append(len(slp))
print("Unique lengths of entries within SLP_predicted:", np.unique(different_number_entries))

SLP_predicted_probability shape: (1800, 5)
Unique lengths of entries within SLP_predicted: [1 2 3 4]


As we can see, everything worked as expected.
The original signals were restored successfully and the predicted signals have the expected shape.

`SLP_predicted_probability` holds for every position the probabilities of every sleep stage (a list).
The overlapping parts are fused by returning the mean of the corresponding (in different datasets) calculated probabilities.
Therefore, for every original SLP stage position (in total 1800 = 15 h * 3600 s * 1 / 30 Hz) it holds the probabilities for every classification label.

In contrast, `SLP_predicted` is an array similar to `SLP`, holding the predicted sleep stages (majority probability).
In the reverse process, the overlapping sleep stages were just assigned to a list.
As different parts of the signal overlap more in the splitted parts (middle parts the most, start and end the least) we accummulate different amount of predicted sleep stages across this signal.

### Train-, Validation-, Test- Split

We aim to train a machine learning model with the data handled by this class.
Therefore, we want to be able to separate the data into training-, validation- and test- pids.

In [104]:
# delete all data
data_manager.empty_database()

# add a lot of data to the database
add_number_datapoints = 100

signal_time_in_seconds = 25200 # 7h in seconds
rri_frequency = 4
mad_frequency = 1
slp_frequency = 1/30

random_sleep_stage_labels = {"wake": [0, 1], "LS": [2], "DS": [3], "REM": [5], "artifect": ["other"]}

for i in range(add_number_datapoints):
    rri_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * rri_frequency))], dtype=np.float64)
    mad_signal = [random.randint(1, 5) for _ in range(int(signal_time_in_seconds * mad_frequency))]
    slp_signal = [random.randint(1, 5) for _ in range(int(signal_time_in_seconds * slp_frequency))]
    
    signal_time_in_seconds += 252 # increase signal time by 252 seconds for each new datapoint (101th datapoint: 14h)

    decide_what_data_to_add = random.randint(0, 2)

    # add data with RRI, MAD and SLP signals (valid training data)
    if decide_what_data_to_add == 0:
        new_datapoint = {
            "ID": str(i),
            "RRI": rri_signal,
            "RRI_frequency": rri_frequency,
            "MAD": mad_signal,
            "MAD_frequency": mad_frequency,
            "SLP": slp_signal,
            "SLP_frequency": slp_frequency,
            "sleep_stage_label": random_sleep_stage_labels
        }
    # add data with RRI and MAD signals, but no SLP signal (invalid training data)
    elif decide_what_data_to_add == 1:
        new_datapoint = {
            "ID": str(i),
            "RRI": rri_signal,
            "RRI_frequency": rri_frequency,
            "MAD": mad_signal,
            "MAD_frequency": mad_frequency,
        }
    # add data with RRI and SLP signals, but no MAD signal (valid training data)
    else:
        new_datapoint = {
            "ID": str(i),
            "RRI": rri_signal,
            "RRI_frequency": rri_frequency,
            "SLP": slp_signal,
            "SLP_frequency": slp_frequency,
            "sleep_stage_label": random_sleep_stage_labels
        }
    
    data_manager.save(new_datapoint, unique_id=True)

print(f"Number of datapoints in file: {len(data_manager)}")

Number of datapoints in file: 100


Depending on whether we want to distribute our data into two or three pids (`test_size` is provided or `None`), the algorithm will distribute the valid data into corresponding separate files.

Data that can not be used to train the network (i.e. missing "RRI" and "SLP") will be left in the main file (`data.py`).
        
As we can manage data with "RRI" and "MAD" and data with "RRI" only, the algorithm makes sure that only one of the two types of data is used (the one with more samples).
The other type will be left in the main file.
This must be done to ensure each batch contains the same data.
To also train with the type of data left behind, we must save these datapoints to another file from the begin with and train the network in separate steps on the individual datasets.

In [105]:
data_manager.separate_train_test_validation(
    train_size = 0.8, 
    validation_size = 0.1, 
    test_size = 0.1,
)

Attention: 32 datapoints do not contain a SLP and/or RRI signal and will be left in the main file.
Attention: 31 datapoints without MAD signal will be left in the main file.

Distributing 80.0% / 10.0% / 10.0% of datapoints into training / validation / test pids, respectively:
   ✅: 100.0% [█████████████████████] 100 / 100 | 82 ms / 82 ms (819 µs/it) |


The training-, validation- or test- files can be accessed by stating the pid when creating a new instance of the class on the database:

ATTENTION:  
-   The instances on all files will have reduced functionality from now on. As the data should
    be fully prepared for the network now, the instances are designed to only load data and
    not save or edit it.

In [106]:
data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")
train_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "train")
validation_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "validation")
test_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "test")

# print some basic information of the datasets
total_duration = data_manager.calculate_total_signal_duration(only_current_pid = False)
main_duration = data_manager.calculate_total_signal_duration(only_current_pid = True)
train_duration = train_data_manager.calculate_total_signal_duration(only_current_pid = True)
validation_duration = validation_data_manager.calculate_total_signal_duration(only_current_pid = True)
test_duration = test_data_manager.calculate_total_signal_duration(only_current_pid = True)

print("Number Datapoints / Total Duration of each pid:")
print("-"*50)
print(f"Main: {len(data_manager)} / {main_duration} seconds ({len(data_manager)/add_number_datapoints*100:.2f} / {main_duration/total_duration*100:.2f}%)")
print(f"Train: {len(train_data_manager)} / {train_duration} seconds ({len(train_data_manager)/add_number_datapoints*100:.2f} / {train_duration/total_duration*100:.2f}%)")
print(f"Validation: {len(validation_data_manager)} / {validation_duration} seconds ({len(validation_data_manager)/add_number_datapoints*100:.2f} / {validation_duration/total_duration*100:.2f}%)")
print(f"Test: {len(test_data_manager)} / {test_duration} seconds ({len(test_data_manager)/add_number_datapoints*100:.2f} / {test_duration/total_duration*100:.2f}%)")

Number Datapoints / Total Duration of each pid:
--------------------------------------------------
Main: 63 / 2419452.0 seconds (63.00 / 64.22%)
Train: 29 / 1052352.0 seconds (29.00 / 27.93%)
Validation: 4 / 150696.0 seconds (4.00 / 4.00%)
Test: 4 / 144900.0 seconds (4.00 / 3.85%)


Furthermore, the boolean parameters `join_splitted_parts` (default: True) and `equally_distribute_signal_durations` (default: True) control whether all database entries resulting from splitting the originially saved datapoint (due to overlength) should be included in the same pid and if the datapoints should be distributed equally with respect to their signal duration.
Note that the latter can only be true if the former parameter is as well.
(If you do not join splitted parts, then it does not matter how long the original signal was.)

`join_splitted_parts` mostly works by calling `crop_oversized_data` before or after `separate_train_test_validation`.
During the distribution, the algorithm mostly checks if the cropping was already performed and will act appropriately.

So let just quickly check the impact of `equally_distribute_signal_durations`.
Saving only valid data will make this easier for us.
We'll see that the actual duration of data will be closer to the desired ratio.

In [107]:
# delete all data
data_manager.empty_database()

# add a lot of data to the database
add_number_datapoints = 100

signal_time_in_seconds = 25200 # 7h in seconds
rri_frequency = 4
mad_frequency = 1
slp_frequency = 1/30

random_sleep_stage_labels = {"wake": [0, 1], "LS": [2], "DS": [3], "REM": [5], "artifect": ["other"]}

for i in range(add_number_datapoints):
    rri_signal = np.array([random.randint(1, 5) for _ in range(int(signal_time_in_seconds * rri_frequency))], dtype=np.float64)
    mad_signal = [random.randint(1, 5) for _ in range(int(signal_time_in_seconds * mad_frequency))]
    slp_signal = [random.randint(1, 5) for _ in range(int(signal_time_in_seconds * slp_frequency))]
    
    signal_time_in_seconds += 252 # increase signal time by 252 seconds for each new datapoint (101th datapoint: 14h)

    new_datapoint = {
        "ID": str(i),
        "RRI": rri_signal,
        "RRI_frequency": rri_frequency,
        "MAD": mad_signal,
        "MAD_frequency": mad_frequency,
        "SLP": slp_signal,
        "SLP_frequency": slp_frequency,
        "sleep_stage_label": random_sleep_stage_labels
    }
    
    data_manager.save(new_datapoint, unique_id=True)

print(f"Number of datapoints in file: {len(data_manager)}")

Number of datapoints in file: 100


In [110]:
data_manager.separate_train_test_validation(
    train_size = 0.8, 
    validation_size = 0.2, 
    random_state = None,
    shuffle = True,
    join_splitted_parts = True,
    equally_distribute_signal_durations = False,
)

data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")
train_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "train")
validation_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "validation")

# print some basic information of the datasets
total_duration = data_manager.calculate_total_signal_duration(only_current_pid = False)
main_duration = data_manager.calculate_total_signal_duration(only_current_pid = True)
train_duration = train_data_manager.calculate_total_signal_duration(only_current_pid = True)
validation_duration = validation_data_manager.calculate_total_signal_duration(only_current_pid = True)

print("\nNumber Datapoints / Total Duration of each pid:")
print("-"*50)
print(f"Main: {len(data_manager)} / {main_duration} seconds ({len(data_manager)/add_number_datapoints*100:.2f} / {main_duration/total_duration*100:.2f}%)")
print(f"Train: {len(train_data_manager)} / {train_duration} seconds ({len(train_data_manager)/add_number_datapoints*100:.2f} / {train_duration/total_duration*100:.2f}%)")
print(f"Validation: {len(validation_data_manager)} / {validation_duration} seconds ({len(validation_data_manager)/add_number_datapoints*100:.2f} / {validation_duration/total_duration*100:.2f}%)")


Distributing 80.0% / 20.0% of datapoints into training / validation pids, respectively:
   ✅: 100.0% [█████████████████████] 100 / 100 | 81 ms / 81 ms (802 µs/it) |

Number Datapoints / Total Duration of each pid:
--------------------------------------------------
Main: 0 / 0 seconds (0.00 / 0.00%)
Train: 80 / 2967552.0 seconds (80.00 / 78.77%)
Validation: 20 / 799848.0 seconds (20.00 / 21.23%)


In [111]:
data_manager.separate_train_test_validation(
    train_size = 0.8, 
    validation_size = 0.2, 
    random_state = None,
    shuffle = True,
    join_splitted_parts = True,
    equally_distribute_signal_durations = True,
)

data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")
train_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "train")
validation_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "validation")

# print some basic information of the datasets
total_duration = data_manager.calculate_total_signal_duration(only_current_pid = False)
main_duration = data_manager.calculate_total_signal_duration(only_current_pid = True)
train_duration = train_data_manager.calculate_total_signal_duration(only_current_pid = True)
validation_duration = validation_data_manager.calculate_total_signal_duration(only_current_pid = True)

print("\nNumber Datapoints / Total Duration of each pid:")
print("-"*50)
print(f"Main: {len(data_manager)} / {main_duration} seconds ({len(data_manager)/add_number_datapoints*100:.2f} / {main_duration/total_duration*100:.2f}%)")
print(f"Train: {len(train_data_manager)} / {train_duration} seconds ({len(train_data_manager)/add_number_datapoints*100:.2f} / {train_duration/total_duration*100:.2f}%)")
print(f"Validation: {len(validation_data_manager)} / {validation_duration} seconds ({len(validation_data_manager)/add_number_datapoints*100:.2f} / {validation_duration/total_duration*100:.2f}%)")


Distributing 80.0% / 20.0% of datapoints into training / validation pids, respectively:
   ✅: 100.0% [█████████████████████] 100 / 100 | 74 ms / 74 ms (739 µs/it) |

Number Datapoints / Total Duration of each pid:
--------------------------------------------------
Main: 0 / 0 seconds (0.00 / 0.00%)
Train: 80 / 3017448.0 seconds (80.00 / 80.09%)
Validation: 20 / 749952.0 seconds (20.00 / 19.91%)


Now applying signal cropping performs this within each dataset separately:

In [112]:
data_manager.crop_oversized_data(
    signal_length_seconds = 36000,  # 10 hours in seconds
    wanted_shift_length_seconds = 5400, # 1.5 hours in seconds
    absolute_shift_deviation_seconds = 1800, # 30 minutes in seconds
)


ATTENTION: No matter in which pid you are calling this function, the data will be split in all of them.

Splitting entries within Processing_Demonstration/training_pid.pkl into multiple ones to ensure the contained signals span at most across: 36000 seconds.
   ✅: 100.0% [█████████████████████] 80 / 80 | 201 ms / 201 ms (2.5 ms/it) |

Splitting entries within Processing_Demonstration/validation_pid.pkl into multiple ones to ensure the contained signals span at most across: 36000 seconds.
   ✅: 100.0% [███████████████████████] 20 / 20 | 70 ms / 70 ms (3.5 ms/it) |


In [113]:
data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")
train_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "train")
validation_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "validation")

total_splitted_datapoints = sum(data_manager.database_configuration["number_datapoints"])

# print some basic information of the datasets
total_duration = data_manager.calculate_total_signal_duration(only_current_pid = False)
main_duration = data_manager.calculate_total_signal_duration(only_current_pid = True)
train_duration = train_data_manager.calculate_total_signal_duration(only_current_pid = True)
validation_duration = validation_data_manager.calculate_total_signal_duration(only_current_pid = True)

print("\nNumber Datapoints / Total Duration of each pid:")
print("-"*50)
print(f"Main: {len(data_manager)} / {main_duration} seconds ({len(data_manager)/total_splitted_datapoints*100:.2f} / {main_duration/total_duration*100:.2f}%)")
print(f"Train: {len(train_data_manager)} / {train_duration} seconds ({len(train_data_manager)/total_splitted_datapoints*100:.2f} / {train_duration/total_duration*100:.2f}%)")
print(f"Validation: {len(validation_data_manager)} / {validation_duration} seconds ({len(validation_data_manager)/total_splitted_datapoints*100:.2f} / {validation_duration/total_duration*100:.2f}%)")


Number Datapoints / Total Duration of each pid:
--------------------------------------------------
Main: 0 / 0 seconds (0.00 / 0.00%)
Train: 152 / 5242008.0 seconds (80.00 / 80.05%)
Validation: 38 / 1306332.0 seconds (20.00 / 19.95%)


Proofing that all splitted parts of the same original datapoint ended up in the same pid (`join_splitted_parts` = True)

In [None]:
# proofing that all splitted parts of the same original datapoint ended up in the same pid
train_ids = train_data_manager["ID"]
validation_ids = validation_data_manager["ID"]

for train_id in train_ids: # type: ignore
    for val_id in validation_ids: # type: ignore
        if train_id == val_id or train_id + "*" == val_id or train_id == val_id + "*":
            print(f"Splitted parts of the same original datapoint ended up in different pids.")
            break

Let's fuse all (still splitted) data back into the main pid and check (`join_splitted_parts` = False).
Note: Signals must be cropped before to choose this setting, we did this above.

In [None]:
data_manager.fuse_train_test_validation()
print(len(data_manager))

190


In [121]:
data_manager.separate_train_test_validation(
    train_size = 0.8, 
    validation_size = 0.2, 
    random_state = None,
    shuffle = True,
    join_splitted_parts = False,
    equally_distribute_signal_durations = False,
)

data_manager = SleepDataManager(directory_path = "Processing_Demonstration/")
train_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "train")
validation_data_manager = SleepDataManager(directory_path = "Processing_Demonstration/", pid = "validation")


Distributing 80.0% / 20.0% of datapoints into training / validation pids, respectively:
   ✅: 100.0% [███████████████████] 190 / 190 | 119 ms / 119 ms (623 µs/it) |


In [122]:
# print some basic information of the datasets
total_duration = data_manager.calculate_total_signal_duration(only_current_pid = False)
main_duration = data_manager.calculate_total_signal_duration(only_current_pid = True)
train_duration = train_data_manager.calculate_total_signal_duration(only_current_pid = True)
validation_duration = validation_data_manager.calculate_total_signal_duration(only_current_pid = True)

print("\nNumber Datapoints / Total Duration of each pid:")
print("-"*50)
print(f"Main: {len(data_manager)} / {main_duration} seconds ({len(data_manager)/total_splitted_datapoints*100:.2f} / {main_duration/total_duration*100:.2f}%)")
print(f"Train: {len(train_data_manager)} / {train_duration} seconds ({len(train_data_manager)/total_splitted_datapoints*100:.2f} / {train_duration/total_duration*100:.2f}%)")
print(f"Validation: {len(validation_data_manager)} / {validation_duration} seconds ({len(validation_data_manager)/total_splitted_datapoints*100:.2f} / {validation_duration/total_duration*100:.2f}%)")


Number Datapoints / Total Duration of each pid:
--------------------------------------------------
Main: 0 / 0 seconds (0.00 / 0.00%)
Train: 152 / 5224410.0 seconds (80.00 / 79.78%)
Validation: 38 / 1323930.0 seconds (20.00 / 20.22%)


In [None]:
# proofing that not all splitted parts of the same original datapoint ended up in the same pid
train_ids = train_data_manager["ID"]
validation_ids = validation_data_manager["ID"]

stop_loop = False
for train_id in train_ids: # type: ignore
    for val_id in validation_ids: # type: ignore
        if train_id == val_id or train_id + "*" == val_id or train_id == val_id + "*":
            print(f"Splitted parts of the same original datapoint ended up in different pids.")
            stop_loop = True
            break
    if stop_loop:
        break

Splitted parts of the same original datapoint ended up in different pids: train and validation.


### Cleaning up

In [None]:
def clean_and_remove_directory(directory):
    """
    Cleans and removes the specified directory if it exists.
    """
    entries = os.listdir(directory)
    for entry in entries:
        if os.path.isdir(os.path.join(directory, entry)):
            clean_and_remove_directory(os.path.join(directory, entry))
        else:
            os.remove(os.path.join(directory, entry))
    os.rmdir(directory)

clean_and_remove_directory("Processing_Demonstration")

## Introduction to the implemented functions

In [40]:
import numpy as np # type: ignore
import random
import h5py # type: ignore

In this section you can check whether the implemented functions in this project work correctly.

### Scaling number of datapoints from signal- to target- frequency:

I would highly suggest to provide data where the signals don't need to be scaled to the frequencies of the data
used to train the neural network.

If there is no other option, then so be it. Here is a demonstration of the functions that will be applied to 
your data:

#### Classification Signal

In [41]:
classification_array = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])
classification_frequency = 1/20
target_frequency = 1/30

print("-"*71)
print(f"Classification Frequency: {classification_frequency} -> Target Frequency: {target_frequency}")
print("-"*71)
print("\nClassification array: ", classification_array)
print("Classification array shape: ", classification_array.shape)

reshaped_array = scale_classification_signal(
        signal = classification_array, # type: ignore
        signal_frequency = classification_frequency,
        target_frequency = target_frequency
        )

print("\nScaled array: ", reshaped_array)
print("Scaled array shape: ", reshaped_array.shape)

classification_array = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8])
classification_frequency = 1/50
target_frequency = 1/30

print("\n")
print("-"*71)
print(f"Classification Frequency: {classification_frequency} -> Target Frequency: {target_frequency}")
print("-"*71)
print("\nClassification array: ", classification_array)
print("Classification array shape: ", classification_array.shape)

reshaped_array = scale_classification_signal(
        signal = classification_array, # type: ignore
        signal_frequency = classification_frequency,
        target_frequency = target_frequency
        )

print("\nScaled array: ", reshaped_array)
print("Scaled array shape: ", reshaped_array.shape)

del reshaped_array, classification_array, classification_frequency, target_frequency

-----------------------------------------------------------------------
Classification Frequency: 0.05 -> Target Frequency: 0.03333333333333333
-----------------------------------------------------------------------

Classification array:  [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14]
Classification array shape:  (15,)

Scaled array:  [ 0  1  3  4  6  7  9 10 12 13]
Scaled array shape:  (10,)


-----------------------------------------------------------------------
Classification Frequency: 0.02 -> Target Frequency: 0.03333333333333333
-----------------------------------------------------------------------

Classification array:  [0 1 2 3 4 5 6 7 8]
Classification array shape:  (9,)

Scaled array:  [0 1 1 2 2 3 4 4 5 5 6 7 7 8 8]
Scaled array shape:  (15,)


#### Continuous Signal

In [42]:
continuous_array_int = np.array([0, 1, 2, 3, 4, 5])
continuous_array_float = np.array([0, 1, 2, 3, 4, 5], dtype = float)
continuous_frequency = 3
target_frequency = 4

print("-"*75)
print(f"Continuous Frequency: {continuous_frequency} -> Target Frequency: {target_frequency}")
print("-"*75)
print(f"Continuous array: {continuous_array_int} / {continuous_array_float}")
print("Continuous array shape: ", continuous_array_int.shape)

reshaped_array_int = interpolate_signal(
        signal = continuous_array_int, # type: ignore
        signal_frequency = continuous_frequency,
        target_frequency = target_frequency
        )

reshaped_array_float = interpolate_signal(
        signal = continuous_array_float, # type: ignore
        signal_frequency = continuous_frequency,
        target_frequency = target_frequency
        )

print(f"\nScaled array: {reshaped_array_int} / {reshaped_array_float}")
print("Scaled array shape: ", reshaped_array_int.shape)

print("\n")

continuous_array_int = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
continuous_array_float = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype = float)
continuous_frequency = 5
target_frequency = 4

print("-"*75)
print(f"Continuous Frequency: {continuous_frequency} -> Target Frequency: {target_frequency}")
print("-"*75)
print(f"Continuous array: {continuous_array_int} / {continuous_array_float}")
print("Continuous array shape: ", continuous_array_int.shape)

reshaped_array_int = interpolate_signal(
        signal = continuous_array_int, # type: ignore
        signal_frequency = continuous_frequency,
        target_frequency = target_frequency
        )

reshaped_array_float = interpolate_signal(
        signal = continuous_array_float, # type: ignore
        signal_frequency = continuous_frequency,
        target_frequency = target_frequency
        )

print(f"\nScaled array: {reshaped_array_int} / {reshaped_array_float}")
print("Scaled array shape: ", reshaped_array_int.shape)

del reshaped_array_int, reshaped_array_float, continuous_array_int, continuous_array_float, continuous_frequency, target_frequency

---------------------------------------------------------------------------
Continuous Frequency: 3 -> Target Frequency: 4
---------------------------------------------------------------------------
Continuous array: [0 1 2 3 4 5] / [0. 1. 2. 3. 4. 5.]
Continuous array shape:  (6,)

Scaled array: [0 1 2 2 3 4 4 5] / [0.   0.75 1.5  2.25 3.   3.75 4.5  5.  ]
Scaled array shape:  (8,)


---------------------------------------------------------------------------
Continuous Frequency: 5 -> Target Frequency: 4
---------------------------------------------------------------------------
Continuous array: [0 1 2 3 4 5 6 7 8 9] / [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
Continuous array shape:  (10,)

Scaled array: [0 1 2 4 5 6 8 9] / [0.   1.25 2.5  3.75 5.   6.25 7.5  8.75]
Scaled array shape:  (8,)


### Splitting a signal which is too long for the neural network model

A signal which is too short will be padded with zeros. No big deal. On the other hand: A signal which is too 
long will be splitted into multiple signals. To create more data, the 10 hour range will be shifted along the
signal.

This shift should not be too small, to create redundant data but also not too big, because the more data the 
better. So we try to find a shift size close to 1 hour, which lets us shift an integer amount of times
easily. 

#### Finding optimal shift size

In [None]:
signal_length_addition_hours = 2.5
desired_length_hours = 10

optimal_shift_length = calculate_optimal_shift_length(
        signal_length_seconds = (desired_length_hours + signal_length_addition_hours) * 3600, # type: ignore
        desired_length_seconds = desired_length_hours*3600, 
        wanted_shift_length_seconds = 3600,
        absolute_shift_deviation_seconds = 1800,
        all_signal_frequencies = [4, 1, 1/30, 1/120]
)
print(optimal_shift_length)

print(f"Optimal shift length for signal which is {signal_length_addition_hours} hours longer than desired length of {desired_length_hours} hours: {round(optimal_shift_length/3600, 3)} hours")

3000
Optimal shift length for signal which is 2.5 hours longer than desired length of 10 hours: 0.833 hours


: 

#### Splitting Signal

The above function to find optimal shift length is embedded in the following split funtion. The optimal
shift size will be estimated for every signal individually.

If there is no integer shift size in range, that lets you shift the signal so, that you perfectly enclose the
last datapoints of the long signal, then the last shift will be altered so that it does.

In [None]:
# Create random signal
frequency = 4
length_signal_seconds = 12.1 * 3600
signal = np.random.rand(int(length_signal_seconds * frequency))

# Only important parameters here:
nn_signal_seconds = 10 * 3600
shift_length_seconds = 3600
absolute_shift_deviation_seconds = 1800

signals_from_splitting, shift_length = split_long_signal(
        signal = signal, # type: ignore
        sampling_frequency = frequency,
        target_frequency = frequency,
        nn_signal_duration_seconds = nn_signal_seconds,
        wanted_shift_length_seconds = shift_length_seconds,
        absolute_shift_deviation_seconds = absolute_shift_deviation_seconds
        )

print("Shift length:", shift_length)
print(f"Shift length: {shift_length / frequency} seconds")
print("Signal shape: ", signal.shape)
print(f"Datapoints in NN: {nn_signal_seconds * frequency}")
print("Signals from splitting shape: ", list_shape(signals_from_splitting))

del signals_from_splitting, signal, shift_length, frequency, nn_signal_seconds, shift_length_seconds, absolute_shift_deviation_seconds

#### Splitting signals within dictionary

In [None]:
# Create random signal
length_signal_seconds = 12.1 * 3600
rri_frequency = 4
mad_frequency = 1
rri_signal = np.random.rand(int(length_signal_seconds * rri_frequency))
mad_signal = np.random.rand(int(length_signal_seconds * mad_frequency))

data_dict = {
    "ID": "1",
    "RRI": rri_signal,
    "RRI_frequency": rri_frequency,
    "MAD": mad_signal,
    "MAD_frequency": mad_frequency,
}

new_dictionaries = split_signals_within_dictionary(
    data_dict = data_dict,
    id_key = "ID",
    valid_signal_keys = ["RRI", "MAD"],
    signal_frequencies = [rri_frequency, mad_frequency],
    signal_target_frequencies = [rri_frequency, mad_frequency],
    nn_signal_duration_seconds = 10 * 3600,
    wanted_shift_length_seconds = 3600,
    absolute_shift_deviation_seconds = 1800,
    all_signal_frequencies = [rri_frequency, mad_frequency]
)

print("Original dictionary:")
print("-"*20)
for key, value in data_dict.items():
    if key == "RRI" or key == "MAD" or key == "SLP":
        print(f"{key}: {value.shape}")
    else:
        print(f"{key}: {value}")
print("\nNew dictionaries:")
print("-"*20)
for new_dict in new_dictionaries:
    for key, value in new_dict.items():
        if key == "RRI" or key == "MAD" or key == "SLP":
            print(f"{key}: {value.shape}")
        else:
            print(f"{key}: {value}")
    print("")

Original dictionary:
--------------------
ID: 1
RRI: (174240,)
RRI_frequency: 4
MAD: (43560,)
MAD_frequency: 1

New dictionaries:
--------------------
ID: 1
RRI: (144000,)
RRI_frequency: 4
MAD: (36000,)
MAD_frequency: 1
shift_length_seconds: 3780

ID: 1_shift_x1
RRI: (144000,)
RRI_frequency: 4
MAD: (36000,)
MAD_frequency: 1
shift_length_seconds: 3780

ID: 1_shift_x2
RRI: (144000,)
RRI_frequency: 4
MAD: (36000,)
MAD_frequency: 1
shift_length_seconds: 3780



#### Fusing signals back together

In [None]:
signals_from_splitting, shift_length = split_long_signal(
        signal = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21], # type: ignore
        sampling_frequency = 1,
        target_frequency = 1,
        nn_signal_duration_seconds = 10,
        wanted_shift_length_seconds = 5,
        absolute_shift_deviation_seconds = 1,
        all_signal_frequencies = [1]
        )

print("Splitted Signals:\n", signals_from_splitting)

fused_signal = fuse_splitted_signals(
    signals = signals_from_splitting, # type: ignore
    shift_length = int(shift_length), # type: ignore
    signal_type = "feature"
)

print("\nFused signal:\n", fused_signal)

Splitted Signals:
 [array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), array([ 6,  7,  8,  9, 10, 11, 12, 13, 14, 15]), array([12, 13, 14, 15, 16, 17, 18, 19, 20, 21])]

Fused signal:
 [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21]


#### Fusing splitted dictionaries

In [None]:
data_dict = {
    "ID": "1",
    "RRI": np.array([0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15, 15, 16, 16, 17, 17, 18, 18, 19, 19, 20, 20, 21, 21, 22, 22]),
    "RRI_frequency": 2,
    "MAD": np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]),
    "MAD_frequency": 1,
    "SLP": np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
    "SLP_frequency": 0.5
}

new_dictionaries = split_signals_within_dictionary(
    data_dict = data_dict,
    id_key = "ID",
    valid_signal_keys = ["RRI", "MAD", "SLP"],
    signal_frequencies = [2, 1, 0.5],
    signal_target_frequencies = [2, 1, 0.5],
    nn_signal_duration_seconds = 10,
    wanted_shift_length_seconds = 5,
    absolute_shift_deviation_seconds = 2,
    all_signal_frequencies = [2, 1, 0.5]
)

print("Original dictionary:")
print("-"*20)
for key, value in data_dict.items():
    if key == "RRI" or key == "MAD" or key == "SLP":
        print(f"{key}: {value.shape}")
    else:
        print(f"{key}: {value}")

Original dictionary:
--------------------
ID: 1
RRI: (46,)
RRI_frequency: 2
MAD: (23,)
MAD_frequency: 1
SLP: (12,)
SLP_frequency: 0.5


In [None]:
print("\nNew dictionaries:")
print("-"*20)
for new_dict in new_dictionaries:
    for key, value in new_dict.items():
        print(f"{key}: {value}")
    print("")


New dictionaries:
--------------------
ID: 1
RRI: [0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9]
RRI_frequency: 2
MAD: [0 1 2 3 4 5 6 7 8 9]
MAD_frequency: 1
SLP: [0 1 2 3 4]
SLP_frequency: 0.5
shift_length_seconds: 6

ID: 1_shift_x1
RRI: [ 6  6  7  7  8  8  9  9 10 10 11 11 12 12 13 13 14 14 15 15]
RRI_frequency: 2
MAD: [ 6  7  8  9 10 11 12 13 14 15]
MAD_frequency: 1
SLP: [3 4 5 6 7]
SLP_frequency: 0.5
shift_length_seconds: 6

ID: 1_shift_x2
RRI: [12 12 13 13 14 14 15 15 16 16 17 17 18 18 19 19 20 20 21 21]
RRI_frequency: 2
MAD: [12 13 14 15 16 17 18 19 20 21]
MAD_frequency: 1
SLP: [ 6  7  8  9 10]
SLP_frequency: 0.5
shift_length_seconds: 6

ID: 1_shift_x3
RRI: [18 18 19 19 20 20 21 21 22 22]
RRI_frequency: 2
MAD: [18 19 20 21 22]
MAD_frequency: 1
SLP: [ 9 10 11]
SLP_frequency: 0.5
shift_length_seconds: 6



In [None]:
fused_dictionary = fuse_splitted_signals_within_dictionaries(
    data_dictionaries = new_dictionaries,
    valid_signal_keys = ["RRI", "MAD", "SLP"],
    valid_signal_frequencies = [2, 1, 0.5],
)

for key, value in fused_dictionary.items():
    print(f"{key}: {value}")

ID: 1
RRI: [ 0  0  1  1  2  2  3  3  4  4  5  5  6  6  7  7  8  8  9  9 10 10 11 11
 12 12 13 13 14 14 15 15 16 16 17 17 18 18 19 19 20 20 21 21 22 22]
MAD: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22]
SLP: [ 0  1  2  3  4  5  6  7  8  9 10  9 10 11]
RRI_frequency: 2
MAD_frequency: 1
SLP_frequency: 0.5


### Reading a .h5 - file

One of the available training datasets for our neural network model are stored in a .h5 file. So we need
to be able to read it. These are the important operations:

In [None]:
shhs_dataset = h5py.File("Raw_Data/SHHS_dataset.h5", 'r')
patients = list(shhs_dataset['slp'].keys()) # type: ignore

random_patient = patients[np.random.randint(0, len(patients))]
print(f"Random patient: {random_patient}")

print(np.unique(shhs_dataset["slp"][random_patient][:])) # type: ignore

for key in ["slp", "rri"]:
    print(f"\nkey: {key}")

    patient_data = shhs_dataset[key][random_patient][:] # type: ignore
    print(f"Data shape: {patient_data.shape}") # type: ignore

    data_freq = shhs_dataset[key].attrs["freq"] # type: ignore
    print(f"Data frequency: {data_freq}")
    print(f"Inverse data frequency: {1/data_freq}") # type: ignore

    print(f"Data length: {patient_data.shape[0]/data_freq} s") # type: ignore

del shhs_dataset, patients, random_patient, key, patient_data, data_freq

Random patient: 201966_1
[0 1 2 3 5]

key: slp
Data shape: (747,)
Data frequency: 0.03333333333333333
Inverse data frequency: 30.0
Data length: 22410.0 s

key: rri
Data shape: (89640,)
Data frequency: 4
Inverse data frequency: 0.25
Data length: 22410.0 s


### Divide up a signal into overlapping windows

The hardest thing about this is, that 'window_overlap' and 'datapoints_per_window' must be chosen so that
the whole signal fits perfectly into n windows. 

Additionally, those values must be integers. This means that 'window_duration_seconds' and 'overlap_seconds'
multiplied with 'target_fequency' as well as 'sampling_frequency' must be integers. (The features and the target labels
must fit equally well into the windows, so that we can find the correlation between a feature- and target- window.)

We have the RRI and MAD values as features and the sleep phase as target classification. As we will see,
RRI and MAD values were recorded with an integer sampling frequency. While the sampling frequency of the 
sleep classification is 1/30. 

Finding window parameters that fullfill the conditions mentioned is easier than it sounds. We will always pass data
to the neural network that is 10 hours long. Now, we just need to think in seconds and find integer values
for 'window_duration_seconds' and 'overlap_seconds' that are a multiple of 30:

#### Finding optimal window_parameters:

In [None]:
find_suitable_window_parameters(
        signal_length = 10 * 3600,
        number_windows_range = (1000, 1400),
        window_size_range = (120, 180),
        minimum_window_size_overlap_difference = 30
    )

Suitable window parameters for signal of length: 36000:
-------------------------------------------------------
Number of windows: 1025, Window size: 160, Overlap: 125.0
Number of windows: 1026, Window size: 125, Overlap: 90.0
Number of windows: 1055, Window size: 164, Overlap: 130.0
Number of windows: 1056, Window size: 130, Overlap: 96.0
Number of windows: 1087, Window size: 162, Overlap: 129.0
Number of windows: 1088, Window size: 129, Overlap: 96.0
Number of windows: 1121, Window size: 160, Overlap: 128.0
Number of windows: 1122, Window size: 128, Overlap: 96.0
Number of windows: 1157, Window size: 164, Overlap: 133.0
Number of windows: 1158, Window size: 133, Overlap: 102.0
Number of windows: 1196, Window size: 150, Overlap: 120.0
Number of windows: 1197, Window size: 120, Overlap: 90.0


Our options are:

Number of windows: 1196, Window size: 150, Overlap: 120.0 \
Number of windows: 1197, Window size: 120, Overlap: 90.0

We will choose the latter, because we don't want the window_size to be too large.

#### Classification Signal

When transforming a classification signal into windows, which is supposed to be the target in the neural 
network, then each window will only be represented by the most common sleep stage. If there is a tie
between the labels, then the one with the highest priority will be chosen 

In [None]:
signal_length_seconds = 10 * 3600
frequency = 1/30
signal_length = int(signal_length_seconds * frequency)

signal = np.array([random.randint(0, 5) for _ in range(signal_length)])

signal_in_windows = signal_to_windows(
    signal = signal, # type: ignore
    datapoints_per_window = int(120 * frequency),
    window_overlap = int(90 * frequency),
    signal_type = "target",
    priority_order = [0, 1, 2, 3, 4, 5, -1]
    )

print(f"Signal shape: {signal.shape}")
print(f"Signal in windows shape: {signal_in_windows.shape}")

del signal, signal_in_windows, signal_length_seconds, frequency, signal_length

Signal shape: (1200,)
Signal in windows shape: (1197,)


#### Continuous Signal

In [None]:
signal = np.random.rand(36000)

signal_in_windows = signal_to_windows(
    signal = signal, # type: ignore
    datapoints_per_window = 120,
    window_overlap = 90,
    signal_type = "feature"
    )

print(f"Signal shape: {signal.shape}")
print(f"Signal in windows shape: {signal_in_windows.shape}")

del signal, signal_in_windows

Signal shape: (36000,)
Signal in windows shape: (1197, 120)


#### Reshape Signal

The following function will be applied to transform a signal into overlapping windows. It will make sure
that the data is passed correctly to the function mentioned above. 

This means it will:
- check if 'number_nn_datapoints', 'datapoints_per_window' and 'window_overlap' are integers
- check if 'datapoints_per_window' and 'window_overlap' perfectly fit into 'number_nn_datapoints'
- compare length of provided signal to length of signal in nn ('number_nn_datapoints')
    - if smaller: Pad with Zeros
    - if bigger: Print warning, but continue by cropping last datapoints
- check if signal transformed to windows has the right shape

In [None]:
random_array = np.random.rand(36000)
reshaped_array = reshape_signal_to_overlapping_windows(
    signal = random_array, # type: ignore
    target_frequency = 4, 
    number_windows = 1197, 
    window_duration_seconds = 120, 
    overlap_seconds = 90,
    signal_type = "feature",
    nn_signal_duration_seconds = 10*3600,
    )

print(f"Random array shape: {random_array.shape}")
print(f"Reshaped array shape: {reshaped_array.shape}")

random_array = np.array([random.randint(0, 3) for _ in range(int(36000/30))])
reshaped_array = reshape_signal_to_overlapping_windows(
    signal = random_array, # type: ignore
    target_frequency = 1/30, 
    number_windows = 1197, 
    window_duration_seconds = 120, 
    overlap_seconds = 90,
    signal_type = "target",
    nn_signal_duration_seconds = 10*3600,
    )

print(f"Random array shape: {random_array.shape}")
print(f"Reshaped array shape: {reshaped_array.shape}")

del random_array, reshaped_array

Random array shape: (36000,)
Reshaped array shape: (1197, 480)
Random array shape: (1200,)
Reshaped array shape: (1197,)


#### Reverse Reshape

Reversing Reshape of feature:

In [None]:
print("Original signal:")
test = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print(test)

print("\nSignal reshaped to overlapping windows:")
reshaped_test = reshape_signal_to_overlapping_windows(
    signal = test,
    target_frequency = 1,
    nn_signal_duration_seconds = 16,
    number_windows = 12,
    window_duration_seconds = 5,
    overlap_seconds = 4,
    signal_type = "feature"
    )
print(reshaped_test)

print("\nLast window when padding was cropped:")
cropped_padding = remove_padding_from_windows(
    signal_in_windows = copy.deepcopy(reshaped_test), # type: ignore
    target_frequency = 1,
    original_signal_length = 10,
    window_duration_seconds = 5, 
    overlap_seconds = 4,
    )
print(cropped_padding[-1])

print("\nSignal reshaped back to original:")
reversed_test = reverse_signal_to_windows_reshape(
    signal_in_windows = reshaped_test, # type: ignore
    target_frequency = 1,
    original_signal_length = 10,
    number_windows = 12,
    window_duration_seconds = 5,
    overlap_seconds = 4
    )
print(reversed_test)

Original signal:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Signal reshaped to overlapping windows:
[[ 1  2  3  4  5]
 [ 2  3  4  5  6]
 [ 3  4  5  6  7]
 [ 4  5  6  7  8]
 [ 5  6  7  8  9]
 [ 6  7  8  9 10]
 [ 7  8  9 10  0]
 [ 8  9 10  0  0]
 [ 9 10  0  0  0]
 [10  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]]

Last window when padding was cropped:
[10  0  0  0  0]

Signal reshaped back to original:
[ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10.]


The sleep stage labels were reshaped differently, as we only keep one label for each window and therefore won't
create a 2d array. 

After predicting the sleep stage labels, we will transform them into a 2d array, that is computable by our 
reverse reshape function. Effectively, we will create an array from each label, containing only the label as
elements:

In [None]:
print("Original signal:")
test = [1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3]
print(test)
reshaped_test = reshape_signal_to_overlapping_windows(
    signal = test,
    target_frequency = 1/3,
    nn_signal_duration_seconds = 36,
    number_windows = 9,
    window_duration_seconds = 12,
    overlap_seconds = 9,
    signal_type = "target"
    )

print("\nSignal reshaped to overlapping windows:")
print(reshaped_test)

expanded_reshaped_test = []
for slp_stg in reshaped_test:
    expanded_reshaped_test.append([slp_stg for _ in range(int(12 * 1/3))])

print("\nExpanded signal:")
print(expanded_reshaped_test)

reversed_test = reverse_signal_to_windows_reshape(
    signal_in_windows = expanded_reshaped_test, # type: ignore
    target_frequency = 1/3, # type: ignore
    original_signal_length = 12,
    number_windows = 9,
    window_duration_seconds = 12,
    overlap_seconds = 9
    )

print("\nExpanded signal reshaped to original:")
print(reversed_test)
print([round(i) for i in reversed_test])

Original signal:
[1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3]

Signal reshaped to overlapping windows:
[1 1 1 2 2 2 3 3 3]

Expanded signal:
[[1, 1, 1, 1], [1, 1, 1, 1], [1, 1, 1, 1], [2, 2, 2, 2], [2, 2, 2, 2], [2, 2, 2, 2], [3, 3, 3, 3], [3, 3, 3, 3], [3, 3, 3, 3]]

Expanded signal reshaped to original:
[1.   1.   1.   1.25 1.5  1.75 2.25 2.5  2.75 3.   3.   3.  ]
[1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]


### Normalize Signal

The implemented unity normalization function can either normalize a multi-dimensional array across all
arrays (global) or normalize each array indivudally (local).

In [None]:
one_dimensional = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
two_dimensional = np.array([[0, 2, 4], [4, 5, 6], [6, 8, 10]])
three_dimensional = np.array([[[0, 1, 2], [3, 4, 5]], [[6, 7, 8], [8, 9, 10]]])

In [None]:
message = "Normalization_Mode: \'global\'"
print(message)
print("-"*len(message))
print("\nNormalized One dimensional array:")
print(unity_based_normalization(
        signal = one_dimensional, # type: ignore
        normalization_max = 1,
        normalization_min = 0,
        normalization_mode = "global"
    ))
print("\nNormalized Two dimensional array:")
print(unity_based_normalization(
        signal = two_dimensional, # type: ignore
        normalization_max = 1,
        normalization_min = 0,
        normalization_mode = "global"
    ))
print("\nNormalized Three dimensional array:")
print(unity_based_normalization(
        signal = three_dimensional, # type: ignore
        normalization_max = 1,
        normalization_min = 0,
        normalization_mode = "global"
    ))

Normalization_Mode: 'global'
----------------------------

Normalized One dimensional array:
[0.  0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]

Normalized Two dimensional array:
[[0.  0.2 0.4]
 [0.4 0.5 0.6]
 [0.6 0.8 1. ]]

Normalized Three dimensional array:
[[[0.  0.1 0.2]
  [0.3 0.4 0.5]]

 [[0.6 0.7 0.8]
  [0.8 0.9 1. ]]]


In [None]:
message = "Normalization_Mode: \'local\'"
print(message)
print("-"*len(message))
print("\nNormalized One dimensional array:")
print(unity_based_normalization(
        signal = one_dimensional, # type: ignore
        normalization_max = 1,
        normalization_min = 0,
        normalization_mode = "local"
    ))
print("\nNormalized Two dimensional array:")
print(unity_based_normalization(
        signal = two_dimensional, # type: ignore
        normalization_max = 1,
        normalization_min = 0,
        normalization_mode = "local"
    ))
print("\nNormalized Three dimensional array:")
print(unity_based_normalization(
        signal = three_dimensional, # type: ignore
        normalization_max = 1,
        normalization_min = 0,
        normalization_mode = "local"
    ))

Normalization_Mode: 'local'
---------------------------

Normalized One dimensional array:
[0.  0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]

Normalized Two dimensional array:
[[0.  0.5 1. ]
 [0.  0.5 1. ]
 [0.  0.5 1. ]]

Normalized Three dimensional array:
[[[0.  0.5 1. ]
  [0.  0.5 1. ]]

 [[0.  0.5 1. ]
  [0.  0.5 1. ]]]


### Alter Sleep Labels

Following function makes sure to keep labels unfiform.

In [None]:
slp = np.array([-2, -1, 0, 1, 2, 3, 4, 5, 6, 7])
print(slp)

current_labels = {"wake": [0, 1], "LS": [2], "DS": [3], "REM": [5], "artifect": ["other"]}
desired_labels = {"wake": 0, "LS": 1, "DS": 2, "REM": 3, "artifect": -1}

print(alter_slp_labels(
        slp_labels = slp, # type: ignore
        current_labels = current_labels,
        desired_labels = desired_labels,
))

[-2 -1  0  1  2  3  4  5  6  7]
[-1 -1  0  0  1  2 -1  3 -1 -1]


In [None]:
slp = np.array(["light_sleep", "deep_sleep", "deep_sleep_2", "WAKE", "REM", "bla", "blub"])
print(slp)

current_labels = {"wake": ["WAKE"], "LS": ["light_sleep"], "DS": ["deep_sleep", "deep_sleep_2"], "REM": ["REM"], "artifect": ["other"]}
desired_labels = {"wake": 0, "LS": 1, "DS": 2, "REM": 3, "artifect": -1}

print(alter_slp_labels(
        slp_labels = slp, # type: ignore
        current_labels = current_labels,
        desired_labels = desired_labels,
))

['light_sleep' 'deep_sleep' 'deep_sleep_2' 'WAKE' 'REM' 'bla' 'blub']
['1' '2' '2' '0' '3' '-1' '-1']


Label Transformation from previous (not mine) Sleep Stage Classification:

In [None]:
slp = np.array([-2, -1, 0, 1, 2, 3, 4, 5, 6, 7])
print(slp)

slp[slp>=1] = slp[slp>=1] - 1
slp[slp==4] = 3
slp[slp==5] = 0
slp[slp==-1] = 0 # set artifact as wake stage

print(slp)

[-2 -1  0  1  2  3  4  5  6  7]
[-2  0  0  0  1  2  3  3  0  6]
