In [8]:
# # no need for this since it has nothing outside classes and functions
# print(__name__)
# if __name__ == "__main__" and hasattr(__builtins__,'__IPYTHON__') and ('google.colab' in str(get_ipython())):
#     from google.colab import drive
#     drive.mount('/content/drive')
#     %cd /content/drive/MyDrive/PressureReliefWorkArea/SummerWork/
#     !ls

In [9]:
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch
# import torch.nn as nn
# import torch.nn.functional as F
# import torch.optim as optim
import numpy as np
from collections.abc import Iterable

%run -n HelperFunctions.ipynb
# import ipynb
# from ipynb.fs.full.HelperFunctions import *

This code loads your CSV file, splits the data into a training set and a test set, and creates a DataLoader for each. The DataLoader can be used to iterate through the data in batches, which is useful for training a neural network.

You can replace 'yourfile.csv' with the path to your actual file. Also, note that this assumes your CSV file doesn't have a header. If it does, you might need to skip the first row.

In [10]:
class JFSKAccelDataset(Dataset):
    def __init__(self, data, labels, sequence_length=None):
        # if(labels == None):
        #     self.data = data.data
        #     self.labels = data.labels
        #     self.sequence_length = data.sequence_length
        #     return
        self.data = data
        self.labels = labels
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx].transpose(0, 1), self.labels[idx]  # Transposing the sequence and channel dimensions
        
        
    def group(self):
        self.data = [self.data[i:i+self.sequence_length] for i in range(len(self.data) - self.sequence_length + 1)]
        self.labels = self.labels[(int)(self.sequence_length/2) - 1 : len(self.labels) - (self.sequence_length - (int)(self.sequence_length/2))]
        # change to get the majority

In [11]:
class SKInputConverter:
    def __init__(self, dataframe, classtype, bufferpref = "Inner"):
        self.dataframe = dataframe
        SKDescriptors.validate_class_type(classtype)
        self.classtype = classtype
        self.bufferpref = bufferpref
        self.inputnum = SKDescriptors.NUM_OF_INPUTS_PER_TYPE[classtype]

    def result(self):
        return self.dataframe, self.inputnum

    def diff(self):
        pass
        return self

    def remove_outliers(self, rem_type = None, rem_func = None, *args, **kwargs):
        pass
        return self

    def normalize(self, norm_type = None, norm_func = None, *args, **kwargs):
        pass
        return self

    def combine(self, comb_type = None, comb_func = None, *args, **kwargs):
        if comb_type != None or comb_func != None or args or kwargs:
            raise NotImplementedError(f"SKInputConverter.combine() has no implemented parameters")
        self.dataframe = self.dataframe.iloc[:,:self.inputnum].apply(np.linalg.norm).join(self.dataframe.iloc[:,self.inputnum:])
        self.dataframe.columns = pd.Index(np.arange(len(self.dataframe.columns) - self.inputnum + 1))
        self.inputnum = 1
        return self

In [12]:
class JFSKLoader:
    def __init__(self, file_path, sequence_length = None, repress_classes = True, *args, **kwargs):


        # 1. open file

        # Gather file info
        # self.file_directory, self.beginning_descriptors, self.file_name, self.ending_descriptors, self.file_extension, self.specifier_values = SKFileNameHandler.read_data_file_name(file_path)
        self.file_directory, _, _, _, file_extension, self.specifier_values = SKFileNameHandler.read_data_file_name(file_path)
        classification_type = self.specifier_values[SKDescriptors.CLASSIFICATION_TYPE_FS]
        input_num = SKDescriptors.NUM_OF_INPUTS_PER_TYPE[classification_type]

        match file_extension:
            case ".csv":
                dataframe = pd.read_csv(file_path)
                # code test file: Data/Week 1/Left then Right/Processed/Type3-Freq10-Labeled_Motion-sessions_2023-08-26_17-25-54.csv
                # classifier training file: Data/COMBINED_Type3-Freq10-Labeled_Motion-sessions_23-24_Fall.csv
            case _:
                raise NotImplementedError(f"JFSKLoader is not equipped to open {file_extension} files.")


        # 2. split dataset into data and labels

        # Get data and labels from dataframe
        data = dataframe.iloc[:, :input_num].to_numpy()  # x, y, z data
        labels = dataframe.iloc[:, input_num:].to_numpy()  # labels


        # 3. make adjustments related to the data
        # THIS is where we would use SKInputConverter


        # 4. group
        self.sequence_length = sequence_length
        self.g_dataset = JFSKAccelDataset(data, labels, sequence_length)
        if sequence_length is not None:
            self.g_dataset.group()


        # 5. make adjustments related to the labels
        if repress_classes:
        # if args and args[0]:
        #     if len(args) > 1:
        #         self.repress_classes(*(args[1:]), **kwargs)
        #     else:
        #         self.repress_classes(**kwargs)
        # elif kwargs.pop("repress_classes", False):
            self.repress_classes(*args, **kwargs)


        # 6. randomize
        data_train, data_test, labels_train, labels_test = train_test_split(self.g_dataset.data, self.g_dataset.labels, test_size=0.2, random_state=42)


        # 7. create dataloader


        # Convert data to tensors
        data_train = torch.tensor(np.array(data_train), dtype=torch.float32)  
        data_test = torch.tensor(np.array(data_test), dtype=torch.float32)

        # Convert labels to tensors and get max index (assuming one-hot encoding)
        labels_train = torch.argmax(torch.tensor(np.array(labels_train), dtype=torch.float32), dim=1)
        labels_test = torch.argmax(torch.tensor(np.array(labels_test), dtype=torch.float32), dim=1)

        # Create data loaders
        train_dataset = JFSKAccelDataset(data_train, labels_train, self.sequence_length)
        test_dataset = JFSKAccelDataset(data_test, labels_test, self.sequence_length)

        self.train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        self.test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)




    # target_classes = ["Other", "Stationary"], rep_format = 'str', rep_func = np.mean, skip_repressed = True, apply_to_all = False, *args, **kwargs
    def repress_classes(self, target_classes = None, rep_format = 'str', rep_func = np.mean, skip_repressed = True, apply_to_all = False, *args, **kwargs):
        classification_type = self.specifier_values[SKDescriptors.CLASSIFICATION_TYPE_FS]
        
        # NOTE: the logic here only works for one-hot vectors
        if not SKDescriptors.NUM_OF_OUTPUTS_PER_TYPE[classification_type] == 1:
            raise NotImplementedError(f"JFSKLoader.repress_classes() with repress_stationary=True may not be equipped to fairly sample stationary data for classification types with outputs that are not one-hot vectors.")

        if not callable(rep_func):
            raise NotImplementedError(f"JFSKLoader.repress_classes()'s rep_func must currently be a function\
                                      \n\tIf you want more complex logic where you'd test for a string or something, feel free to alter the code")

        if target_classes is None:
            target_classes = [SKDescriptors.OTHER_TAG, SKDescriptors.STATIONARY_TAG]
            rep_format = 'tag'

        

        temp_list = []
        match rep_format:
            case 'str' | 'cts':
                for i, cts in enumerate(SKDescriptors.CTS_PER_TYPE[classification_type]):
                    if cts in target_classes:
                        temp_list.append(i)

                if len(temp_list) != len(target_classes):
                    print(f"\nWARNING: The following target_classes were not kept as they do not correspond to classification_type '{classification_type}':\n")
                    for cts in target_classes:
                        if not any(cts == SKDescriptors.CTS_PER_TYPE[classification_type][i] for i in temp_list):
                            print(f"\t{cts}\n")

            case 'tag':
                for i, cts in enumerate(SKDescriptors.CTS_PER_TYPE[classification_type]):
                    for tag in target_classes:
                        if tag in cts:
                            temp_list.append(i)

            case 'num' | 'int':
                forwarn_active = False
                for i in target_classes:
                    if i in np.arange(SKDescriptors.NUM_OF_CLASSES_PER_TYPE[classification_type]):
                        temp_list.append(i)
                        continue

                    if not forwarn_active:
                        print(f"\nWARNING: The following target_classes were not kept as they do not correspond to classification_type '{classification_type}':\n")
                        forwarn_active = True
                    print(f"\t{i}\n")
                
            case _ :
                raise AssertionError("You can only pass 'str', 'tag', 'cts', 'num', or 'int' into the rep_format parameter of JFSKLoader.repress_classes()\
                                     \n\t\t'str' is used when you want to pass the class names (as strings) of classes it should repress\
                                     \n\t\t'tag' is used when you want to pass the class tags (as strings) it should repress\
                                     \n\t\t'cts' is used when you want to pass the classes (as ClassTagSets) it should repress\
                                     \n\t\t'int' and 'num' are used when you want to pass the indices of the classes it should repress")
            
        target_classes = temp_list

        

        if skip_repressed:
            accounted = np.delete(self.g_dataset.labels, target_classes, axis=1)
        else:
            accounted = self.g_dataset.labels

        if apply_to_all:
            adjusted = np.arange(SKDescriptors.NUM_OF_CLASSES_PER_TYPE[classification_type])
        else:
            adjusted = target_classes

        count_ones = lambda x: dict(zip(*np.unique(x, return_counts = True))).get(1, 0)
        class_counts = np.apply_along_axis(count_ones, 0, accounted)
        adjustment_height = int(rep_func(class_counts, *args, **kwargs))
        print(adjustment_height)

        for i in adjusted:
            # class_i_rows_indexed = np.nonzero(self.g_dataset.labels[i][:])
            class_i_rows_indexed = np.nonzero(np.transpose(self.g_dataset.labels)[i][:])
            del_rows_not_indexed = np.random.randint(len(class_i_rows_indexed), size = max(0, len(class_i_rows_indexed) - adjustment_height))
            print(len(class_i_rows_indexed))
            print(np.ndarray(del_rows_not_indexed))
            if del_rows_not_indexed:
                del_rows_indexed = class_i_rows_indexed[del_rows_not_indexed]
            else:
                del_rows_indexed = del_rows_not_indexed
            print(i)
            print(np.shape(self.g_dataset.labels))
            self.g_dataset.data = np.delete(self.g_dataset.data, del_rows_indexed, axis = 0)
            self.g_dataset.labels = np.delete(self.g_dataset.labels, del_rows_indexed, axis = 0)
            print(np.shape(self.g_dataset.labels))


    def count_instances(series):
        return dict(zip(*np.unique(series, return_counts = True))).get(1, 0)

        # ci_arr = np.unique(series, return_counts = True)
        # return dict(zip(ci_arr[0], ci_arr[1])).get(1, 0)


In [13]:
class SKLabelConverter:

    VALID_TYPE_CONVERSIONS = (
        (3, 5),
    )
    OVERRIDE_TYPE_VALIDATION = (
        # This is here only for conversions that
            # fail validate_class_type_conversion() but not for simple reasons
            # (simple reasons like accidentally choosing the wrong input/output types
            # or not listing the correct values in the below dictionaries)
        # If you add an entry here you may have to change logic of other parts of the code
            # for instance if the number of columns of the output spreadsheet will be more
            # than the input spreadsheet, you may have to change the dataframe.drop line at the end
    )



    # Currently these do nothing. If we later change how we want the buffers to function
        # (not the length of the buffers but which buffers overlap into other classes),
        # we will be able to do so using this
    VALID_BUFFER_TYPE_CONVERSIONS = ()
    OVERRIDE_BUFFER_TYPE_VALIDATION = (
        # This is here only for BufferType conversions that
            # fail validate_buffer_type_conversion() but not for simple reasons
            # (simple reasons like accidentally choosing the wrong input/output types
            # or not listing the correct values in the above dictionaries)
        # If you add an entry here you may have to change logic of other parts of the code
    )



    def __init__(self, labeled_data_file = None, *args):
        # if not all(n == Converter.NUM_OF_LABEL_TYPES for n in (len(Converter.NUM_OF_INPUTS_PER_TYPE), len(Converter.NUM_OF_CLASSES_PER_TYPE), len(Converter.NUM_OF_OUTPUTS_PER_TYPE))):
        #     print("Converter is not usable if defining dictionaries do not match corresponding dictionaries in size.")
        #     print("Fix and rerun the code to use the converter")
        #     return
        if labeled_data_file is not None:
            self.input_directory, self.input_beginning_descriptors, self.input_file_name, self.input_ending_descriptors, self.input_file_extension, self.input_specifiers = SKFileNameHandler.read_data_file_name(labeled_data_file)
            self.output_label_type = -1
            self.output_freq = -1
            self.output_buffer_type = -1
            self.output_buffer_num = -1
        else:
            self.input_specifiers = args[0]

        self.type_validated = False
        self.buffer_num_validated = False



    def validate_label_type_conversion(self, input_label_type, output_label_type):
        # validating type values' consistency
        has_valid_types = SKDescriptors.validate_class_type(input_label_type) and SKDescriptors.validate_class_type(output_label_type)
        # input_label_type matches self.input_label_type
        matches_input_file = self.input_specifiers.get(SKDescriptors.CLASSIFICATION_TYPE_FS, -1) == input_label_type
        # input_label_type corresponds to WithClassNum value stored in self
        consistent_with_class_num = self.input_specifiers.get(SKDescriptors.WITH_CLASS_NUMBER_FS, -1) == SKDescriptors.NUM_OF_CLASSES_PER_TYPE[input_label_type]
        # has valid values (compared to type dictionaries and the conversion file)
        has_consistent_values = has_valid_types and matches_input_file and consistent_with_class_num

        # validating conversion logic
        # is a type conversion for which someone implemented the logic
        is_listed = (input_label_type, output_label_type) in SKLabelConverter.VALID_TYPE_CONVERSIONS #.get((input_label_type, output_label_type), False)
        # is a logical input type conversion
        is_not_to_more_inputs = SKDescriptors.NUM_OF_INPUTS_PER_TYPE.get(input_label_type, 0) >= SKDescriptors.NUM_OF_INPUTS_PER_TYPE.get(output_label_type, 1)
        # is a logical output type conversion when only one class chosen at a time
        is_to_fewer_classes = SKDescriptors.NUM_OF_CLASSES_PER_TYPE.get(input_label_type, 0) > SKDescriptors.NUM_OF_CLASSES_PER_TYPE.get(output_label_type, 0)
        # may be logical output type conversion if we are converting from
            # an output with a "multi-hot" vector to an input with a one-hot vector
        input_is_not_one_hot_type = SKDescriptors.NUM_OF_OUTPUTS_PER_TYPE.get(input_label_type, 1) != 1
        # all conversion logic between types is sound
        has_valid_logic = is_listed and is_not_to_more_inputs and (is_to_fewer_classes or input_is_not_one_hot_type)

        # allowing override
        # the others are to keep someone from accidentally making a "bad conversion,"
            # but this one is to allow more-complex conversions that are possible,
            # given that someone manually listed the conversion in OVERRIDE_TYPE_VALIDATION
        # this does not override the conversion if the file is incorrect or if the types are invalid
        is_overridden = (input_label_type, output_label_type) in SKLabelConverter.OVERRIDE_TYPE_VALIDATION #.get((input_label_type, output_label_type), False)

        return has_consistent_values and (has_valid_logic or is_overridden)


    def validate_buffer_num_conversion(self, input_buffer_num, output_buffer_num):
        # validate consistency
        # both buffer nums are non-negative
        has_nonnegative_buffer_nums = input_buffer_num >= 0 and output_buffer_num >= 0
        # input_buffer_num matches self.input_buffer_num
        matches_input_file = self.input_specifiers.get(SKDescriptors.BUFFER_NUMBER_FS, -1) == input_buffer_num
        # BufferType is valid
        has_valid_buffer_type = self.input_specifiers.get(SKDescriptors.BUFFER_TYPE_FS, 0) in np.arange(1, SKDescriptors.NUM_OF_BUFFER_TYPES + 1)
        # combining
        has_consistent_values = has_nonnegative_buffer_nums and matches_input_file and has_valid_buffer_type
        # returning
        return has_consistent_values


    def set_label_type_conversion(self, input_label_type, output_label_type):
        #NOTE that types have not yet been implemented as tuple labels
        if(not self.validate_label_type_conversion(input_label_type, output_label_type)):
            print(f"Current object/class definitions prohibit the conversion from Type {input_label_type} to Type {output_label_type}.")
            self.type_validated = False
            return
        #self.input_label_type = input_label_type
        self.output_label_type = output_label_type
        self.type_validated = True


    def set_buffer_num_conversion(self, input_buffer_num, output_buffer_num):
        if(not self.validate_buffer_num_conversion(input_buffer_num, output_buffer_num)):
            print(f"Current object/class definitions prohibit the conversion from BufferNum {input_buffer_num} to BufferNum {output_buffer_num}.")
            self.buffer_num_validated = False
            return
        #self.input_buffer_num = input_buffer_num
        self.output_buffer_num = output_buffer_num
        self.buffer_num_validated = True




    def convert_label_type(self, input_dataframe, is_df = True, to_file = False, labels_only = True, df_datatype = torch.Tensor, feedback = False):
        if not self.type_validated:
            raise AssertionError("Yeah, no. You need to set the label type successfully before trying any conversions")

        input_label_type = self.input_specifiers.get(SKDescriptors.CLASSIFICATION_TYPE_FS, -1)


        base_constr = lambda dataframe_like = None, size = 0: df_datatype(dataframe_like) if dataframe_like is not None else df_datatype(np.empty((size, 0)))
        select_cols = lambda dataframe, indices: dataframe[:, indices] if not(isinstance(indices, Iterable) and len(indices) > 1) or isinstance(indices, str) else base_constr(np.concatenate([dataframe[:, i] for i in indices], axis = 0))
        merge_cols = lambda cols: base_constr(np.sum(cols, axis = 1))
        append_col = lambda dataframe, col: base_constr(np.concatenate((dataframe, col), axis = 1))

        match df_datatype:
            case torch.Tensor:
                # copy = lambda dataframe_like: dataframe_like.clone().detach().requires_grad_(dataframe_like.requires_grad)
                base_constr = lambda dataframe_like = None, size = 0: torch.from_numpy(dataframe_like) if dataframe_like is not None else torch.empty((size, 0), dtype=input_dataframe.dtype, layout=input_dataframe.layout, requires_grad=input_dataframe.requires_grad)
                select_cols = lambda dataframe, indices: dataframe[:, indices].unsqueeze(1) if not(isinstance(indices, Iterable) and len(indices) > 1) or isinstance(indices, str) else torch.cat([dataframe[:, i].unsqueeze(1) for i in indices], dim = 1)
                merge_cols = lambda cols: torch.sum(cols, dim = 1).unsqueeze(1)
                append_col = lambda dataframe, col: torch.cat((dataframe, col), dim = 1)
            case _ :
                raise NotImplementedError(f"SKLabelConverter.convert_label_type is not yet equipped to handle '{(str)(df_datatype)}'.\
                                          \n\t\t\tTry using 'torch.Tensor' or implement logic for a different type")


        if not is_df:
            superclasses = SKDescriptors.get_superclass_dict(input_label_type, self.output_label_type)
            output_dataframe = base_constr(np.vectorize(superclasses.__getitem__)(input_dataframe))

            # a, b = np.unique(np.array())
            # output_dataframe = 

            # output_dataframe = np.ndarray(input_dataframe.shape)

            # output_dataframe = input_dataframe
            # for i in range(len(input_dataframe)):
            #     print(len(input_dataframe))
            #     print(i)
            #     print(input_dataframe)
            #     print(input_dataframe[i])
            #     print("yippee")
            #     output_dataframe[i] = superclasses[input_dataframe[i]]
            if feedback:
                print(f"Label conversion from type {input_label_type} to type {self.output_label_type} by changing output column num")
            return output_dataframe
        

        subclasses = SKDescriptors.get_subclass_dict(input_label_type, self.output_label_type)
        input_num = SKDescriptors.NUM_OF_INPUTS_PER_TYPE[input_label_type]
        if labels_only:
            output_dataframe = base_constr(size = np.shape(input_dataframe)[0])
        else:
            output_dataframe = select_cols(input_dataframe, slice(input_num))
            input_dataframe = select_cols(input_dataframe, slice(input_num, input_num + SKDescriptors.NUM_OF_OUTPUTS_PER_TYPE[input_label_type]))
        # this is what converts the data
        # if you want custom conversion logic, you might want to start reworking earlier parts of the function
            # and leave this here since this is highly abstract and allows for most needed conversions
        if feedback:
            print(f"Label conversion from type {input_label_type} to type {self.output_label_type}:")
        for (i, cts), l in subclasses.items():
            col_inds = [t[0] for t in l]
            if not col_inds:
                raise AssertionError("Not only did we not find any superclasses, but we failed to detect such with our first assertion test. You will probably need to do some serious searching for this bug.")
            if feedback:
                print(f"\tNew class {i} replaces old classes {col_inds}")
            old_cols = select_cols(input_dataframe, col_inds)
            if len(col_inds) > 1:
                new_col = merge_cols(old_cols)
            output_dataframe = append_col(output_dataframe, new_col)


        if to_file:
            raise NotImplementedError("Saving altered data labels to a file does not work currently. Reimplementation of this will hopefully be easy, but it is not a priority at the moment of writing this")
        else:
            return output_dataframe